kou commented on code in PR #36517:
URL: https://github.com/apache/arrow/pull/36517#discussion_r1263349172


##########
cpp/src/arrow/flight/test_definitions.cc:
##########
@@ -118,6 +123,24 @@ void ConnectivityTest::TestBrokenConnection() {
 //------------------------------------------------------------
 // Tests of data plane methods
 
+namespace {
+class GetFlightInfoListener : public AsyncListener<FlightInfo> {
+ public:
+  void OnNext(FlightInfo message) override { info = std::move(message); }
+  void OnFinish(TransportStatus status) override {
+    if (status.ok()) {
+      future.MarkFinished(std::move(info));
+    } else {
+      std::cout << status.ToStatus().ToString() << std::endl;

Review Comment:
   Is this a debug print?



##########
cpp/src/arrow/flight/test_definitions.cc:
##########
@@ -1483,6 +1544,104 @@ void ErrorHandlingTest::TearDownTest() {
   ASSERT_OK(server_->Shutdown());
 }
 
+void ErrorHandlingTest::TestAsyncGetFlightInfo() {
+  if (!supports_async()) {
+    GTEST_SKIP() << "Transport does not support async";
+  }
+  // Server-side still does all the junk around trying to translate Arrow
+  // status codes, so this test is a little indirect
+
+  for (const auto code : kStatusCodes) {
+    ARROW_SCOPED_TRACE("C++ status code: ", static_cast<int>(code));

Review Comment:
   We may want to show string representation of the code too:
   
   ```suggestion
       ARROW_SCOPED_TRACE("C++ status code: ", static_cast<int>(code), ": ", 
Status::CodeAsString(code));
   ```



##########
cpp/src/arrow/flight/transport/grpc/grpc_client.cc:
##########
@@ -564,6 +568,78 @@ class GrpcResultStream : public ResultStream {
   std::unique_ptr<::grpc::ClientReader<pb::Result>> stream_;
 };
 
+// Helpers for implementing the async versions of the calls
+
+arrow::Result<FlightInfo> ToAsyncResult(const pb::FlightInfo& proto) {
+  FlightInfo::Data info_data;
+  RETURN_NOT_OK(internal::FromProto(proto, &info_data));
+  return FlightInfo(std::move(info_data));
+}

Review Comment:
   Can we share this code in existing sync version of `GetFlightInfo()`?



##########
cpp/src/arrow/flight/client.cc:
##########
@@ -620,6 +621,17 @@ Status FlightClient::GetFlightInfo(const 
FlightCallOptions& options,
   return GetFlightInfo(options, descriptor).Value(info);
 }
 
+void FlightClient::GetFlightInfo(const FlightCallOptions& options,
+                                 const FlightDescriptor& descriptor,
+                                 std::shared_ptr<AsyncListener<FlightInfo>> 
listener) {
+  if (auto status = CheckOpen(); !status.ok()) {
+    listener->OnFinish(
+        TransportStatus{TransportStatusCode::kInternal, status.ToString()});

Review Comment:
   I see. Then how about using `TransportStatus::FromStatus(status)` here?
   
   ```suggestion
       listener->OnFinish(TransportStatus::FromStatus(status));
   ```



##########
cpp/src/arrow/flight/transport/grpc/grpc_client.cc:
##########
@@ -564,6 +568,78 @@ class GrpcResultStream : public ResultStream {
   std::unique_ptr<::grpc::ClientReader<pb::Result>> stream_;
 };
 
+// Helpers for implementing the async versions of the calls
+
+arrow::Result<FlightInfo> ToAsyncResult(const pb::FlightInfo& proto) {
+  FlightInfo::Data info_data;
+  RETURN_NOT_OK(internal::FromProto(proto, &info_data));
+  return FlightInfo(std::move(info_data));
+}
+
+/// Force destruction to wait for RPC completion.
+class FinishedFlag {
+ public:
+  ~FinishedFlag() { Wait(); }
+
+  void Finish() {
+    std::lock_guard<std::mutex> guard(mutex_);
+    finished_ = true;
+    cv_.notify_all();
+  }
+  void Wait() const {
+    std::unique_lock<std::mutex> guard(mutex_);
+    cv_.wait(guard, [&]() { return finished_; });
+  }
+
+ private:
+  mutable std::mutex mutex_;
+  mutable std::condition_variable cv_;
+  bool finished_{false};
+};
+
+template <typename Result, typename Request, typename Response>
+class UnaryUnaryAsyncCall : public ::grpc::ClientUnaryReactor, public 
internal::AsyncRpc {
+ public:
+  ClientRpc rpc;
+  std::shared_ptr<AsyncListener<Result>> listener;
+  FinishedFlag finished;
+  Request pb_request;
+  Response pb_response;
+  Status client_status;
+
+  explicit UnaryUnaryAsyncCall(const FlightCallOptions& options,
+                               std::shared_ptr<AsyncListener<Result>> listener)
+      : rpc(options), listener(std::move(listener)) {}
+
+  void TryCancel() override { rpc.context.TryCancel(); }
+
+  void OnDone(const ::grpc::Status& status) override {
+    if (status.ok()) {
+      auto result = ToAsyncResult(pb_response);
+      client_status = result.status();
+      if (client_status.ok()) {
+        listener->OnNext(std::move(result).MoveValueUnsafe());
+      }
+    }
+    Finish(status);
+  }
+
+  void Finish(const ::grpc::Status& status) {
+    finished.Finish();
+    auto listener = std::move(this->listener);
+    listener->OnFinish(
+        CombinedTransportStatus(status, std::move(client_status), 
&rpc.context));
+    flight::internal::ClientTransport::SetAsyncRpc(listener.get(), nullptr);
+  }
+};
+
+#define LISTENER_NOT_OK(LISTENER, EXPR)                                        
    \
+  if (auto arrow_status = (EXPR); !arrow_status.ok()) {                        
    \
+    (LISTENER)->OnFinish(                                                      
    \
+        TransportStatus{TransportStatusCode::kInternal, 
arrow_status.ToString()}); \

Review Comment:
   ```suggestion
       (LISTENER)->OnFinish(TransportStatus::FromStatus(arrow_status));         
      \
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to