This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f0020f  [tools] --replica_selection flag for 'table scan' and 'perf 
table_scan'
8f0020f is described below

commit 8f0020fe22bc9f026d563155503be981111b202a
Author: Alexey Serbin <[email protected]>
AuthorDate: Sun Jun 20 23:50:38 2021 -0700

    [tools] --replica_selection flag for 'table scan' and 'perf table_scan'
    
    This patch adds replica selection configuration knob for the
    `kudu table scan` and `kudu perf table_scan` CLI tools by introducing
    --replica_selection flag.  Acceptable values for the newly introduced
    flag are CLOSEST and LEADER, case insensitive.
    
    I decided not to expose ReplicaSelection::FIRST_REPLICA enumeration
    value via the new flag: the semantics of FIRST_REPLICA is too hazy and
    I could not see how it might bring any value in practice.
    
    This patch also contains corresponding tests.
    
    Change-Id: Ie8ac1e91f0c03c61edd19796d69cecaf68880a5b
    Reviewed-on: http://gerrit.cloudera.org:8080/17605
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <[email protected]>
---
 src/kudu/tools/kudu-tool-test.cc    |  63 +++++++++++++++++
 src/kudu/tools/table_scanner.cc     | 131 +++++++++++++++++++++++++++---------
 src/kudu/tools/table_scanner.h      |  62 +++++++++--------
 src/kudu/tools/tool_action_perf.cc  |  10 +++
 src/kudu/tools/tool_action_table.cc |   6 ++
 5 files changed, 211 insertions(+), 61 deletions(-)

diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 6843c9b..c0e0f42 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -2874,6 +2874,38 @@ TEST_F(ToolTest, PerfTableScanCountOnly) {
   }
 }
 
+TEST_F(ToolTest, PerfTableScanReplicaSelection) {
+  constexpr const char* const kTableName = "perf.table_scan.replica_selection";
+  NO_FATALS(RunLoadgen(1,
+                       {
+                         "--num_threads=8",
+                         "--num_rows_per_thread=1",
+                       },
+                       kTableName));
+  {
+    string out;
+    string err;
+    vector<string> out_lines;
+    const auto s = RunTool(
+        Substitute("perf table_scan $0 $1 --replica_selection=leader",
+                   cluster_->master()->bound_rpc_addr().ToString(), 
kTableName),
+        &out, &err, &out_lines);
+    ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+    ASSERT_EQ(1, out_lines.size()) << out;
+    ASSERT_STR_CONTAINS(out, "Total count 8 ");
+  }
+  {
+    string out;
+    string err;
+    const auto s = RunTool(Substitute(
+        "perf table_scan $0 $1 --replica_selection=CLOSEST",
+        cluster_->master()->bound_rpc_addr().ToString(), kTableName),
+        &out, &err);
+    ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+    ASSERT_STR_CONTAINS(out, "Total count 8 ");
+  }
+}
+
 TEST_F(ToolTest, TestPerfTabletScan) {
   // Create a table.
   constexpr const char* const kTableName = "perf.tablet_scan";
@@ -3948,6 +3980,37 @@ TEST_F(ToolTest, TableScanRowCountOnly) {
   }
 }
 
+TEST_F(ToolTest, TableScanReplicaSelection) {
+  constexpr const char* const kTableName = "kudu.table.scan.replica_selection";
+  NO_FATALS(RunLoadgen(1,
+                       {
+                         "--num_threads=2",
+                         "--num_rows_per_thread=1",
+                       },
+                       kTableName));
+  {
+    string out;
+    string err;
+    vector<string> out_lines;
+    const auto s = RunTool(
+        Substitute("table scan $0 $1 --replica_selection=LEADER",
+                   cluster_->master()->bound_rpc_addr().ToString(), 
kTableName),
+        &out, &err, &out_lines);
+    ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+    ASSERT_STR_CONTAINS(out, "Total count 2 ");
+  }
+  {
+    string out;
+    string err;
+    const auto s = RunTool(
+        Substitute("table scan $0 $1 --replica_selection=closest",
+                   cluster_->master()->bound_rpc_addr().ToString(), 
kTableName),
+                   &out, &err);
+    ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+    ASSERT_STR_CONTAINS(out, "Total count 2 ");
+  }
+}
+
 TEST_P(ToolTestCopyTableParameterized, TestCopyTable) {
   for (const auto& arg : GenerateArgs()) {
     NO_FATALS(RunCopyTableCheck(arg));
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index 48d2a8d..edd49e9 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -21,7 +21,6 @@
 #include <cstddef>
 #include <cstdint>
 #include <cstring>
-#include <initializer_list>
 #include <iomanip>
 #include <iostream>
 #include <iterator>
@@ -118,33 +117,60 @@ DEFINE_string(write_type, "insert",
               "How data should be copied to the destination table. Valid 
values are 'insert', "
               "'upsert' or the empty string. If the empty string, data will 
not be copied "
               "(useful when create_table is 'true').");
+DEFINE_string(replica_selection, "CLOSEST",
+              "Replica selection for scan operations. Acceptable values are: "
+              "CLOSEST, LEADER (maps into KuduClient::CLOSEST_REPLICA and "
+              "KuduClient::LEADER_ONLY correspondingly).");
 
 DECLARE_bool(row_count_only);
 DECLARE_int32(num_threads);
 DECLARE_int64(timeout_ms);
-DECLARE_string(tablets);
 DECLARE_string(columns);
+DECLARE_string(tablets);
 
-static bool ValidateWriteType(const char* flag_name,
-                              const string& flag_value) {
-  static const auto kWriteTypes = { "insert", "upsert", "" };
-  if (std::find_if(kWriteTypes.begin(), kWriteTypes.end(),
-                   [&](const string& allowed_value) {
-                     return iequals(allowed_value, flag_value);
-                   }) != kWriteTypes.end()) {
+namespace {
+
+bool IsFlagValueAcceptable(const char* flag_name,
+                           const string& flag_value,
+                           const vector<string>& acceptable_values) {
+  if (std::find_if(acceptable_values.begin(), acceptable_values.end(),
+                   [&](const string& value) {
+                     return iequals(value, flag_value);
+                   }) != acceptable_values.end()) {
     return true;
   }
 
-  std::ostringstream ss;
+  ostringstream ss;
   ss << "'" << flag_value << "': unsupported value for --" << flag_name
      << " flag; should be one of ";
-  copy(kWriteTypes.begin(), kWriteTypes.end(),
+  copy(acceptable_values.begin(), acceptable_values.end(),
        std::ostream_iterator<string>(ss, " "));
   LOG(ERROR) << ss.str();
 
   return false;
 }
+
+bool ValidateWriteType(const char* flag_name,
+                       const string& flag_value) {
+  static const vector<string> kWriteTypes = { "insert", "upsert", "" };
+  return IsFlagValueAcceptable(flag_name, flag_value, kWriteTypes);
+}
+
+constexpr const char* const kReplicaSelectionClosest = "closest";
+constexpr const char* const kReplicaSelectionLeader = "leader";
+bool ValidateReplicaSelection(const char* flag_name,
+                              const string& flag_value) {
+  static const vector<string> kReplicaSelections = {
+    kReplicaSelectionClosest,
+    kReplicaSelectionLeader,
+  };
+  return IsFlagValueAcceptable(flag_name, flag_value, kReplicaSelections);
+}
+
+} // anonymous namespace
+
 DEFINE_validator(write_type, &ValidateWriteType);
+DEFINE_validator(replica_selection, &ValidateReplicaSelection);
 
 namespace kudu {
 namespace tools {
@@ -451,25 +477,18 @@ void CheckPendingErrors(const 
client::sp::shared_ptr<KuduSession>& session) {
   }
 }
 
-Status TableScanner::AddRow(const client::sp::shared_ptr<KuduTable>& table,
-                            const KuduSchema& table_schema,
-                            const KuduScanBatch::RowPtr& src_row,
-                            const client::sp::shared_ptr<KuduSession>& 
session) {
-  unique_ptr<KuduWriteOperation> write_op;
-  if (FLAGS_write_type == "insert") {
-    write_op.reset(table->NewInsert());
-  } else if (FLAGS_write_type == "upsert") {
-    write_op.reset(table->NewUpsert());
-  } else {
-    LOG(FATAL) << Substitute("invalid write_type: $0", FLAGS_write_type);
-  }
-
-  KuduPartialRow* dst_row = write_op->mutable_row();
-  size_t row_size = ContiguousRowHelper::row_size(*src_row.schema_);
-  memcpy(dst_row->row_data_, src_row.row_data_, row_size);
-  BitmapChangeBits(dst_row->isset_bitmap_, 0, table_schema.num_columns(), 
true);
-
-  return session->Apply(write_op.release());
+TableScanner::TableScanner(
+    client::sp::shared_ptr<client::KuduClient> client,
+    std::string table_name,
+    boost::optional<client::sp::shared_ptr<client::KuduClient>> dst_client,
+    boost::optional<std::string> dst_table_name)
+    : total_count_(0),
+      client_(std::move(client)),
+      table_name_(std::move(table_name)),
+      dst_client_(std::move(dst_client)),
+      dst_table_name_(std::move(dst_table_name)),
+      out_(nullptr) {
+  CHECK_OK(SetReplicaSelection(FLAGS_replica_selection));
 }
 
 Status TableScanner::ScanData(const std::vector<kudu::client::KuduScanToken*>& 
tokens,
@@ -546,7 +565,6 @@ void TableScanner::CopyTask(const vector<KuduScanToken*>& 
tokens, Status* thread
   });
 }
 
-
 void TableScanner::SetOutput(ostream* out) {
   out_ = out;
 }
@@ -555,6 +573,13 @@ void TableScanner::SetReadMode(KuduScanner::ReadMode mode) 
{
   mode_ = mode;
 }
 
+Status TableScanner::SetReplicaSelection(const string& selection_str) {
+  KuduClient::ReplicaSelection selection;
+  RETURN_NOT_OK(ParseReplicaSelection(selection_str, &selection));
+  replica_selection_ = selection;
+  return Status::OK();
+}
+
 Status TableScanner::StartWork(WorkType type) {
   client::sp::shared_ptr<KuduTable> src_table;
   RETURN_NOT_OK(client_->OpenTable(table_name_, &src_table));
@@ -573,6 +598,7 @@ Status TableScanner::StartWork(WorkType type) {
   if (mode_) {
     RETURN_NOT_OK(builder.SetReadMode(mode_.get()));
   }
+  RETURN_NOT_OK(builder.SetSelection(replica_selection_));
   RETURN_NOT_OK(builder.SetTimeoutMillis(FLAGS_timeout_ms));
 
   // Set projection if needed.
@@ -640,8 +666,12 @@ Status TableScanner::StartWork(WorkType type) {
 
   for (i = 0; i < FLAGS_num_threads; ++i) {
     if (!thread_statuses[i].ok()) {
-      if (out_) *out_ << "Scanning failed " << thread_statuses[i].ToString() 
<< endl;
-      if (end_status.ok()) end_status = thread_statuses[i];
+      if (out_) {
+        *out_ << "Scanning failed " << thread_statuses[i].ToString() << endl;
+      }
+      if (end_status.ok()) {
+        end_status = thread_statuses[i];
+      }
     }
   }
 
@@ -659,5 +689,40 @@ Status TableScanner::StartCopy() {
   return StartWork(WorkType::kCopy);
 }
 
+Status TableScanner::AddRow(const client::sp::shared_ptr<KuduTable>& table,
+                            const KuduSchema& table_schema,
+                            const KuduScanBatch::RowPtr& src_row,
+                            const client::sp::shared_ptr<KuduSession>& 
session) {
+  unique_ptr<KuduWriteOperation> write_op;
+  if (FLAGS_write_type == "insert") {
+    write_op.reset(table->NewInsert());
+  } else if (FLAGS_write_type == "upsert") {
+    write_op.reset(table->NewUpsert());
+  } else {
+    LOG(FATAL) << Substitute("invalid write_type: $0", FLAGS_write_type);
+  }
+
+  KuduPartialRow* dst_row = write_op->mutable_row();
+  size_t row_size = ContiguousRowHelper::row_size(*src_row.schema_);
+  memcpy(dst_row->row_data_, src_row.row_data_, row_size);
+  BitmapChangeBits(dst_row->isset_bitmap_, 0, table_schema.num_columns(), 
true);
+
+  return session->Apply(write_op.release());
+}
+
+Status TableScanner::ParseReplicaSelection(
+    const string& selection_str,
+    KuduClient::ReplicaSelection* selection) {
+  DCHECK(selection);
+  if (iequals(kReplicaSelectionClosest, selection_str)) {
+    *selection = KuduClient::ReplicaSelection::CLOSEST_REPLICA;
+  } else if (iequals(kReplicaSelectionLeader, selection_str)) {
+    *selection = KuduClient::ReplicaSelection::LEADER_ONLY;
+  } else {
+    return Status::InvalidArgument("invalid replica selection", selection_str);
+  }
+  return Status::OK();
+}
+
 } // namespace tools
 } // namespace kudu
diff --git a/src/kudu/tools/table_scanner.h b/src/kudu/tools/table_scanner.h
index d4b0787..3330097 100644
--- a/src/kudu/tools/table_scanner.h
+++ b/src/kudu/tools/table_scanner.h
@@ -23,7 +23,6 @@
 #include <iosfwd>
 #include <memory>
 #include <string>
-#include <utility>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
@@ -36,35 +35,31 @@
 #include "kudu/util/threadpool.h"
 
 namespace kudu {
+
 namespace client {
 class KuduSchema;
-}  // namespace client
-}  // namespace kudu
+} // namespace client
 
-namespace kudu {
 namespace tools {
+
 // This class is not thread-safe.
 class TableScanner {
  public:
-  TableScanner(client::sp::shared_ptr<kudu::client::KuduClient> client,
+  TableScanner(client::sp::shared_ptr<client::KuduClient> client,
                std::string table_name,
-               
boost::optional<client::sp::shared_ptr<kudu::client::KuduClient>> dst_client
-                 = boost::none,
-               boost::optional<std::string> dst_table_name = boost::none):
-    total_count_(0),
-    client_(std::move(client)),
-    table_name_(std::move(table_name)),
-    dst_client_(std::move(dst_client)),
-    dst_table_name_(std::move(dst_table_name)),
-    out_(nullptr) {
-  }
+               boost::optional<client::sp::shared_ptr<client::KuduClient>> 
dst_client
+                   = boost::none,
+               boost::optional<std::string> dst_table_name = boost::none);
 
   // Set output stream of this tool, or disable output if not set.
   // 'out' must remain valid for the lifetime of this class.
   void SetOutput(std::ostream* out);
 
   // Set read mode, see KuduScanner::SetReadMode().
-  void SetReadMode(kudu::client::KuduScanner::ReadMode mode);
+  void SetReadMode(client::KuduScanner::ReadMode mode);
+
+  // Set replica selection for scan operations.
+  Status SetReplicaSelection(const std::string& selection);
 
   Status StartScan();
   Status StartCopy();
@@ -79,22 +74,32 @@ class TableScanner {
     kCopy
   };
 
-  Status StartWork(WorkType type);
-  Status ScanData(const std::vector<kudu::client::KuduScanToken*>& tokens,
-                  const std::function<void(const kudu::client::KuduScanBatch& 
batch)>& cb);
-  void ScanTask(const std::vector<kudu::client::KuduScanToken*>& tokens, 
Status* thread_status);
-  void CopyTask(const std::vector<kudu::client::KuduScanToken*>& tokens, 
Status* thread_status);
+  static Status AddRow(
+      const client::sp::shared_ptr<client::KuduTable>& table,
+      const client::KuduSchema& table_schema,
+      const client::KuduScanBatch::RowPtr& src_row,
+      const client::sp::shared_ptr<client::KuduSession>& session);
+
+  // Convert replica selection from string into the 
KuduClient::ReplicaSelection
+  // enumerator.
+  static Status ParseReplicaSelection(
+      const std::string& selection_str,
+      client::KuduClient::ReplicaSelection* selection);
 
-  Status AddRow(const client::sp::shared_ptr<kudu::client::KuduTable>& table,
-                const kudu::client::KuduSchema& table_schema,
-                const kudu::client::KuduScanBatch::RowPtr& src_row,
-                const client::sp::shared_ptr<kudu::client::KuduSession>& 
session);
+  Status StartWork(WorkType type);
+  Status ScanData(const std::vector<client::KuduScanToken*>& tokens,
+                  const std::function<void(const client::KuduScanBatch& 
batch)>& cb);
+  void ScanTask(const std::vector<client::KuduScanToken*>& tokens,
+                Status* thread_status);
+  void CopyTask(const std::vector<client::KuduScanToken*>& tokens,
+                Status* thread_status);
 
   std::atomic<uint64_t> total_count_;
-  boost::optional<kudu::client::KuduScanner::ReadMode> mode_;
-  client::sp::shared_ptr<kudu::client::KuduClient> client_;
+  boost::optional<client::KuduScanner::ReadMode> mode_;
+  client::sp::shared_ptr<client::KuduClient> client_;
   std::string table_name_;
-  boost::optional<client::sp::shared_ptr<kudu::client::KuduClient>> 
dst_client_;
+  client::KuduClient::ReplicaSelection replica_selection_;
+  boost::optional<client::sp::shared_ptr<client::KuduClient>> dst_client_;
   boost::optional<std::string> dst_table_name_;
   std::unique_ptr<ThreadPool> thread_pool_;
 
@@ -102,5 +107,6 @@ class TableScanner {
   Mutex output_lock_;
   std::ostream* out_;
 };
+
 } // namespace tools
 } // namespace kudu
diff --git a/src/kudu/tools/tool_action_perf.cc 
b/src/kudu/tools/tool_action_perf.cc
index c333ab4..630f732 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -382,6 +382,7 @@ DEFINE_bool(txn_rollback, false,
 
 DECLARE_bool(show_values);
 DECLARE_int32(num_threads);
+DECLARE_string(replica_selection);
 DECLARE_string(table_name);
 
 namespace kudu {
@@ -771,6 +772,10 @@ Status CountTableRows(const shared_ptr<KuduClient>& client,
                       const string& table_name, uint64_t* count) {
   TableScanner scanner(client, table_name);
   scanner.SetReadMode(KuduScanner::ReadMode::READ_YOUR_WRITES);
+  const auto& replica_selection_str = FLAGS_replica_selection;
+  if (!replica_selection_str.empty()) {
+    RETURN_NOT_OK(scanner.SetReplicaSelection(replica_selection_str));
+  }
   RETURN_NOT_OK(scanner.StartScan());
   if (count != nullptr) {
     *count = scanner.TotalScannedCount();
@@ -921,6 +926,10 @@ Status TableScan(const RunnerContext& context) {
   FLAGS_show_values = false;
   TableScanner scanner(client, table_name);
   scanner.SetOutput(&cout);
+  const auto& replica_selection_str = FLAGS_replica_selection;
+  if (!replica_selection_str.empty()) {
+    RETURN_NOT_OK(scanner.SetReplicaSelection(replica_selection_str));
+  }
   return scanner.StartScan();
 }
 
@@ -1058,6 +1067,7 @@ unique_ptr<Mode> BuildPerfMode() {
       .AddOptionalParameter("num_threads")
       .AddOptionalParameter("predicates")
       .AddOptionalParameter("tablets")
+      .AddOptionalParameter("replica_selection")
       .Build();
 
   // TODO(aserbin): move this to tool_local_replica.cc
diff --git a/src/kudu/tools/tool_action_table.cc 
b/src/kudu/tools/tool_action_table.cc
index 59fbf83..90ae529 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -118,6 +118,7 @@ DEFINE_string(upper_bound_type, "EXCLUSIVE_BOUND",
 DECLARE_bool(row_count_only);
 DECLARE_bool(show_scanner_stats);
 DECLARE_bool(show_values);
+DECLARE_string(replica_selection);
 DECLARE_string(tables);
 
 namespace kudu {
@@ -489,6 +490,10 @@ Status ScanTable(const RunnerContext &context) {
   }
   TableScanner scanner(client, table_name);
   scanner.SetOutput(&cout);
+  const auto& replica_selection_str = FLAGS_replica_selection;
+  if (!replica_selection_str.empty()) {
+    RETURN_NOT_OK(scanner.SetReplicaSelection(replica_selection_str));
+  }
   return scanner.StartScan();
 }
 
@@ -1322,6 +1327,7 @@ unique_ptr<Mode> BuildTableMode() {
       .AddOptionalParameter("num_threads")
       .AddOptionalParameter("predicates")
       .AddOptionalParameter("tablets")
+      .AddOptionalParameter("replica_selection")
       .Build();
 
   unique_ptr<Action> copy_table =

Reply via email to