This is an automated email from the ASF dual-hosted git repository.
adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new ef0d3bd [tools] Add 'run_cleanup' option for 'kudu perf loadgen'
ef0d3bd is described below
commit ef0d3bdd7ff823a7c84f072b3e4476a864d1c17f
Author: Yingchun Lai <[email protected]>
AuthorDate: Wed Jan 1 10:32:25 2020 +0800
[tools] Add 'run_cleanup' option for 'kudu perf loadgen'
Add 'run_cleanup' option to provide a way to cleanup test data written to
the table, especially an existing user table.
Change-Id: I1e75adde434bac5e88151361655526b91f327b4c
Reviewed-on: http://gerrit.cloudera.org:8080/14958
Tested-by: Adar Dembo <[email protected]>
Reviewed-by: Adar Dembo <[email protected]>
---
src/kudu/tools/kudu-tool-test.cc | 64 ++++++++++++--
src/kudu/tools/tool_action_perf.cc | 168 +++++++++++++++++++++++--------------
2 files changed, 160 insertions(+), 72 deletions(-)
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index e478413..88684f8 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -22,8 +22,8 @@
#include <cstdint>
#include <cstdio>
#include <fstream>
-#include <iterator>
#include <initializer_list>
+#include <iterator>
#include <map>
#include <memory>
#include <set>
@@ -430,7 +430,7 @@ class ToolTest : public KuduTest {
vector<pair<string, string>> expected_columns;
if (columns.empty()) {
// If we ran with an empty projection, we'll actually get all the
columns.
- expected_columns = {{ "int32", "key" },
+ expected_columns = {{ "int.*", "key" },
{ "int32", "int_val" },
{ "string", "string_val" }};
} else {
@@ -2258,6 +2258,55 @@ TEST_F(ToolTest, TestLoadgenDatabaseName) {
ASSERT_STR_CONTAINS(out, "foo.loadgen_auto_");
}
+TEST_F(ToolTest, TestLoadgenKeepAutoTableAndData) {
+ // Run 'perf loadgen' and keep data.
+ // Create a single tablet by setting table_num_hash_partitions and
table_num_range_partitions
+ // to 1, then we can easily check sequential rows in the tablet by
RunScanTableCheck.
+ NO_FATALS(RunLoadgen(1, { "--keep_auto_table=true",
+ "--table_num_hash_partitions=1",
+ "--table_num_range_partitions=1" }));
+ string auto_table_name;
+ NO_FATALS(RunActionStdoutString(Substitute("table list $0",
+ HostPort::ToCommaSeparatedString(cluster_->master_rpc_addrs())),
&auto_table_name));
+
+ // Data is kept.
+ NO_FATALS(RunScanTableCheck(auto_table_name, "", 0, 1999, {}));
+
+ // Run 'perf loadgen' again with sequential mode and delete new generated
data.
+ {
+ const vector<string> args = {
+ "perf",
+ "loadgen",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ Substitute("--table_name=$0", auto_table_name),
+ "--seq_start=2000",
+ "--use_random=false",
+ "--run_cleanup=true",
+ };
+ ASSERT_OK(RunKuduTool(args));
+ }
+
+ // Old data is kept and new data is deleted.
+ NO_FATALS(RunScanTableCheck(auto_table_name, "", 0, 1999, {}));
+
+ // Run 'perf loadgen' again with random mode and delete new generated data.
+ {
+ const vector<string> args = {
+ "perf",
+ "loadgen",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ Substitute("--table_name=$0", auto_table_name),
+ "--seq_start=2000",
+ "--use_random=true",
+ "--run_cleanup=true",
+ };
+ ASSERT_OK(RunKuduTool(args));
+ }
+
+ // Old data is kept and new data is deleted.
+ NO_FATALS(RunScanTableCheck(auto_table_name, "", 0, 1999, {}));
+}
+
TEST_F(ToolTest, TestLoadgenHmsEnabled) {
ExternalMiniClusterOptions opts;
opts.hms_mode = HmsMode::ENABLE_HIVE_METASTORE;
@@ -2299,21 +2348,20 @@ TEST_F(ToolTest, TestLoadgenAutoGenTablePartitioning) {
const MonoDelta kTimeout = MonoDelta::FromMilliseconds(10);
TServerDetails* ts = ts_map_[cluster_->tablet_server(0)->uuid()];
- // Test with misconfigured partitioning. This should fail because we disallow
- // creating tables with "no" partitioning.
+ // Now let's try running with a single partition.
vector<string> args(base_args);
args.emplace_back("--table_num_range_partitions=1");
args.emplace_back("--table_num_hash_partitions=1");
- Status s = RunKuduTool(args);
- ASSERT_FALSE(s.ok());
+ int expected_tablets = 1;
+ ASSERT_OK(RunKuduTool(args));
+ ASSERT_OK(WaitForNumTabletsOnTS(ts, expected_tablets, kTimeout));
// Now let's try running with a couple range partitions.
args = base_args;
args.emplace_back("--table_num_range_partitions=2");
args.emplace_back("--table_num_hash_partitions=1");
- int expected_tablets = 2;
+ expected_tablets += 2;
ASSERT_OK(RunKuduTool(args));
- vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
ASSERT_OK(WaitForNumTabletsOnTS(ts, expected_tablets, kTimeout));
// Now let's try running with only hash partitions.
diff --git a/src/kudu/tools/tool_action_perf.cc
b/src/kudu/tools/tool_action_perf.cc
index 845f79f..c31f19d 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -234,6 +234,7 @@ using kudu::client::KuduClient;
using kudu::client::KuduColumnSchema;
using kudu::client::KuduError;
using kudu::client::KuduInsert;
+using kudu::client::KuduDelete;
using kudu::client::KuduScanner;
using kudu::client::KuduSchema;
using kudu::client::KuduSchemaBuilder;
@@ -250,6 +251,7 @@ using kudu::log::LogAnchorRegistry;
using kudu::tablet::RowIteratorOptions;
using kudu::tablet::Tablet;
using kudu::tablet::TabletMetadata;
+using kudu::client::KuduWriteOperation;
using std::accumulate;
using std::cerr;
using std::cout;
@@ -302,6 +304,9 @@ DEFINE_bool(run_scan, false,
"the inserted rows matches the expected number. If enabled, "
"the scan is run only if no errors were encountered "
"while inserting the generated rows.");
+DEFINE_bool(run_cleanup, false,
+ "Whether to run post-insertion deletion to reset the existing "
+ "table as before.");
DEFINE_uint64(seq_start, 0,
"Initial value for the generator in sequential mode. "
"This is useful when running multiple times against already "
@@ -358,7 +363,7 @@ namespace {
bool ValidatePartitionFlags() {
int num_tablets = FLAGS_table_num_hash_partitions *
FLAGS_table_num_range_partitions;
- if (num_tablets <= 1) {
+ if (num_tablets < 1) {
LOG(ERROR) << Substitute("Invalid partitioning:
--table_num_hash_partitions=$0 "
"--table_num_range_partitions=$1, must specify more than one
partition "
"for auto-generated tables", FLAGS_table_num_hash_partitions,
@@ -432,21 +437,31 @@ string Generator::Next() {
// Utility function that determines the range of generated values each thread
// should insert across if inserting in non-random mode. In random mode, this
// is used to generate different RNG seeds per thread.
-int64_t SpanPerThread(int num_columns) {
- CHECK_LT(0, num_columns);
+int64_t SpanPerThread(int num_key_columns) {
+ CHECK_LT(0, num_key_columns);
CHECK_LT(0, FLAGS_num_threads);
const auto per_thread_limit = numeric_limits<int64_t>::max() /
- (num_columns * FLAGS_num_threads);
+ (num_key_columns * FLAGS_num_threads);
return (FLAGS_num_rows_per_thread < 0 ||
FLAGS_num_rows_per_thread > per_thread_limit)
? numeric_limits<int64_t>::max() / FLAGS_num_threads
- : FLAGS_num_rows_per_thread * num_columns;
+ : FLAGS_num_rows_per_thread * num_key_columns;
}
-Status GenerateRowData(Generator* gen, KuduPartialRow* row,
- const string& fixed_string) {
+Status GenerateRowData(Generator* key_gen, Generator* value_gen,
KuduPartialRow* row,
+ const string& fixed_string, KuduWriteOperation::Type
op_type) {
const vector<ColumnSchema>& columns(row->schema()->columns());
- for (size_t idx = 0; idx < columns.size(); ++idx) {
+ DCHECK(op_type == KuduWriteOperation::Type::INSERT ||
+ op_type == KuduWriteOperation::Type::DELETE);
+ size_t gen_column_count = op_type == KuduWriteOperation::Type::INSERT ?
+ columns.size() : row->schema()->num_key_columns();
+ // Seperate key Generator and value Generator, so we can generate the same
primary keys
+ // when perform DELETE operations.
+ Generator* gen = key_gen;
+ for (size_t idx = 0; idx < gen_column_count; ++idx) {
+ if (idx == row->schema()->num_key_columns()) {
+ gen = value_gen;
+ }
const TypeInfo* tinfo = columns[idx].type_info();
switch (tinfo->type()) {
case BOOL:
@@ -478,7 +493,7 @@ Status GenerateRowData(Generator* gen, KuduPartialRow* row,
break;
case DECIMAL32:
RETURN_NOT_OK(row->SetUnscaledDecimal(idx,
std::min(gen->Next<int32_t>(),
-
kMaxUnscaledDecimal32)));
+
kMaxUnscaledDecimal32)));
break;
case DECIMAL64:
RETURN_NOT_OK(row->SetUnscaledDecimal(idx,
std::min(gen->Next<int64_t>(),
@@ -520,7 +535,8 @@ mutex cerr_lock;
void GeneratorThread(
const client::sp::shared_ptr<KuduClient>& client, const string& table_name,
- size_t gen_idx, Status* status, uint64_t* row_count, uint64_t* err_count) {
+ size_t gen_idx, KuduWriteOperation::Type op_type,
+ Status* status, uint64_t* row_count, uint64_t* err_count) {
const Generator::Mode gen_mode = FLAGS_use_random ? Generator::MODE_RAND
: Generator::MODE_SEQ;
@@ -549,14 +565,29 @@ void GeneratorThread(
// Planning for non-intersecting ranges for different generator threads
// in sequential generation mode.
- const int64_t gen_span = SpanPerThread(table->schema().num_columns());
+ const int64_t gen_span =
SpanPerThread(KuduSchema::ToSchema(table->schema()).num_key_columns());
const int64_t gen_seed = gen_idx * gen_span + gen_seq_start;
- Generator gen(gen_mode, gen_seed, FLAGS_string_len);
+ Generator key_gen(gen_mode, gen_seed, FLAGS_string_len);
+ Generator value_gen(gen_mode, gen_seed, FLAGS_string_len);
for (; num_rows_per_gen < 0 || idx < num_rows_per_gen; ++idx) {
- unique_ptr<KuduInsert> insert_op(table->NewInsert());
- RETURN_NOT_OK(GenerateRowData(&gen, insert_op->mutable_row(),
- FLAGS_string_fixed));
- RETURN_NOT_OK(session->Apply(insert_op.release()));
+ switch (op_type) {
+ case KuduWriteOperation::Type::INSERT: {
+ unique_ptr<KuduInsert> insert_op(table->NewInsert());
+ RETURN_NOT_OK(GenerateRowData(&key_gen, &value_gen,
insert_op->mutable_row(),
+ FLAGS_string_fixed, op_type));
+ RETURN_NOT_OK(session->Apply(insert_op.release()));
+ break;
+ }
+ case KuduWriteOperation::Type::DELETE: {
+ unique_ptr<KuduDelete> delete_op(table->NewDelete());
+ RETURN_NOT_OK(GenerateRowData(&key_gen, nullptr,
delete_op->mutable_row(),
+ FLAGS_string_fixed, op_type));
+ RETURN_NOT_OK(session->Apply(delete_op.release()));
+ break;
+ }
+ default:
+ LOG(FATAL) << "Unknown op_type=" << op_type;
+ }
if (flush_per_n_rows != 0 && idx != 0 && idx % flush_per_n_rows == 0) {
session->FlushAsync(nullptr);
}
@@ -588,35 +619,67 @@ void GeneratorThread(
}
}
-Status GenerateInsertRows(const client::sp::shared_ptr<KuduClient>& client,
- const string& table_name,
- uint64_t* total_row_count,
- uint64_t* total_err_count) {
+Status GenerateWriteRows(const client::sp::shared_ptr<KuduClient>& client,
+ const string& table_name,
+ KuduWriteOperation::Type op_type,
+ uint64_t* num_rows_generated = nullptr) {
+ DCHECK(op_type == KuduWriteOperation::Type::INSERT ||
+ op_type == KuduWriteOperation::Type::DELETE);
const size_t gen_num = FLAGS_num_threads;
- vector<Status> status(gen_num);
+ vector<Status> statuses(gen_num);
vector<uint64_t> row_count(gen_num, 0);
vector<uint64_t> err_count(gen_num, 0);
vector<thread> threads;
+ Stopwatch sw(Stopwatch::ALL_THREADS);
+ sw.start();
for (size_t i = 0; i < gen_num; ++i) {
- threads.emplace_back(&GeneratorThread, client, table_name, i,
- &status[i], &row_count[i], &err_count[i]);
+ threads.emplace_back(&GeneratorThread, client, table_name, i, op_type,
+ &statuses[i], &row_count[i], &err_count[i]);
}
for (auto& t : threads) {
t.join();
}
- if (total_row_count != nullptr) {
- *total_row_count = accumulate(row_count.begin(), row_count.end(), 0UL);
- }
- if (total_err_count != nullptr) {
- *total_err_count = accumulate(err_count.begin(), err_count.end(), 0UL);
+ sw.stop();
+ const double time_total_ms = sw.elapsed().wall_millis();
+ uint64_t total_row_count = accumulate(row_count.begin(), row_count.end(),
0UL);
+ uint64_t total_err_count = accumulate(err_count.begin(), err_count.end(),
0UL);
+ cout << endl
+ << (op_type == KuduWriteOperation::Type::INSERT ? "INSERT" : "DELETE")
<< " report" << endl
+ << " rows total: " << total_row_count << endl
+ << " time total: " << time_total_ms << " ms" << endl;
+ if (total_row_count != 0 && total_err_count == 0) {
+ // Report per-row timings only if there were no write errors, otherwise the
+ // readings do not make much sense.
+ cout << " time per row: " << time_total_ms / total_row_count << " ms" <<
endl;
}
- // Return first non-OK error status, if any, as a result.
- const auto it = find_if(status.begin(), status.end(),
+
+ // Make first non-OK error status, if any, as a result.
+ Status status;
+ const auto it = find_if(statuses.begin(), statuses.end(),
[&](const Status& s) { return !s.ok(); });
- if (it != status.end()) {
- return *it;
+ if (it != statuses.end()) {
+ status = *it;
+ }
+ if (!status.ok() || total_err_count != 0) {
+ string err_str;
+ if (!status.ok()) {
+ SubstituteAndAppend(&err_str, status.ToString());
+ }
+ if (total_err_count != 0) {
+ if (!status.ok()) {
+ SubstituteAndAppend(&err_str, "; ");
+ }
+ SubstituteAndAppend(&err_str, "Encountered $0 write operation errors",
+ total_err_count);
+ }
+ return Status::RuntimeError(err_str);
}
+
+ if (num_rows_generated) {
+ *num_rows_generated = total_row_count;
+ }
+
return Status::OK();
}
@@ -663,12 +726,12 @@ Status TestLoadGenerator(const RunnerContext& context) {
if (FLAGS_table_num_replicas > 0) {
table_creator->num_replicas(FLAGS_table_num_replicas);
}
- if (FLAGS_table_num_range_partitions > 1) {
+ if (FLAGS_table_num_range_partitions >= 1) {
// Split the generated span for a sequential workload evenly across all
// tablets. In case we're inserting in random mode, use unbounded range
// partitioning, so the table has key coverage of the entire keyspace.
const int64_t total_inserted_span =
- SpanPerThread(schema.num_columns()) * FLAGS_num_threads;
+ SpanPerThread(KuduSchema::ToSchema(schema).num_key_columns()) *
FLAGS_num_threads;
const int64_t span_per_range =
total_inserted_span / FLAGS_table_num_range_partitions;
table_creator->set_range_partition_columns({ kKeyColumnName });
@@ -689,36 +752,8 @@ Status TestLoadGenerator(const RunnerContext& context) {
<< "table '" << table_name << "'" << endl;
uint64_t num_rows_generated = 0;
- uint64_t num_rows_write_error = 0;
- Stopwatch sw(Stopwatch::ALL_THREADS);
- sw.start();
- Status status = GenerateInsertRows(client, table_name,
- &num_rows_generated,
- &num_rows_write_error);
- sw.stop();
- const double time_total_ms = sw.elapsed().wall_millis();
- cout << endl << "Generator report" << endl
- << " time total : " << time_total_ms << " ms" << endl;
- if (num_rows_generated != 0 && num_rows_write_error == 0) {
- // Report per-row timings only if there were no write errors, otherwise the
- // readings do not make much sense.
- cout << " time per row: " << time_total_ms / num_rows_generated << " ms"
- << endl;
- }
- if (!status.ok() || num_rows_write_error != 0) {
- string err_str;
- if (!status.ok()) {
- SubstituteAndAppend(&err_str, status.ToString());
- }
- if (num_rows_write_error != 0) {
- if (!status.ok()) {
- SubstituteAndAppend(&err_str, "; ");
- }
- SubstituteAndAppend(&err_str, "Encountered $0 write operation errors",
- num_rows_write_error);
- }
- return Status::RuntimeError(err_str);
- }
+ RETURN_NOT_OK(GenerateWriteRows(client, table_name,
KuduWriteOperation::Type::INSERT,
+ &num_rows_generated));
if (FLAGS_run_scan) {
// In case if no write errors encountered, run a table scan to make sure
@@ -735,6 +770,10 @@ Status TestLoadGenerator(const RunnerContext& context) {
}
}
+ if (FLAGS_run_cleanup) {
+ RETURN_NOT_OK(GenerateWriteRows(client, table_name,
KuduWriteOperation::Type::DELETE));
+ }
+
if (is_auto_table && !FLAGS_keep_auto_table) {
cout << "Dropping auto-created table '" << table_name << "'" << endl;
// Drop the table which was automatically created to run the test.
@@ -842,6 +881,7 @@ unique_ptr<Mode> BuildPerfMode() {
.AddOptionalParameter("keep_auto_table")
.AddOptionalParameter("num_rows_per_thread")
.AddOptionalParameter("num_threads")
+ .AddOptionalParameter("run_cleanup")
.AddOptionalParameter("run_scan")
.AddOptionalParameter("seq_start")
.AddOptionalParameter("show_first_n_errors")