pitrou commented on code in PR #36517:
URL: https://github.com/apache/arrow/pull/36517#discussion_r1265312125
##########
cpp/src/arrow/flight/flight_test.cc:
##########
@@ -1021,6 +1029,27 @@ TEST_F(TestFlightClient, RoundTripStatus) {
const auto descr = FlightDescriptor::Command("status-outofmemory");
const auto status = client_->GetFlightInfo(descr).status();
ASSERT_RAISES(OutOfMemory, status);
+
+ class Listener : public AsyncListener<FlightInfo> {
+ public:
+ void OnNext(FlightInfo info) override { info_ = std::move(info); }
Review Comment:
Should we also count the number of times that `OnNext` is called?
##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class ARROW_FLIGHT_EXPORT AsyncListenerBase {
Review Comment:
Should make these classes EXPERIMENTAL for now, perhaps?
##########
cpp/src/arrow/flight/serialization_internal.h:
##########
@@ -59,7 +59,7 @@ Status FromProto(const pb::FlightDescriptor& pb_descr,
FlightDescriptor* descr);
Status FromProto(const pb::FlightEndpoint& pb_endpoint, FlightEndpoint*
endpoint);
Status FromProto(const pb::RenewFlightEndpointRequest& pb_request,
RenewFlightEndpointRequest* request);
-Status FromProto(const pb::FlightInfo& pb_info, FlightInfo::Data* info);
+arrow::Result<FlightInfo> FromProto(const pb::FlightInfo& pb_info);
Review Comment:
Is there an issue open to tackle other `FromProto` helpers?
##########
cpp/src/arrow/flight/transport.h:
##########
@@ -64,8 +64,11 @@
#include <vector>
#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
#include "arrow/flight/visibility.h"
+#include "arrow/ipc/options.h"
#include "arrow/type_fwd.h"
+#include "arrow/util/future.h"
Review Comment:
Is this header unused?
##########
cpp/src/arrow/flight/transport.h:
##########
@@ -182,6 +185,9 @@ class ARROW_FLIGHT_EXPORT ClientTransport {
virtual Status GetFlightInfo(const FlightCallOptions& options,
const FlightDescriptor& descriptor,
std::unique_ptr<FlightInfo>* info);
+ virtual void GetFlightInfo(const FlightCallOptions& options,
Review Comment:
Would call it `GetFlightInfoAsync` for clarity, especially at call sites.
##########
cpp/src/arrow/flight/flight_test.cc:
##########
@@ -1021,6 +1029,27 @@ TEST_F(TestFlightClient, RoundTripStatus) {
const auto descr = FlightDescriptor::Command("status-outofmemory");
const auto status = client_->GetFlightInfo(descr).status();
ASSERT_RAISES(OutOfMemory, status);
+
+ class Listener : public AsyncListener<FlightInfo> {
+ public:
+ void OnNext(FlightInfo info) override { info_ = std::move(info); }
+
+ void OnFinish(TransportStatus status) override {
+ if (status.ok()) {
Review Comment:
Perhaps also `EXPECT_FALSE(future_)`?
##########
cpp/src/arrow/flight/test_definitions.cc:
##########
@@ -118,6 +123,23 @@ void ConnectivityTest::TestBrokenConnection() {
//------------------------------------------------------------
// Tests of data plane methods
+namespace {
+class GetFlightInfoListener : public AsyncListener<FlightInfo> {
+ public:
+ void OnNext(FlightInfo message) override { info = std::move(message); }
Review Comment:
We probably want to count the number of calls to `OnNext` and `OnFinish`.
##########
cpp/src/arrow/flight/client.h:
##########
@@ -301,6 +301,17 @@ class ARROW_FLIGHT_EXPORT FlightClient {
return GetFlightInfo({}, descriptor);
}
+ /// \brief Asynchronous GetFlightInfo.
+ /// \param[in] options Per-RPC options
+ /// \param[in] descriptor the dataset request
+ /// \param[in] listener Callbacks for response and RPC completion
+ void GetFlightInfo(const FlightCallOptions& options, const FlightDescriptor&
descriptor,
+ std::shared_ptr<AsyncListener<FlightInfo>> listener);
+ void GetFlightInfo(const FlightDescriptor& descriptor,
Review Comment:
Naming question: do we want to make sure that async methods stand out? This
could be achieved by naming this `GetFlightInfoAsync`.
@westonpace WDYT?
##########
cpp/src/arrow/flight/test_definitions.cc:
##########
@@ -670,7 +699,7 @@ void DoPutTest::TearDownTest() {
}
void DoPutTest::CheckBatches(const FlightDescriptor& expected_descriptor,
const RecordBatchVector& expected_batches) {
- auto* do_put_server = (DoPutTestServer*)server_.get();
+ auto* do_put_server = static_cast<DoPutTestServer*>(server_.get());
Review Comment:
`checked_cast` perhaps?
##########
cpp/src/arrow/flight/test_definitions.cc:
##########
@@ -145,6 +167,13 @@ void DataTest::CheckDoGet(
ASSERT_OK_AND_ASSIGN(auto info, client_->GetFlightInfo(descr));
check_endpoints(info->endpoints());
+ if (supports_async()) {
+ auto listener = std::make_shared<GetFlightInfoListener>();
+ client_->GetFlightInfo(descr, listener);
+ ASSERT_FINISHES_OK(listener->future);
Review Comment:
This begs the question: do we want to a client `GetFlightInfoAsync` overload
that returns a `Future` directly?
I presume that, at some point, `AsyncListener` may grow other capabilities,
but simple uses might be content with getting a `Future`...
##########
cpp/src/arrow/flight/transport/grpc/util_internal.cc:
##########
@@ -20,7 +20,9 @@
#include <cstdlib>
#include <map>
#include <memory>
+#include <optional>
#include <string>
+#include "arrow/util/string_builder.h"
Review Comment:
Move this with other Arrow includes below.
##########
cpp/src/arrow/flight/test_definitions.cc:
##########
@@ -1469,6 +1518,17 @@ class ErrorHandlingTestServer : public FlightServerBase {
return MakeFlightError(FlightStatusCode::Unauthorized, "Unauthorized",
"extra info");
}
};
+
+class TransportStatusListener : public AsyncListener<FlightInfo> {
Review Comment:
Why not reuse `GetFlightInfoListener`?
##########
cpp/src/arrow/flight/transport/grpc/grpc_client.cc:
##########
@@ -559,6 +562,70 @@ class GrpcResultStream : public ResultStream {
std::unique_ptr<::grpc::ClientReader<pb::Result>> stream_;
};
+/// Force destruction to wait for RPC completion.
+class FinishedFlag {
+ public:
+ ~FinishedFlag() { Wait(); }
+
+ void Finish() {
+ std::lock_guard<std::mutex> guard(mutex_);
+ finished_ = true;
+ cv_.notify_all();
+ }
+ void Wait() const {
+ std::unique_lock<std::mutex> guard(mutex_);
+ cv_.wait(guard, [&]() { return finished_; });
+ }
+
+ private:
+ mutable std::mutex mutex_;
+ mutable std::condition_variable cv_;
+ bool finished_{false};
+};
+
+template <typename Result, typename Request, typename Response>
+class UnaryUnaryAsyncCall : public ::grpc::ClientUnaryReactor, public
internal::AsyncRpc {
+ public:
+ ClientRpc rpc;
+ std::shared_ptr<AsyncListener<Result>> listener;
+ FinishedFlag finished;
+ Request pb_request;
+ Response pb_response;
+ Status client_status;
+
+ explicit UnaryUnaryAsyncCall(const FlightCallOptions& options,
+ std::shared_ptr<AsyncListener<Result>> listener)
+ : rpc(options), listener(std::move(listener)) {}
+
+ void TryCancel() override { rpc.context.TryCancel(); }
+
+ void OnDone(const ::grpc::Status& status) override {
+ if (status.ok()) {
+ auto result = internal::FromProto(pb_response);
+ client_status = result.status();
+ if (client_status.ok()) {
+ listener->OnNext(std::move(result).MoveValueUnsafe());
+ }
+ }
+ Finish(status);
+ }
+
+ void Finish(const ::grpc::Status& status) {
+ finished.Finish();
Review Comment:
I think you want this to be the very last step here? Otherwise `this` might
finish destructing just now, and accesses below will be invalid.
##########
cpp/src/arrow/flight/transport/grpc/grpc_client.cc:
##########
@@ -806,12 +872,26 @@ class GrpcClientImpl : public internal::ClientTransport {
stub_->GetFlightInfo(&rpc.context, pb_descriptor, &pb_response),
&rpc.context);
RETURN_NOT_OK(s);
- FlightInfo::Data info_data;
- RETURN_NOT_OK(internal::FromProto(pb_response, &info_data));
+ ARROW_ASSIGN_OR_RAISE(auto info_data, internal::FromProto(pb_response));
+ *info = std::make_unique<FlightInfo>(std::move(info_data));
info->reset(new FlightInfo(std::move(info_data)));
return Status::OK();
}
+ void GetFlightInfo(const FlightCallOptions& options, const FlightDescriptor&
descriptor,
+ std::shared_ptr<AsyncListener<FlightInfo>> listener)
override {
+ using AsyncCall =
+ UnaryUnaryAsyncCall<FlightInfo, pb::FlightDescriptor, pb::FlightInfo>;
+ auto call = std::make_unique<AsyncCall>(options, listener);
+ LISTENER_NOT_OK(listener, internal::ToProto(descriptor,
&call->pb_request));
+ LISTENER_NOT_OK(listener, call->rpc.SetToken(auth_handler_.get()));
+
+ stub_->experimental_async()->GetFlightInfo(&call->rpc.context,
&call->pb_request,
+ &call->pb_response, call.get());
+ ClientTransport::SetAsyncRpc(listener.get(), std::move(call));
+
static_cast<AsyncCall*>(ClientTransport::GetAsyncRpc(listener.get()))->StartCall();
Review Comment:
`checked_cast`?
##########
cpp/src/arrow/flight/client.h:
##########
@@ -301,6 +301,17 @@ class ARROW_FLIGHT_EXPORT FlightClient {
return GetFlightInfo({}, descriptor);
}
+ /// \brief Asynchronous GetFlightInfo.
+ /// \param[in] options Per-RPC options
+ /// \param[in] descriptor the dataset request
+ /// \param[in] listener Callbacks for response and RPC completion
+ void GetFlightInfo(const FlightCallOptions& options, const FlightDescriptor&
descriptor,
+ std::shared_ptr<AsyncListener<FlightInfo>> listener);
+ void GetFlightInfo(const FlightDescriptor& descriptor,
Review Comment:
Also, mark these methods EXPERIMENTAL for now?
##########
cpp/src/arrow/flight/transport/grpc/grpc_client.cc:
##########
@@ -559,6 +562,70 @@ class GrpcResultStream : public ResultStream {
std::unique_ptr<::grpc::ClientReader<pb::Result>> stream_;
};
+/// Force destruction to wait for RPC completion.
+class FinishedFlag {
+ public:
+ ~FinishedFlag() { Wait(); }
+
+ void Finish() {
+ std::lock_guard<std::mutex> guard(mutex_);
+ finished_ = true;
+ cv_.notify_all();
+ }
+ void Wait() const {
+ std::unique_lock<std::mutex> guard(mutex_);
+ cv_.wait(guard, [&]() { return finished_; });
+ }
+
+ private:
+ mutable std::mutex mutex_;
+ mutable std::condition_variable cv_;
+ bool finished_{false};
+};
+
+template <typename Result, typename Request, typename Response>
+class UnaryUnaryAsyncCall : public ::grpc::ClientUnaryReactor, public
internal::AsyncRpc {
+ public:
+ ClientRpc rpc;
+ std::shared_ptr<AsyncListener<Result>> listener;
+ FinishedFlag finished;
Review Comment:
You should probably make this the last member, so that other members are
destroyed only after `finished` has been notified?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]