[tools] added insert-generated-rows into kudu tools The insert-generated-rows tool has been merged into the 'kudu' umbrella toolset.
This addresses KUDU-1628. Besides, added ability to run multiple inserter threads and specify additional parameters on batching behavior of the generated write operations. It's possible to run data generating sessions both in MANUAL_FLUSH and AUTO_FLUSH_BACKGROUND modes. Introduced sequential and random modes for the data generator. Overall, these changes allow to use the tool to measure performance of the Kudu C++ client library in simplistic 'push-as-much-as-you-can' scenario: the client generates and sends data as fast as it can. Change-Id: I332927c0b928c9c4fb81a8e26f5c9ed7565299ad Reviewed-on: http://gerrit.cloudera.org:8080/4412 Reviewed-by: Adar Dembo <[email protected]> Tested-by: Kudu Jenkins Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5385af86 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5385af86 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5385af86 Branch: refs/heads/master Commit: 5385af86dc61e56dbe601f0ad6c2bc8fba30ed7c Parents: 742d85c Author: Alexey Serbin <[email protected]> Authored: Tue Sep 13 21:33:59 2016 -0700 Committer: Alexey Serbin <[email protected]> Committed: Fri Sep 30 01:49:16 2016 +0000 ---------------------------------------------------------------------- src/kudu/tools/CMakeLists.txt | 7 +- src/kudu/tools/insert-generated-rows.cc | 125 ----- src/kudu/tools/kudu-tool-test.cc | 128 ++++- src/kudu/tools/tool_action.cc | 6 - src/kudu/tools/tool_action.h | 5 +- src/kudu/tools/tool_action_cluster.cc | 4 +- src/kudu/tools/tool_action_fs.cc | 2 +- src/kudu/tools/tool_action_local_replica.cc | 16 +- src/kudu/tools/tool_action_master.cc | 10 +- src/kudu/tools/tool_action_pbc.cc | 2 +- src/kudu/tools/tool_action_remote_replica.cc | 14 +- src/kudu/tools/tool_action_table.cc | 10 +- src/kudu/tools/tool_action_tablet.cc | 10 +- src/kudu/tools/tool_action_test.cc | 582 ++++++++++++++++++++++ src/kudu/tools/tool_action_tserver.cc | 10 +- src/kudu/tools/tool_action_wal.cc | 2 +- src/kudu/tools/tool_main.cc | 1 + 17 files changed, 751 insertions(+), 183 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/5385af86/src/kudu/tools/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt index 47f7ae8..61951b5 100644 --- a/src/kudu/tools/CMakeLists.txt +++ b/src/kudu/tools/CMakeLists.txt @@ -40,11 +40,6 @@ add_executable(create-demo-table create-demo-table.cc) target_link_libraries(create-demo-table ${LINK_LIBS}) -add_executable(insert-generated-rows insert-generated-rows.cc) -target_link_libraries(insert-generated-rows - kudu_tools_util - ${LINK_LIBS}) - add_library(ksck ksck.cc ksck_remote.cc @@ -70,6 +65,7 @@ add_executable(kudu tool_action_remote_replica.cc tool_action_table.cc tool_action_tablet.cc + tool_action_test.cc tool_action_tserver.cc tool_action_wal.cc tool_main.cc @@ -115,4 +111,3 @@ ADD_KUDU_TEST_DEPENDENCIES(kudu-tool-test ADD_KUDU_TEST(kudu-ts-cli-test) ADD_KUDU_TEST_DEPENDENCIES(kudu-ts-cli-test kudu) - http://git-wip-us.apache.org/repos/asf/kudu/blob/5385af86/src/kudu/tools/insert-generated-rows.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/insert-generated-rows.cc b/src/kudu/tools/insert-generated-rows.cc deleted file mode 100644 index f0b0b1b..0000000 --- a/src/kudu/tools/insert-generated-rows.cc +++ /dev/null @@ -1,125 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// -// Simple tool to insert "random junk" rows into an arbitrary table. -// First column is in ascending order, the rest are random data. -// Helps make things like availability demos a little easier. - -#include <gflags/gflags.h> -#include <glog/logging.h> -#include <iostream> -#include <memory> -#include <vector> - -#include "kudu/client/client.h" -#include "kudu/gutil/stl_util.h" -#include "kudu/gutil/strings/split.h" -#include "kudu/gutil/strings/substitute.h" -#include "kudu/tools/data_gen_util.h" -#include "kudu/util/flags.h" -#include "kudu/util/logging.h" -#include "kudu/util/random.h" -#include "kudu/util/random_util.h" - -DEFINE_string(master_address, "localhost", - "Comma separated list of master addresses to run against."); - -namespace kudu { -namespace tools { - -using std::string; -using std::vector; - -using client::KuduClient; -using client::KuduClientBuilder; -using client::KuduColumnSchema; -using client::KuduInsert; -using client::KuduSchema; -using client::KuduSession; -using client::KuduTable; -using client::sp::shared_ptr; - -void PrintUsage(char** argv) { - std::cerr << "usage: " << argv[0] << " [--master_address localhost] <table_name>" - << std::endl; -} - -static int WriteRandomDataToTable(int argc, char** argv) { - ParseCommandLineFlags(&argc, &argv, true); - if (argc != 2) { - PrintUsage(argv); - return 1; - } - InitGoogleLoggingSafe(argv[0]); - FLAGS_logtostderr = true; - - string table_name = argv[1]; - - vector<string> addrs = strings::Split(FLAGS_master_address, ","); - CHECK(!addrs.empty()) << "At least one master address must be specified!"; - - // Set up client. - LOG(INFO) << "Connecting to Kudu Master..."; - shared_ptr<KuduClient> client; - CHECK_OK(KuduClientBuilder() - .master_server_addrs(addrs) - .Build(&client)); - - LOG(INFO) << "Opening table..."; - shared_ptr<KuduTable> table; - CHECK_OK(client->OpenTable(table_name, &table)); - KuduSchema schema = table->schema(); - - shared_ptr<KuduSession> session = client->NewSession(); - session->SetTimeoutMillis(5000); // Time out after 5 seconds. - CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); - - Random random(GetRandomSeed32()); - - LOG(INFO) << "Inserting random rows..."; - for (uint64_t record_id = 0; true; ++record_id) { - - gscoped_ptr<KuduInsert> insert(table->NewInsert()); - KuduPartialRow* row = insert->mutable_row(); - GenerateDataForRow(schema, record_id, &random, row); - - LOG(INFO) << "Inserting record: " << row->ToString(); - CHECK_OK(session->Apply(insert.release())); - Status s = session->Flush(); - if (PREDICT_FALSE(!s.ok())) { - std::vector<client::KuduError*> errors; - ElementDeleter d(&errors); - bool overflow; - session->GetPendingErrors(&errors, &overflow); - CHECK(!overflow); - for (const client::KuduError* e : errors) { - LOG(FATAL) << "Unexpected insert error: " << e->status().ToString(); - } - continue; - } - LOG(INFO) << "OK"; - } - - return 0; -} - -} // namespace tools -} // namespace kudu - -int main(int argc, char** argv) { - return kudu::tools::WriteRandomDataToTable(argc, argv); -} http://git-wip-us.apache.org/repos/asf/kudu/blob/5385af86/src/kudu/tools/kudu-tool-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc index 3353aae..34e0a59 100644 --- a/src/kudu/tools/kudu-tool-test.cc +++ b/src/kudu/tools/kudu-tool-test.cc @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include <algorithm> +#include <iterator> #include <memory> #include <sstream> #include <string> @@ -26,6 +28,7 @@ #include "kudu/cfile/cfile-test-base.h" #include "kudu/cfile/cfile_util.h" #include "kudu/cfile/cfile_writer.h" +#include "kudu/client/client-test-util.h" #include "kudu/common/partial_row.h" #include "kudu/common/partition.h" #include "kudu/common/schema.h" @@ -43,6 +46,7 @@ #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/split.h" #include "kudu/gutil/strings/substitute.h" +#include "kudu/integration-tests/external_mini_cluster.h" #include "kudu/tablet/local_tablet_writer.h" #include "kudu/tablet/tablet-harness.h" #include "kudu/tablet/tablet_metadata.h" @@ -51,6 +55,7 @@ #include "kudu/util/async_util.h" #include "kudu/util/env.h" #include "kudu/util/metrics.h" +#include "kudu/util/net/sockaddr.h" #include "kudu/util/oid_generator.h" #include "kudu/util/path_util.h" #include "kudu/util/subprocess.h" @@ -58,17 +63,21 @@ #include "kudu/util/test_util.h" namespace kudu { + namespace tools { using cfile::CFileWriter; using cfile::StringDataGenerator; using cfile::WriterOptions; +using client::sp::shared_ptr; using consensus::OpId; using consensus::ReplicateRefPtr; using consensus::ReplicateMsg; using fs::WritableBlock; using log::Log; using log::LogOptions; +using std::back_inserter; +using std::copy; using std::ostringstream; using std::string; using std::unique_ptr; @@ -159,10 +168,8 @@ class ToolTest : public KuduTest { ASSERT_EQ(0, stderr[usage_idx].find("Usage: ")); // Strip away everything up to the usage string to test for regexes. - vector<string> remaining_lines; - for (int i = usage_idx + 1; i < stderr.size(); i++) { - remaining_lines.push_back(stderr[i]); - } + const vector<string> remaining_lines(stderr.begin() + usage_idx + 1, + stderr.end()); for (const auto& r : regexes) { ASSERT_STRINGS_ANY_MATCH(remaining_lines, r); } @@ -182,6 +189,7 @@ TEST_F(ToolTest, TestTopLevelHelp) { "remote_replica.*replicas on a Kudu Tablet Server", "table.*Kudu tables", "tablet.*Kudu tablets", + "test.*tests", "tserver.*Kudu Tablet Server", "wal.*write-ahead log" }; @@ -287,6 +295,12 @@ TEST_F(ToolTest, TestModeHelp) { NO_FATALS(RunTestHelp("tablet change_config", kChangeConfigModeRegexes)); } { + const vector<string> kTestRegexes = { + "loadgen.*Run load generation test with optional scan afterwards", + }; + NO_FATALS(RunTestHelp("test", kTestRegexes)); + } + { const vector<string> kTServerModeRegexes = { "set_flag.*Change a gflag value", "status.*Get the status", @@ -745,5 +759,111 @@ TEST_F(ToolTest, TestLocalReplicaOps) { } } +// Create and start Kudu mini cluster, optionally creating a table in the DB, +// and then run 'kudu test loadgen ...' utility against it. +void RunLoadgen(size_t num_tservers = 1, + const vector<string>& tool_args = {}, + const string& table_name = "") { + kudu::ExternalMiniClusterOptions opts; + opts.num_tablet_servers = num_tservers; + // fsync causes flakiness on EC2 + opts.extra_tserver_flags.push_back("--never_fsync"); + + unique_ptr<ExternalMiniCluster> cluster(new ExternalMiniCluster(opts)); + ASSERT_OK(cluster->Start()); + if (!table_name.empty()) { + static const string kKeyColumnName = "key"; + static const Schema kSchema = Schema( + { + ColumnSchema(kKeyColumnName, INT64), + ColumnSchema("bool_val", BOOL), + ColumnSchema("int8_val", INT8), + ColumnSchema("int16_val", INT16), + ColumnSchema("int32_val", INT32), + ColumnSchema("int64_val", INT64), + ColumnSchema("float_val", FLOAT), + ColumnSchema("double_val", DOUBLE), + ColumnSchema("unixtime_micros_val", UNIXTIME_MICROS), + ColumnSchema("string_val", STRING), + ColumnSchema("binary_val", BINARY), + }, 1); + + shared_ptr<client::KuduClient> client; + ASSERT_OK(cluster->CreateClient(nullptr, &client)); + client::KuduSchema client_schema(client::KuduSchemaFromSchema(kSchema)); + unique_ptr<client::KuduTableCreator> table_creator( + client->NewTableCreator()); + ASSERT_OK(table_creator->table_name(table_name) + .schema(&client_schema) + .add_hash_partitions({kKeyColumnName}, 2) + .num_replicas(cluster->num_tablet_servers()) + .Create()); + } + vector<string> args = { + GetKuduCtlAbsolutePath(), + "test", + "loadgen", + cluster->master()->bound_rpc_addr().ToString(), + }; + if (!table_name.empty()) { + args.push_back(Substitute("-table_name=$0", table_name)); + } + copy(tool_args.begin(), tool_args.end(), back_inserter(args)); + ASSERT_OK(Subprocess::Call(args)); +} + +// Run the loadgen benchmark with all optional parameters set to defaults. +TEST_F(ToolTest, TestLoadgenDefaultParameters) { + NO_FATALS(RunLoadgen()); +} + +// Run the loadgen benchmark in AUTO_FLUSH_BACKGROUND mode, sequential values. +TEST_F(ToolTest, TestLoadgenAutoFlushBackgroundSequential) { + NO_FATALS(RunLoadgen(3, + { + "--buffer_flush_watermark_pct=0.125", + "--buffer_size_bytes=65536", + "--buffers_num=8", + "--num_rows_per_thread=2048", + "--num_threads=4", + "--run_scan", + "--string_fixed=0123456789", + }, + "bench_auto_flush_background_sequential")); +} + +// Run loadgen benchmark in AUTO_FLUSH_BACKGROUND mode, randomized values. +TEST_F(ToolTest, TestLoadgenAutoFlushBackgroundRandom) { + NO_FATALS(RunLoadgen(5, + { + "--buffer_flush_watermark_pct=0.125", + "--buffer_size_bytes=65536", + "--buffers_num=8", + // small number of rows to avoid collisions: it's random generation mode + "--num_rows_per_thread=16", + "--num_threads=1", + "--run_scan", + "--string_len=8", + "--use_random", + }, + "bench_auto_flush_background_random")); +} + +// Run the loadgen benchmark in MANUAL_FLUSH mode. +TEST_F(ToolTest, TestLoadgenManualFlush) { + NO_FATALS(RunLoadgen(3, + { + "--buffer_size_bytes=524288", + "--buffers_num=2", + "--flush_per_n_rows=1024", + "--num_rows_per_thread=4096", + "--num_threads=3", + "--run_scan", + "--show_first_n_errors=3", + "--string_len=16", + }, + "bench_manual_flush")); +} + } // namespace tools } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/5385af86/src/kudu/tools/tool_action.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/tool_action.cc b/src/kudu/tools/tool_action.cc index aa5cdb6..1a920fc 100644 --- a/src/kudu/tools/tool_action.cc +++ b/src/kudu/tools/tool_action.cc @@ -170,9 +170,6 @@ string Mode::BuildHelp(const vector<Mode*>& chain) const { return msg; } -Mode::Mode() { -} - ActionBuilder::ActionBuilder(const string& name, const ActionRunner& runner) : name_(name), runner_(runner) { @@ -224,9 +221,6 @@ unique_ptr<Action> ActionBuilder::Build() { return action; } -Action::Action() { -} - Status Action::Run(const vector<Mode*>& chain, const unordered_map<string, string>& required_args, const vector<string>& variadic_args) const { http://git-wip-us.apache.org/repos/asf/kudu/blob/5385af86/src/kudu/tools/tool_action.h ---------------------------------------------------------------------- diff --git a/src/kudu/tools/tool_action.h b/src/kudu/tools/tool_action.h index 95aa162..082f0d0 100644 --- a/src/kudu/tools/tool_action.h +++ b/src/kudu/tools/tool_action.h @@ -118,7 +118,7 @@ class Mode { private: friend class ModeBuilder; - Mode(); + Mode() = default; std::string name_; @@ -261,7 +261,7 @@ class Action { private: friend class ActionBuilder; - Action(); + Action() = default; std::string name_; @@ -283,6 +283,7 @@ std::unique_ptr<Mode> BuildPbcMode(); std::unique_ptr<Mode> BuildRemoteReplicaMode(); std::unique_ptr<Mode> BuildTableMode(); std::unique_ptr<Mode> BuildTabletMode(); +std::unique_ptr<Mode> BuildTestMode(); std::unique_ptr<Mode> BuildTServerMode(); std::unique_ptr<Mode> BuildWalMode(); http://git-wip-us.apache.org/repos/asf/kudu/blob/5385af86/src/kudu/tools/tool_action_cluster.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/tool_action_cluster.cc b/src/kudu/tools/tool_action_cluster.cc index e32c9dc..513d75b 100644 --- a/src/kudu/tools/tool_action_cluster.cc +++ b/src/kudu/tools/tool_action_cluster.cc @@ -65,8 +65,8 @@ namespace { const char* const kMasterAddressesArg = "master_addresses"; Status RunKsck(const RunnerContext& context) { - string master_addresses_str = FindOrDie(context.required_args, - kMasterAddressesArg); + const string& master_addresses_str = FindOrDie(context.required_args, + kMasterAddressesArg); vector<string> master_addresses = strings::Split(master_addresses_str, ","); shared_ptr<KsckMaster> master; RETURN_NOT_OK_PREPEND(RemoteKsckMaster::Build(master_addresses, &master), http://git-wip-us.apache.org/repos/asf/kudu/blob/5385af86/src/kudu/tools/tool_action_fs.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/tool_action_fs.cc b/src/kudu/tools/tool_action_fs.cc index f64ecdb..cb5e483 100644 --- a/src/kudu/tools/tool_action_fs.cc +++ b/src/kudu/tools/tool_action_fs.cc @@ -72,7 +72,7 @@ Status DumpUuid(const RunnerContext& context) { } Status DumpCFile(const RunnerContext& context) { - string block_id_str = FindOrDie(context.required_args, "block_id"); + const string& block_id_str = FindOrDie(context.required_args, "block_id"); uint64_t numeric_id; if (!safe_strtou64(block_id_str, &numeric_id)) { return Status::InvalidArgument(Substitute( http://git-wip-us.apache.org/repos/asf/kudu/blob/5385af86/src/kudu/tools/tool_action_local_replica.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/tool_action_local_replica.cc b/src/kudu/tools/tool_action_local_replica.cc index 63e22d4..b0455a7 100644 --- a/src/kudu/tools/tool_action_local_replica.cc +++ b/src/kudu/tools/tool_action_local_replica.cc @@ -172,7 +172,7 @@ Status ParsePeerString(const string& peer_str, Status PrintReplicaUuids(const RunnerContext& context) { unique_ptr<FsManager> fs_manager; RETURN_NOT_OK(FsInit(&fs_manager)); - string tablet_id = FindOrDie(context.required_args, "tablet_id"); + const string& tablet_id = FindOrDie(context.required_args, "tablet_id"); // Load the cmeta file and print all peer uuids. unique_ptr<ConsensusMetadata> cmeta; @@ -186,7 +186,7 @@ Status PrintReplicaUuids(const RunnerContext& context) { Status RewriteRaftConfig(const RunnerContext& context) { // Parse tablet ID argument. - string tablet_id = FindOrDie(context.required_args, "tablet_id"); + const string& tablet_id = FindOrDie(context.required_args, "tablet_id"); if (tablet_id != master::SysCatalogTable::kSysCatalogTabletId) { LOG(WARNING) << "Master will not notice rewritten Raft config of regular " << "tablets. A regular Raft config change must occur."; @@ -238,8 +238,8 @@ Status RewriteRaftConfig(const RunnerContext& context) { Status CopyFromRemote(const RunnerContext& context) { // Parse the tablet ID and source arguments. - string tablet_id = FindOrDie(context.required_args, "tablet_id"); - string rpc_address = FindOrDie(context.required_args, "source"); + const string& tablet_id = FindOrDie(context.required_args, "tablet_id"); + const string& rpc_address = FindOrDie(context.required_args, "source"); HostPort hp; RETURN_NOT_OK(ParseHostPortString(rpc_address, &hp)); @@ -259,7 +259,7 @@ Status CopyFromRemote(const RunnerContext& context) { Status DumpWals(const RunnerContext& context) { unique_ptr<FsManager> fs_manager; RETURN_NOT_OK(FsInit(&fs_manager)); - string tablet_id = FindOrDie(context.required_args, "tablet_id"); + const string& tablet_id = FindOrDie(context.required_args, "tablet_id"); shared_ptr<LogReader> reader; RETURN_NOT_OK(LogReader::Open(fs_manager.get(), @@ -309,7 +309,7 @@ Status ListBlocksInRowSet(const Schema& schema, Status DumpBlockIdsForLocalReplica(const RunnerContext& context) { unique_ptr<FsManager> fs_manager; RETURN_NOT_OK(FsInit(&fs_manager)); - string tablet_id = FindOrDie(context.required_args, "tablet_id"); + const string& tablet_id = FindOrDie(context.required_args, "tablet_id"); scoped_refptr<TabletMetadata> meta; RETURN_NOT_OK(TabletMetadata::Load(fs_manager.get(), tablet_id, &meta)); @@ -567,7 +567,7 @@ Status DumpRowSetInternal(FsManager* fs_manager, Status DumpRowSet(const RunnerContext& context) { unique_ptr<FsManager> fs_manager; RETURN_NOT_OK(FsInit(&fs_manager)); - string tablet_id = FindOrDie(context.required_args, "tablet_id"); + const string& tablet_id = FindOrDie(context.required_args, "tablet_id"); scoped_refptr<TabletMetadata> meta; RETURN_NOT_OK(TabletMetadata::Load(fs_manager.get(), tablet_id, &meta)); @@ -607,7 +607,7 @@ Status DumpRowSet(const RunnerContext& context) { Status DumpMeta(const RunnerContext& context) { unique_ptr<FsManager> fs_manager; RETURN_NOT_OK(FsInit(&fs_manager)); - string tablet_id = FindOrDie(context.required_args, "tablet_id"); + const string& tablet_id = FindOrDie(context.required_args, "tablet_id"); RETURN_NOT_OK(DumpTabletMeta(fs_manager.get(), tablet_id, 0)); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/kudu/blob/5385af86/src/kudu/tools/tool_action_master.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/tool_action_master.cc b/src/kudu/tools/tool_action_master.cc index aea53d1..b27dfdf 100644 --- a/src/kudu/tools/tool_action_master.cc +++ b/src/kudu/tools/tool_action_master.cc @@ -44,19 +44,19 @@ const char* const kFlagArg = "flag"; const char* const kValueArg = "value"; Status MasterSetFlag(const RunnerContext& context) { - string address = FindOrDie(context.required_args, kMasterAddressArg); - string flag = FindOrDie(context.required_args, kFlagArg); - string value = FindOrDie(context.required_args, kValueArg); + const string& address = FindOrDie(context.required_args, kMasterAddressArg); + const string& flag = FindOrDie(context.required_args, kFlagArg); + const string& value = FindOrDie(context.required_args, kValueArg); return SetServerFlag(address, master::Master::kDefaultPort, flag, value); } Status MasterStatus(const RunnerContext& context) { - string address = FindOrDie(context.required_args, kMasterAddressArg); + const string& address = FindOrDie(context.required_args, kMasterAddressArg); return PrintServerStatus(address, master::Master::kDefaultPort); } Status MasterTimestamp(const RunnerContext& context) { - string address = FindOrDie(context.required_args, kMasterAddressArg); + const string& address = FindOrDie(context.required_args, kMasterAddressArg); return PrintServerTimestamp(address, master::Master::kDefaultPort); } http://git-wip-us.apache.org/repos/asf/kudu/blob/5385af86/src/kudu/tools/tool_action_pbc.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/tool_action_pbc.cc b/src/kudu/tools/tool_action_pbc.cc index a190ed1..c32e1b0 100644 --- a/src/kudu/tools/tool_action_pbc.cc +++ b/src/kudu/tools/tool_action_pbc.cc @@ -44,7 +44,7 @@ namespace { const char* const kPathArg = "path"; Status DumpPBContainerFile(const RunnerContext& context) { - string path = FindOrDie(context.required_args, kPathArg); + const string& path = FindOrDie(context.required_args, kPathArg); Env* env = Env::Default(); gscoped_ptr<RandomAccessFile> reader; http://git-wip-us.apache.org/repos/asf/kudu/blob/5385af86/src/kudu/tools/tool_action_remote_replica.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/tool_action_remote_replica.cc b/src/kudu/tools/tool_action_remote_replica.cc index 02f973e..e305d4c 100644 --- a/src/kudu/tools/tool_action_remote_replica.cc +++ b/src/kudu/tools/tool_action_remote_replica.cc @@ -161,7 +161,7 @@ Status GetReplicas(TabletServerServiceProxy* proxy, } Status CheckReplicas(const RunnerContext& context) { - string address = FindOrDie(context.required_args, kTServerAddressArg); + const string& address = FindOrDie(context.required_args, kTServerAddressArg); unique_ptr<TabletServerServiceProxy> proxy; RETURN_NOT_OK(BuildProxy(address, tserver::TabletServer::kDefaultPort, @@ -188,9 +188,9 @@ Status CheckReplicas(const RunnerContext& context) { } Status DeleteReplica(const RunnerContext& context) { - string address = FindOrDie(context.required_args, kTServerAddressArg); - string tablet_id = FindOrDie(context.required_args, kTabletArg); - string reason = FindOrDie(context.required_args, kReasonArg); + const string& address = FindOrDie(context.required_args, kTServerAddressArg); + const string& tablet_id = FindOrDie(context.required_args, kTabletArg); + const string& reason = FindOrDie(context.required_args, kReasonArg); ServerStatusPB status; RETURN_NOT_OK(GetServerStatus(address, tserver::TabletServer::kDefaultPort, @@ -219,8 +219,8 @@ Status DeleteReplica(const RunnerContext& context) { } Status DumpReplica(const RunnerContext& context) { - string address = FindOrDie(context.required_args, kTServerAddressArg); - string tablet_id = FindOrDie(context.required_args, kTabletArg); + const string& address = FindOrDie(context.required_args, kTServerAddressArg); + const string& tablet_id = FindOrDie(context.required_args, kTabletArg); unique_ptr<TabletServerServiceProxy> proxy; RETURN_NOT_OK(BuildProxy(address, tserver::TabletServer::kDefaultPort, @@ -243,7 +243,7 @@ Status DumpReplica(const RunnerContext& context) { } Status ListReplicas(const RunnerContext& context) { - string address = FindOrDie(context.required_args, kTServerAddressArg); + const string& address = FindOrDie(context.required_args, kTServerAddressArg); unique_ptr<TabletServerServiceProxy> proxy; RETURN_NOT_OK(BuildProxy(address, tserver::TabletServer::kDefaultPort, &proxy)); http://git-wip-us.apache.org/repos/asf/kudu/blob/5385af86/src/kudu/tools/tool_action_table.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/tool_action_table.cc b/src/kudu/tools/tool_action_table.cc index 1e58286..c50e3bd 100644 --- a/src/kudu/tools/tool_action_table.cc +++ b/src/kudu/tools/tool_action_table.cc @@ -53,10 +53,10 @@ const char* const kMasterAddressesArg = "master_addresses"; const char* const kTableNameArg = "table_name"; Status DeleteTable(const RunnerContext& context) { - string master_addresses_str = FindOrDie(context.required_args, - kMasterAddressesArg); + const string& master_addresses_str = FindOrDie(context.required_args, + kMasterAddressesArg); vector<string> master_addresses = strings::Split(master_addresses_str, ","); - string table_name = FindOrDie(context.required_args, kTableNameArg); + const string& table_name = FindOrDie(context.required_args, kTableNameArg); client::sp::shared_ptr<KuduClient> client; RETURN_NOT_OK(KuduClientBuilder() @@ -66,8 +66,8 @@ Status DeleteTable(const RunnerContext& context) { } Status ListTables(const RunnerContext& context) { - string master_addresses_str = FindOrDie(context.required_args, - kMasterAddressesArg); + const string& master_addresses_str = FindOrDie(context.required_args, + kMasterAddressesArg); vector<string> master_addresses = strings::Split(master_addresses_str, ","); client::sp::shared_ptr<KuduClient> client; http://git-wip-us.apache.org/repos/asf/kudu/blob/5385af86/src/kudu/tools/tool_action_tablet.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/tool_action_tablet.cc b/src/kudu/tools/tool_action_tablet.cc index 8ba5340..0e57d08 100644 --- a/src/kudu/tools/tool_action_tablet.cc +++ b/src/kudu/tools/tool_action_tablet.cc @@ -101,13 +101,13 @@ Status GetTabletLeader(const client::sp::shared_ptr<KuduClient>& client, Status ChangeConfig(const RunnerContext& context, ChangeConfigType cc_type) { // Parse and validate arguments. RaftPeerPB peer_pb; - string master_addresses_str = FindOrDie(context.required_args, - kMasterAddressesArg); + const string& master_addresses_str = FindOrDie(context.required_args, + kMasterAddressesArg); vector<string> master_addresses = strings::Split(master_addresses_str, ","); - string tablet_id = FindOrDie(context.required_args, kTabletIdArg); - string replica_uuid = FindOrDie(context.required_args, kReplicaUuidArg); + const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg); + const string& replica_uuid = FindOrDie(context.required_args, kReplicaUuidArg); if (cc_type == consensus::ADD_SERVER || cc_type == consensus::CHANGE_ROLE) { - string replica_type = FindOrDie(context.required_args, kReplicaTypeArg); + const string& replica_type = FindOrDie(context.required_args, kReplicaTypeArg); string uppercase_peer_type; ToUpperCase(replica_type, &uppercase_peer_type); RaftPeerPB::MemberType member_type_val; http://git-wip-us.apache.org/repos/asf/kudu/blob/5385af86/src/kudu/tools/tool_action_test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/tool_action_test.cc b/src/kudu/tools/tool_action_test.cc new file mode 100644 index 0000000..70bd941 --- /dev/null +++ b/src/kudu/tools/tool_action_test.cc @@ -0,0 +1,582 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +// This is a small load generation tool which pushes data to a tablet +// server as fast as possible. The table is supposed to be created already, +// and this tool populates it with generated data. As an option, it's possible +// to run a post-scan over the inserted rows to get total table row count +// as reported by the scan operation. +// +// See below for examples of usage. +// +// Run in MANUAL_FLUSH mode, 1 thread inserting 8M rows into auto-created table, +// flushing every 2000 rows, unlimited number of buffers with 32MB +// limit on their total size, with auto-generated strings +// of length 64 for binary and string fields +// with Kudu master server listening on the default port at localhost: +// +// kudu test loadgen \ +// --num_threads=1 \ +// --num_rows_per_thread=8000000 \ +// --string_len=64 \ +// --buffer_size_bytes=33554432 \ +// --buffers_num=0 \ +// --flush_per_n_rows=2000 \ +// 127.0.0.1 +// +// +// Run in AUTO_FLUSH_BACKGROUND mode, 2 threads inserting 4M rows each inserting +// into auto-created table, with limit of 8 buffers max 1MB in size total, +// having 12.5% for buffer flush watermark, +// using the specified pre-set string for binary and string fields +// with Kudu master server listening on the default port at localhost: +// +// kudu test loadgen \ +// --num_threads=2 \ +// --num_rows_per_thread=4000000 \ +// --string_fixed=012345678901234567890123456789012 \ +// --buffer_size_bytes=1048576 \ +// --buffer_flush_watermark_pct=0.125 \ +// --buffers_num=8 \ +// 127.0.0.1 +// +// +// Run in AUTO_FLUSH_BACKGROUND mode, 4 threads inserting 2M rows each inserting +// into auto-created table, with limit of 4 buffers max 64KB in size total, +// having 25% for buffer flush watermark, +// using the specified pre-set string for binary and string fields +// with Kudu master server listening at 127.0.0.1:8765 +// +// kudu test loadgen \ +// --num_threads=4 \ +// --num_rows_per_thread=2000000 \ +// --string_fixed=0123456789 \ +// --buffer_size_bytes=65536 \ +// --buffers_num=4 \ +// --buffer_flush_watermark_pct=0.25 \ +// --table_name=bench_02 \ +// 127.0.0.1:8765 +// +// +// Run with default parameter values for data generation and batching, +// inserting data into auto-created table, +// with Kudu master server listening on the default port at localhost, +// plus run post-insertion row scan to verify +// that the count of the inserted rows matches the expected number: +// +// kudu test loadgen \ +// --run_scan=true \ +// 127.0.0.1 +// + +#include "kudu/tools/tool_action.h" + +#include <cstdint> +#include <cstdlib> +#include <ctime> + +#include <algorithm> +#include <iostream> +#include <limits> +#include <memory> +#include <sstream> +#include <thread> +#include <vector> + +#include <gflags/gflags.h> + +#include "kudu/client/client.h" +#include "kudu/common/schema.h" +#include "kudu/common/types.h" +#include "kudu/gutil/strings/split.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/oid_generator.h" +#include "kudu/util/random.h" +#include "kudu/util/stopwatch.h" + +using std::accumulate; +using std::cerr; +using std::cout; +using std::endl; +using std::numeric_limits; +using std::string; +using std::ostringstream; +using std::thread; +using std::vector; +using std::unique_ptr; + +using kudu::ColumnSchema; +using kudu::KuduPartialRow; +using kudu::Stopwatch; +using kudu::TypeInfo; +using kudu::client::KuduClient; +using kudu::client::KuduClientBuilder; +using kudu::client::KuduColumnSchema; +using kudu::client::KuduError; +using kudu::client::KuduInsert; +using kudu::client::KuduRowResult; +using kudu::client::KuduSchema; +using kudu::client::KuduSchemaBuilder; +using kudu::client::KuduScanBatch; +using kudu::client::KuduScanner; +using kudu::client::KuduSession; +using kudu::client::KuduTable; +using kudu::client::KuduTableCreator; +using kudu::client::sp::shared_ptr; + +DEFINE_double(buffer_flush_watermark_pct, 0.5, + "Mutation buffer flush watermark, in percentage of total size."); +DEFINE_int32(buffer_size_bytes, 4 * 1024 * 1024, + "Size of the mutation buffer, per session (bytes)."); +DEFINE_int32(buffers_num, 2, + "Number of mutation buffers per session."); +DEFINE_int32(flush_per_n_rows, 0, + "Perform async flush per given number of rows added. " + "Setting to non-zero implicitly turns on manual flush mode."); +DEFINE_bool(keep_auto_table, false, + "If using the auto-generated table, enabling this option " + "retains the table populated with the data after the test " + "finishes. By default, the auto-generated table is dropped " + "after sucessfully finishing the test. NOTE: this parameter " + "has no effect if using already existing table " + "(see the '--table_name' flag): the existing tables nor their data " + "are never dropped/deleted."); +DEFINE_uint64(num_rows_per_thread, 1000, + "Number of rows each thread generates and inserts; " + "0 means unlimited. All rows generated by a thread are inserted " + "in the context of the same session."); +DEFINE_int32(num_threads, 2, + "Number of generator threads to run. Each thread runs its own " + "KuduSession."); +DEFINE_bool(run_scan, false, + "Whether to run post-insertion scan to verify that the count of " + "the inserted rows matches the expected number. If enabled, " + "the scan is run only if no errors were encountered " + "while inserting the generated rows."); +DEFINE_int32(show_first_n_errors, 0, + "Output detailed information on the specified number of " + "first n errors (if any)."); +DEFINE_string(string_fixed, "", + "Pre-defined string to write into binary and string columns. " + "Client generates more data per second using pre-defined string " + "compared with auto-generated strings of the same length " + "if run with the same CPU/memory configuration. If left empty, " + "then auto-generated strings of length specified by the " + "'--string_len' parameter are used instead."); +DEFINE_int32(string_len, 32, + "Length of strings to put into string and binary columns. This " + "parameter is not in effect if '--string_fixed' is specified."); +DEFINE_string(table_name, "", + "Name of an existing table to use for the test. The test will " + "determine the structure of the table schema and " + "populate it with data accordingly. If left empty, " + "the test automatically creates a table of pre-defined columnar " + "structure with unique name and uses it to insert " + "auto-generated data. The auto-created table is dropped " + "upon successful completion of the test if not overridden " + "by the '--keep_auto_table' flag. If running the test against " + "an already existing table, it's highly recommended to use a " + "dedicated table created just for testing purposes: " + "the existing table nor its data is never dropped/deleted."); +DEFINE_bool(use_random, false, + "Whether to use random numbers instead of sequential ones. " + "In case of using random numbers collisions are possible over " + "the data for columns with unique constraint (e.g. primary key)."); + +namespace kudu { +namespace tools { + + +namespace { + +const char* const kMasterAddressesArg = "master_addresses"; + +class Generator { + public: + enum Mode { + MODE_SEQ, + MODE_RAND, + }; + + Generator(Mode m, int64_t seed, size_t string_len) + : mode_(m), + seq_(seed), + random_(seed), + string_len_(string_len) { + } + + ~Generator() = default; + + uint64_t NextImpl() { + if (mode_ == MODE_SEQ) { + return seq_++; + } + return random_.Next64(); + } + + template <typename T> + T Next() { + return NextImpl() & numeric_limits<T>::max(); + } + + private: + const Mode mode_; + uint64_t seq_; + Random random_; + const size_t string_len_; +}; + +template <> +bool Generator::Next() { + return (NextImpl() & 0x1); +} + +template <> +double Generator::Next() { + return static_cast<double>(NextImpl()); +} + +template <> +float Generator::Next() { + return static_cast<float>(NextImpl()); +} + +template <> +string Generator::Next() { + ostringstream ss; + ss << NextImpl() << "."; + string str(ss.str()); + if (str.size() >= string_len_) { + str = str.substr(0, string_len_); + } else { + str += string(string_len_ - str.size(), 'x'); + } + return str; +} + +Status GenerateRowData(Generator* gen, KuduPartialRow* row, + const string& fixed_string) { + const vector<ColumnSchema>& columns(row->schema()->columns()); + for (size_t idx = 0; idx < columns.size(); ++idx) { + const TypeInfo* tinfo = columns[idx].type_info(); + switch (tinfo->type()) { + case BOOL: + RETURN_NOT_OK(row->SetBool(idx, gen->Next<bool>())); + break; + case INT8: + RETURN_NOT_OK(row->SetInt8(idx, gen->Next<int8_t>())); + break; + case INT16: + RETURN_NOT_OK(row->SetInt16(idx, gen->Next<int16_t>())); + break; + case INT32: + RETURN_NOT_OK(row->SetInt32(idx, gen->Next<int32_t>())); + break; + case INT64: + RETURN_NOT_OK(row->SetInt64(idx, gen->Next<int64_t>())); + break; + case UNIXTIME_MICROS: + RETURN_NOT_OK(row->SetUnixTimeMicros(idx, gen->Next<int64_t>())); + break; + case FLOAT: + RETURN_NOT_OK(row->SetFloat(idx, gen->Next<float>())); + break; + case DOUBLE: + RETURN_NOT_OK(row->SetDouble(idx, gen->Next<double>())); + break; + case BINARY: + if (fixed_string.empty()) { + RETURN_NOT_OK(row->SetBinary(idx, gen->Next<string>())); + } else { + RETURN_NOT_OK(row->SetBinaryNoCopy(idx, fixed_string)); + } + break; + case STRING: + if (fixed_string.empty()) { + RETURN_NOT_OK(row->SetString(idx, gen->Next<string>())); + } else { + RETURN_NOT_OK(row->SetStringNoCopy(idx, fixed_string)); + } + break; + default: + return Status::InvalidArgument("unknown data type"); + } + } + return Status::OK(); +} + +void GeneratorThread( + const shared_ptr<KuduClient>& client, const string& table_name, + Generator::Mode gen_mode, int64_t gen_seed, + uint64_t* row_count, uint64_t* err_count) { + + const size_t flush_per_n_rows = FLAGS_flush_per_n_rows; + + shared_ptr<KuduSession> session(client->NewSession()); + CHECK_OK(session->SetMutationBufferFlushWatermark( + FLAGS_buffer_flush_watermark_pct)); + CHECK_OK(session->SetMutationBufferSpace(FLAGS_buffer_size_bytes)); + CHECK_OK(session->SetMutationBufferMaxNum(FLAGS_buffers_num)); + CHECK_OK(session->SetFlushMode( + flush_per_n_rows == 0 ? KuduSession::AUTO_FLUSH_BACKGROUND + : KuduSession::MANUAL_FLUSH)); + + shared_ptr<KuduTable> table; + CHECK_OK(client->OpenTable(table_name, &table)); + + Generator gen(gen_mode, gen_seed, FLAGS_string_len); + uint64_t idx = 0; + const size_t num_rows_per_thread = FLAGS_num_rows_per_thread; + for (; num_rows_per_thread == 0 || idx < num_rows_per_thread; + ++idx) { + unique_ptr<KuduInsert> insert_op(table->NewInsert()); + CHECK_OK(GenerateRowData(&gen, insert_op->mutable_row(), FLAGS_string_fixed)); + CHECK_OK(session->Apply(insert_op.release())); + if (flush_per_n_rows != 0 && idx != 0 && idx % flush_per_n_rows == 0) { + session->FlushAsync(nullptr); + } + } + CHECK_OK(session->Flush()); + if (row_count != nullptr) { + *row_count = idx; + } + vector<KuduError*> errors; + ElementDeleter d(&errors); + session->GetPendingErrors(&errors, nullptr); + if (err_count != nullptr) { + *err_count = errors.size(); + } + for (size_t i = 0; i < errors.size() && i < FLAGS_show_first_n_errors; ++i) { + cerr << errors[i]->status().ToString() << endl; + } +} + +void GenerateInsertRows(const shared_ptr<KuduClient>& client, + const string& table_name, + uint64_t* total_row_count, uint64_t* total_err_count) { + + const size_t num_threads = FLAGS_num_threads; + const Generator::Mode generator_mode = FLAGS_use_random ? Generator::MODE_RAND + : Generator::MODE_SEQ; + vector<uint64_t> row_count(num_threads, 0); + vector<uint64_t> err_count(num_threads, 0); + vector<thread> threads; + // The 'seed span' allows to have non-intersecting ranges for column values + // in sequential generation mode. + const int64_t seed_span = numeric_limits<int64_t>::max() / num_threads; + for (size_t i = 0; i < num_threads; ++i) { + threads.emplace_back(&GeneratorThread, client, table_name, generator_mode, + seed_span * i, &row_count[i], &err_count[i]); + } + for (auto& t : threads) { + t.join(); + } + if (total_row_count != nullptr) { + *total_row_count = accumulate(row_count.begin(), row_count.end(), 0UL); + } + if (total_err_count != nullptr) { + *total_err_count = accumulate(err_count.begin(), err_count.end(), 0UL); + } +} + +// Fetch all rows from the table with the specified name; iterate over them +// and output their total count. +Status CountTableRows(const shared_ptr<KuduClient>& client, + const string& table_name, uint64_t* count) { + // It's assumed that all writing activity has stopped at this point. + const uint64_t snapshot_timestamp = client->GetLatestObservedTimestamp(); + + shared_ptr<KuduTable> table; + RETURN_NOT_OK(client->OpenTable(table_name, &table)); + + // It's necessary to have read-what-you-write behavior here. Since + // tablet leader can change and there might be replica propagation delays, + // set the snapshot to the latest observed one to get correct row count. + // Due to KUDU-1656, there might be timeouts due to tservers catching up with + // the requested snapshot. The simple workaround: if the timeout error occurs, + // retry the row count operation. + Status row_count_status; + uint64_t row_count = 0; + for (size_t i = 0; i < 3; ++i) { + KuduScanner scanner(table.get()); + // NOTE: +1 is due to the current implementation of the scanner. + RETURN_NOT_OK(scanner.SetSnapshotRaw(snapshot_timestamp + 1)); + RETURN_NOT_OK(scanner.SetReadMode(KuduScanner::READ_AT_SNAPSHOT)); + row_count_status = scanner.Open(); + if (!row_count_status.ok()) { + if (row_count_status.IsTimedOut()) { + // Retry condition: start the row count over again. + continue; + } + return row_count_status; + } + row_count = 0; + while (scanner.HasMoreRows()) { + KuduScanBatch batch; + row_count_status = scanner.NextBatch(&batch); + if (!row_count_status.ok()) { + if (row_count_status.IsTimedOut()) { + // Retry condition: start the row count over again. + break; + } + return row_count_status; + } + row_count += batch.NumRows(); + } + if (row_count_status.ok()) { + // If it reaches this point with success, + // stop the retry cycle since the result is ready. + break; + } + } + RETURN_NOT_OK(row_count_status); + if (count != nullptr) { + *count = row_count; + } + + return Status::OK(); +} + +Status TestLoadGenerator(const RunnerContext& context) { + const string& master_addresses_str = + FindOrDie(context.required_args, kMasterAddressesArg); + + vector<string> master_addrs(strings::Split(master_addresses_str, ",")); + if (master_addrs.empty()) { + return Status::InvalidArgument( + "At least one master address must be specified"); + } + shared_ptr<KuduClient> client; + RETURN_NOT_OK(KuduClientBuilder() + .master_server_addrs(master_addrs) + .Build(&client)); + string table_name; + bool is_auto_table = false; + if (!FLAGS_table_name.empty()) { + table_name = FLAGS_table_name; + } else { + static const string kKeyColumnName = "key"; + static const Schema kSchema = Schema( + { + ColumnSchema(kKeyColumnName, INT64), + ColumnSchema("int32_val", INT32), + ColumnSchema("string_val", STRING), + ColumnSchema("binary_val", BINARY), + }, 1); + + // The auto-created table case. + is_auto_table = true; + ObjectIdGenerator oid_generator; + table_name = "loadgen_auto_" + oid_generator.Next(); + KuduSchema schema; + KuduSchemaBuilder b; + b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey(); + b.AddColumn("int_val")->Type(KuduColumnSchema::INT32); + b.AddColumn("string_val")->Type(KuduColumnSchema::STRING); + RETURN_NOT_OK(b.Build(&schema)); + + unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator()); + RETURN_NOT_OK(table_creator->table_name(table_name) + .schema(&schema) + .num_replicas(1) + .add_hash_partitions(vector<string>({ kKeyColumnName }), 8) + .wait(true) + .Create()); + } + cout << "Using " << (is_auto_table ? "auto-created " : "") + << "table '" << table_name << "'" << endl; + + uint64_t total_row_count = 0; + uint64_t total_err_count = 0; + Stopwatch sw(Stopwatch::ALL_THREADS); + sw.start(); + GenerateInsertRows(client, table_name, &total_row_count, &total_err_count); + sw.stop(); + const double total = sw.elapsed().wall_millis(); + cout << endl << "Generator report" << endl + << " time total : " << total << " ms" << endl; + if (total_row_count != 0) { + cout << " time per row: " << total / total_row_count << " ms" << endl; + } + if (total_err_count != 0) { + return Status::RuntimeError(strings::Substitute("Encountered $0 errors", + total_err_count)); + } + + if (FLAGS_run_scan) { + // Run a table scan to count inserted rows. + uint64_t count; + RETURN_NOT_OK(CountTableRows(client, table_name, &count)); + cout << endl << "Scanner report" << endl + << " expected rows: " << total_row_count << endl + << " actual rows : " << count << endl; + if (count != total_row_count) { + return Status::RuntimeError( + strings::Substitute("Row count mismatch: expected $0, actual $1", + total_row_count, count)); + } + } + + if (is_auto_table && !FLAGS_keep_auto_table) { + cout << "Dropping auto-created table '" << table_name << "'" << endl; + // Drop the table which was automatically created to run the test. + RETURN_NOT_OK(client->DeleteTable(table_name)); + } + + return Status::OK(); +} + +} // anonymous namespace + +unique_ptr<Mode> BuildTestMode() { + unique_ptr<Action> insert = + ActionBuilder("loadgen", &TestLoadGenerator) + .Description("Run load generation test with optional scan afterwards") + .ExtraDescription( + "Run load generation tool which inserts auto-generated data " + "into already existing table as fast as possible. If requested, " + "also run scan over the inserted rows to check whether the reported " + "count or inserted rows matches with the expected one. " + "NOTE: it's highly recommended to create a separate table for that " + "because the tool does not clean the inserted data.") + .AddRequiredParameter({ kMasterAddressesArg, + "Comma-separated list of master addresses to run against. " + "Addresses are in 'hostname:port' form where port may be omitted " + "if a master server listens at the default port." }) + .AddOptionalParameter("buffer_flush_watermark_pct") + .AddOptionalParameter("buffer_size_bytes") + .AddOptionalParameter("buffers_num") + .AddOptionalParameter("flush_per_n_rows") + .AddOptionalParameter("keep_auto_table") + .AddOptionalParameter("num_rows_per_thread") + .AddOptionalParameter("num_threads") + .AddOptionalParameter("run_scan") + .AddOptionalParameter("string_fixed") + .AddOptionalParameter("string_len") + .AddOptionalParameter("table_name") + .AddOptionalParameter("use_random") + .Build(); + + return ModeBuilder("test") + .Description("Run various tests against a Kudu cluster") + .AddAction(std::move(insert)) + .Build(); +} + +} // namespace tools +} // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/5385af86/src/kudu/tools/tool_action_tserver.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/tool_action_tserver.cc b/src/kudu/tools/tool_action_tserver.cc index 145ec1b..54397f8 100644 --- a/src/kudu/tools/tool_action_tserver.cc +++ b/src/kudu/tools/tool_action_tserver.cc @@ -44,20 +44,20 @@ const char* const kFlagArg = "flag"; const char* const kValueArg = "value"; Status TServerSetFlag(const RunnerContext& context) { - string address = FindOrDie(context.required_args, kTServerAddressArg); - string flag = FindOrDie(context.required_args, kFlagArg); - string value = FindOrDie(context.required_args, kValueArg); + const string& address = FindOrDie(context.required_args, kTServerAddressArg); + const string& flag = FindOrDie(context.required_args, kFlagArg); + const string& value = FindOrDie(context.required_args, kValueArg); return SetServerFlag(address, tserver::TabletServer::kDefaultPort, flag, value); } Status TServerStatus(const RunnerContext& context) { - string address = FindOrDie(context.required_args, kTServerAddressArg); + const string& address = FindOrDie(context.required_args, kTServerAddressArg); return PrintServerStatus(address, tserver::TabletServer::kDefaultPort); } Status TServerTimestamp(const RunnerContext& context) { - string address = FindOrDie(context.required_args, kTServerAddressArg); + const string& address = FindOrDie(context.required_args, kTServerAddressArg); return PrintServerTimestamp(address, tserver::TabletServer::kDefaultPort); } http://git-wip-us.apache.org/repos/asf/kudu/blob/5385af86/src/kudu/tools/tool_action_wal.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/tool_action_wal.cc b/src/kudu/tools/tool_action_wal.cc index 9b8cdb8..c414320 100644 --- a/src/kudu/tools/tool_action_wal.cc +++ b/src/kudu/tools/tool_action_wal.cc @@ -39,7 +39,7 @@ namespace { const char* const kPathArg = "path"; Status Dump(const RunnerContext& context) { - string segment_path = FindOrDie(context.required_args, kPathArg); + const string& segment_path = FindOrDie(context.required_args, kPathArg); scoped_refptr<ReadableLogSegment> segment; RETURN_NOT_OK(ReadableLogSegment::Open(Env::Default(), segment_path, &segment)); http://git-wip-us.apache.org/repos/asf/kudu/blob/5385af86/src/kudu/tools/tool_main.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/tool_main.cc b/src/kudu/tools/tool_main.cc index caf3bde..8ea1dac 100644 --- a/src/kudu/tools/tool_main.cc +++ b/src/kudu/tools/tool_main.cc @@ -119,6 +119,7 @@ int RunTool(int argc, char** argv, bool show_help) { .AddMode(BuildRemoteReplicaMode()) .AddMode(BuildTableMode()) .AddMode(BuildTabletMode()) + .AddMode(BuildTestMode()) .AddMode(BuildTServerMode()) .AddMode(BuildWalMode()) .Build();
