This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d98e75  [tools] --scan_batch_size for 'table scan' and 'perf 
table_scan'
2d98e75 is described below

commit 2d98e75ee62a89846c1dd29302ac590a1461c9de
Author: Alexey Serbin <[email protected]>
AuthorDate: Tue Jul 20 22:39:44 2021 -0700

    [tools] --scan_batch_size for 'table scan' and 'perf table_scan'
    
    This patch introduces a new command-line flag for the 'kudu table scan'
    and the 'kudu perf table_scan' CLI tools: --scan_batch_size.  The new
    flag allows for customization of the batch size when running scan
    operations.  I also added a few test scenarios for basic coverage of
    the newly introduced functionality.
    
    Change-Id: I4fd5962eeda1d1d7921c6b70521f2d611ee1f3c7
    Reviewed-on: http://gerrit.cloudera.org:8080/17708
    Reviewed-by: Bankim Bhavsar <[email protected]>
    Tested-by: Alexey Serbin <[email protected]>
---
 src/kudu/tools/kudu-tool-test.cc    | 71 +++++++++++++++++++++++++++++++++++++
 src/kudu/tools/table_scanner.cc     | 11 ++++++
 src/kudu/tools/table_scanner.h      |  6 ++++
 src/kudu/tools/tool_action_perf.cc  |  3 ++
 src/kudu/tools/tool_action_table.cc |  7 ++++
 5 files changed, 98 insertions(+)

diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index c0e0f42..d77f8bc 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -2906,6 +2906,55 @@ TEST_F(ToolTest, PerfTableScanReplicaSelection) {
   }
 }
 
+TEST_F(ToolTest, PerfTableScanBatchSize) {
+  constexpr const char* const kTableName = "perf.table_scan.batch_size";
+  NO_FATALS(RunLoadgen(1,
+                       {
+                         "--num_threads=8",
+                         "--num_rows_per_thread=111",
+                       },
+                       kTableName));
+  // Check that the special case of batch size of 0 works as well.
+  {
+    string out;
+    string err;
+    vector<string> out_lines;
+    const auto s = RunTool(
+        Substitute("perf table_scan $0 $1 --scan_batch_size=0",
+                   cluster_->master()->bound_rpc_addr().ToString(), 
kTableName),
+        &out, &err, &out_lines);
+    ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+    ASSERT_EQ(1, out_lines.size()) << out;
+    ASSERT_STR_CONTAINS(out, "Total count 888 ");
+  }
+  // Use default server-side scan batch size.
+  {
+    string out;
+    string err;
+    vector<string> out_lines;
+    const auto s = RunTool(
+        Substitute("perf table_scan $0 $1 --scan_batch_size=-1",
+                   cluster_->master()->bound_rpc_addr().ToString(), 
kTableName),
+        &out, &err, &out_lines);
+    ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+    ASSERT_EQ(1, out_lines.size()) << out;
+    ASSERT_STR_CONTAINS(out, "Total count 888 ");
+  }
+  // Use 2 MiByte scan batch size.
+  {
+    string out;
+    string err;
+    vector<string> out_lines;
+    const auto s = RunTool(
+        Substitute("perf table_scan $0 $1 --scan_batch_size=2097152",
+                   cluster_->master()->bound_rpc_addr().ToString(), 
kTableName),
+        &out, &err, &out_lines);
+    ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+    ASSERT_EQ(1, out_lines.size()) << out;
+    ASSERT_STR_CONTAINS(out, "Total count 888 ");
+  }
+}
+
 TEST_F(ToolTest, TestPerfTabletScan) {
   // Create a table.
   constexpr const char* const kTableName = "perf.tablet_scan";
@@ -4011,6 +4060,28 @@ TEST_F(ToolTest, TableScanReplicaSelection) {
   }
 }
 
+TEST_F(ToolTest, TableScanCustomBatchSize) {
+  constexpr const char* const kTableName = "kudu.table.scan.batch_size";
+  NO_FATALS(RunLoadgen(1,
+                       {
+                         "--num_threads=5",
+                         "--num_rows_per_thread=20000",
+                       },
+                       kTableName));
+  // Use 256 KiByte scan batch.
+  {
+    string out;
+    string err;
+    vector<string> out_lines;
+    const auto s = RunTool(
+        Substitute("table scan $0 $1 --scan_batch_size=262144",
+                   cluster_->master()->bound_rpc_addr().ToString(), 
kTableName),
+        &out, &err, &out_lines);
+    ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+    ASSERT_STR_CONTAINS(out, "Total count 100000 ");
+  }
+}
+
 TEST_P(ToolTestCopyTableParameterized, TestCopyTable) {
   for (const auto& arg : GenerateArgs()) {
     NO_FATALS(RunCopyTableCheck(arg));
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index edd49e9..61ca244 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -487,6 +487,7 @@ TableScanner::TableScanner(
       table_name_(std::move(table_name)),
       dst_client_(std::move(dst_client)),
       dst_table_name_(std::move(dst_table_name)),
+      scan_batch_size_(-1),
       out_(nullptr) {
   CHECK_OK(SetReplicaSelection(FLAGS_replica_selection));
 }
@@ -580,6 +581,10 @@ Status TableScanner::SetReplicaSelection(const string& 
selection_str) {
   return Status::OK();
 }
 
+void TableScanner::SetScanBatchSize(int32_t scan_batch_size) {
+  scan_batch_size_ = scan_batch_size;
+}
+
 Status TableScanner::StartWork(WorkType type) {
   client::sp::shared_ptr<KuduTable> src_table;
   RETURN_NOT_OK(client_->OpenTable(table_name_, &src_table));
@@ -598,6 +603,12 @@ Status TableScanner::StartWork(WorkType type) {
   if (mode_) {
     RETURN_NOT_OK(builder.SetReadMode(mode_.get()));
   }
+  if (scan_batch_size_ >= 0) {
+    // Batch size of 0 is valid and has special semantics: the server sends
+    // zero rows (i.e. no data) in the very first scan batch sent back to the
+    // client. See {KuduScanner,KuduScanTokenBuilder}::SetBatchSizeBytes().
+    RETURN_NOT_OK(builder.SetBatchSizeBytes(scan_batch_size_));
+  }
   RETURN_NOT_OK(builder.SetSelection(replica_selection_));
   RETURN_NOT_OK(builder.SetTimeoutMillis(FLAGS_timeout_ms));
 
diff --git a/src/kudu/tools/table_scanner.h b/src/kudu/tools/table_scanner.h
index 3330097..9cd8ced 100644
--- a/src/kudu/tools/table_scanner.h
+++ b/src/kudu/tools/table_scanner.h
@@ -61,6 +61,11 @@ class TableScanner {
   // Set replica selection for scan operations.
   Status SetReplicaSelection(const std::string& selection);
 
+  // Set the size for scan result batch size, in bytes. A negative value has
+  // the semantics of relying on the server-side default: see the
+  // --scanner_default_batch_size_bytes flag.
+  void SetScanBatchSize(int32_t scan_batch_size);
+
   Status StartScan();
   Status StartCopy();
 
@@ -101,6 +106,7 @@ class TableScanner {
   client::KuduClient::ReplicaSelection replica_selection_;
   boost::optional<client::sp::shared_ptr<client::KuduClient>> dst_client_;
   boost::optional<std::string> dst_table_name_;
+  int32_t scan_batch_size_;
   std::unique_ptr<ThreadPool> thread_pool_;
 
   // Protects output to 'out_' so that rows don't get interleaved.
diff --git a/src/kudu/tools/tool_action_perf.cc 
b/src/kudu/tools/tool_action_perf.cc
index 630f732..1b36bfa 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -382,6 +382,7 @@ DEFINE_bool(txn_rollback, false,
 
 DECLARE_bool(show_values);
 DECLARE_int32(num_threads);
+DECLARE_int32(scan_batch_size);
 DECLARE_string(replica_selection);
 DECLARE_string(table_name);
 
@@ -926,6 +927,7 @@ Status TableScan(const RunnerContext& context) {
   FLAGS_show_values = false;
   TableScanner scanner(client, table_name);
   scanner.SetOutput(&cout);
+  scanner.SetScanBatchSize(FLAGS_scan_batch_size);
   const auto& replica_selection_str = FLAGS_replica_selection;
   if (!replica_selection_str.empty()) {
     RETURN_NOT_OK(scanner.SetReplicaSelection(replica_selection_str));
@@ -1063,6 +1065,7 @@ unique_ptr<Mode> BuildPerfMode() {
       .AddOptionalParameter("columns")
       .AddOptionalParameter("row_count_only")
       .AddOptionalParameter("report_scanner_stats")
+      .AddOptionalParameter("scan_batch_size")
       .AddOptionalParameter("fill_cache")
       .AddOptionalParameter("num_threads")
       .AddOptionalParameter("predicates")
diff --git a/src/kudu/tools/tool_action_table.cc 
b/src/kudu/tools/tool_action_table.cc
index 90ae529..974014a 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -114,6 +114,11 @@ DEFINE_string(lower_bound_type, "INCLUSIVE_BOUND",
 DEFINE_string(upper_bound_type, "EXCLUSIVE_BOUND",
               "The type of the upper bound, either inclusive or exclusive. "
               "Defaults to exclusive. This flag is case-insensitive.");
+DEFINE_int32(scan_batch_size, -1,
+             "The size for scan results batches, in bytes. A negative value "
+             "means the server-side default is used, where the server-side "
+             "default is controlled by the tablet server's "
+             "--scanner_default_batch_size_bytes flag.");
 
 DECLARE_bool(row_count_only);
 DECLARE_bool(show_scanner_stats);
@@ -490,6 +495,7 @@ Status ScanTable(const RunnerContext &context) {
   }
   TableScanner scanner(client, table_name);
   scanner.SetOutput(&cout);
+  scanner.SetScanBatchSize(FLAGS_scan_batch_size);
   const auto& replica_selection_str = FLAGS_replica_selection;
   if (!replica_selection_str.empty()) {
     RETURN_NOT_OK(scanner.SetReplicaSelection(replica_selection_str));
@@ -1323,6 +1329,7 @@ unique_ptr<Mode> BuildTableMode() {
       .AddOptionalParameter("columns")
       .AddOptionalParameter("row_count_only")
       .AddOptionalParameter("report_scanner_stats")
+      .AddOptionalParameter("scan_batch_size")
       .AddOptionalParameter("fill_cache")
       .AddOptionalParameter("num_threads")
       .AddOptionalParameter("predicates")

Reply via email to