This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit eaf558527a9f996e68515bf190dc7cf1ab9931a3 Author: Alexey Serbin <[email protected]> AuthorDate: Sat Sep 12 20:27:41 2020 -0700 [tools] add --use-upsert option for 'perf loadgen' With the newly introduced --use_upsert option, it's possible to run the loadgen tool against the same table many times, simulating update workloads. This patch also adds corresponding test scenario into the kudu-tool-test suite. In addition, I did a minor clean-up of the code in tool_action_perf.cc. Change-Id: I52aa9513d82aa4420bcf57dc7f535a3fc32df792 Reviewed-on: http://gerrit.cloudera.org:8080/16445 Reviewed-by: Bankim Bhavsar <[email protected]> Tested-by: Alexey Serbin <[email protected]> --- src/kudu/tools/kudu-tool-test.cc | 21 ++++++++++ src/kudu/tools/tool_action_perf.cc | 86 +++++++++++++++++++++++--------------- 2 files changed, 74 insertions(+), 33 deletions(-) diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc index b0802eb..6edc8ff 100644 --- a/src/kudu/tools/kudu-tool-test.cc +++ b/src/kudu/tools/kudu-tool-test.cc @@ -2297,6 +2297,27 @@ TEST_F(ToolTest, TestLoadgenAutoFlushBackgroundRandomValuesIgnoreDeprecated) { "bench_auto_flush_background_random_values_ignore_deprecated")); } +// Run loadgen benchmark in AUTO_FLUSH_BACKGROUND mode, writing the generated +// data using UPSERT instead of INSERT. +TEST_F(ToolTest, TestLoadgenAutoFlushBackgroundUseUpsert) { + NO_FATALS(RunLoadgen( + 1 /* num_tservers */, + { + "--num_rows_per_thread=4096", + "--num_threads=8", + "--run_scan", + "--string_len=8", + // Use UPSERT (default is to use INSERT) operations for writing rows. + "--use_upsert", + // Use random values: even if there are many threads writing many + // rows, no errors are expected because of using UPSERT instead of + // INSERT. + "--use_random_pk", + "--use_random_non_pk", + }, + "bench_auto_flush_background_use_upsert")); +} + // Run the loadgen benchmark in MANUAL_FLUSH mode. TEST_F(ToolTest, TestLoadgenManualFlush) { NO_FATALS(RunLoadgen(3, diff --git a/src/kudu/tools/tool_action_perf.cc b/src/kudu/tools/tool_action_perf.cc index 9aa19af..d5b3d53 100644 --- a/src/kudu/tools/tool_action_perf.cc +++ b/src/kudu/tools/tool_action_perf.cc @@ -172,7 +172,6 @@ #include <limits> #include <memory> #include <mutex> -#include <numeric> #include <string> #include <thread> #include <unordered_map> @@ -232,14 +231,13 @@ using kudu::TypeInfo; using kudu::client::KuduClient; using kudu::client::KuduColumnSchema; using kudu::client::KuduError; -using kudu::client::KuduInsert; -using kudu::client::KuduDelete; using kudu::client::KuduScanner; using kudu::client::KuduSchema; using kudu::client::KuduSchemaBuilder; using kudu::client::KuduSession; using kudu::client::KuduTable; using kudu::client::KuduTableCreator; +using kudu::client::KuduWriteOperation; using kudu::clock::LogicalClock; using kudu::consensus::ConsensusBootstrapInfo; using kudu::consensus::ConsensusMetadata; @@ -249,8 +247,6 @@ using kudu::log::LogAnchorRegistry; using kudu::tablet::RowIteratorOptions; using kudu::tablet::Tablet; using kudu::tablet::TabletMetadata; -using kudu::client::KuduWriteOperation; -using std::accumulate; using std::cerr; using std::cout; using std::endl; @@ -367,6 +363,9 @@ DEFINE_bool(use_random_pk, false, DEFINE_bool(use_random_non_pk, false, "Whether to use random numbers instead of sequential ones for non-primary key " "columns."); +DEFINE_bool(use_upsert, false, + "Whether to use UPSERT instead of INSERT to store the generated " + "data into the table"); namespace kudu { namespace tools { @@ -388,6 +387,19 @@ bool ValidatePartitionFlags() { } GROUP_FLAG_VALIDATOR(partition_flags, &ValidatePartitionFlags); +const char* OpTypeToString(KuduWriteOperation::Type op_type) { + switch (op_type) { + case KuduWriteOperation::INSERT: + return "INSERT"; + case KuduWriteOperation::DELETE: + return "DELETE"; + case KuduWriteOperation::UPSERT: + return "UPSERT"; + default: + LOG(FATAL) << Substitute("unsupported op_type $0", op_type); + } +} + class Generator { public: enum Mode { @@ -465,10 +477,12 @@ int64_t SpanPerThread(int num_key_columns) { Status GenerateRowData(Generator* key_gen, Generator* value_gen, KuduPartialRow* row, const string& fixed_string, KuduWriteOperation::Type op_type) { const vector<ColumnSchema>& columns(row->schema()->columns()); - DCHECK(op_type == KuduWriteOperation::Type::INSERT || - op_type == KuduWriteOperation::Type::DELETE); - size_t gen_column_count = op_type == KuduWriteOperation::Type::INSERT ? - columns.size() : row->schema()->num_key_columns(); + DCHECK(op_type == KuduWriteOperation::INSERT || + op_type == KuduWriteOperation::DELETE || + op_type == KuduWriteOperation::UPSERT); + const size_t gen_column_count = op_type == KuduWriteOperation::DELETE + ? row->schema()->num_key_columns() + : columns.size(); // Seperate key Generator and value Generator, so we can generate the same primary keys // when perform DELETE operations. Generator* gen = key_gen; @@ -598,36 +612,38 @@ WriteResults GeneratorThread(const client::sp::shared_ptr<KuduClient>& client, // Planning for non-intersecting ranges for different generator threads // in sequential generation mode. - const int64_t gen_span = SpanPerThread(KuduSchema::ToSchema(table->schema()).num_key_columns()); + const int64_t gen_span = SpanPerThread(KuduSchema::ToSchema( + table->schema()).num_key_columns()); const int64_t gen_seed = gen_idx * gen_span + gen_seq_start; Generator key_gen(key_gen_mode, gen_seed, FLAGS_string_len); Generator value_gen(value_gen_mode, gen_seed, FLAGS_string_len); + unique_ptr<KuduWriteOperation> op; for (; num_rows_per_gen < 0 || idx < num_rows_per_gen; ++idx) { switch (op_type) { - case KuduWriteOperation::Type::INSERT: { - unique_ptr<KuduInsert> insert_op(table->NewInsert()); - RETURN_NOT_OK(GenerateRowData(&key_gen, &value_gen, insert_op->mutable_row(), - FLAGS_string_fixed, op_type)); - RETURN_NOT_OK(session->Apply(insert_op.release())); + case KuduWriteOperation::INSERT: + op.reset(table->NewInsert()); break; - } - case KuduWriteOperation::Type::DELETE: { - unique_ptr<KuduDelete> delete_op(table->NewDelete()); - RETURN_NOT_OK(GenerateRowData(&key_gen, nullptr, delete_op->mutable_row(), - FLAGS_string_fixed, op_type)); - RETURN_NOT_OK(session->Apply(delete_op.release())); + case KuduWriteOperation::DELETE: + op.reset(table->NewDelete()); + break; + case KuduWriteOperation::UPSERT: + op.reset(table->NewUpsert()); break; - } default: - LOG(FATAL) << "Unknown op_type=" << op_type; + LOG(FATAL) << Substitute("unknown op_type $0", op_type); } + RETURN_NOT_OK(GenerateRowData( + &key_gen, + op_type == KuduWriteOperation::DELETE ? nullptr : &value_gen, + op->mutable_row(), + FLAGS_string_fixed, + op_type)); + RETURN_NOT_OK(session->Apply(op.release())); if (flush_per_n_rows != 0 && idx != 0 && idx % flush_per_n_rows == 0) { session->FlushAsync(nullptr); } } - RETURN_NOT_OK(session->Flush()); - - return Status::OK(); + return session->Flush(); }; WriteResults results; @@ -658,8 +674,9 @@ WriteResults GeneratorThread(const client::sp::shared_ptr<KuduClient>& client, WriteResults GenerateWriteRows(const ClientFactory& client_factory, const string& table_name, KuduWriteOperation::Type op_type) { - DCHECK(op_type == KuduWriteOperation::Type::INSERT || - op_type == KuduWriteOperation::Type::DELETE); + DCHECK(op_type == KuduWriteOperation::INSERT || + op_type == KuduWriteOperation::DELETE || + op_type == KuduWriteOperation::UPSERT); const size_t gen_num = FLAGS_num_threads; vector<WriteResults> results(gen_num); @@ -685,7 +702,7 @@ WriteResults GenerateWriteRows(const ClientFactory& client_factory, std::max(combined.latest_observed_timestamp, r.latest_observed_timestamp); } cout << endl - << (op_type == KuduWriteOperation::Type::INSERT ? "INSERT" : "DELETE") << " report" << endl + << OpTypeToString(op_type) << " report" << endl << " rows total: " << combined.row_count << endl << " time total: " << time_total_ms << " ms" << endl; if (combined.row_count != 0 && combined.err_count == 0) { @@ -788,8 +805,10 @@ Status TestLoadGenerator(const RunnerContext& context) { CHECK_OK(CreateKuduClient(context, &client)); return client; }; - WriteResults write_results = - GenerateWriteRows(client_factory, table_name, KuduWriteOperation::Type::INSERT); + WriteResults write_results = GenerateWriteRows( + client_factory, + table_name, + FLAGS_use_upsert ? KuduWriteOperation::UPSERT : KuduWriteOperation::INSERT); RETURN_NOT_OK(write_results.status); client->SetLatestObservedTimestamp(write_results.latest_observed_timestamp); if (FLAGS_run_scan) { @@ -808,8 +827,8 @@ Status TestLoadGenerator(const RunnerContext& context) { } if (FLAGS_run_cleanup) { - RETURN_NOT_OK( - GenerateWriteRows(client_factory, table_name, KuduWriteOperation::Type::DELETE).status); + RETURN_NOT_OK(GenerateWriteRows( + client_factory, table_name, KuduWriteOperation::DELETE).status); } if (is_auto_table && !FLAGS_keep_auto_table) { @@ -948,6 +967,7 @@ unique_ptr<Mode> BuildPerfMode() { .AddOptionalParameter("use_random") .AddOptionalParameter("use_random_pk") .AddOptionalParameter("use_random_non_pk") + .AddOptionalParameter("use_upsert") .Build(); unique_ptr<Action> table_scan =
