This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 53ddc88 [tool] update type of loadgen's --num_rows_per_thread flag
53ddc88 is described below
commit 53ddc88463c9785c1f7acc517c0a01ef70187cf6
Author: Alexey Serbin <[email protected]>
AuthorDate: Fri May 17 20:28:28 2019 -0700
[tool] update type of loadgen's --num_rows_per_thread flag
This patch updates the type of the --num_rows_per_thread command-line
flag, changing it from unsigned. With that, the value of 0 is not a
special value for this flag anymore. Instead, any negative value
for this flag now means 'as many rows per thread as possible'.
With this change, it's now possible to use the 'kudu perf loadgen'
CLI tool to create empty tables, which is useful in various test
scenarios.
In addition, this patch contains a small clean-up on using
the --keep_auto_table loadgen's flag in some test scenarios.
Change-Id: I2712ac7678c9cfd9359629f11df3a86dd727997d
Reviewed-on: http://gerrit.cloudera.org:8080/13373
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <[email protected]>
---
src/kudu/tools/kudu-tool-test.cc | 48 +++++++++--
src/kudu/tools/tool_action_common.cc | 155 +++++++++++++++++------------------
src/kudu/tools/tool_action_perf.cc | 44 ++++++----
3 files changed, 145 insertions(+), 102 deletions(-)
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index d282349..ed0bc68 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -608,7 +608,9 @@ class ToolTest : public KuduTest {
protected:
void RunLoadgen(int num_tservers = 1,
const vector<string>& tool_args = {},
- const string& table_name = "");
+ const string& table_name = "",
+ string* tool_stdout = nullptr,
+ string* tool_stderr = nullptr);
void StartExternalMiniCluster(ExternalMiniClusterOptions opts = {});
void StartMiniCluster(InternalMiniClusterOptions opts = {});
unique_ptr<ExternalMiniCluster> cluster_;
@@ -2025,7 +2027,9 @@ TEST_F(ToolTest, TestLocalReplicaOps) {
// and then run 'kudu perf loadgen ...' utility against it.
void ToolTest::RunLoadgen(int num_tservers,
const vector<string>& tool_args,
- const string& table_name) {
+ const string& table_name,
+ string* tool_stdout,
+ string* tool_stderr) {
ExternalMiniClusterOptions opts;
opts.num_tablet_servers = num_tservers;
NO_FATALS(StartExternalMiniCluster(std::move(opts)));
@@ -2075,7 +2079,7 @@ void ToolTest::RunLoadgen(int num_tservers,
args.push_back(Substitute("-table_name=$0", table_name));
}
copy(tool_args.begin(), tool_args.end(), back_inserter(args));
- ASSERT_OK(RunKuduTool(args));
+ ASSERT_OK(RunKuduTool(args, tool_stdout, tool_stderr));
}
// Run the loadgen benchmark with all optional parameters set to defaults.
@@ -2083,6 +2087,40 @@ TEST_F(ToolTest, TestLoadgenDefaultParameters) {
NO_FATALS(RunLoadgen());
}
+// Verify it's possible to run loadgen to create a table, no records inserted.
+// Also verify that --num_rows_per_thread=0 in case of existing table
+// results in no rows inserted.
+TEST_F(ToolTest, TestLoadgenZeroRowsPerThread) {
+ // Run the tool with zer rows per thread against an existing table.
+ // The existing table should get no rows inserted.
+ {
+ string out;
+ NO_FATALS(RunLoadgen(1, { "--num_rows_per_thread=0", "--run_scan" },
+ "an_empty_test_table", &out));
+ ASSERT_STR_MATCHES(out, "expected rows: 0");
+ ASSERT_STR_MATCHES(out, "actual rows : 0");
+ }
+
+ // Request to run with zero rows per thread and with various numbers
+ // of generator threads. The latter parameter in such a configuration is
+ // irrelevant, and the result table should be empty anyways.
+ for (auto num_threads : { 1, 2, 10, 100 }) {
+ SCOPED_TRACE(Substitute("num_threads=$0", num_threads));
+ const vector<string> args = {
+ "perf",
+ "loadgen",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ Substitute("--num_threads=$0", num_threads),
+ "--num_rows_per_thread=0",
+ "--run_scan",
+ };
+ string out;
+ ASSERT_OK(RunKuduTool(args, &out));
+ ASSERT_STR_MATCHES(out, "expected rows: 0");
+ ASSERT_STR_MATCHES(out, "actual rows : 0");
+ }
+}
+
// Run the loadgen benchmark in AUTO_FLUSH_BACKGROUND mode, sequential values.
TEST_F(ToolTest, TestLoadgenAutoFlushBackgroundSequential) {
NO_FATALS(RunLoadgen(3,
@@ -2277,14 +2315,14 @@ TEST_F(ToolTest, TestNonRandomWorkloadLoadgen) {
TEST_F(ToolTest, TestPerfTableScan) {
const string& kTableName = "perf.table_scan";
- NO_FATALS(RunLoadgen(1, { "--keep_auto_table=true", "--run_scan" },
kTableName));
+ NO_FATALS(RunLoadgen(1, { "--run_scan" }, kTableName));
NO_FATALS(RunScanTableCheck(kTableName, "", 1, 2000, {}, "perf table_scan"));
}
TEST_F(ToolTest, TestPerfTabletScan) {
// Create a table.
const string& kTableName = "perf.tablet_scan";
- NO_FATALS(RunLoadgen(1, { "--keep_auto_table=true" }, kTableName));
+ NO_FATALS(RunLoadgen(1, {}, kTableName));
// Get the list of tablets.
vector<string> tablet_ids;
diff --git a/src/kudu/tools/tool_action_common.cc
b/src/kudu/tools/tool_action_common.cc
index ae464d6..7fb62a6 100644
--- a/src/kudu/tools/tool_action_common.cc
+++ b/src/kudu/tools/tool_action_common.cc
@@ -119,61 +119,50 @@ DEFINE_string(tables, "", "Tables to include
(comma-separated list of table name
DEFINE_string(memtracker_output, "table",
"One of 'json', 'json_compact' or 'table'. Table output flattens
"
"the memtracker hierarchy.");
+
DEFINE_int32(num_threads, 2,
"Number of threads to run. Each thread runs its own "
"KuduSession.");
-
-namespace boost {
-template <typename Signature>
-class function;
-} // namespace boost
-
-namespace kudu {
-
-namespace master {
-class ListMastersRequestPB;
-class ListMastersResponsePB;
-class ListTabletServersRequestPB;
-class ListTabletServersResponsePB;
-class ReplaceTabletRequestPB;
-class ReplaceTabletResponsePB;
-} // namespace master
-
-namespace tools {
-
-using client::internal::AsyncLeaderMasterRpc;
-using client::KuduClient;
-using client::KuduClientBuilder;
-using consensus::ConsensusServiceProxy; // NOLINT
-using consensus::ReplicateMsg;
-using log::LogEntryPB;
-using log::LogEntryReader;
-using log::ReadableLogSegment;
-using master::ListMastersRequestPB;
-using master::ListMastersResponsePB;
-using master::ListTabletServersRequestPB;
-using master::ListTabletServersResponsePB;
-using master::MasterServiceProxy;
-using master::ReplaceTabletRequestPB;
-using master::ReplaceTabletResponsePB;
-using pb_util::SecureDebugString;
-using pb_util::SecureShortDebugString;
-using rpc::BackoffType;
-using rpc::Messenger;
-using rpc::MessengerBuilder;
-using rpc::RequestIdPB;
-using rpc::ResponseCallback;
-using rpc::RpcController;
-using server::GenericServiceProxy;
-using server::GetFlagsRequestPB;
-using server::GetFlagsResponsePB;
-using server::GetStatusRequestPB;
-using server::GetStatusResponsePB;
-using server::ServerClockRequestPB;
-using server::ServerClockResponsePB;
-using server::ServerStatusPB;
-using server::SetFlagRequestPB;
-using server::SetFlagResponsePB;
+static bool ValidateNumThreads(const char* flag_name, int32_t flag_value) {
+ if (flag_value <= 0) {
+ LOG(ERROR) << strings::Substitute("'$0' flag should have a positive value",
+ flag_name);
+ return false;
+ }
+ return true;
+}
+DEFINE_validator(num_threads, &ValidateNumThreads);
+
+using kudu::client::KuduClient;
+using kudu::client::KuduClientBuilder;
+using kudu::client::internal::AsyncLeaderMasterRpc;
+using kudu::consensus::ConsensusServiceProxy; // NOLINT
+using kudu::consensus::ReplicateMsg;
+using kudu::log::LogEntryPB;
+using kudu::log::LogEntryReader;
+using kudu::log::ReadableLogSegment;
+using kudu::master::MasterServiceProxy;
+using kudu::pb_util::SecureDebugString;
+using kudu::pb_util::SecureShortDebugString;
+using kudu::rpc::BackoffType;
+using kudu::rpc::Messenger;
+using kudu::rpc::MessengerBuilder;
+using kudu::rpc::RequestIdPB;
+using kudu::rpc::ResponseCallback;
+using kudu::rpc::RpcController;
+using kudu::server::GenericServiceProxy;
+using kudu::server::GetFlagsRequestPB;
+using kudu::server::GetFlagsResponsePB;
+using kudu::server::GetStatusRequestPB;
+using kudu::server::GetStatusResponsePB;
+using kudu::server::ServerClockRequestPB;
+using kudu::server::ServerClockResponsePB;
+using kudu::server::ServerStatusPB;
+using kudu::server::SetFlagRequestPB;
+using kudu::server::SetFlagResponsePB;
+using kudu::tserver::TabletServerAdminServiceProxy; // NOLINT
+using kudu::tserver::TabletServerServiceProxy; // NOLINT
+using kudu::tserver::WriteRequestPB;
using std::cout;
using std::endl;
using std::ostream;
@@ -185,9 +174,14 @@ using std::unique_ptr;
using std::vector;
using strings::Split;
using strings::Substitute;
-using tserver::TabletServerAdminServiceProxy; // NOLINT
-using tserver::TabletServerServiceProxy; // NOLINT
-using tserver::WriteRequestPB;
+
+namespace boost {
+template <typename Signature>
+class function;
+} // namespace boost
+
+namespace kudu {
+namespace tools {
const char* const kMasterAddressesArg = "master_addresses";
const char* const kMasterAddressesArgDesc = "Comma-separated list of Kudu "
@@ -738,32 +732,35 @@ Status LeaderMasterProxy::SyncRpc(const Req& req,
// Explicit specializations for callers outside this compilation unit.
template
-Status LeaderMasterProxy::SyncRpc(const ListTabletServersRequestPB& req,
- ListTabletServersResponsePB* resp,
- string func_name,
- const
boost::function<void(MasterServiceProxy*,
- const
ListTabletServersRequestPB&,
-
ListTabletServersResponsePB*,
- RpcController*,
- const
ResponseCallback&)>& func);
+Status LeaderMasterProxy::SyncRpc(
+ const master::ListTabletServersRequestPB& req,
+ master::ListTabletServersResponsePB* resp,
+ string func_name,
+ const boost::function<void(MasterServiceProxy*,
+ const master::ListTabletServersRequestPB&,
+ master::ListTabletServersResponsePB*,
+ RpcController*,
+ const ResponseCallback&)>& func);
template
-Status LeaderMasterProxy::SyncRpc(const ListMastersRequestPB& req,
- ListMastersResponsePB* resp,
- string func_name,
- const
boost::function<void(MasterServiceProxy*,
- const
ListMastersRequestPB&,
-
ListMastersResponsePB*,
- RpcController*,
- const
ResponseCallback&)>& func);
+Status LeaderMasterProxy::SyncRpc(
+ const master::ListMastersRequestPB& req,
+ master::ListMastersResponsePB* resp,
+ string func_name,
+ const boost::function<void(MasterServiceProxy*,
+ const master::ListMastersRequestPB&,
+ master::ListMastersResponsePB*,
+ RpcController*,
+ const ResponseCallback&)>& func);
template
-Status LeaderMasterProxy::SyncRpc(const ReplaceTabletRequestPB& req,
- ReplaceTabletResponsePB* resp,
- string func_name,
- const
boost::function<void(MasterServiceProxy*,
- const
ReplaceTabletRequestPB&,
-
ReplaceTabletResponsePB*,
- RpcController*,
- const
ResponseCallback&)>& func);
+Status LeaderMasterProxy::SyncRpc(
+ const master::ReplaceTabletRequestPB& req,
+ master::ReplaceTabletResponsePB* resp,
+ string func_name,
+ const boost::function<void(MasterServiceProxy*,
+ const master::ReplaceTabletRequestPB&,
+ master::ReplaceTabletResponsePB*,
+ RpcController*,
+ const ResponseCallback&)>& func);
const int ControlShellProtocol::kMaxMessageBytes = 1024 * 1024;
diff --git a/src/kudu/tools/tool_action_perf.cc
b/src/kudu/tools/tool_action_perf.cc
index f980d86..5fe4730 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -290,10 +290,10 @@ DEFINE_bool(keep_auto_table, false,
"nor its data is ever dropped/deleted.");
DEFINE_int32(num_iters, 1,
"Number of times to run the scan.");
-DEFINE_uint64(num_rows_per_thread, 1000,
- "Number of rows each thread generates and inserts; "
- "0 means unlimited. All rows generated by a thread are inserted "
- "in the context of the same session.");
+DEFINE_int64(num_rows_per_thread, 1000,
+ "Number of rows each thread generates and inserts; "
+ "-1 means unlimited. All rows generated by a thread are inserted "
+ "in the context of the same session.");
DECLARE_int32(num_threads);
DEFINE_bool(ordered_scan, false,
"Whether to run an ordered or unordered scan.");
@@ -433,8 +433,13 @@ string Generator::Next() {
// should insert across if inserting in non-random mode. In random mode, this
// is used to generate different RNG seeds per thread.
int64_t SpanPerThread(int num_columns) {
- return (FLAGS_num_rows_per_thread == 0) ?
- numeric_limits<int64_t>::max() / FLAGS_num_threads
+ CHECK_LT(0, num_columns);
+ CHECK_LT(0, FLAGS_num_threads);
+ const auto per_thread_limit = numeric_limits<int64_t>::max() /
+ (num_columns * FLAGS_num_threads);
+ return (FLAGS_num_rows_per_thread < 0 ||
+ FLAGS_num_rows_per_thread > per_thread_limit)
+ ? numeric_limits<int64_t>::max() / FLAGS_num_threads
: FLAGS_num_rows_per_thread * num_columns;
}
@@ -512,9 +517,13 @@ void GeneratorThread(
const size_t flush_per_n_rows = FLAGS_flush_per_n_rows;
const uint64_t gen_seq_start = FLAGS_seq_start;
client::sp::shared_ptr<KuduSession> session(client->NewSession());
- uint64_t idx = 0;
+ int64_t idx = 0;
auto generator = [&]() -> Status {
+ const int64_t num_rows_per_gen = FLAGS_num_rows_per_thread;
+ if (num_rows_per_gen == 0) {
+ return Status::OK();
+ }
RETURN_NOT_OK(session->SetMutationBufferFlushWatermark(
FLAGS_buffer_flush_watermark_pct));
RETURN_NOT_OK(session->SetMutationBufferSpace(
@@ -524,21 +533,19 @@ void GeneratorThread(
RETURN_NOT_OK(session->SetFlushMode(
flush_per_n_rows == 0 ? KuduSession::AUTO_FLUSH_BACKGROUND
: KuduSession::MANUAL_FLUSH));
- const size_t num_rows_per_gen = FLAGS_num_rows_per_thread;
client::sp::shared_ptr<KuduTable> table;
RETURN_NOT_OK(client->OpenTable(table_name, &table));
- const size_t num_columns = table->schema().num_columns();
// Planning for non-intersecting ranges for different generator threads
// in sequential generation mode.
- const int64_t gen_span = SpanPerThread(num_columns);
+ const int64_t gen_span = SpanPerThread(table->schema().num_columns());
const int64_t gen_seed = gen_idx * gen_span + gen_seq_start;
Generator gen(gen_mode, gen_seed, FLAGS_string_len);
- for (; num_rows_per_gen == 0 || idx < num_rows_per_gen; ++idx) {
+ for (; num_rows_per_gen < 0 || idx < num_rows_per_gen; ++idx) {
unique_ptr<KuduInsert> insert_op(table->NewInsert());
RETURN_NOT_OK(GenerateRowData(&gen, insert_op->mutable_row(),
- FLAGS_string_fixed));
+ FLAGS_string_fixed));
RETURN_NOT_OK(session->Apply(insert_op.release()));
if (flush_per_n_rows != 0 && idx != 0 && idx % flush_per_n_rows == 0) {
session->FlushAsync(nullptr);
@@ -642,8 +649,7 @@ Status TestLoadGenerator(const RunnerContext& context) {
RETURN_NOT_OK(b.Build(&schema));
unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
- table_creator->table_name(table_name)
- .schema(&schema);
+ table_creator->table_name(table_name).schema(&schema);
if (FLAGS_table_num_replicas > 0) {
table_creator->num_replicas(FLAGS_table_num_replicas);
}
@@ -651,8 +657,10 @@ Status TestLoadGenerator(const RunnerContext& context) {
// Split the generated span for a sequential workload evenly across all
// tablets. In case we're inserting in random mode, use unbounded range
// partitioning, so the table has key coverage of the entire keyspace.
- const int64_t total_inserted_span = SpanPerThread(schema.num_columns())
* FLAGS_num_threads;
- const int64_t span_per_range = total_inserted_span /
FLAGS_table_num_range_partitions;
+ const int64_t total_inserted_span =
+ SpanPerThread(schema.num_columns()) * FLAGS_num_threads;
+ const int64_t span_per_range =
+ total_inserted_span / FLAGS_table_num_range_partitions;
table_creator->set_range_partition_columns({ kKeyColumnName });
for (int i = 1; i < FLAGS_table_num_range_partitions; i++) {
unique_ptr<KuduPartialRow> split(schema.NewRow());
@@ -662,8 +670,8 @@ Status TestLoadGenerator(const RunnerContext& context) {
}
}
if (FLAGS_table_num_hash_partitions > 1) {
- table_creator->add_hash_partitions(
- vector<string>({ kKeyColumnName }), FLAGS_table_num_hash_partitions);
+ table_creator->add_hash_partitions({ kKeyColumnName },
+ FLAGS_table_num_hash_partitions);
}
RETURN_NOT_OK(table_creator->Create());
}