kou commented on code in PR #37764:
URL: https://github.com/apache/arrow/pull/37764#discussion_r1330664657


##########
swift/ArrowFlight/Sources/ArrowFlight/FlightClient.swift:
##########
@@ -28,6 +28,40 @@ public class FlightClient {
         client = Arrow_Flight_Protocol_FlightServiceAsyncClient(channel: 
channel)
     }
     
+    private func readMessages(_ responseStream: 
GRPCAsyncResponseStream<Arrow_Flight_Protocol_FlightData>) async throws -> 
ArrowReader.ArrowReaderResult {
+        let reader = ArrowReader()
+        let arrowResult = ArrowReader.MakeArrowReaderResult();
+        for try await data in responseStream {
+            switch reader.fromMessage(data.dataHeader, dataBody: 
data.dataBody, result: arrowResult) {
+            case .success(_):
+                continue;
+            case .failure(let error):
+                throw error
+            }
+        }
+        
+        return arrowResult;
+    }
+    
+    private func writeBatches(_ requestStream: 
GRPCAsyncRequestStreamWriter<Arrow_Flight_Protocol_FlightData>, descriptor: 
FlightDescriptor, recordBatchs: [RecordBatch]) async throws {
+        let writer = ArrowWriter()
+        switch writer.toMessage(recordBatchs[0].schema) {
+        case .success(let schemaData):
+            try await requestStream.send(FlightData(schemaData, dataBody: 
Data(), flightDescriptor: descriptor).toProtocol())
+            for rc in recordBatchs {
+                switch writer.toMessage(rc) {

Review Comment:
   `rc` -> `rb` ("R"ecord "B"atch) or `rc` -> `recordBatch` may be better.



##########
swift/Arrow/Sources/Arrow/ArrowWriter.swift:
##########
@@ -38,7 +38,7 @@ public class ArrowWriter {
             self.data.append(data)
         }
     }
-
+    

Review Comment:
   Are this trailing spaces change and other similar changes intentional?



##########
swift/ArrowFlight/Sources/ArrowFlight/RecordBatchStreamReader.swift:
##########
@@ -32,29 +33,35 @@ public class RecordBatchStreamReader: AsyncSequence, 
AsyncIteratorProtocol {
         self.streamIterator = self.stream.makeAsyncIterator()
     }
     
-    public func next() async throws -> Arrow.RecordBatch? {
+    public func next() async throws -> (Arrow.RecordBatch?, 
FlightDescriptor?)? {
         guard !Task.isCancelled else {
             return nil
         }
 
         if batchIndex < batches.count {
             let batch = batches[batchIndex]
             batchIndex += 1
-            return batch
+            return (batch, descriptor)
         }
          
+        let result = ArrowReader.MakeArrowReaderResult()
         while true {
-            let flightData = try await self.streamIterator.next()
-            if flightData == nil {
+            let streamData = try await self.streamIterator.next()
+            if streamData == nil {
                 return nil
             }
             
-            let data = (flightData as! 
Arrow_Flight_Protocol_FlightData).dataBody
-            switch reader.fromStream(data) {
-            case .success(let rbResult):
-                batches = rbResult.batches
-                batchIndex = 1
-                return batches[0]
+            let flightData = (streamData as! Arrow_Flight_Protocol_FlightData)
+            let dataBody = flightData.dataBody
+            let dataHeader = flightData.dataHeader
+            descriptor = FlightDescriptor(flightData.flightDescriptor)
+            switch reader.fromMessage(dataHeader, dataBody: dataBody, result: 
result) {
+            case .success(()):
+                if(result.batches.count > 0 ) {

Review Comment:
   ```suggestion
                   if result.batches.count > 0 {
   ```



##########
swift/ArrowFlight/Tests/ArrowFlightTests/FlightTest.swift:
##########
@@ -97,28 +102,33 @@ final class MyFlightServer : ArrowFlightServer {
     func getSchema(_ request: ArrowFlight.FlightDescriptor) async throws -> 
ArrowFlight.FlightSchemaResult {
         XCTAssertEqual(String(bytes: request.cmd, encoding: .utf8)!, "schema 
info")
         XCTAssertEqual(request.type, .cmd)
-        return try 
ArrowFlight.FlightSchemaResult(schemaToArrowStream(makeSchema()))
+        return try 
ArrowFlight.FlightSchemaResult(schemaToMessage(makeSchema()))
     }
     
     func getFlightInfo(_ request: ArrowFlight.FlightDescriptor) async throws 
-> ArrowFlight.FlightInfo {
-        return ArrowFlight.FlightInfo(Data())
+        let key = String(decoding: request.cmd, as: UTF8.self)
+        if(flights[key] != nil) {

Review Comment:
   ```suggestion
           if flights[key] != nil {
   ```



##########
swift/Arrow/Sources/Arrow/ArrowReader.swift:
##########
@@ -200,4 +201,41 @@ public class ArrowReader {
             return .failure(.unknownError("Error loading file: \(error)"))
         }
     }
+
+    static public func MakeArrowReaderResult() -> ArrowReaderResult{

Review Comment:
   ```suggestion
       static public func MakeArrowReaderResult() -> ArrowReaderResult {
   ```



##########
swift/ArrowFlight/Tests/ArrowFlightTests/FlightTest.swift:
##########
@@ -177,87 +187,104 @@ public class FlightClientTester {
         })
         
         XCTAssertEqual(actionTypes.count, 2)
-        XCTAssertEqual(actionTypes[0].type, "type1")
-        XCTAssertEqual(actionTypes[0].description, "desc1")
-        XCTAssertEqual(actionTypes[1].type, "type2")
-        XCTAssertEqual(actionTypes[1].description, "desc2")
+        
+        XCTAssertEqual(actionTypes[0].type, "clear")
+        XCTAssertEqual(actionTypes[0].description, "Clear the stored flights.")
+        XCTAssertEqual(actionTypes[1].type, "shutdown")
+        XCTAssertEqual(actionTypes[1].description, "Shut down this server.")
     }
     
     func listFlightsTest() async throws {
         let flightCriteria = FlightCriteria("flight criteria 
expression".data(using: .utf8)!)
         var num_calls = 0
         try await client?.listFlights(flightCriteria, closure: { data in
-            num_calls += 1
-            let schema = try streamToArrowSchema(data.schema)
-            XCTAssertEqual(schema.fields.count, 3)
+            if let schema = data.schema {
+                XCTAssertGreaterThanOrEqual(schema.fields.count, 0)
+                num_calls += 1
+            }
         })
         
-        XCTAssertEqual(num_calls, 1)
+        XCTAssertEqual(num_calls, 2)
     }
     
-    func doActionTest() async throws {
-        let action = FlightAction("test_action", body: "test_action 
body".data(using: .utf8)!)
+    func doActionTest(_ type: String, actionBody: Data) async throws {
+        let action = FlightAction(type, body: actionBody)
         var actionResults = [FlightResult]()
         try await client?.doAction(action, closure: { result in
             actionResults.append(result)
         })
         
-        XCTAssertEqual(actionResults.count, 1)
-        XCTAssertEqual(String(bytes:actionResults[0].body, encoding: .utf8), 
"test_action result")
+        XCTAssertEqual(actionResults.count, 0)

Review Comment:
   It seems that this removes a test case for a `DoAction` that returns a 
result.
   Is it intentional?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to