This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new ad63158e74 GH-37884: [Swift] allow reading of unaligned FlatBuffers 
buffers (#38635)
ad63158e74 is described below

commit ad63158e74d903c263c51cd0207cf77f8aa12ede
Author: abandy <[email protected]>
AuthorDate: Fri Dec 8 14:50:22 2023 -0700

    GH-37884: [Swift] allow reading of unaligned FlatBuffers buffers (#38635)
    
    The PR enables the swift readers to read from unaligned buffers (fix for 
issue: 37884)
    
    Enabling unaligned buffers incurs a performance penalty so the developer 
will need to consider this when enabling this feature.
    
    It is not currently possible to recover from a buffer unaligned error as 
this error is a fatalError so trying aligned and then falling back to unaligned 
is not an option.  Also, FlatBuffers has a verifier that should be able to 
catch this error but currently it seems to fail on both aligned and unaligned 
buffers (I tried verifying the example python server get return value but 
verification fails even though the buffers are able to be read successfully)
    * Closes: #37884
    
    Authored-by: Alva Bandy <[email protected]>
    Signed-off-by: Sutou Kouhei <[email protected]>
---
 swift/Arrow/Package.swift                              |  6 +++++-
 swift/Arrow/Sources/Arrow/ArrowReader.swift            | 18 +++++++++++++-----
 .../ArrowFlight/Sources/ArrowFlight/FlightClient.swift | 11 +++++++++--
 .../ArrowFlight/Sources/ArrowFlight/FlightServer.swift |  7 +++++++
 .../Sources/ArrowFlight/RecordBatchStreamReader.swift  | 11 +++++++++--
 5 files changed, 43 insertions(+), 10 deletions(-)

diff --git a/swift/Arrow/Package.swift b/swift/Arrow/Package.swift
index 065afe6264..946eb999c7 100644
--- a/swift/Arrow/Package.swift
+++ b/swift/Arrow/Package.swift
@@ -32,7 +32,11 @@ let package = Package(
             targets: ["Arrow"]),
     ],
     dependencies: [
-        .package(url: "https://github.com/google/flatbuffers.git";, from: 
"23.3.3")
+        // The latest version of flatbuffers v23.5.26 was built in May 26, 2023
+        // and therefore doesn't include the unaligned buffer swift changes.
+        // This can be changed back to using the tag once a new version of
+        // flatbuffers has been released.
+        .package(url: "https://github.com/google/flatbuffers.git";, branch: 
"master")
     ],
     targets: [
         // Targets are the basic building blocks of a package. A target can 
define a module or a test suite.
diff --git a/swift/Arrow/Sources/Arrow/ArrowReader.swift 
b/swift/Arrow/Sources/Arrow/ArrowReader.swift
index ef995b1805..d9dc1bdb47 100644
--- a/swift/Arrow/Sources/Arrow/ArrowReader.swift
+++ b/swift/Arrow/Sources/Arrow/ArrowReader.swift
@@ -132,7 +132,8 @@ public class ArrowReader {
     }
 
     public func fromStream( // swiftlint:disable:this function_body_length
-        _ fileData: Data
+        _ fileData: Data,
+        useUnalignedBuffers: Bool = false
     ) -> Result<ArrowReaderResult, ArrowError> {
         let footerLength = fileData.withUnsafeBytes { rawBuffer in
             rawBuffer.loadUnaligned(fromByteOffset: fileData.count - 4, as: 
Int32.self)
@@ -141,7 +142,9 @@ public class ArrowReader {
         let result = ArrowReaderResult()
         let footerStartOffset = fileData.count - Int(footerLength + 4)
         let footerData = fileData[footerStartOffset...]
-        let footerBuffer = ByteBuffer(data: footerData)
+        let footerBuffer = ByteBuffer(
+            data: footerData,
+            allowReadingUnalignedBuffers: useUnalignedBuffers)
         let footer = org_apache_arrow_flatbuf_Footer.getRootAsFooter(bb: 
footerBuffer)
         let schemaResult = loadSchema(footer.schema!)
         switch schemaResult {
@@ -170,7 +173,9 @@ public class ArrowReader {
             let messageStartOffset = recordBatch.offset + 
(Int64(MemoryLayout<Int32>.size) * messageOffset)
             let messageEndOffset = messageStartOffset + Int64(messageLength)
             let recordBatchData = fileData[messageStartOffset ..< 
messageEndOffset]
-            let mbb = ByteBuffer(data: recordBatchData)
+            let mbb = ByteBuffer(
+                data: recordBatchData,
+                allowReadingUnalignedBuffers: useUnalignedBuffers)
             let message = 
org_apache_arrow_flatbuf_Message.getRootAsMessage(bb: mbb)
             switch message.headerType {
             case .recordbatch:
@@ -219,9 +224,12 @@ public class ArrowReader {
     public func fromMessage(
         _ dataHeader: Data,
         dataBody: Data,
-        result: ArrowReaderResult
+        result: ArrowReaderResult,
+        useUnalignedBuffers: Bool = false
     ) -> Result<Void, ArrowError> {
-        let mbb = ByteBuffer(data: dataHeader)
+        let mbb = ByteBuffer(
+            data: dataHeader,
+            allowReadingUnalignedBuffers: useUnalignedBuffers)
         let message = org_apache_arrow_flatbuf_Message.getRootAsMessage(bb: 
mbb)
         switch message.headerType {
         case .schema:
diff --git a/swift/ArrowFlight/Sources/ArrowFlight/FlightClient.swift 
b/swift/ArrowFlight/Sources/ArrowFlight/FlightClient.swift
index 7a572ceca5..ef3e4fa239 100644
--- a/swift/ArrowFlight/Sources/ArrowFlight/FlightClient.swift
+++ b/swift/ArrowFlight/Sources/ArrowFlight/FlightClient.swift
@@ -24,8 +24,11 @@ import Arrow
 
 public class FlightClient {
     let client: Arrow_Flight_Protocol_FlightServiceAsyncClient
-    public init(channel: GRPCChannel) {
+    let allowReadingUnalignedBuffers: Bool
+
+    public init(channel: GRPCChannel, allowReadingUnalignedBuffers: Bool = 
false ) {
         client = Arrow_Flight_Protocol_FlightServiceAsyncClient(channel: 
channel)
+        self.allowReadingUnalignedBuffers = allowReadingUnalignedBuffers
     }
 
     private func readMessages(
@@ -34,7 +37,11 @@ public class FlightClient {
         let reader = ArrowReader()
         let arrowResult = ArrowReader.makeArrowReaderResult()
         for try await data in responseStream {
-            switch reader.fromMessage(data.dataHeader, dataBody: 
data.dataBody, result: arrowResult) {
+            switch reader.fromMessage(
+                data.dataHeader,
+                dataBody: data.dataBody,
+                result: arrowResult,
+                useUnalignedBuffers: allowReadingUnalignedBuffers) {
             case .success:
                 continue
             case .failure(let error):
diff --git a/swift/ArrowFlight/Sources/ArrowFlight/FlightServer.swift 
b/swift/ArrowFlight/Sources/ArrowFlight/FlightServer.swift
index a34bf5c0ac..19644d632e 100644
--- a/swift/ArrowFlight/Sources/ArrowFlight/FlightServer.swift
+++ b/swift/ArrowFlight/Sources/ArrowFlight/FlightServer.swift
@@ -63,6 +63,7 @@ public func schemaFromMessage(_ schemaData: Data) -> 
ArrowSchema? {
 }
 
 public protocol ArrowFlightServer: Sendable {
+    var allowReadingUnalignedBuffers: Bool { get }
     func listFlights(_ criteria: FlightCriteria, writer: 
FlightInfoStreamWriter) async throws
     func getFlightInfo(_ request: FlightDescriptor) async throws -> FlightInfo
     func getSchema(_ request: FlightDescriptor) async throws -> 
ArrowFlight.FlightSchemaResult
@@ -73,6 +74,12 @@ public protocol ArrowFlightServer: Sendable {
     func doExchange(_ reader: RecordBatchStreamReader, writer: 
RecordBatchStreamWriter) async throws
 }
 
+extension ArrowFlightServer {
+    var allowReadingUnalignedBuffers: Bool {
+        return false
+    }
+}
+
 public func makeFlightServer(_ handler: ArrowFlightServer) -> 
CallHandlerProvider {
   return InternalFlightServer(handler)
 }
diff --git 
a/swift/ArrowFlight/Sources/ArrowFlight/RecordBatchStreamReader.swift 
b/swift/ArrowFlight/Sources/ArrowFlight/RecordBatchStreamReader.swift
index 972d19435d..464752dbcb 100644
--- a/swift/ArrowFlight/Sources/ArrowFlight/RecordBatchStreamReader.swift
+++ b/swift/ArrowFlight/Sources/ArrowFlight/RecordBatchStreamReader.swift
@@ -27,10 +27,13 @@ public class RecordBatchStreamReader: AsyncSequence, 
AsyncIteratorProtocol {
     var descriptor: FlightDescriptor?
     var batchIndex = 0
     var streamIterator: any AsyncIteratorProtocol
+    var useUnalignedBuffers: Bool
     let stream: GRPC.GRPCAsyncRequestStream<Arrow_Flight_Protocol_FlightData>
-    init(_ stream: 
GRPC.GRPCAsyncRequestStream<Arrow_Flight_Protocol_FlightData>) {
+    init(_ stream: 
GRPC.GRPCAsyncRequestStream<Arrow_Flight_Protocol_FlightData>,
+         useUnalignedBuffers: Bool = false) {
         self.stream = stream
         self.streamIterator = self.stream.makeAsyncIterator()
+        self.useUnalignedBuffers = useUnalignedBuffers
     }
 
     public func next() async throws -> (Arrow.RecordBatch?, 
FlightDescriptor?)? {
@@ -55,7 +58,11 @@ public class RecordBatchStreamReader: AsyncSequence, 
AsyncIteratorProtocol {
             let dataBody = flightData.dataBody
             let dataHeader = flightData.dataHeader
             descriptor = FlightDescriptor(flightData.flightDescriptor)
-            switch reader.fromMessage(dataHeader, dataBody: dataBody, result: 
result) {
+            switch reader.fromMessage(
+                dataHeader,
+                dataBody: dataBody,
+                result: result,
+                useUnalignedBuffers: useUnalignedBuffers) {
             case .success(()):
                 if result.batches.count > 0 {
                     batches = result.batches

Reply via email to