This is an automated email from the ASF dual-hosted git repository.
zhangyifan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 20ae0e5df [Tool] Limit table copying speed
20ae0e5df is described below
commit 20ae0e5dfbdf98a2f21cf029cb3501c64fc02108
Author: xinghuayu007 <[email protected]>
AuthorDate: Tue Jun 18 16:53:46 2024 +0800
[Tool] Limit table copying speed
Migrating data to another Kudu cluster using the 'kudu table copy'
CLI command when the data is very large may cause memory and/or
network bandwidth perssure. To reduce the effect on other services,
it is better to limit the copying speed.
This patch introduces two parameter:
--table_copy_throttler_bytes_per_sec and
--table_copy_throttler_burst_factor to limit the table copying
speed.
Change-Id: I37d23f6f5158618f91b67528e152cf2ff4cf38f3
Reviewed-on: http://gerrit.cloudera.org:8080/21527
Reviewed-by: Zoltan Chovan <[email protected]>
Reviewed-by: Abhishek Chennaka <[email protected]>
Tested-by: Yifan Zhang <[email protected]>
Reviewed-by: Yifan Zhang <[email protected]>
---
src/kudu/tools/kudu-tool-test.cc | 53 +++++++++++++++++++++++++++++++++++++
src/kudu/tools/table_scanner.cc | 23 ++++++++++++++++
src/kudu/tools/table_scanner.h | 3 +++
src/kudu/tools/tool_action_table.cc | 2 ++
4 files changed, 81 insertions(+)
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index dde8c65c3..8d97f4047 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -52,6 +52,7 @@
#include "kudu/cfile/cfile_writer.h"
#include "kudu/client/client-test-util.h"
#include "kudu/client/client.h"
+#include "kudu/client/scan_batch.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/client/value.h"
@@ -77,6 +78,7 @@
#include "kudu/fs/fs_manager.h"
#include "kudu/fs/fs_report.h"
#include "kudu/fs/log_block_manager.h"
+#include "kudu/gutil/integral_types.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stl_util.h"
@@ -181,6 +183,8 @@ using kudu::client::KuduScanToken;
using kudu::client::KuduScanTokenBuilder;
using kudu::client::KuduSchema;
using kudu::client::KuduSchemaBuilder;
+using kudu::client::KuduScanBatch;
+using kudu::client::KuduScanner;
using kudu::client::KuduSession;
using kudu::client::KuduTable;
using kudu::client::KuduTableAlterer;
@@ -5986,6 +5990,55 @@ TEST_F(ToolTest, TableScanFaultTolerant) {
}
}
+TEST_F(ToolTest, TableCopyLimitSpeed) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ NO_FATALS(StartExternalMiniCluster());
+ constexpr const char* const kTableName = "kudu.table.copy.limit_speed.from";
+ constexpr const char* const kNewTableName = "kudu.table.copy.limit_speed.to";
+ TestWorkload workload(cluster_.get());
+ workload.set_table_name(kTableName);
+ workload.set_num_tablets(1);
+ workload.set_num_replicas(1);
+ workload.Setup();
+ workload.Start();
+ ASSERT_EVENTUALLY([&]() {
+ ASSERT_GE(workload.rows_inserted(), 20000);
+ });
+ workload.StopAndJoin();
+
+ const string& master_addr = cluster_->master()->bound_rpc_addr().ToString();
+ int64 table_copy_throttler_bytes_per_sec = 10240;
+ MonoTime start_time = MonoTime::Now();
+ NO_FATALS(RunTool(
+ Substitute("table copy $0 $1 $2 --dst_table=$3 --write_type=upsert "
+ "-scan_batch_size=1024 "
+ "--table_copy_throttler_bytes_per_sec=$4 "
+ "--table_copy_throttler_burst_factor=100",
+ master_addr, kTableName,
+ master_addr, kNewTableName,
+ table_copy_throttler_bytes_per_sec
+ ), nullptr, nullptr));
+ MonoTime end_time = MonoTime::Now();
+
+ shared_ptr<KuduClient> client;
+ ASSERT_OK(KuduClientBuilder()
+ .add_master_server_addr(master_addr)
+ .Build(&client));
+ shared_ptr<KuduTable> table;
+ client->OpenTable(kNewTableName, &table);
+ KuduScanner scanner(table.get());
+ scanner.Open();
+ KuduScanBatch batch;
+ int64_t data_size = 0;
+ while (scanner.HasMoreRows()) {
+ ASSERT_OK(scanner.NextBatch(&batch));
+ data_size = batch.direct_data().size() + batch.indirect_data().size();
+ }
+ // Table copy speed must less than table_copy_throttler_bytes_per_sec.
+ ASSERT_LE(data_size / (end_time - start_time).ToSeconds(),
table_copy_throttler_bytes_per_sec);
+}
+
TEST_F(ToolTest, TableCopyFaultTolerant) {
constexpr const char* const kTableName =
"kudu.table.copy.fault_tolerant.from";
constexpr const char* const kNewTableName =
"kudu.table.copy.fault_tolerant.to";
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index 5e33c8182..eebb94a41 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -61,6 +61,7 @@
#include "kudu/util/slice.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/string_case.h"
+#include "kudu/util/throttler.h"
using kudu::client::KuduClient;
using kudu::client::KuduColumnSchema;
@@ -139,6 +140,15 @@ DEFINE_string(replica_selection, "CLOSEST",
"Replica selection for scan operations. Acceptable values are: "
"CLOSEST, LEADER (maps into KuduClient::CLOSEST_REPLICA and "
"KuduClient::LEADER_ONLY correspondingly).");
+DEFINE_int64(table_copy_throttler_bytes_per_sec, 0,
+ "Limit table copying speed. It limits the copying speed of all
the tablets "
+ "in one table for one session. The default value is 0, which
means not limiting "
+ "the speed. The unit is bytes/second");
+DEFINE_double(table_copy_throttler_burst_factor, 1.0F,
+ "Burst factor for table copy throttling. The maximum rate the
throttler "
+ "allows within a token refill period (100ms) equals burst factor
multiplied "
+ "base rate (--table_copy_throttler_bytes_per_sec). The default
value is 1.0, "
+ "which means the maximum rate is equal to
--table_copy_throttler_bytes_per_sec.");
DECLARE_bool(row_count_only);
DECLARE_int32(num_threads);
@@ -571,6 +581,11 @@ TableScanner::TableScanner(
scan_batch_size_(-1),
out_(nullptr) {
CHECK_OK(SetReplicaSelection(FLAGS_replica_selection));
+ if (FLAGS_table_copy_throttler_bytes_per_sec > 0) {
+ throttler_ = std::make_shared<Throttler>(MonoTime::Now(), 0,
+
FLAGS_table_copy_throttler_bytes_per_sec,
+
FLAGS_table_copy_throttler_burst_factor);
+ }
}
Status TableScanner::ScanData(const vector<KuduScanToken*>& tokens,
@@ -593,6 +608,14 @@ Status TableScanner::ScanData(const
vector<KuduScanToken*>& tokens,
count += batch.NumRows();
total_count_ += batch.NumRows();
++next_batch_calls;
+ // Limit table copy speed.
+ if (throttler_) {
+ SCOPED_LOG_SLOW_EXECUTION(WARNING, 1000, "Table copy throttler");
+ while (!throttler_->Take(MonoTime::Now(), 0,
+ batch.direct_data().size() +
batch.indirect_data().size())) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+ }
RETURN_NOT_OK(cb(batch));
}
sw.stop();
diff --git a/src/kudu/tools/table_scanner.h b/src/kudu/tools/table_scanner.h
index 13dd39d39..1eba9f295 100644
--- a/src/kudu/tools/table_scanner.h
+++ b/src/kudu/tools/table_scanner.h
@@ -35,6 +35,8 @@
#include "kudu/util/threadpool.h"
namespace kudu {
+class Throttler;
+
namespace tools {
// This class is not thread-safe.
@@ -102,6 +104,7 @@ class TableScanner {
std::optional<std::string> dst_table_name_;
int32_t scan_batch_size_;
std::unique_ptr<ThreadPool> thread_pool_;
+ std::shared_ptr<Throttler> throttler_;
// Protects output to 'out_' so that rows don't get interleaved.
Mutex output_lock_;
diff --git a/src/kudu/tools/tool_action_table.cc
b/src/kudu/tools/tool_action_table.cc
index b6901d37a..21b8c04e8 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -2029,6 +2029,8 @@ unique_ptr<Mode> BuildTableMode() {
.AddOptionalParameter("scan_batch_size")
.AddOptionalParameter("tablets")
.AddOptionalParameter("write_type")
+ .AddOptionalParameter("table_copy_throttler_bytes_per_sec")
+ .AddOptionalParameter("table_copy_throttler_burst_factor")
.Build();
unique_ptr<Action> set_extra_config =