This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bcda5eff KUDU-3524 Fix crash when sending periodic keep-alive requests
6bcda5eff is described below

commit 6bcda5eff94ea7c7f96c38d67ade3f83111e6743
Author: xinghuayu007 <[email protected]>
AuthorDate: Wed Nov 29 15:45:16 2023 +0800

    KUDU-3524 Fix crash when sending periodic keep-alive requests
    
    Currently, Kudu client applications on macOS crash upon calling
    StartKeepAlivePeriodically(), see KUDU-3524 for details. That's
    because a PeriodicTimer was used to send keep-alive requests in
    a synchronous manner, while attempting to wait for the response
    on a reactor thread. However, reactor threads do not allow for
    waiting.
    
    This patch uses 'ScannerKeepAliveAysnc()', an asynchronous
    interface to send keep-alive requests to avoid this problem.
    
    Change-Id: I130db970a091cdf7689245a79dc4ea445d1f739f
    Reviewed-on: http://gerrit.cloudera.org:8080/20739
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/client/scanner-internal.cc | 26 ++++++++++++++++++++++++--
 src/kudu/client/scanner-internal.h  | 16 ++++++++++++++++
 2 files changed, 40 insertions(+), 2 deletions(-)

diff --git a/src/kudu/client/scanner-internal.cc 
b/src/kudu/client/scanner-internal.cc
index 9d4294798..23754e709 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -100,6 +100,14 @@ KuduScanner::Data::~Data() {
   }
 }
 
+void KuduScanner::Data::KeepAliveResponseCallback::Run() {
+  if (!controller.status().ok()) {
+    LOG(WARNING) << Substitute("Failed to send keep-alive request: $0",
+                               controller.status().ToString());
+  }
+  delete this;
+}
+
 Status KuduScanner::Data::StartKeepAlivePeriodically(uint64_t 
keep_alive_interval_ms,
                                                      
std::shared_ptr<Messenger> messenger) {
   if (!open_) return Status::IllegalState("Scanner was not open.");
@@ -107,9 +115,23 @@ Status 
KuduScanner::Data::StartKeepAlivePeriodically(uint64_t keep_alive_interva
     return Status::OK();
   }
   keep_alive_timer_ = PeriodicTimer::Create(
-      messenger,
+      std::move(messenger),
       [&]() {
-        return KeepAlive();
+        if (!open_) return Status::IllegalState("Scanner was not open.");
+        // If there is no scanner to keep alive, we still return Status::OK().
+        if (!last_response_.IsInitialized() || 
!last_response_.has_more_results() ||
+            !next_req_.has_scanner_id()) {
+          return Status::OK();
+        }
+        // 'cb' deletes itself upon completion.
+        auto* cb = new KeepAliveResponseCallback();
+        cb->request.set_scanner_id(next_req_.scanner_id());
+        cb->controller.set_timeout(configuration_.timeout());
+        // PeriodicTimer does not allow waiting, so calling 
ScannerKeepAliveAsync()
+        // instead of ScannerKeepAlive().
+        proxy_->ScannerKeepAliveAsync(cb->request, &cb->response, 
&cb->controller,
+                                      [cb]() { cb->Run(); });
+        return Status::OK();
       },
       MonoDelta::FromMilliseconds(keep_alive_interval_ms));
   keep_alive_timer_->Start();
diff --git a/src/kudu/client/scanner-internal.h 
b/src/kudu/client/scanner-internal.h
index f647c6470..4dd9334ca 100644
--- a/src/kudu/client/scanner-internal.h
+++ b/src/kudu/client/scanner-internal.h
@@ -284,6 +284,22 @@ class KuduScanner::Data {
   std::string DebugString() const;
 
  private:
+  // Callback for sending keep alive requests asynchronously.
+  struct KeepAliveResponseCallback {
+   public:
+    KeepAliveResponseCallback() = default;
+
+    void Run();
+
+    tserver::ScannerKeepAliveRequestPB request;
+    tserver::ScannerKeepAliveResponsePB response;
+    rpc::RpcController controller;
+
+   private:
+    // Prevent instances of this class from being allocated on the stack.
+    ~KeepAliveResponseCallback() = default;
+  };
+
   // Analyze the response of the last Scan RPC made by this scanner.
   //
   // The error handling of a scan RPC is fairly complex, since we have to 
handle

Reply via email to