This is an automated email from the ASF dual-hosted git repository. raulcd pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-swift.git
commit 6344e1a443a91fd73f1578953455f9ced13faa09 Author: abandy <[email protected]> AuthorDate: Sat May 20 10:04:00 2023 -0400 GH-35659: [Swift] Initial Swift IPC writer (#35660) ### Rationale for this change This adds the initial IPC writer for swift in order to write the arrow data to memory or to a file. ### What changes are included in this PR? - IPC writer - bug fixes found during writer testing ### Are these changes tested? Writer unit test added. * Closes: #35659 Authored-by: Alva Bandy <[email protected]> Signed-off-by: Sutou Kouhei <[email protected]> --- Arrow/Sources/Arrow/ArrowBuffer.swift | 7 +- Arrow/Sources/Arrow/ArrowBufferBuilder.swift | 3 + Arrow/Sources/Arrow/ArrowReader.swift | 4 +- Arrow/Sources/Arrow/ArrowReaderHelper.swift | 21 +- Arrow/Sources/Arrow/ArrowTable.swift | 86 ++++++++- Arrow/Sources/Arrow/ArrowWriter.swift | 214 +++++++++++++++++++++ Arrow/Sources/Arrow/ArrowWriterHelper.swift | 97 ++++++++++ .../{ArrowTests.swift => ArrayTests.swift} | 66 ++----- Arrow/Tests/ArrowTests/IPCTests.swift | 136 ++++++++++--- Arrow/Tests/ArrowTests/RecordBatchTests.swift | 51 +++++ Arrow/Tests/ArrowTests/TableTests.swift | 103 ++++++++++ 11 files changed, 699 insertions(+), 89 deletions(-) diff --git a/Arrow/Sources/Arrow/ArrowBuffer.swift b/Arrow/Sources/Arrow/ArrowBuffer.swift index 6aa8f7b..d56279b 100644 --- a/Arrow/Sources/Arrow/ArrowBuffer.swift +++ b/Arrow/Sources/Arrow/ArrowBuffer.swift @@ -34,6 +34,11 @@ public class ArrowBuffer { self.rawPointer.deallocate() } + func append(to data: inout Data) { + let ptr = UnsafePointer(rawPointer.assumingMemoryBound(to: UInt8.self)) + data.append(ptr, count: Int(capacity)); + } + static func createBuffer(_ data: [UInt8], length: UInt) -> ArrowBuffer { let byteCount = UInt(data.count) let capacity = alignTo64(byteCount) @@ -54,7 +59,7 @@ public class ArrowBuffer { let memory = MemoryAllocator(64) let rawPointer = memory.allocateArray(Int(capacity)) rawPointer.initializeMemory(as: UInt8.self, repeating: 0, count: Int(capacity)) - return ArrowBuffer(length: actualLen, capacity: capacity, rawPointer: rawPointer) + return ArrowBuffer(length: length, capacity: capacity, rawPointer: rawPointer) } static func copyCurrent(_ from: ArrowBuffer, to: inout ArrowBuffer, len: UInt) { diff --git a/Arrow/Sources/Arrow/ArrowBufferBuilder.swift b/Arrow/Sources/Arrow/ArrowBufferBuilder.swift index 65ac44e..6dc5020 100644 --- a/Arrow/Sources/Arrow/ArrowBufferBuilder.swift +++ b/Arrow/Sources/Arrow/ArrowBufferBuilder.swift @@ -51,6 +51,9 @@ public class BaseBufferBuilder<T> { func resizeLength(_ data: ArrowBuffer, len: UInt = 0) -> UInt { if len == 0 || len < data.length * 2 { + if data.length == 0 || data.length * 2 < ArrowBuffer.min_length { + return ArrowBuffer.min_length + } return UInt(data.length * 2); } diff --git a/Arrow/Sources/Arrow/ArrowReader.swift b/Arrow/Sources/Arrow/ArrowReader.swift index 88ef4e1..68647ca 100644 --- a/Arrow/Sources/Arrow/ArrowReader.swift +++ b/Arrow/Sources/Arrow/ArrowReader.swift @@ -40,7 +40,7 @@ public class ArrowReader { for index in 0 ..< schema.fieldsCount { let field = schema.fields(at: index)! let arrowField = ArrowField(field.name!, type: findArrowType(field), isNullable: field.nullable) - builder.addField(arrowField) + let _ = builder.addField(arrowField) if field.typeType == .struct_ { throw ValidationError.unknownType } @@ -133,7 +133,7 @@ public class ArrowReader { let messageStartOffset = recordBatch.offset + (Int64(MemoryLayout<Int32>.size) * messageOffset) let messageEndOffset = messageStartOffset + Int64(messageLength) - let recordBatchData = fileData[messageStartOffset ... messageEndOffset] + let recordBatchData = fileData[messageStartOffset ..< messageEndOffset] let mbb = ByteBuffer(data: recordBatchData) let message = org_apache_arrow_flatbuf_Message.getRootAsMessage(bb: mbb) switch message.headerType { diff --git a/Arrow/Sources/Arrow/ArrowReaderHelper.swift b/Arrow/Sources/Arrow/ArrowReaderHelper.swift index 6f306f7..c4e7b55 100644 --- a/Arrow/Sources/Arrow/ArrowReaderHelper.swift +++ b/Arrow/Sources/Arrow/ArrowReaderHelper.swift @@ -76,8 +76,18 @@ func makeArrayHolder(_ field: org_apache_arrow_flatbuf_Field, buffers: [ArrowBuf case .utf8: let arrowData = try ArrowData(ArrowType.ArrowString, buffers: buffers, nullCount: buffers[0].length, stride: MemoryLayout<Int8>.stride) - let chuckedArray = try ChunkedArray<String>([StringArray(arrowData)]) - return ChunkedArrayHolder(chuckedArray) + return ChunkedArrayHolder(try ChunkedArray<String>([StringArray(arrowData)])) + case .date: + let dateType = field.type(type: org_apache_arrow_flatbuf_Date.self)! + if dateType.unit == .day { + let arrowData = try ArrowData(ArrowType.ArrowString, buffers: buffers, + nullCount: buffers[0].length, stride: MemoryLayout<Date>.stride) + return ChunkedArrayHolder(try ChunkedArray<Date>([Date32Array(arrowData)])) + } + + let arrowData = try ArrowData(ArrowType.ArrowString, buffers: buffers, + nullCount: buffers[0].length, stride: MemoryLayout<Date>.stride) + return ChunkedArrayHolder(try ChunkedArray<Date>([Date64Array(arrowData)])) default: throw ValidationError.unknownType } @@ -125,6 +135,13 @@ func findArrowType(_ field: org_apache_arrow_flatbuf_Field) -> ArrowType.Info { } case .utf8: return ArrowType.ArrowString + case .date: + let dateType = field.type(type: org_apache_arrow_flatbuf_Date.self)! + if dateType.unit == .day { + return ArrowType.ArrowDate32 + } + + return ArrowType.ArrowDate64 default: return ArrowType.ArrowUnknown } diff --git a/Arrow/Sources/Arrow/ArrowTable.swift b/Arrow/Sources/Arrow/ArrowTable.swift index e5eb0d7..b8b8707 100644 --- a/Arrow/Sources/Arrow/ArrowTable.swift +++ b/Arrow/Sources/Arrow/ArrowTable.swift @@ -22,18 +22,58 @@ public class ChunkedArrayHolder { public let length: UInt public let nullCount: UInt public let holder: Any - + public let getBufferData: () throws -> [Data] + public let getBufferDataSizes: () throws -> [Int] public init<T>(_ chunked: ChunkedArray<T>) { self.holder = chunked self.length = chunked.length self.type = chunked.type self.nullCount = chunked.nullCount + self.getBufferData = {() throws -> [Data] in + var bufferData = [Data]() + var numBuffers = 2; + if !isFixedPrimitive(try toFBTypeEnum(chunked.type)) { + numBuffers = 3 + } + + for _ in 0 ..< numBuffers { + bufferData.append(Data()) + } + + for arrow_data in chunked.arrays { + for index in 0 ..< numBuffers { + arrow_data.arrowData.buffers[index].append(to: &bufferData[index]) + } + } + + return bufferData; + } + + self.getBufferDataSizes = {() throws -> [Int] in + var bufferDataSizes = [Int]() + var numBuffers = 2; + if !isFixedPrimitive(try toFBTypeEnum(chunked.type)) { + numBuffers = 3 + } + for _ in 0 ..< numBuffers { + bufferDataSizes.append(Int(0)) + } + + for arrow_data in chunked.arrays { + for index in 0 ..< numBuffers { + bufferDataSizes[index] += Int(arrow_data.arrowData.buffers[index].capacity); + } + } + + return bufferDataSizes; + } + } } public class ArrowColumn { public let field: ArrowField - private let dataHolder: ChunkedArrayHolder + fileprivate let dataHolder: ChunkedArrayHolder public var type: ArrowType.Info {get{return self.dataHolder.type}} public var length: UInt {get{return self.dataHolder.length}} public var nullCount: UInt {get{return self.dataHolder.nullCount}} @@ -60,6 +100,15 @@ public class ArrowTable { self.rowCount = columns[0].length } + public func toRecordBatch() -> RecordBatch { + var rbColumns = [ChunkedArrayHolder]() + for column in self.columns { + rbColumns.append(column.dataHolder) + } + + return RecordBatch(schema, columns: rbColumns) + } + public class Builder { let schemaBuilder = ArrowSchema.Builder() var columns = [ArrowColumn]() @@ -70,19 +119,19 @@ public class ArrowTable { public func addColumn<T>(_ fieldName: String, chunked: ChunkedArray<T>) -> Builder { let field = ArrowField(fieldName, type: chunked.type, isNullable: chunked.nullCount != 0) - self.schemaBuilder.addField(field) + let _ = self.schemaBuilder.addField(field) self.columns.append(ArrowColumn(field, chunked: chunked)) return self } public func addColumn<T>(_ field: ArrowField, arrowArray: ArrowArray<T>) throws -> Builder { - self.schemaBuilder.addField(field) + let _ = self.schemaBuilder.addField(field) self.columns.append(ArrowColumn(field, chunked: try ChunkedArray([arrowArray]))) return self } public func addColumn<T>(_ field: ArrowField, chunked: ChunkedArray<T>) -> Builder { - self.schemaBuilder.addField(field) + let _ = self.schemaBuilder.addField(field) self.columns.append(ArrowColumn(field, chunked: chunked)) return self } @@ -109,6 +158,33 @@ public class RecordBatch { self.length = columns[0].length } + public class Builder { + let schemaBuilder = ArrowSchema.Builder() + var columns = [ChunkedArrayHolder]() + + public func addColumn(_ fieldName: String, chunked: ChunkedArrayHolder) -> Builder { + let field = ArrowField(fieldName, type: chunked.type, isNullable: chunked.nullCount != 0) + let _ = self.schemaBuilder.addField(field) + self.columns.append(chunked) + return self + } + + public func addColumn(_ field: ArrowField, chunked: ChunkedArrayHolder) -> Builder { + let _ = self.schemaBuilder.addField(field) + self.columns.append(chunked) + return self + } + + public func finish() -> RecordBatch { + return RecordBatch(self.schemaBuilder.finish(), columns: self.columns) + } + } + + public func data<T>(for columnIndex: Int) -> ChunkedArray<T> { + let arrayHolder = column(columnIndex) + return (arrayHolder.holder as! ChunkedArray<T>) + } + public func column(_ index: Int) -> ChunkedArrayHolder { return self.columns[index] } diff --git a/Arrow/Sources/Arrow/ArrowWriter.swift b/Arrow/Sources/Arrow/ArrowWriter.swift new file mode 100644 index 0000000..70b4d0e --- /dev/null +++ b/Arrow/Sources/Arrow/ArrowWriter.swift @@ -0,0 +1,214 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import Foundation +import FlatBuffers + +public protocol DataWriter { + var count: Int {get} + func append(_ data: Data) +} + +public class ArrowWriter { + public class InMemDataWriter : DataWriter { + public private(set) var data: Data; + public var count: Int { return data.count } + public init(_ data: Data) { + self.data = data + } + convenience init() { + self.init(Data()) + } + + public func append(_ data: Data) { + self.data.append(data) + } + } + + public class FileDataWriter : DataWriter { + private var handle: FileHandle + private var current_size: Int = 0 + public var count: Int { return current_size } + public init(_ handle: FileHandle) { + self.handle = handle + } + + public func append(_ data: Data) { + self.handle.write(data) + self.current_size += data.count + } + } + + private func writeField(_ fbb: inout FlatBufferBuilder, field: ArrowField) throws -> Offset { + let nameOffset = fbb.create(string: field.name) + let fieldTypeOfffset = try toFBType(&fbb, infoType: field.type) + let startOffset = org_apache_arrow_flatbuf_Field.startField(&fbb) + org_apache_arrow_flatbuf_Field.add(name: nameOffset, &fbb) + org_apache_arrow_flatbuf_Field.add(nullable: field.isNullable, &fbb) + org_apache_arrow_flatbuf_Field.add(typeType: try toFBTypeEnum(field.type), &fbb) + org_apache_arrow_flatbuf_Field.add(type: fieldTypeOfffset, &fbb) + return org_apache_arrow_flatbuf_Field.endField(&fbb, start: startOffset) + } + + private func writeSchema(_ fbb: inout FlatBufferBuilder, schema: ArrowSchema) throws -> Offset { + var fieldOffsets = [Offset]() + for field in schema.fields { + fieldOffsets.append(try writeField(&fbb, field: field)) + } + + let fieldsOffset: Offset = fbb.createVector(ofOffsets: fieldOffsets) + let schemaOffset = org_apache_arrow_flatbuf_Schema.createSchema(&fbb, endianness: .little, fieldsVectorOffset: fieldsOffset) + return schemaOffset + + } + + private func writeRecordBatches(_ writer: inout DataWriter, batches: [RecordBatch]) throws -> [org_apache_arrow_flatbuf_Block] { + var rbBlocks = [org_apache_arrow_flatbuf_Block]() + + for batch in batches { + let startIndex = writer.count + let rbResult = try writeRecordBatch(batch: batch) + withUnsafeBytes(of: rbResult.1.o.littleEndian) {writer.append(Data($0))} + writer.append(rbResult.0) + try writeRecordBatchData(&writer, batch: batch) + rbBlocks.append(org_apache_arrow_flatbuf_Block(offset: Int64(startIndex), metaDataLength: Int32(0), bodyLength: Int64(rbResult.1.o))) + } + + return rbBlocks + } + + private func writeRecordBatch(batch: RecordBatch) throws -> (Data, Offset) { + let schema = batch.schema + var output = Data() + var fbb = FlatBufferBuilder() + + // write out field nodes + var fieldNodeOffsets = [Offset]() + fbb.startVector(schema.fields.count, elementSize: MemoryLayout<org_apache_arrow_flatbuf_FieldNode>.size) + for index in (0 ..< schema.fields.count).reversed() { + let column = batch.column(index) + let fieldNode = org_apache_arrow_flatbuf_FieldNode(length: Int64(column.length), nullCount: Int64(column.nullCount)) + fieldNodeOffsets.append(fbb.create(struct: fieldNode)) + } + + let nodeOffset = fbb.endVector(len: schema.fields.count) + + // write out buffers + var buffers = [org_apache_arrow_flatbuf_Buffer]() + var bufferOffset = Int(0) + for index in 0 ..< batch.schema.fields.count { + let column = batch.column(index) + let colBufferDataSizes = try column.getBufferDataSizes() + for var bufferDataSize in colBufferDataSizes { + bufferDataSize = getPadForAlignment(bufferDataSize) + let buffer = org_apache_arrow_flatbuf_Buffer(offset: Int64(bufferOffset), length: Int64(bufferDataSize)) + buffers.append(buffer) + bufferOffset += bufferDataSize + } + } + + org_apache_arrow_flatbuf_RecordBatch.startVectorOfBuffers(batch.schema.fields.count, in: &fbb) + for buffer in buffers.reversed() { + fbb.create(struct: buffer) + } + + let batchBuffersOffset = fbb.endVector(len: buffers.count) + let startRb = org_apache_arrow_flatbuf_RecordBatch.startRecordBatch(&fbb); + org_apache_arrow_flatbuf_RecordBatch.addVectorOf(nodes:nodeOffset, &fbb) + org_apache_arrow_flatbuf_RecordBatch.addVectorOf(buffers:batchBuffersOffset, &fbb) + let recordBatchOffset = org_apache_arrow_flatbuf_RecordBatch.endRecordBatch(&fbb, start: startRb) + + let bodySize = Int64(bufferOffset); + let startMessage = org_apache_arrow_flatbuf_Message.startMessage(&fbb) + org_apache_arrow_flatbuf_Message.add(bodyLength: Int64(bodySize), &fbb) + org_apache_arrow_flatbuf_Message.add(headerType: .recordbatch, &fbb) + org_apache_arrow_flatbuf_Message.add(header: recordBatchOffset, &fbb) + let messageOffset = org_apache_arrow_flatbuf_Message.endMessage(&fbb, start: startMessage) + fbb.finish(offset: messageOffset) + output.append(fbb.data) + return (output, Offset(offset: UInt32(output.count))) + } + + private func writeRecordBatchData(_ writer: inout DataWriter, batch: RecordBatch) throws { + for index in 0 ..< batch.schema.fields.count { + let column = batch.column(index) + let colBufferData = try column.getBufferData() + for var bufferData in colBufferData { + addPadForAlignment(&bufferData) + writer.append(bufferData) + } + } + } + + private func writeFooter(schema: ArrowSchema, rbBlocks: [org_apache_arrow_flatbuf_Block]) throws -> Data { + var fbb: FlatBufferBuilder = FlatBufferBuilder() + let schemaOffset = try writeSchema(&fbb, schema: schema) + + let _ = fbb.startVector(rbBlocks.count, elementSize: MemoryLayout<org_apache_arrow_flatbuf_Block>.size) + for blkInfo in rbBlocks.reversed() { + fbb.create(struct: blkInfo) + } + + let rbBlkEnd = fbb.endVector(len: rbBlocks.count) + + + let footerStartOffset = org_apache_arrow_flatbuf_Footer.startFooter(&fbb) + org_apache_arrow_flatbuf_Footer.add(schema: schemaOffset, &fbb) + org_apache_arrow_flatbuf_Footer.addVectorOf(recordBatches: rbBlkEnd, &fbb) + let footerOffset = org_apache_arrow_flatbuf_Footer.endFooter(&fbb, start: footerStartOffset) + fbb.finish(offset: footerOffset) + return fbb.data + } + + private func writeStream(_ writer: inout DataWriter, schema: ArrowSchema, batches: [RecordBatch]) throws { + var fbb: FlatBufferBuilder = FlatBufferBuilder() + let schemaOffset = try writeSchema(&fbb, schema: schema) + fbb.finish(offset: schemaOffset) + writer.append(fbb.data) + + let rbBlocks = try writeRecordBatches(&writer, batches: batches) + let footerData = try writeFooter(schema: schema, rbBlocks: rbBlocks) + fbb.finish(offset: Offset(offset: fbb.buffer.size)) + let footerOffset = writer.count + writer.append(footerData) + addPadForAlignment(&writer) + + withUnsafeBytes(of: Int32(0).littleEndian) { writer.append(Data($0)) } + let footerDiff = (UInt32(writer.count) - UInt32(footerOffset)); + withUnsafeBytes(of: footerDiff.littleEndian) { writer.append(Data($0)) } + } + + public func toStream(_ schema: ArrowSchema, batches: [RecordBatch]) throws -> Data { + var writer: any DataWriter = InMemDataWriter() + try writeStream(&writer, schema: schema, batches: batches) + return (writer as! InMemDataWriter).data + } + + public func toFile(_ fileName: URL, schema: ArrowSchema, batches: [RecordBatch]) throws { + try Data().write(to: fileName) + let fileHandle = FileHandle(forUpdatingAtPath: fileName.path)! + defer { fileHandle.closeFile() } + + var markerData = FILEMARKER.data(using: .utf8)!; + addPadForAlignment(&markerData) + + var writer: any DataWriter = FileDataWriter(fileHandle) + writer.append(FILEMARKER.data(using: .utf8)!) + try writeStream(&writer, schema: schema, batches: batches) + writer.append(FILEMARKER.data(using: .utf8)!) + } +} diff --git a/Arrow/Sources/Arrow/ArrowWriterHelper.swift b/Arrow/Sources/Arrow/ArrowWriterHelper.swift new file mode 100644 index 0000000..520d4f0 --- /dev/null +++ b/Arrow/Sources/Arrow/ArrowWriterHelper.swift @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import Foundation +import FlatBuffers + +extension Data { + func hexEncodedString() -> String { + return map { String(format: "%02hhx", $0) }.joined() + } +} + +func toFBTypeEnum(_ infoType: ArrowType.Info) throws -> org_apache_arrow_flatbuf_Type_ { + if infoType == ArrowType.ArrowInt8 || infoType == ArrowType.ArrowInt16 || + infoType == ArrowType.ArrowInt64 || infoType == ArrowType.ArrowUInt8 || + infoType == ArrowType.ArrowUInt16 || infoType == ArrowType.ArrowUInt32 || + infoType == ArrowType.ArrowUInt64 || infoType == ArrowType.ArrowInt32 { + return org_apache_arrow_flatbuf_Type_.int + } else if infoType == ArrowType.ArrowFloat || infoType == ArrowType.ArrowDouble { + return org_apache_arrow_flatbuf_Type_.floatingpoint + } else if infoType == ArrowType.ArrowString { + return org_apache_arrow_flatbuf_Type_.utf8 + } else if infoType == ArrowType.ArrowBool { + return org_apache_arrow_flatbuf_Type_.bool + } else if infoType == ArrowType.ArrowDate32 || infoType == ArrowType.ArrowDate64 { + return org_apache_arrow_flatbuf_Type_.date + } + + throw ValidationError.unknownType +} + +func toFBType(_ fbb: inout FlatBufferBuilder, infoType: ArrowType.Info) throws -> Offset { + if infoType == ArrowType.ArrowInt8 || infoType == ArrowType.ArrowUInt8 { + return org_apache_arrow_flatbuf_Int.createInt(&fbb, bitWidth: 8, isSigned: infoType == ArrowType.ArrowInt8); + } else if infoType == ArrowType.ArrowInt16 || infoType == ArrowType.ArrowUInt16 { + return org_apache_arrow_flatbuf_Int.createInt(&fbb, bitWidth: 16, isSigned: infoType == ArrowType.ArrowInt16); + } else if infoType == ArrowType.ArrowInt32 || infoType == ArrowType.ArrowUInt32 { + return org_apache_arrow_flatbuf_Int.createInt(&fbb, bitWidth: 32, isSigned: infoType == ArrowType.ArrowInt32); + } else if infoType == ArrowType.ArrowInt64 || infoType == ArrowType.ArrowUInt64 { + return org_apache_arrow_flatbuf_Int.createInt(&fbb, bitWidth: 64, isSigned: infoType == ArrowType.ArrowInt64); + } else if infoType == ArrowType.ArrowFloat { + return org_apache_arrow_flatbuf_FloatingPoint.createFloatingPoint(&fbb, precision: .single) + } else if infoType == ArrowType.ArrowDouble { + return org_apache_arrow_flatbuf_FloatingPoint.createFloatingPoint(&fbb, precision: .double) + } else if infoType == ArrowType.ArrowString { + return org_apache_arrow_flatbuf_Utf8.endUtf8(&fbb, start: org_apache_arrow_flatbuf_Utf8.startUtf8(&fbb)) + } else if infoType == ArrowType.ArrowBool { + return org_apache_arrow_flatbuf_Bool.endBool(&fbb, start: org_apache_arrow_flatbuf_Bool.startBool(&fbb)) + } else if infoType == ArrowType.ArrowDate32 { + let startOffset = org_apache_arrow_flatbuf_Date.startDate(&fbb) + org_apache_arrow_flatbuf_Date.add(unit: .day, &fbb) + return org_apache_arrow_flatbuf_Date.endDate(&fbb, start: startOffset) + } else if infoType == ArrowType.ArrowDate64 { + let startOffset = org_apache_arrow_flatbuf_Date.startDate(&fbb) + org_apache_arrow_flatbuf_Date.add(unit: .millisecond, &fbb) + return org_apache_arrow_flatbuf_Date.endDate(&fbb, start: startOffset) + } + + throw ValidationError.unknownType +} + +func addPadForAlignment(_ data: inout Data, alignment: Int = 8) { + let padding = data.count % Int(alignment) + if padding > 0 { + data.append(Data([UInt8](repeating: 0, count: alignment - padding))) + } +} + +func addPadForAlignment(_ writer: inout DataWriter, alignment: Int = 8) { + let padding = writer.count % Int(alignment) + if padding > 0 { + writer.append(Data([UInt8](repeating: 0, count: alignment - padding))) + } +} + +func getPadForAlignment(_ count: Int, alignment: Int = 8) -> Int { + let padding = count % Int(alignment) + if padding > 0 { + return count + (alignment - padding) + } + + return count +} diff --git a/Arrow/Tests/ArrowTests/ArrowTests.swift b/Arrow/Tests/ArrowTests/ArrayTests.swift similarity index 67% rename from Arrow/Tests/ArrowTests/ArrowTests.swift rename to Arrow/Tests/ArrowTests/ArrayTests.swift index 8aa6a38..b20064d 100644 --- a/Arrow/Tests/ArrowTests/ArrowTests.swift +++ b/Arrow/Tests/ArrowTests/ArrayTests.swift @@ -18,7 +18,7 @@ import XCTest @testable import Arrow -final class ArrowTests: XCTestCase { +final class ArrayTests: XCTestCase { func testPrimitiveArray() throws { // This is an example of a functional test case. // Use XCTAssert and related functions to verify your tests produce the correct @@ -46,7 +46,7 @@ final class ArrowTests: XCTestCase { XCTAssertEqual(doubleBuilder.length, 2) XCTAssertEqual(doubleBuilder.capacity, 264) let doubleArray = try doubleBuilder.finish() - XCTAssertEqual(doubleArray.length, 32) + XCTAssertEqual(doubleArray.length, 2) XCTAssertEqual(doubleArray[0]!, 14) XCTAssertEqual(doubleArray[1]!, 40.4) } @@ -62,7 +62,7 @@ final class ArrowTests: XCTestCase { } XCTAssertEqual(stringBuilder.nullCount, 10) XCTAssertEqual(stringBuilder.length, 100) - XCTAssertEqual(stringBuilder.capacity, 1032) + XCTAssertEqual(stringBuilder.capacity, 648) let stringArray = try stringBuilder.finish() XCTAssertEqual(stringArray.length, 100) for i in 0..<stringArray.length { @@ -87,58 +87,17 @@ final class ArrowTests: XCTestCase { XCTAssertEqual(boolBuilder.length, 4) XCTAssertEqual(boolBuilder.capacity, 72) let boolArray = try boolBuilder.finish() - XCTAssertEqual(boolArray.length, 32) + XCTAssertEqual(boolArray.length, 4) XCTAssertEqual(boolArray[1], nil) XCTAssertEqual(boolArray[0]!, true) XCTAssertEqual(boolArray[2]!, false) - } - - func testSchema() throws { - let schemaBuilder = ArrowSchema.Builder(); - let schema = schemaBuilder.addField("col1", type: ArrowType.ArrowInt8, isNullable: true) - .addField("col2", type: ArrowType.ArrowBool, isNullable: false) - .finish() - XCTAssertEqual(schema.fields.count, 2) - XCTAssertEqual(schema.fields[0].name, "col1") - XCTAssertEqual(schema.fields[0].type, ArrowType.ArrowInt8) - XCTAssertEqual(schema.fields[0].isNullable, true) - XCTAssertEqual(schema.fields[1].name, "col2") - XCTAssertEqual(schema.fields[1].type, ArrowType.ArrowBool) - XCTAssertEqual(schema.fields[1].isNullable, false) - } - - func testTable() throws { - let uint8Builder: NumberArrayBuilder<UInt8> = try ArrowArrayBuilders.loadNumberArrayBuilder(); - uint8Builder.append(10) - uint8Builder.append(22) - let stringBuilder = try ArrowArrayBuilders.loadStringArrayBuilder(); - stringBuilder.append("test10") - stringBuilder.append("test22") - let table = try ArrowTable.Builder() - .addColumn("col1", arrowArray: uint8Builder.finish()) - .addColumn("col2", arrowArray: stringBuilder.finish()) - .finish(); - - let schema = table.schema - XCTAssertEqual(schema.fields.count, 2) - XCTAssertEqual(schema.fields[0].name, "col1") - XCTAssertEqual(schema.fields[0].type, ArrowType.ArrowUInt8) - XCTAssertEqual(schema.fields[0].isNullable, false) - XCTAssertEqual(schema.fields[1].name, "col2") - XCTAssertEqual(schema.fields[1].type, ArrowType.ArrowString) - XCTAssertEqual(schema.fields[1].isNullable, false) - XCTAssertEqual(table.columns.count, 2) - let col1: ChunkedArray<UInt8> = table.columns[0].data(); - let col2: ChunkedArray<String> = table.columns[1].data(); - XCTAssertEqual(col1.length, 32) - XCTAssertEqual(col2.length, 32) } - func testDate() throws { + func testDate32Array() throws { let date32Builder: Date32ArrayBuilder = try ArrowArrayBuilders.loadDate32ArrayBuilder(); let date2 = Date(timeIntervalSinceReferenceDate: 86400 * 1) - let date1 = Date(timeIntervalSinceReferenceDate: 86400 * 5000) + let date1 = Date(timeIntervalSinceReferenceDate: 86400 * 5000 + 352) date32Builder.append(date1) date32Builder.append(date2) date32Builder.append(nil) @@ -146,11 +105,16 @@ final class ArrowTests: XCTestCase { XCTAssertEqual(date32Builder.length, 3) XCTAssertEqual(date32Builder.capacity, 136) let date32Array = try date32Builder.finish() - XCTAssertEqual(date32Array.length, 32) + XCTAssertEqual(date32Array.length, 3) XCTAssertEqual(date32Array[1], date2) - XCTAssertEqual(date32Array[0]!, date1) - + let adjustedDate1 = Date(timeIntervalSince1970: date1.timeIntervalSince1970 - 352) + XCTAssertEqual(date32Array[0]!, adjustedDate1) + } + + func testDate64Array() throws { let date64Builder: Date64ArrayBuilder = try ArrowArrayBuilders.loadDate64ArrayBuilder(); + let date2 = Date(timeIntervalSinceReferenceDate: 86400 * 1) + let date1 = Date(timeIntervalSinceReferenceDate: 86400 * 5000 + 352) date64Builder.append(date1) date64Builder.append(date2) date64Builder.append(nil) @@ -158,7 +122,7 @@ final class ArrowTests: XCTestCase { XCTAssertEqual(date64Builder.length, 3) XCTAssertEqual(date64Builder.capacity, 264) let date64Array = try date64Builder.finish() - XCTAssertEqual(date64Array.length, 32) + XCTAssertEqual(date64Array.length, 3) XCTAssertEqual(date64Array[1], date2) XCTAssertEqual(date64Array[0]!, date1) diff --git a/Arrow/Tests/ArrowTests/IPCTests.swift b/Arrow/Tests/ArrowTests/IPCTests.swift index 9f876f8..f57f043 100644 --- a/Arrow/Tests/ArrowTests/IPCTests.swift +++ b/Arrow/Tests/ArrowTests/IPCTests.swift @@ -19,17 +19,42 @@ import XCTest import FlatBuffers @testable import Arrow -final class IPCTests: XCTestCase { - func currentDirectory(path: String = #file) -> URL { - return URL(fileURLWithPath: path).deletingLastPathComponent() +func checkBoolRecordBatch(_ recordBatches: [RecordBatch]) { + XCTAssertEqual(recordBatches.count, 1) + for recordBatch in recordBatches { + XCTAssertEqual(recordBatch.length, 5) + XCTAssertEqual(recordBatch.columns.count, 2) + XCTAssertEqual(recordBatch.schema.fields.count, 2) + XCTAssertEqual(recordBatch.schema.fields[0].name, "one") + XCTAssertEqual(recordBatch.schema.fields[0].type, ArrowType.ArrowBool) + XCTAssertEqual(recordBatch.schema.fields[1].name, "two") + XCTAssertEqual(recordBatch.schema.fields[1].type, ArrowType.ArrowString) + for index in 0..<recordBatch.length { + let column = recordBatch.columns[0] + let str = column.holder as! AsString + let val = "\(str.asString(index))" + if index == 0 || index == 4 { + XCTAssertEqual(val, "true") + } else if index == 2 { + XCTAssertEqual(val, "") + } else { + XCTAssertEqual(val, "false") + } + } } +} - func testFileReader() throws { +func currentDirectory(path: String = #file) -> URL { + return URL(fileURLWithPath: path).deletingLastPathComponent() +} + +final class IPCFileReaderTests: XCTestCase { + func testFileReader_double() throws { let fileURL = currentDirectory().appendingPathComponent("../../testdata_double.arrow") let arrowReader = ArrowReader() - let recordBatchs = try arrowReader.fromFile(fileURL) - XCTAssertEqual(recordBatchs.count, 1) - for recordBatch in recordBatchs { + let recordBatches = try arrowReader.fromFile(fileURL) + XCTAssertEqual(recordBatches.count, 1) + for recordBatch in recordBatches { XCTAssertEqual(recordBatch.length, 5) XCTAssertEqual(recordBatch.columns.count, 2) XCTAssertEqual(recordBatch.schema.fields.count, 2) @@ -53,28 +78,83 @@ final class IPCTests: XCTestCase { func testFileReader_bool() throws { let fileURL = currentDirectory().appendingPathComponent("../../testdata_bool.arrow") let arrowReader = ArrowReader() - let recordBatchs = try arrowReader.fromFile(fileURL) - XCTAssertEqual(recordBatchs.count, 1) - for recordBatch in recordBatchs { - XCTAssertEqual(recordBatch.length, 5) - XCTAssertEqual(recordBatch.columns.count, 2) - XCTAssertEqual(recordBatch.schema.fields.count, 2) - XCTAssertEqual(recordBatch.schema.fields[0].name, "one") - XCTAssertEqual(recordBatch.schema.fields[0].type, ArrowType.ArrowBool) - XCTAssertEqual(recordBatch.schema.fields[1].name, "two") + let fileRBs = try arrowReader.fromFile(fileURL) + checkBoolRecordBatch(fileRBs) + } + + func testFileWriter_bool() throws { + //read existing file + let fileURL = currentDirectory().appendingPathComponent("../../testdata_bool.arrow") + let arrowReader = ArrowReader() + let fileRBs = try arrowReader.fromFile(fileURL) + checkBoolRecordBatch(fileRBs) + let arrowWriter = ArrowWriter() + //write data from file to a stream + let writeData = try arrowWriter.toStream(fileRBs[0].schema, batches: fileRBs) + //read stream back into recordbatches + checkBoolRecordBatch(try arrowReader.fromStream(writeData)) + //write file record batches to another file + let outputUrl = currentDirectory().appendingPathComponent("../../testfilewriter_bool.arrow") + try arrowWriter.toFile(outputUrl, schema: fileRBs[0].schema, batches: fileRBs) + checkBoolRecordBatch(try arrowReader.fromFile(outputUrl)) + } + + func makeSchema() throws -> ArrowSchema { + let schemaBuilder = ArrowSchema.Builder(); + return schemaBuilder.addField("col1", type: ArrowType.ArrowUInt8, isNullable: true) + .addField("col2", type: ArrowType.ArrowString, isNullable: false) + .addField("col3", type: ArrowType.ArrowDate32, isNullable: false) + .finish() + } + + func makeRecordBatch() throws -> RecordBatch { + let uint8Builder: NumberArrayBuilder<UInt8> = try ArrowArrayBuilders.loadNumberArrayBuilder(); + uint8Builder.append(10) + uint8Builder.append(22) + let stringBuilder = try ArrowArrayBuilders.loadStringArrayBuilder(); + stringBuilder.append("test10") + stringBuilder.append("test22") + let date32Builder = try ArrowArrayBuilders.loadDate32ArrayBuilder(); + let date2 = Date(timeIntervalSinceReferenceDate: 86400 * 1) + let date1 = Date(timeIntervalSinceReferenceDate: 86400 * 5000 + 352) + date32Builder.append(date1) + date32Builder.append(date2) + + let intHolder = ChunkedArrayHolder(try ChunkedArray([uint8Builder.finish()])) + let stringHolder = ChunkedArrayHolder(try ChunkedArray([stringBuilder.finish()])) + let date32Holder = ChunkedArrayHolder(try ChunkedArray([date32Builder.finish()])) + return RecordBatch.Builder() + .addColumn("col1", chunked: intHolder) + .addColumn("col2", chunked: stringHolder) + .addColumn("col3", chunked: date32Holder) + .finish() + } + + func testInMemoryToFromStream() throws { + //read existing file + let schema = try makeSchema() + let recordBatch = try makeRecordBatch() + let arrowWriter = ArrowWriter() + let writeData = try arrowWriter.toStream(schema, batches: [recordBatch]) + let arrowReader = ArrowReader() + let recordBatches = try arrowReader.fromStream(writeData) + XCTAssertEqual(recordBatches.count, 1) + for recordBatch in recordBatches { + XCTAssertEqual(recordBatch.length, 2) + XCTAssertEqual(recordBatch.columns.count, 3) + XCTAssertEqual(recordBatch.schema.fields.count, 3) + XCTAssertEqual(recordBatch.schema.fields[0].name, "col1") + XCTAssertEqual(recordBatch.schema.fields[0].type, ArrowType.ArrowUInt8) + XCTAssertEqual(recordBatch.schema.fields[1].name, "col2") XCTAssertEqual(recordBatch.schema.fields[1].type, ArrowType.ArrowString) - for index in 0..<recordBatch.length { - let column = recordBatch.columns[0] - let str = column.holder as! AsString - let val = "\(str.asString(index))" - if index == 0 || index == 4 { - XCTAssertEqual(val, "true") - } else if index == 2 { - XCTAssertEqual(val, "") - } else { - XCTAssertEqual(val, "false") - } - } + XCTAssertEqual(recordBatch.schema.fields[2].name, "col3") + XCTAssertEqual(recordBatch.schema.fields[2].type, ArrowType.ArrowDate32) + let dateVal = "\((recordBatch.columns[2].holder as! AsString).asString(0))" + XCTAssertEqual(dateVal, "2014-09-10 00:00:00 +0000") + let stringVal = "\((recordBatch.columns[1].holder as! AsString).asString(1))" + XCTAssertEqual(stringVal, "test22") + let uintVal = "\((recordBatch.columns[0].holder as! AsString).asString(0))" + XCTAssertEqual(uintVal, "10") } } } diff --git a/Arrow/Tests/ArrowTests/RecordBatchTests.swift b/Arrow/Tests/ArrowTests/RecordBatchTests.swift new file mode 100644 index 0000000..8af34d5 --- /dev/null +++ b/Arrow/Tests/ArrowTests/RecordBatchTests.swift @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import XCTest +@testable import Arrow + +final class RecordBatchTests: XCTestCase { + func testRecordBatch() throws { + let uint8Builder: NumberArrayBuilder<UInt8> = try ArrowArrayBuilders.loadNumberArrayBuilder(); + uint8Builder.append(10) + uint8Builder.append(22) + let stringBuilder = try ArrowArrayBuilders.loadStringArrayBuilder(); + stringBuilder.append("test10") + stringBuilder.append("test22") + + let intHolder = ChunkedArrayHolder(try ChunkedArray([uint8Builder.finish()])) + let stringHolder = ChunkedArrayHolder(try ChunkedArray([stringBuilder.finish()])) + let recordBatch = RecordBatch.Builder() + .addColumn("col1", chunked: intHolder) + .addColumn("col2", chunked: stringHolder) + .finish() + + let schema = recordBatch.schema + XCTAssertEqual(schema.fields.count, 2) + XCTAssertEqual(schema.fields[0].name, "col1") + XCTAssertEqual(schema.fields[0].type, ArrowType.ArrowUInt8) + XCTAssertEqual(schema.fields[0].isNullable, false) + XCTAssertEqual(schema.fields[1].name, "col2") + XCTAssertEqual(schema.fields[1].type, ArrowType.ArrowString) + XCTAssertEqual(schema.fields[1].isNullable, false) + XCTAssertEqual(recordBatch.columns.count, 2) + let col1: ChunkedArray<UInt8> = recordBatch.data(for: 0); + let col2: ChunkedArray<String> = recordBatch.data(for: 1); + XCTAssertEqual(col1.length, 2) + XCTAssertEqual(col2.length, 2) + } +} diff --git a/Arrow/Tests/ArrowTests/TableTests.swift b/Arrow/Tests/ArrowTests/TableTests.swift new file mode 100644 index 0000000..2d0a48e --- /dev/null +++ b/Arrow/Tests/ArrowTests/TableTests.swift @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import XCTest +@testable import Arrow + +final class TableTests: XCTestCase { + func testSchema() throws { + let schemaBuilder = ArrowSchema.Builder(); + let schema = schemaBuilder.addField("col1", type: ArrowType.ArrowInt8, isNullable: true) + .addField("col2", type: ArrowType.ArrowBool, isNullable: false) + .finish() + XCTAssertEqual(schema.fields.count, 2) + XCTAssertEqual(schema.fields[0].name, "col1") + XCTAssertEqual(schema.fields[0].type, ArrowType.ArrowInt8) + XCTAssertEqual(schema.fields[0].isNullable, true) + XCTAssertEqual(schema.fields[1].name, "col2") + XCTAssertEqual(schema.fields[1].type, ArrowType.ArrowBool) + XCTAssertEqual(schema.fields[1].isNullable, false) + } + + func testTable() throws { + let uint8Builder: NumberArrayBuilder<UInt8> = try ArrowArrayBuilders.loadNumberArrayBuilder(); + uint8Builder.append(10) + uint8Builder.append(22) + let stringBuilder = try ArrowArrayBuilders.loadStringArrayBuilder(); + stringBuilder.append("test10") + stringBuilder.append("test22") + let date32Builder: Date32ArrayBuilder = try ArrowArrayBuilders.loadDate32ArrayBuilder(); + let date2 = Date(timeIntervalSinceReferenceDate: 86400 * 1) + let date1 = Date(timeIntervalSinceReferenceDate: 86400 * 5000 + 352) + date32Builder.append(date1) + date32Builder.append(date2) + + let table = try ArrowTable.Builder() + .addColumn("col1", arrowArray: uint8Builder.finish()) + .addColumn("col2", arrowArray: stringBuilder.finish()) + .addColumn("col3", arrowArray: date32Builder.finish()) + .finish(); + + let schema = table.schema + XCTAssertEqual(schema.fields.count, 3) + XCTAssertEqual(schema.fields[0].name, "col1") + XCTAssertEqual(schema.fields[0].type, ArrowType.ArrowUInt8) + XCTAssertEqual(schema.fields[0].isNullable, false) + XCTAssertEqual(schema.fields[1].name, "col2") + XCTAssertEqual(schema.fields[1].type, ArrowType.ArrowString) + XCTAssertEqual(schema.fields[1].isNullable, false) + XCTAssertEqual(schema.fields[1].name, "col2") + XCTAssertEqual(schema.fields[1].type, ArrowType.ArrowString) + XCTAssertEqual(schema.fields[1].isNullable, false) + XCTAssertEqual(table.columns.count, 3) + let col1: ChunkedArray<UInt8> = table.columns[0].data(); + let col2: ChunkedArray<String> = table.columns[1].data(); + let col3: ChunkedArray<Date> = table.columns[2].data(); + XCTAssertEqual(col1.length, 2) + XCTAssertEqual(col2.length, 2) + XCTAssertEqual(col3.length, 2) + } + + func testTableToRecordBatch() throws { + let uint8Builder: NumberArrayBuilder<UInt8> = try ArrowArrayBuilders.loadNumberArrayBuilder(); + uint8Builder.append(10) + uint8Builder.append(22) + let stringBuilder = try ArrowArrayBuilders.loadStringArrayBuilder(); + stringBuilder.append("test10") + stringBuilder.append("test22") + + let table = try ArrowTable.Builder() + .addColumn("col1", arrowArray: uint8Builder.finish()) + .addColumn("col2", arrowArray: stringBuilder.finish()) + .finish(); + + let recordBatch = table.toRecordBatch() + let schema = recordBatch.schema + XCTAssertEqual(schema.fields.count, 2) + XCTAssertEqual(schema.fields[0].name, "col1") + XCTAssertEqual(schema.fields[0].type, ArrowType.ArrowUInt8) + XCTAssertEqual(schema.fields[0].isNullable, false) + XCTAssertEqual(schema.fields[1].name, "col2") + XCTAssertEqual(schema.fields[1].type, ArrowType.ArrowString) + XCTAssertEqual(schema.fields[1].isNullable, false) + XCTAssertEqual(recordBatch.columns.count, 2) + let col1: ChunkedArray<UInt8> = recordBatch.data(for: 0); + let col2: ChunkedArray<String> = recordBatch.data(for: 1); + XCTAssertEqual(col1.length, 2) + XCTAssertEqual(col2.length, 2) + } +}
