lidavidm commented on code in PR #36517:
URL: https://github.com/apache/arrow/pull/36517#discussion_r1287201913


##########
cpp/src/arrow/flight/test_definitions.cc:
##########
@@ -1656,5 +1812,107 @@ void ErrorHandlingTest::TestDoExchange() {
   reader_thread.join();
 }
 
+//------------------------------------------------------------
+// Test async clients
+
+void AsyncClientTest::SetUpTest() {
+  if (!supports_async()) {
+    GTEST_SKIP() << "async is not supported";
+  }
+
+  ASSERT_OK_AND_ASSIGN(auto location, Location::ForScheme(transport(), 
"127.0.0.1", 0));
+
+  server_ = ExampleTestServer();
+  FlightServerOptions server_options(location);
+  ASSERT_OK(server_->Init(server_options));
+
+  std::string uri = location.scheme() + "://127.0.0.1:" + 
std::to_string(server_->port());
+  ASSERT_OK_AND_ASSIGN(auto real_location, Location::Parse(uri));
+  FlightClientOptions client_options = FlightClientOptions::Defaults();
+  ASSERT_OK_AND_ASSIGN(client_, FlightClient::Connect(real_location, 
client_options));
+
+  ASSERT_TRUE(client_->supports_async());
+}
+void AsyncClientTest::TearDownTest() {
+  if (supports_async()) {
+    ASSERT_OK(client_->Close());
+    ASSERT_OK(server_->Shutdown());
+  }
+}
+
+void AsyncClientTest::TestGetFlightInfo() {
+  class Listener : public AsyncListener<FlightInfo> {
+   public:
+    void OnNext(FlightInfo info) override {
+      info_ = std::move(info);
+      counter_++;
+    }
+
+    void OnFinish(Status status) override {
+      ASSERT_FALSE(future_.is_finished());
+      if (status.ok()) {
+        future_.MarkFinished(std::move(info_));
+      } else {
+        future_.MarkFinished(std::move(status));
+      }
+    }
+
+    int counter_ = 0;
+    FlightInfo info_ = FlightInfo(FlightInfo::Data());
+    arrow::Future<FlightInfo> future_ = arrow::Future<FlightInfo>::Make();
+  };
+
+  auto descr = FlightDescriptor::Command("status-outofmemory");
+  auto listener = std::make_shared<Listener>();
+  client_->GetFlightInfoAsync(descr, listener);
+
+  ASSERT_FINISHES_AND_RAISES(UnknownError, listener->future_);
+  ASSERT_THAT(listener->future_.status().ToString(), 
::testing::HasSubstr("Sentinel"));
+  ASSERT_EQ(0, listener->counter_);
+}
+void AsyncClientTest::TestGetFlightInfoFuture() {
+  auto descr = FlightDescriptor::Command("status-outofmemory");
+  auto future = client_->GetFlightInfoAsync(descr);
+  ASSERT_FINISHES_AND_RAISES(UnknownError, future);
+  ASSERT_THAT(future.status().ToString(), ::testing::HasSubstr("Sentinel"));
+
+  descr = FlightDescriptor::Command("my_command");
+  future = client_->GetFlightInfoAsync(descr);
+  ASSERT_FINISHES_OK_AND_ASSIGN(auto info, future);
+  // See test_util.cc:ExampleFlightInfo
+  ASSERT_EQ(descr, info.descriptor());
+  ASSERT_EQ(1000, info.total_records());
+  ASSERT_EQ(100000, info.total_bytes());
+}
+void AsyncClientTest::TestListenerLifetime() {
+  arrow::Future<FlightInfo> future = arrow::Future<FlightInfo>::Make();
+
+  class Listener : public AsyncListener<FlightInfo> {
+   public:
+    void OnNext(FlightInfo info) override { info_ = std::move(info); }
+
+    void OnFinish(Status status) override {
+      if (status.ok()) {
+        future_.MarkFinished(std::move(info_));
+      } else {
+        future_.MarkFinished(std::move(status));
+      }
+    }
+
+    FlightInfo info_ = FlightInfo(FlightInfo::Data());
+    arrow::Future<FlightInfo> future_;
+  };
+
+  // Bad client code: don't retain a reference to the listener
+  {
+    auto descr = FlightDescriptor::Command("my_command");
+    auto listener = std::make_shared<Listener>();
+    listener->future_ = future;
+    client_->GetFlightInfoAsync(descr, std::move(listener));
+  }
+
+  ASSERT_FINISHES_OK(future);
+}
+

Review Comment:
   We cannot handle closing the client, as explained in the comment below - 
_something_ has to own the resources at some point that isn't the Future itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to