lidavidm commented on code in PR #36517:
URL: https://github.com/apache/arrow/pull/36517#discussion_r1265722198
##########
cpp/src/arrow/flight/transport/grpc/grpc_client.cc:
##########
@@ -559,6 +562,70 @@ class GrpcResultStream : public ResultStream {
std::unique_ptr<::grpc::ClientReader<pb::Result>> stream_;
};
+/// Force destruction to wait for RPC completion.
+class FinishedFlag {
+ public:
+ ~FinishedFlag() { Wait(); }
+
+ void Finish() {
+ std::lock_guard<std::mutex> guard(mutex_);
+ finished_ = true;
+ cv_.notify_all();
+ }
+ void Wait() const {
+ std::unique_lock<std::mutex> guard(mutex_);
+ cv_.wait(guard, [&]() { return finished_; });
+ }
+
+ private:
+ mutable std::mutex mutex_;
+ mutable std::condition_variable cv_;
+ bool finished_{false};
+};
+
+template <typename Result, typename Request, typename Response>
+class UnaryUnaryAsyncCall : public ::grpc::ClientUnaryReactor, public
internal::AsyncRpc {
+ public:
+ ClientRpc rpc;
+ std::shared_ptr<AsyncListener<Result>> listener;
+ FinishedFlag finished;
+ Request pb_request;
+ Response pb_response;
+ Status client_status;
+
+ explicit UnaryUnaryAsyncCall(const FlightCallOptions& options,
+ std::shared_ptr<AsyncListener<Result>> listener)
+ : rpc(options), listener(std::move(listener)) {}
+
+ void TryCancel() override { rpc.context.TryCancel(); }
+
+ void OnDone(const ::grpc::Status& status) override {
+ if (status.ok()) {
+ auto result = internal::FromProto(pb_response);
+ client_status = result.status();
+ if (client_status.ok()) {
+ listener->OnNext(std::move(result).MoveValueUnsafe());
+ }
+ }
+ Finish(status);
+ }
+
+ void Finish(const ::grpc::Status& status) {
+ finished.Finish();
Review Comment:
It has to be the second-to-last step since the explicit `SetAsyncRpc` might
trigger destruction, but otherwise fair.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]