This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 2d98e75 [tools] --scan_batch_size for 'table scan' and 'perf
table_scan'
2d98e75 is described below
commit 2d98e75ee62a89846c1dd29302ac590a1461c9de
Author: Alexey Serbin <[email protected]>
AuthorDate: Tue Jul 20 22:39:44 2021 -0700
[tools] --scan_batch_size for 'table scan' and 'perf table_scan'
This patch introduces a new command-line flag for the 'kudu table scan'
and the 'kudu perf table_scan' CLI tools: --scan_batch_size. The new
flag allows for customization of the batch size when running scan
operations. I also added a few test scenarios for basic coverage of
the newly introduced functionality.
Change-Id: I4fd5962eeda1d1d7921c6b70521f2d611ee1f3c7
Reviewed-on: http://gerrit.cloudera.org:8080/17708
Reviewed-by: Bankim Bhavsar <[email protected]>
Tested-by: Alexey Serbin <[email protected]>
---
src/kudu/tools/kudu-tool-test.cc | 71 +++++++++++++++++++++++++++++++++++++
src/kudu/tools/table_scanner.cc | 11 ++++++
src/kudu/tools/table_scanner.h | 6 ++++
src/kudu/tools/tool_action_perf.cc | 3 ++
src/kudu/tools/tool_action_table.cc | 7 ++++
5 files changed, 98 insertions(+)
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index c0e0f42..d77f8bc 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -2906,6 +2906,55 @@ TEST_F(ToolTest, PerfTableScanReplicaSelection) {
}
}
+TEST_F(ToolTest, PerfTableScanBatchSize) {
+ constexpr const char* const kTableName = "perf.table_scan.batch_size";
+ NO_FATALS(RunLoadgen(1,
+ {
+ "--num_threads=8",
+ "--num_rows_per_thread=111",
+ },
+ kTableName));
+ // Check that the special case of batch size of 0 works as well.
+ {
+ string out;
+ string err;
+ vector<string> out_lines;
+ const auto s = RunTool(
+ Substitute("perf table_scan $0 $1 --scan_batch_size=0",
+ cluster_->master()->bound_rpc_addr().ToString(),
kTableName),
+ &out, &err, &out_lines);
+ ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+ ASSERT_EQ(1, out_lines.size()) << out;
+ ASSERT_STR_CONTAINS(out, "Total count 888 ");
+ }
+ // Use default server-side scan batch size.
+ {
+ string out;
+ string err;
+ vector<string> out_lines;
+ const auto s = RunTool(
+ Substitute("perf table_scan $0 $1 --scan_batch_size=-1",
+ cluster_->master()->bound_rpc_addr().ToString(),
kTableName),
+ &out, &err, &out_lines);
+ ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+ ASSERT_EQ(1, out_lines.size()) << out;
+ ASSERT_STR_CONTAINS(out, "Total count 888 ");
+ }
+ // Use 2 MiByte scan batch size.
+ {
+ string out;
+ string err;
+ vector<string> out_lines;
+ const auto s = RunTool(
+ Substitute("perf table_scan $0 $1 --scan_batch_size=2097152",
+ cluster_->master()->bound_rpc_addr().ToString(),
kTableName),
+ &out, &err, &out_lines);
+ ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+ ASSERT_EQ(1, out_lines.size()) << out;
+ ASSERT_STR_CONTAINS(out, "Total count 888 ");
+ }
+}
+
TEST_F(ToolTest, TestPerfTabletScan) {
// Create a table.
constexpr const char* const kTableName = "perf.tablet_scan";
@@ -4011,6 +4060,28 @@ TEST_F(ToolTest, TableScanReplicaSelection) {
}
}
+TEST_F(ToolTest, TableScanCustomBatchSize) {
+ constexpr const char* const kTableName = "kudu.table.scan.batch_size";
+ NO_FATALS(RunLoadgen(1,
+ {
+ "--num_threads=5",
+ "--num_rows_per_thread=20000",
+ },
+ kTableName));
+ // Use 256 KiByte scan batch.
+ {
+ string out;
+ string err;
+ vector<string> out_lines;
+ const auto s = RunTool(
+ Substitute("table scan $0 $1 --scan_batch_size=262144",
+ cluster_->master()->bound_rpc_addr().ToString(),
kTableName),
+ &out, &err, &out_lines);
+ ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+ ASSERT_STR_CONTAINS(out, "Total count 100000 ");
+ }
+}
+
TEST_P(ToolTestCopyTableParameterized, TestCopyTable) {
for (const auto& arg : GenerateArgs()) {
NO_FATALS(RunCopyTableCheck(arg));
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index edd49e9..61ca244 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -487,6 +487,7 @@ TableScanner::TableScanner(
table_name_(std::move(table_name)),
dst_client_(std::move(dst_client)),
dst_table_name_(std::move(dst_table_name)),
+ scan_batch_size_(-1),
out_(nullptr) {
CHECK_OK(SetReplicaSelection(FLAGS_replica_selection));
}
@@ -580,6 +581,10 @@ Status TableScanner::SetReplicaSelection(const string&
selection_str) {
return Status::OK();
}
+void TableScanner::SetScanBatchSize(int32_t scan_batch_size) {
+ scan_batch_size_ = scan_batch_size;
+}
+
Status TableScanner::StartWork(WorkType type) {
client::sp::shared_ptr<KuduTable> src_table;
RETURN_NOT_OK(client_->OpenTable(table_name_, &src_table));
@@ -598,6 +603,12 @@ Status TableScanner::StartWork(WorkType type) {
if (mode_) {
RETURN_NOT_OK(builder.SetReadMode(mode_.get()));
}
+ if (scan_batch_size_ >= 0) {
+ // Batch size of 0 is valid and has special semantics: the server sends
+ // zero rows (i.e. no data) in the very first scan batch sent back to the
+ // client. See {KuduScanner,KuduScanTokenBuilder}::SetBatchSizeBytes().
+ RETURN_NOT_OK(builder.SetBatchSizeBytes(scan_batch_size_));
+ }
RETURN_NOT_OK(builder.SetSelection(replica_selection_));
RETURN_NOT_OK(builder.SetTimeoutMillis(FLAGS_timeout_ms));
diff --git a/src/kudu/tools/table_scanner.h b/src/kudu/tools/table_scanner.h
index 3330097..9cd8ced 100644
--- a/src/kudu/tools/table_scanner.h
+++ b/src/kudu/tools/table_scanner.h
@@ -61,6 +61,11 @@ class TableScanner {
// Set replica selection for scan operations.
Status SetReplicaSelection(const std::string& selection);
+ // Set the size for scan result batch size, in bytes. A negative value has
+ // the semantics of relying on the server-side default: see the
+ // --scanner_default_batch_size_bytes flag.
+ void SetScanBatchSize(int32_t scan_batch_size);
+
Status StartScan();
Status StartCopy();
@@ -101,6 +106,7 @@ class TableScanner {
client::KuduClient::ReplicaSelection replica_selection_;
boost::optional<client::sp::shared_ptr<client::KuduClient>> dst_client_;
boost::optional<std::string> dst_table_name_;
+ int32_t scan_batch_size_;
std::unique_ptr<ThreadPool> thread_pool_;
// Protects output to 'out_' so that rows don't get interleaved.
diff --git a/src/kudu/tools/tool_action_perf.cc
b/src/kudu/tools/tool_action_perf.cc
index 630f732..1b36bfa 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -382,6 +382,7 @@ DEFINE_bool(txn_rollback, false,
DECLARE_bool(show_values);
DECLARE_int32(num_threads);
+DECLARE_int32(scan_batch_size);
DECLARE_string(replica_selection);
DECLARE_string(table_name);
@@ -926,6 +927,7 @@ Status TableScan(const RunnerContext& context) {
FLAGS_show_values = false;
TableScanner scanner(client, table_name);
scanner.SetOutput(&cout);
+ scanner.SetScanBatchSize(FLAGS_scan_batch_size);
const auto& replica_selection_str = FLAGS_replica_selection;
if (!replica_selection_str.empty()) {
RETURN_NOT_OK(scanner.SetReplicaSelection(replica_selection_str));
@@ -1063,6 +1065,7 @@ unique_ptr<Mode> BuildPerfMode() {
.AddOptionalParameter("columns")
.AddOptionalParameter("row_count_only")
.AddOptionalParameter("report_scanner_stats")
+ .AddOptionalParameter("scan_batch_size")
.AddOptionalParameter("fill_cache")
.AddOptionalParameter("num_threads")
.AddOptionalParameter("predicates")
diff --git a/src/kudu/tools/tool_action_table.cc
b/src/kudu/tools/tool_action_table.cc
index 90ae529..974014a 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -114,6 +114,11 @@ DEFINE_string(lower_bound_type, "INCLUSIVE_BOUND",
DEFINE_string(upper_bound_type, "EXCLUSIVE_BOUND",
"The type of the upper bound, either inclusive or exclusive. "
"Defaults to exclusive. This flag is case-insensitive.");
+DEFINE_int32(scan_batch_size, -1,
+ "The size for scan results batches, in bytes. A negative value "
+ "means the server-side default is used, where the server-side "
+ "default is controlled by the tablet server's "
+ "--scanner_default_batch_size_bytes flag.");
DECLARE_bool(row_count_only);
DECLARE_bool(show_scanner_stats);
@@ -490,6 +495,7 @@ Status ScanTable(const RunnerContext &context) {
}
TableScanner scanner(client, table_name);
scanner.SetOutput(&cout);
+ scanner.SetScanBatchSize(FLAGS_scan_batch_size);
const auto& replica_selection_str = FLAGS_replica_selection;
if (!replica_selection_str.empty()) {
RETURN_NOT_OK(scanner.SetReplicaSelection(replica_selection_str));
@@ -1323,6 +1329,7 @@ unique_ptr<Mode> BuildTableMode() {
.AddOptionalParameter("columns")
.AddOptionalParameter("row_count_only")
.AddOptionalParameter("report_scanner_stats")
+ .AddOptionalParameter("scan_batch_size")
.AddOptionalParameter("fill_cache")
.AddOptionalParameter("num_threads")
.AddOptionalParameter("predicates")