This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 8644d88da KUDU-3498 Scanner keeps alive in periodically
8644d88da is described below
commit 8644d88dae6a76c5df3595b8a5aeb13df4d6ab5c
Author: xinghuayu007 <[email protected]>
AuthorDate: Mon Jul 31 23:04:21 2023 +0800
KUDU-3498 Scanner keeps alive in periodically
Kudu caches the scanner in the tablet server for continuing
reading. It will be expired if the idle time is over the defined
scanner ttl time. Sometimes the client reads a batch of data,
if the data is every large, it takes a long time to handle it.
Then the client reads the next batch using the same scanner, the
scanner will be expired even if it has sent a keep alive request.
This patch adds support for keeping a scanner alive periodically.
It uses a timer to send keep alive requests background. So,
it will never be expired when the scanner is in using.
Change-Id: I1165d96814eb4bcd5db9b5cb60403fffc5b18c81
Reviewed-on: http://gerrit.cloudera.org:8080/20282
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/client/client-test.cc | 182 ++++++++++++++++++++++++++++++++++++
src/kudu/client/client.cc | 15 ++-
src/kudu/client/client.h | 30 ++++++
src/kudu/client/scanner-internal.cc | 30 ++++++
src/kudu/client/scanner-internal.h | 20 ++++
5 files changed, 276 insertions(+), 1 deletion(-)
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 25f6b2731..0168ddc95 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -36,6 +36,7 @@
#include <thread>
#include <tuple>
#include <type_traits>
+#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
@@ -99,6 +100,7 @@
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/mini-cluster/mini_cluster.h"
#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/periodic.h" // IWYU pragma: keep
#include "kudu/rpc/service_pool.h"
#include "kudu/security/tls_context.h"
#include "kudu/security/token.pb.h"
@@ -117,6 +119,7 @@
#include "kudu/util/async_util.h"
#include "kudu/util/barrier.h"
#include "kudu/util/countdown_latch.h"
+#include "kudu/util/env.h"
#include "kudu/util/locks.h" // IWYU pragma: keep
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
@@ -231,6 +234,7 @@ using std::set;
using std::string;
using std::thread;
using std::unique_ptr;
+using std::unordered_map;
using std::unordered_set;
using std::vector;
using strings::Substitute;
@@ -2980,6 +2984,184 @@ TEST_F(ClientTest, TestScannerKeepAlive) {
ASSERT_EQ(sum, 499500);
}
+class KeepAlivePeriodicallyTest :
+ public ClientTest,
+ public ::testing::WithParamInterface<bool> {
+ public:
+ void SetUp() override {
+ ClientTest::SetUp();
+ // Create a new cluster with 3 tablet servers.
+ InternalMiniClusterOptions options;
+ options.num_tablet_servers = 3;
+ cluster_->Shutdown();
+ env_->DeleteRecursively(test_dir_);
+ cluster_.reset(new InternalMiniCluster(env_, options));
+ ASSERT_OK(cluster_->Start());
+ ASSERT_OK(KuduClientBuilder()
+
.add_master_server_addr(cluster_->mini_master()->bound_rpc_addr().ToString())
+ .Build(&client_));
+
+ unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+ // Create a table with 3 hash partitions.
+ ASSERT_OK(table_creator->table_name(kTableName)
+ .schema(&schema_)
+ .num_replicas(3)
+ .add_hash_partitions({ "key" }, 3)
+ .timeout(MonoDelta::FromSeconds(60))
+ .Create());
+
+ ASSERT_OK(client_->OpenTable(kTableName, &test_table_));
+ NO_FATALS(InsertTestRows(test_table_.get(), 3000));
+
+ // Every tablet server owns 3 tablet replicas.
+ for (int i = 0; i < 3; i++) {
+ vector<string> tablet_ids =
cluster_->mini_tablet_server(i)->ListTablets();
+ ASSERT_EQ(3, tablet_ids.size());
+ }
+ }
+
+ shared_ptr<KuduTable> test_table_;
+};
+
+INSTANTIATE_TEST_SUITE_P(KeepAlivePeriodically, KeepAlivePeriodicallyTest,
::testing::Bool());
+
+// Test case 1: 3 tablets is distributed in different tablet servers.
+// When the scanner opens the next tablet, keepalive requests are sent
+// to the other tablet server automatically.
+TEST_P(KeepAlivePeriodicallyTest,
TestScannerKeepAlivePeriodicallyCrossServers) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ const auto keep_alive_periodically = GetParam();
+ // Set the scanner TTL low.
+ FLAGS_scanner_ttl_ms = 500;
+ KuduScanner scanner(test_table_.get());
+ ASSERT_OK(scanner.SetBatchSizeBytes(100));
+ ASSERT_OK(scanner.Open());
+
+ // Start the keepalive timer.
+ if (keep_alive_periodically) {
+ ASSERT_OK(scanner.StartKeepAlivePeriodically(FLAGS_scanner_ttl_ms/10));
+ }
+
+ KuduScanBatch batch;
+ bool has_expired_scan = false;
+ while (scanner.HasMoreRows()) {
+ // Sleep 1s to make scanner expired when without keepalive peroidically.
+ SleepFor(MonoDelta::FromSeconds(1));
+ Status s = scanner.NextBatch(&batch);
+ if (keep_alive_periodically) {
+ // It is OK, the scanner is not expired.
+ ASSERT_OK(s);
+ } else if (s.IsNotFound()) {
+ // It is expired.
+ has_expired_scan = true;
+ break;
+ }
+ }
+ if (keep_alive_periodically) {
+ // Test keep alive timer will be stopped automatically after no more rows
to read.
+ ASSERT_FALSE(scanner.data_->keep_alive_timer_->started());
+ ASSERT_FALSE(has_expired_scan);
+ } else {
+ ASSERT_TRUE(has_expired_scan);
+ }
+}
+
+// Test case 2: The scanner are reading a tablet replica in a tablet
+// server, if the tablet server is stopped, the client automatically
+// restarts the scanning at some other tablet replica hosted by different
+// tablet server. And keepalive requests are sent the new tablet server.
+TEST_P(KeepAlivePeriodicallyTest,
TestScannerKeepAlivePeriodicallyScannerTolerate) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ const auto keep_alive_periodically = GetParam();
+
+ // Set the scanner TTL low.
+ FLAGS_scanner_ttl_ms = 500;
+ KuduScanner scanner(test_table_.get());
+ ASSERT_OK(scanner.SetFaultTolerant());
+ ASSERT_OK(scanner.SetBatchSizeBytes(100));
+ ASSERT_OK(scanner.Open());
+
+ // Start the keepalive timer.
+ if (keep_alive_periodically) {
+ ASSERT_OK(scanner.StartKeepAlivePeriodically(FLAGS_scanner_ttl_ms/2));
+ }
+
+ // Do a first scan.
+ ASSERT_TRUE(scanner.HasMoreRows());
+ KuduScanBatch batch;
+ scanner.NextBatch(&batch);
+ ASSERT_TRUE(scanner.HasMoreRows());
+
+ // Stop the current tablet server.
+ KuduTabletServer* kts_ptr;
+ ASSERT_OK(scanner.GetCurrentServer(&kts_ptr));
+ unique_ptr<KuduTabletServer> kts(kts_ptr);
+ RestartTServerAndWait(kts->uuid());
+
+ bool has_expired_scan = false;
+ while (scanner.HasMoreRows()) {
+ SleepFor(MonoDelta::FromSeconds(1));
+ Status s = scanner.NextBatch(&batch);
+ // Set fault tolerance false to enable the scanner expired.
+ ASSERT_OK(scanner.data_->mutable_configuration()->SetFaultTolerant(false));
+ if (keep_alive_periodically) {
+ // It is OK, the scanner is not expired.
+ ASSERT_OK(s);
+ } else if (s.IsNotFound()) {
+ // It is expired.
+ has_expired_scan = true;
+ break;
+ }
+ }
+ if (keep_alive_periodically) {
+ ASSERT_FALSE(has_expired_scan);
+ } else {
+ ASSERT_TRUE(has_expired_scan);
+ }
+}
+
+TEST_P(KeepAlivePeriodicallyTest, TestStopKeepAlivePeriodically) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ const auto stop_keepalive_periodcally = GetParam();
+
+ // Set the scanner TTL low.
+ FLAGS_scanner_ttl_ms = 500;
+ KuduScanner scanner(test_table_.get());
+ ASSERT_OK(scanner.SetBatchSizeBytes(100));
+ ASSERT_OK(scanner.Open());
+
+ ASSERT_OK(scanner.StartKeepAlivePeriodically(FLAGS_scanner_ttl_ms/2));
+
+ // Stop the keepalive timer.
+ if (stop_keepalive_periodcally) {
+ scanner.StopKeepAlivePeriodically();
+ }
+
+ KuduScanBatch batch;
+ bool has_expired_scan = false;
+ while (scanner.HasMoreRows()) {
+ // Sleep 1s to make scanner expired when without keepalive peroidically.
+ SleepFor(MonoDelta::FromSeconds(1));
+ Status s = scanner.NextBatch(&batch);
+ if (!stop_keepalive_periodcally) {
+ // It is OK, the scanner is not expired.
+ ASSERT_OK(s);
+ } else if (s.IsNotFound()) {
+ // It is expired.
+ has_expired_scan = true;
+ break;
+ }
+ }
+ if (stop_keepalive_periodcally) {
+ ASSERT_TRUE(has_expired_scan);
+ } else {
+ ASSERT_FALSE(has_expired_scan);
+ }
+}
+
// Test cleanup of scanners on the server side when closed.
TEST_F(ClientTest, TestCloseScanner) {
NO_FATALS(InsertTestRows(client_table_.get(), 10));
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 0c8c2c78c..d5b5740cc 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -2090,6 +2090,15 @@ Status KuduScanner::KeepAlive() {
return data_->KeepAlive();
}
+Status KuduScanner::StartKeepAlivePeriodically(uint64_t
keep_alive_interval_ms) {
+ return data_->StartKeepAlivePeriodically(keep_alive_interval_ms,
+
data_->table_->client()->data_->messenger_);
+}
+
+void KuduScanner::StopKeepAlivePeriodically() {
+ data_->StopKeepAlivePeriodically();
+}
+
void KuduScanner::Close() {
if (!data_->open_) return;
@@ -2119,10 +2128,14 @@ void KuduScanner::Close() {
bool KuduScanner::HasMoreRows() const {
CHECK(data_->open_);
- return !data_->short_circuit_ && // The scan is not short
circuited
+ bool has_more = !data_->short_circuit_ && // The scan is not short
circuited
(data_->data_in_open_ || // more data in hand
data_->last_response_.has_more_results() || // more data in this tablet
data_->MoreTablets()); // more tablets to scan,
possibly with more data
+ if (!has_more) {
+ data_->StopKeepAlivePeriodically();
+ }
+ return has_more;
}
Status KuduScanner::NextBatch(vector<KuduRowResult>* rows) {
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index ceb888e45..1e28d29a8 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -1088,6 +1088,7 @@ class KUDU_EXPORT KuduClient : public
sp::enable_shared_from_this<KuduClient> {
friend class tools::LeaderMasterProxy;
friend class tools::RemoteKsckCluster;
friend class tools::TableLister;
+ friend class KeepAlivePeriodicallyTest;
friend class ScanTokenTest;
friend Status tools::ShowTabletInfo(const std::vector<std::string>&
master_addresses,
const std::vector<std::string>&
tablet_id);
@@ -2949,6 +2950,32 @@ class KUDU_EXPORT KuduScanner {
/// fault tolerant.
Status KeepAlive();
+ /// Keep the current remote scanner alive by sending keep-alive requests
periodically.
+ ///
+ /// This function uses a timer to call KeepAlive() periodically which is
+ /// defined by parameter keep_alive_interval_ms. It sends keep-alive
requests to
+ /// the server periodically using a separate thread. This is useful if the
client
+ /// takes long time to handle the fetched data before having the chance to
call
+ /// KeepAlive(). This can be called after the scanner is opened and the
timer can
+ /// be stopped by calling StopKeepAlivePeriodically().
+ ///
+ /// @note This method isn't thread-safe.
+ ///
+ /// @param [in] keep_alive_interval_ms
+ /// The interval to send keep alive request. The default value is 30000 ms,
+ /// which is half of the default setting for the --scanner_ttl_ms scanner.
+ /// @return It returns a non-OK if the scanner is not opened.
+ Status StartKeepAlivePeriodically(uint64_t keep_alive_interval_ms = 30000);
+
+ /// Stop keeping the current remote scanner alive periodically.
+ ///
+ /// This function stops to send keep-alive requests to the server
periodically.
+ /// After function StartKeepAlivePeriodically is called, this function can
be used to
+ /// stop the keep-alive timer at any time. The timer will be stopped
automatically
+ /// after finishing scanning. But it can also be stopped manually by calling
this
+ /// function.
+ void StopKeepAlivePeriodically();
+
/// Close the scanner.
///
/// Closing the scanner releases resources on the server. This call does not
@@ -3207,6 +3234,9 @@ class KUDU_EXPORT KuduScanner {
FRIEND_TEST(ClientTest, TestScanWithQueryId);
FRIEND_TEST(ClientTest, TestReadAtSnapshotNoTimestampSet);
FRIEND_TEST(ConsistencyITest, TestSnapshotScanTimestampReuse);
+ FRIEND_TEST(KeepAlivePeriodicallyTest,
TestScannerKeepAlivePeriodicallyCrossServers);
+ FRIEND_TEST(KeepAlivePeriodicallyTest,
TestScannerKeepAlivePeriodicallyScannerTolerate);
+ FRIEND_TEST(KeepAlivePeriodicallyTest, TestStopKeepAlivePeriodically);
FRIEND_TEST(ScanTokenTest, TestScanTokens);
FRIEND_TEST(ScanTokenTest, TestScanTokens_NonUniquePrimaryKey);
FRIEND_TEST(ScanTokenTest, TestScanTokensWithQueryId);
diff --git a/src/kudu/client/scanner-internal.cc
b/src/kudu/client/scanner-internal.cc
index 297c6009e..9d4294798 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -19,6 +19,7 @@
#include <algorithm>
#include <cstdint>
+#include <functional>
#include <ostream>
#include <string>
#include <type_traits>
@@ -45,6 +46,8 @@
#include "kudu/gutil/strings/stringpiece.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/connection.h"
+#include "kudu/rpc/messenger.h" // IWYU pragma: keep
+#include "kudu/rpc/periodic.h"
#include "kudu/rpc/rpc.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_header.pb.h"
@@ -61,7 +64,9 @@ using google::protobuf::FieldDescriptor;
using google::protobuf::Reflection;
using kudu::rpc::ComputeExponentialBackoff;
using kudu::rpc::CredentialsPolicy;
+using kudu::rpc::Messenger;
using kudu::rpc::RpcController;
+using kudu::rpc::PeriodicTimer;
using kudu::security::SignedTokenPB;
using kudu::tserver::NewScanRequestPB;
using kudu::tserver::RowFormatFlags;
@@ -90,6 +95,31 @@ KuduScanner::Data::Data(KuduTable* table)
}
KuduScanner::Data::~Data() {
+ if (keep_alive_timer_) {
+ keep_alive_timer_->Stop();
+ }
+}
+
+Status KuduScanner::Data::StartKeepAlivePeriodically(uint64_t
keep_alive_interval_ms,
+
std::shared_ptr<Messenger> messenger) {
+ if (!open_) return Status::IllegalState("Scanner was not open.");
+ if (keep_alive_timer_ && keep_alive_timer_->started()) {
+ return Status::OK();
+ }
+ keep_alive_timer_ = PeriodicTimer::Create(
+ messenger,
+ [&]() {
+ return KeepAlive();
+ },
+ MonoDelta::FromMilliseconds(keep_alive_interval_ms));
+ keep_alive_timer_->Start();
+ return Status::OK();
+}
+
+void KuduScanner::Data::StopKeepAlivePeriodically() {
+ if (keep_alive_timer_) {
+ keep_alive_timer_->Stop();
+ }
}
Status KuduScanner::Data::EnrichStatusMessage(Status s) const {
diff --git a/src/kudu/client/scanner-internal.h
b/src/kudu/client/scanner-internal.h
index a4c8901ff..f647c6470 100644
--- a/src/kudu/client/scanner-internal.h
+++ b/src/kudu/client/scanner-internal.h
@@ -26,6 +26,7 @@
#include <vector>
#include <glog/logging.h>
+#include <gtest/gtest_prod.h>
#include "kudu/client/client.h"
#include "kudu/client/columnar_scan_batch.h"
@@ -49,6 +50,11 @@ class MonoTime;
class PartitionKey;
class Schema;
+namespace rpc {
+class Messenger;
+class PeriodicTimer;
+} // rpc
+
namespace tserver {
class TabletServerServiceProxy;
} // tserver
@@ -169,6 +175,13 @@ class KuduScanner::Data {
Status KeepAlive();
+ // Send keep alive requests periodically in an independent thread.
+ Status StartKeepAlivePeriodically(uint64_t keep_alive_interval_ms,
+ std::shared_ptr<rpc::Messenger> messenger);
+
+ // Stop sending keep-alive requests periodically.
+ void StopKeepAlivePeriodically();
+
// Returns whether there may exist more tablets to scan.
//
// This method does not take into account any non-covered range partitions
@@ -293,6 +306,13 @@ class KuduScanner::Data {
void UpdateResourceMetrics();
+ // This is used to send keep-alive requests periodically.
+ // When the scanner calls StartKeepAlivePeriodically(),
+ // it will be initialized.
+ std::shared_ptr<rpc::PeriodicTimer> keep_alive_timer_;
+
+ FRIEND_TEST(KeepAlivePeriodicallyTest,
TestScannerKeepAlivePeriodicallyCrossServers);
+
DISALLOW_COPY_AND_ASSIGN(Data);
};