kou commented on code in PR #45029:
URL: https://github.com/apache/arrow/pull/45029#discussion_r2071012855
##########
swift/Arrow/Sources/Arrow/ArrowReader.swift:
##########
@@ -216,7 +216,67 @@ public class ArrowReader { // swiftlint:disable:this
type_body_length
return .success(RecordBatch(arrowSchema, columns: columns))
}
- public func fromStream( // swiftlint:disable:this function_body_length
+ public func fromMemoryStream( // swiftlint:disable:this
function_body_length
+ _ fileData: Data,
+ useUnalignedBuffers: Bool = false
+ ) -> Result<ArrowReaderResult, ArrowError> {
+ let result = ArrowReaderResult()
+ var offset: Int = 0
+ var length = getUInt32(fileData, offset: offset)
+ var streamData = fileData
+ var schemaMessage: org_apache_arrow_flatbuf_Schema?
+ while length != 0 {
+ if length == CONTINUATIONMARKER {
+ offset += Int(MemoryLayout<Int32>.size)
Review Comment:
Sorry. I don't remember this but I think that I referred `var offset: Int =
0`...
##########
swift/Arrow/Sources/Arrow/ArrowReader.swift:
##########
@@ -216,7 +216,76 @@ public class ArrowReader { // swiftlint:disable:this
type_body_length
return .success(RecordBatch(arrowSchema, columns: columns))
}
- public func fromStream( // swiftlint:disable:this function_body_length
+ /*
+ The Memory stream format is for reading the arrow streaming protocol.
This
+ format is slightly different from the File format protocol as it doesn't
contain
+ a header and footer
+ */
+ public func fromMemoryStream( // swiftlint:disable:this
function_body_length
+ _ fileData: Data,
+ useUnalignedBuffers: Bool = false
+ ) -> Result<ArrowReaderResult, ArrowError> {
+ let result = ArrowReaderResult()
+ var offset: Int = 0
+ var length = getUInt32(fileData, offset: offset)
+ var streamData = fileData
+ var schemaMessage: org_apache_arrow_flatbuf_Schema?
+ while length != 0 {
+ if length == CONTINUATIONMARKER {
+ offset += Int(MemoryLayout<Int32>.size)
+ length = getUInt32(fileData, offset: offset)
+ if length == 0 {
+ return .success(result)
+ }
+ }
+
+ offset += Int(MemoryLayout<Int32>.size)
+ streamData = fileData[offset...]
+ let dataBuffer = ByteBuffer(
+ data: streamData,
+ allowReadingUnalignedBuffers: true)
+ let message =
org_apache_arrow_flatbuf_Message.getRootAsMessage(bb: dataBuffer)
+ switch message.headerType {
+ case .recordbatch:
+ do {
+ let rbMessage = message.header(type:
org_apache_arrow_flatbuf_RecordBatch.self)!
+ offset += Int(message.bodyLength + Int64(length))
+ let recordBatch = try loadRecordBatch(
+ rbMessage,
+ schema: schemaMessage!,
+ arrowSchema: result.schema!,
+ data: fileData,
+ messageEndOffset: (message.bodyLength +
Int64(length))).get()
+ result.batches.append(recordBatch)
+ length = getUInt32(fileData, offset: offset)
+ } catch let error as ArrowError {
+ return .failure(error)
+ } catch {
+ return .failure(.unknownError("Unexpected error:
\(error)"))
+ }
+ case .schema:
+ schemaMessage = message.header(type:
org_apache_arrow_flatbuf_Schema.self)!
+ let schemaResult = loadSchema(schemaMessage!)
+ switch schemaResult {
+ case .success(let schema):
+ result.schema = schema
+ case .failure(let error):
+ return .failure(error)
+ }
+ offset += Int(message.bodyLength + Int64(length))
+ length = getUInt32(fileData, offset: offset)
+ default:
+ return .failure(.unknownError("Unhandled header type:
\(message.headerType)"))
+ }
+ }
+ return .success(result)
+ }
+
+ /*
+ The File stream format supports random accessing the data. This format
contains
+ a header and footer around the streaming format.
+ */
+ public func fromFileStream( // swiftlint:disable:this function_body_length
Review Comment:
Can we use different name for this? This may be confused named because
Apache Arrow specification uses:
* "IPC Streaming Format"
https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
* "IPC File Format"
https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format
If we use "File" and "Stream" in this method name, users may think that this
is for "IPC Streaming Format" that is stored in a file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]