This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 8f0020f [tools] --replica_selection flag for 'table scan' and 'perf
table_scan'
8f0020f is described below
commit 8f0020fe22bc9f026d563155503be981111b202a
Author: Alexey Serbin <[email protected]>
AuthorDate: Sun Jun 20 23:50:38 2021 -0700
[tools] --replica_selection flag for 'table scan' and 'perf table_scan'
This patch adds replica selection configuration knob for the
`kudu table scan` and `kudu perf table_scan` CLI tools by introducing
--replica_selection flag. Acceptable values for the newly introduced
flag are CLOSEST and LEADER, case insensitive.
I decided not to expose ReplicaSelection::FIRST_REPLICA enumeration
value via the new flag: the semantics of FIRST_REPLICA is too hazy and
I could not see how it might bring any value in practice.
This patch also contains corresponding tests.
Change-Id: Ie8ac1e91f0c03c61edd19796d69cecaf68880a5b
Reviewed-on: http://gerrit.cloudera.org:8080/17605
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <[email protected]>
---
src/kudu/tools/kudu-tool-test.cc | 63 +++++++++++++++++
src/kudu/tools/table_scanner.cc | 131 +++++++++++++++++++++++++++---------
src/kudu/tools/table_scanner.h | 62 +++++++++--------
src/kudu/tools/tool_action_perf.cc | 10 +++
src/kudu/tools/tool_action_table.cc | 6 ++
5 files changed, 211 insertions(+), 61 deletions(-)
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 6843c9b..c0e0f42 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -2874,6 +2874,38 @@ TEST_F(ToolTest, PerfTableScanCountOnly) {
}
}
+TEST_F(ToolTest, PerfTableScanReplicaSelection) {
+ constexpr const char* const kTableName = "perf.table_scan.replica_selection";
+ NO_FATALS(RunLoadgen(1,
+ {
+ "--num_threads=8",
+ "--num_rows_per_thread=1",
+ },
+ kTableName));
+ {
+ string out;
+ string err;
+ vector<string> out_lines;
+ const auto s = RunTool(
+ Substitute("perf table_scan $0 $1 --replica_selection=leader",
+ cluster_->master()->bound_rpc_addr().ToString(),
kTableName),
+ &out, &err, &out_lines);
+ ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+ ASSERT_EQ(1, out_lines.size()) << out;
+ ASSERT_STR_CONTAINS(out, "Total count 8 ");
+ }
+ {
+ string out;
+ string err;
+ const auto s = RunTool(Substitute(
+ "perf table_scan $0 $1 --replica_selection=CLOSEST",
+ cluster_->master()->bound_rpc_addr().ToString(), kTableName),
+ &out, &err);
+ ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+ ASSERT_STR_CONTAINS(out, "Total count 8 ");
+ }
+}
+
TEST_F(ToolTest, TestPerfTabletScan) {
// Create a table.
constexpr const char* const kTableName = "perf.tablet_scan";
@@ -3948,6 +3980,37 @@ TEST_F(ToolTest, TableScanRowCountOnly) {
}
}
+TEST_F(ToolTest, TableScanReplicaSelection) {
+ constexpr const char* const kTableName = "kudu.table.scan.replica_selection";
+ NO_FATALS(RunLoadgen(1,
+ {
+ "--num_threads=2",
+ "--num_rows_per_thread=1",
+ },
+ kTableName));
+ {
+ string out;
+ string err;
+ vector<string> out_lines;
+ const auto s = RunTool(
+ Substitute("table scan $0 $1 --replica_selection=LEADER",
+ cluster_->master()->bound_rpc_addr().ToString(),
kTableName),
+ &out, &err, &out_lines);
+ ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+ ASSERT_STR_CONTAINS(out, "Total count 2 ");
+ }
+ {
+ string out;
+ string err;
+ const auto s = RunTool(
+ Substitute("table scan $0 $1 --replica_selection=closest",
+ cluster_->master()->bound_rpc_addr().ToString(),
kTableName),
+ &out, &err);
+ ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err;
+ ASSERT_STR_CONTAINS(out, "Total count 2 ");
+ }
+}
+
TEST_P(ToolTestCopyTableParameterized, TestCopyTable) {
for (const auto& arg : GenerateArgs()) {
NO_FATALS(RunCopyTableCheck(arg));
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index 48d2a8d..edd49e9 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -21,7 +21,6 @@
#include <cstddef>
#include <cstdint>
#include <cstring>
-#include <initializer_list>
#include <iomanip>
#include <iostream>
#include <iterator>
@@ -118,33 +117,60 @@ DEFINE_string(write_type, "insert",
"How data should be copied to the destination table. Valid
values are 'insert', "
"'upsert' or the empty string. If the empty string, data will
not be copied "
"(useful when create_table is 'true').");
+DEFINE_string(replica_selection, "CLOSEST",
+ "Replica selection for scan operations. Acceptable values are: "
+ "CLOSEST, LEADER (maps into KuduClient::CLOSEST_REPLICA and "
+ "KuduClient::LEADER_ONLY correspondingly).");
DECLARE_bool(row_count_only);
DECLARE_int32(num_threads);
DECLARE_int64(timeout_ms);
-DECLARE_string(tablets);
DECLARE_string(columns);
+DECLARE_string(tablets);
-static bool ValidateWriteType(const char* flag_name,
- const string& flag_value) {
- static const auto kWriteTypes = { "insert", "upsert", "" };
- if (std::find_if(kWriteTypes.begin(), kWriteTypes.end(),
- [&](const string& allowed_value) {
- return iequals(allowed_value, flag_value);
- }) != kWriteTypes.end()) {
+namespace {
+
+bool IsFlagValueAcceptable(const char* flag_name,
+ const string& flag_value,
+ const vector<string>& acceptable_values) {
+ if (std::find_if(acceptable_values.begin(), acceptable_values.end(),
+ [&](const string& value) {
+ return iequals(value, flag_value);
+ }) != acceptable_values.end()) {
return true;
}
- std::ostringstream ss;
+ ostringstream ss;
ss << "'" << flag_value << "': unsupported value for --" << flag_name
<< " flag; should be one of ";
- copy(kWriteTypes.begin(), kWriteTypes.end(),
+ copy(acceptable_values.begin(), acceptable_values.end(),
std::ostream_iterator<string>(ss, " "));
LOG(ERROR) << ss.str();
return false;
}
+
+bool ValidateWriteType(const char* flag_name,
+ const string& flag_value) {
+ static const vector<string> kWriteTypes = { "insert", "upsert", "" };
+ return IsFlagValueAcceptable(flag_name, flag_value, kWriteTypes);
+}
+
+constexpr const char* const kReplicaSelectionClosest = "closest";
+constexpr const char* const kReplicaSelectionLeader = "leader";
+bool ValidateReplicaSelection(const char* flag_name,
+ const string& flag_value) {
+ static const vector<string> kReplicaSelections = {
+ kReplicaSelectionClosest,
+ kReplicaSelectionLeader,
+ };
+ return IsFlagValueAcceptable(flag_name, flag_value, kReplicaSelections);
+}
+
+} // anonymous namespace
+
DEFINE_validator(write_type, &ValidateWriteType);
+DEFINE_validator(replica_selection, &ValidateReplicaSelection);
namespace kudu {
namespace tools {
@@ -451,25 +477,18 @@ void CheckPendingErrors(const
client::sp::shared_ptr<KuduSession>& session) {
}
}
-Status TableScanner::AddRow(const client::sp::shared_ptr<KuduTable>& table,
- const KuduSchema& table_schema,
- const KuduScanBatch::RowPtr& src_row,
- const client::sp::shared_ptr<KuduSession>&
session) {
- unique_ptr<KuduWriteOperation> write_op;
- if (FLAGS_write_type == "insert") {
- write_op.reset(table->NewInsert());
- } else if (FLAGS_write_type == "upsert") {
- write_op.reset(table->NewUpsert());
- } else {
- LOG(FATAL) << Substitute("invalid write_type: $0", FLAGS_write_type);
- }
-
- KuduPartialRow* dst_row = write_op->mutable_row();
- size_t row_size = ContiguousRowHelper::row_size(*src_row.schema_);
- memcpy(dst_row->row_data_, src_row.row_data_, row_size);
- BitmapChangeBits(dst_row->isset_bitmap_, 0, table_schema.num_columns(),
true);
-
- return session->Apply(write_op.release());
+TableScanner::TableScanner(
+ client::sp::shared_ptr<client::KuduClient> client,
+ std::string table_name,
+ boost::optional<client::sp::shared_ptr<client::KuduClient>> dst_client,
+ boost::optional<std::string> dst_table_name)
+ : total_count_(0),
+ client_(std::move(client)),
+ table_name_(std::move(table_name)),
+ dst_client_(std::move(dst_client)),
+ dst_table_name_(std::move(dst_table_name)),
+ out_(nullptr) {
+ CHECK_OK(SetReplicaSelection(FLAGS_replica_selection));
}
Status TableScanner::ScanData(const std::vector<kudu::client::KuduScanToken*>&
tokens,
@@ -546,7 +565,6 @@ void TableScanner::CopyTask(const vector<KuduScanToken*>&
tokens, Status* thread
});
}
-
void TableScanner::SetOutput(ostream* out) {
out_ = out;
}
@@ -555,6 +573,13 @@ void TableScanner::SetReadMode(KuduScanner::ReadMode mode)
{
mode_ = mode;
}
+Status TableScanner::SetReplicaSelection(const string& selection_str) {
+ KuduClient::ReplicaSelection selection;
+ RETURN_NOT_OK(ParseReplicaSelection(selection_str, &selection));
+ replica_selection_ = selection;
+ return Status::OK();
+}
+
Status TableScanner::StartWork(WorkType type) {
client::sp::shared_ptr<KuduTable> src_table;
RETURN_NOT_OK(client_->OpenTable(table_name_, &src_table));
@@ -573,6 +598,7 @@ Status TableScanner::StartWork(WorkType type) {
if (mode_) {
RETURN_NOT_OK(builder.SetReadMode(mode_.get()));
}
+ RETURN_NOT_OK(builder.SetSelection(replica_selection_));
RETURN_NOT_OK(builder.SetTimeoutMillis(FLAGS_timeout_ms));
// Set projection if needed.
@@ -640,8 +666,12 @@ Status TableScanner::StartWork(WorkType type) {
for (i = 0; i < FLAGS_num_threads; ++i) {
if (!thread_statuses[i].ok()) {
- if (out_) *out_ << "Scanning failed " << thread_statuses[i].ToString()
<< endl;
- if (end_status.ok()) end_status = thread_statuses[i];
+ if (out_) {
+ *out_ << "Scanning failed " << thread_statuses[i].ToString() << endl;
+ }
+ if (end_status.ok()) {
+ end_status = thread_statuses[i];
+ }
}
}
@@ -659,5 +689,40 @@ Status TableScanner::StartCopy() {
return StartWork(WorkType::kCopy);
}
+Status TableScanner::AddRow(const client::sp::shared_ptr<KuduTable>& table,
+ const KuduSchema& table_schema,
+ const KuduScanBatch::RowPtr& src_row,
+ const client::sp::shared_ptr<KuduSession>&
session) {
+ unique_ptr<KuduWriteOperation> write_op;
+ if (FLAGS_write_type == "insert") {
+ write_op.reset(table->NewInsert());
+ } else if (FLAGS_write_type == "upsert") {
+ write_op.reset(table->NewUpsert());
+ } else {
+ LOG(FATAL) << Substitute("invalid write_type: $0", FLAGS_write_type);
+ }
+
+ KuduPartialRow* dst_row = write_op->mutable_row();
+ size_t row_size = ContiguousRowHelper::row_size(*src_row.schema_);
+ memcpy(dst_row->row_data_, src_row.row_data_, row_size);
+ BitmapChangeBits(dst_row->isset_bitmap_, 0, table_schema.num_columns(),
true);
+
+ return session->Apply(write_op.release());
+}
+
+Status TableScanner::ParseReplicaSelection(
+ const string& selection_str,
+ KuduClient::ReplicaSelection* selection) {
+ DCHECK(selection);
+ if (iequals(kReplicaSelectionClosest, selection_str)) {
+ *selection = KuduClient::ReplicaSelection::CLOSEST_REPLICA;
+ } else if (iequals(kReplicaSelectionLeader, selection_str)) {
+ *selection = KuduClient::ReplicaSelection::LEADER_ONLY;
+ } else {
+ return Status::InvalidArgument("invalid replica selection", selection_str);
+ }
+ return Status::OK();
+}
+
} // namespace tools
} // namespace kudu
diff --git a/src/kudu/tools/table_scanner.h b/src/kudu/tools/table_scanner.h
index d4b0787..3330097 100644
--- a/src/kudu/tools/table_scanner.h
+++ b/src/kudu/tools/table_scanner.h
@@ -23,7 +23,6 @@
#include <iosfwd>
#include <memory>
#include <string>
-#include <utility>
#include <vector>
#include <boost/optional/optional.hpp>
@@ -36,35 +35,31 @@
#include "kudu/util/threadpool.h"
namespace kudu {
+
namespace client {
class KuduSchema;
-} // namespace client
-} // namespace kudu
+} // namespace client
-namespace kudu {
namespace tools {
+
// This class is not thread-safe.
class TableScanner {
public:
- TableScanner(client::sp::shared_ptr<kudu::client::KuduClient> client,
+ TableScanner(client::sp::shared_ptr<client::KuduClient> client,
std::string table_name,
-
boost::optional<client::sp::shared_ptr<kudu::client::KuduClient>> dst_client
- = boost::none,
- boost::optional<std::string> dst_table_name = boost::none):
- total_count_(0),
- client_(std::move(client)),
- table_name_(std::move(table_name)),
- dst_client_(std::move(dst_client)),
- dst_table_name_(std::move(dst_table_name)),
- out_(nullptr) {
- }
+ boost::optional<client::sp::shared_ptr<client::KuduClient>>
dst_client
+ = boost::none,
+ boost::optional<std::string> dst_table_name = boost::none);
// Set output stream of this tool, or disable output if not set.
// 'out' must remain valid for the lifetime of this class.
void SetOutput(std::ostream* out);
// Set read mode, see KuduScanner::SetReadMode().
- void SetReadMode(kudu::client::KuduScanner::ReadMode mode);
+ void SetReadMode(client::KuduScanner::ReadMode mode);
+
+ // Set replica selection for scan operations.
+ Status SetReplicaSelection(const std::string& selection);
Status StartScan();
Status StartCopy();
@@ -79,22 +74,32 @@ class TableScanner {
kCopy
};
- Status StartWork(WorkType type);
- Status ScanData(const std::vector<kudu::client::KuduScanToken*>& tokens,
- const std::function<void(const kudu::client::KuduScanBatch&
batch)>& cb);
- void ScanTask(const std::vector<kudu::client::KuduScanToken*>& tokens,
Status* thread_status);
- void CopyTask(const std::vector<kudu::client::KuduScanToken*>& tokens,
Status* thread_status);
+ static Status AddRow(
+ const client::sp::shared_ptr<client::KuduTable>& table,
+ const client::KuduSchema& table_schema,
+ const client::KuduScanBatch::RowPtr& src_row,
+ const client::sp::shared_ptr<client::KuduSession>& session);
+
+ // Convert replica selection from string into the
KuduClient::ReplicaSelection
+ // enumerator.
+ static Status ParseReplicaSelection(
+ const std::string& selection_str,
+ client::KuduClient::ReplicaSelection* selection);
- Status AddRow(const client::sp::shared_ptr<kudu::client::KuduTable>& table,
- const kudu::client::KuduSchema& table_schema,
- const kudu::client::KuduScanBatch::RowPtr& src_row,
- const client::sp::shared_ptr<kudu::client::KuduSession>&
session);
+ Status StartWork(WorkType type);
+ Status ScanData(const std::vector<client::KuduScanToken*>& tokens,
+ const std::function<void(const client::KuduScanBatch&
batch)>& cb);
+ void ScanTask(const std::vector<client::KuduScanToken*>& tokens,
+ Status* thread_status);
+ void CopyTask(const std::vector<client::KuduScanToken*>& tokens,
+ Status* thread_status);
std::atomic<uint64_t> total_count_;
- boost::optional<kudu::client::KuduScanner::ReadMode> mode_;
- client::sp::shared_ptr<kudu::client::KuduClient> client_;
+ boost::optional<client::KuduScanner::ReadMode> mode_;
+ client::sp::shared_ptr<client::KuduClient> client_;
std::string table_name_;
- boost::optional<client::sp::shared_ptr<kudu::client::KuduClient>>
dst_client_;
+ client::KuduClient::ReplicaSelection replica_selection_;
+ boost::optional<client::sp::shared_ptr<client::KuduClient>> dst_client_;
boost::optional<std::string> dst_table_name_;
std::unique_ptr<ThreadPool> thread_pool_;
@@ -102,5 +107,6 @@ class TableScanner {
Mutex output_lock_;
std::ostream* out_;
};
+
} // namespace tools
} // namespace kudu
diff --git a/src/kudu/tools/tool_action_perf.cc
b/src/kudu/tools/tool_action_perf.cc
index c333ab4..630f732 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -382,6 +382,7 @@ DEFINE_bool(txn_rollback, false,
DECLARE_bool(show_values);
DECLARE_int32(num_threads);
+DECLARE_string(replica_selection);
DECLARE_string(table_name);
namespace kudu {
@@ -771,6 +772,10 @@ Status CountTableRows(const shared_ptr<KuduClient>& client,
const string& table_name, uint64_t* count) {
TableScanner scanner(client, table_name);
scanner.SetReadMode(KuduScanner::ReadMode::READ_YOUR_WRITES);
+ const auto& replica_selection_str = FLAGS_replica_selection;
+ if (!replica_selection_str.empty()) {
+ RETURN_NOT_OK(scanner.SetReplicaSelection(replica_selection_str));
+ }
RETURN_NOT_OK(scanner.StartScan());
if (count != nullptr) {
*count = scanner.TotalScannedCount();
@@ -921,6 +926,10 @@ Status TableScan(const RunnerContext& context) {
FLAGS_show_values = false;
TableScanner scanner(client, table_name);
scanner.SetOutput(&cout);
+ const auto& replica_selection_str = FLAGS_replica_selection;
+ if (!replica_selection_str.empty()) {
+ RETURN_NOT_OK(scanner.SetReplicaSelection(replica_selection_str));
+ }
return scanner.StartScan();
}
@@ -1058,6 +1067,7 @@ unique_ptr<Mode> BuildPerfMode() {
.AddOptionalParameter("num_threads")
.AddOptionalParameter("predicates")
.AddOptionalParameter("tablets")
+ .AddOptionalParameter("replica_selection")
.Build();
// TODO(aserbin): move this to tool_local_replica.cc
diff --git a/src/kudu/tools/tool_action_table.cc
b/src/kudu/tools/tool_action_table.cc
index 59fbf83..90ae529 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -118,6 +118,7 @@ DEFINE_string(upper_bound_type, "EXCLUSIVE_BOUND",
DECLARE_bool(row_count_only);
DECLARE_bool(show_scanner_stats);
DECLARE_bool(show_values);
+DECLARE_string(replica_selection);
DECLARE_string(tables);
namespace kudu {
@@ -489,6 +490,10 @@ Status ScanTable(const RunnerContext &context) {
}
TableScanner scanner(client, table_name);
scanner.SetOutput(&cout);
+ const auto& replica_selection_str = FLAGS_replica_selection;
+ if (!replica_selection_str.empty()) {
+ RETURN_NOT_OK(scanner.SetReplicaSelection(replica_selection_str));
+ }
return scanner.StartScan();
}
@@ -1322,6 +1327,7 @@ unique_ptr<Mode> BuildTableMode() {
.AddOptionalParameter("num_threads")
.AddOptionalParameter("predicates")
.AddOptionalParameter("tablets")
+ .AddOptionalParameter("replica_selection")
.Build();
unique_ptr<Action> copy_table =