This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 6bcda5eff KUDU-3524 Fix crash when sending periodic keep-alive requests
6bcda5eff is described below
commit 6bcda5eff94ea7c7f96c38d67ade3f83111e6743
Author: xinghuayu007 <[email protected]>
AuthorDate: Wed Nov 29 15:45:16 2023 +0800
KUDU-3524 Fix crash when sending periodic keep-alive requests
Currently, Kudu client applications on macOS crash upon calling
StartKeepAlivePeriodically(), see KUDU-3524 for details. That's
because a PeriodicTimer was used to send keep-alive requests in
a synchronous manner, while attempting to wait for the response
on a reactor thread. However, reactor threads do not allow for
waiting.
This patch uses 'ScannerKeepAliveAysnc()', an asynchronous
interface to send keep-alive requests to avoid this problem.
Change-Id: I130db970a091cdf7689245a79dc4ea445d1f739f
Reviewed-on: http://gerrit.cloudera.org:8080/20739
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/client/scanner-internal.cc | 26 ++++++++++++++++++++++++--
src/kudu/client/scanner-internal.h | 16 ++++++++++++++++
2 files changed, 40 insertions(+), 2 deletions(-)
diff --git a/src/kudu/client/scanner-internal.cc
b/src/kudu/client/scanner-internal.cc
index 9d4294798..23754e709 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -100,6 +100,14 @@ KuduScanner::Data::~Data() {
}
}
+void KuduScanner::Data::KeepAliveResponseCallback::Run() {
+ if (!controller.status().ok()) {
+ LOG(WARNING) << Substitute("Failed to send keep-alive request: $0",
+ controller.status().ToString());
+ }
+ delete this;
+}
+
Status KuduScanner::Data::StartKeepAlivePeriodically(uint64_t
keep_alive_interval_ms,
std::shared_ptr<Messenger> messenger) {
if (!open_) return Status::IllegalState("Scanner was not open.");
@@ -107,9 +115,23 @@ Status
KuduScanner::Data::StartKeepAlivePeriodically(uint64_t keep_alive_interva
return Status::OK();
}
keep_alive_timer_ = PeriodicTimer::Create(
- messenger,
+ std::move(messenger),
[&]() {
- return KeepAlive();
+ if (!open_) return Status::IllegalState("Scanner was not open.");
+ // If there is no scanner to keep alive, we still return Status::OK().
+ if (!last_response_.IsInitialized() ||
!last_response_.has_more_results() ||
+ !next_req_.has_scanner_id()) {
+ return Status::OK();
+ }
+ // 'cb' deletes itself upon completion.
+ auto* cb = new KeepAliveResponseCallback();
+ cb->request.set_scanner_id(next_req_.scanner_id());
+ cb->controller.set_timeout(configuration_.timeout());
+ // PeriodicTimer does not allow waiting, so calling
ScannerKeepAliveAsync()
+ // instead of ScannerKeepAlive().
+ proxy_->ScannerKeepAliveAsync(cb->request, &cb->response,
&cb->controller,
+ [cb]() { cb->Run(); });
+ return Status::OK();
},
MonoDelta::FromMilliseconds(keep_alive_interval_ms));
keep_alive_timer_->Start();
diff --git a/src/kudu/client/scanner-internal.h
b/src/kudu/client/scanner-internal.h
index f647c6470..4dd9334ca 100644
--- a/src/kudu/client/scanner-internal.h
+++ b/src/kudu/client/scanner-internal.h
@@ -284,6 +284,22 @@ class KuduScanner::Data {
std::string DebugString() const;
private:
+ // Callback for sending keep alive requests asynchronously.
+ struct KeepAliveResponseCallback {
+ public:
+ KeepAliveResponseCallback() = default;
+
+ void Run();
+
+ tserver::ScannerKeepAliveRequestPB request;
+ tserver::ScannerKeepAliveResponsePB response;
+ rpc::RpcController controller;
+
+ private:
+ // Prevent instances of this class from being allocated on the stack.
+ ~KeepAliveResponseCallback() = default;
+ };
+
// Analyze the response of the last Scan RPC made by this scanner.
//
// The error handling of a scan RPC is fairly complex, since we have to
handle