This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 51b0e91 [tools] use --timeout_ms in tablet_scanner
51b0e91 is described below
commit 51b0e91232b25b676316a0dc724bc1b0e77f0e2e
Author: Alexey Serbin <[email protected]>
AuthorDate: Tue Jun 15 23:46:43 2021 -0700
[tools] use --timeout_ms in tablet_scanner
This patch removes the hard-coded timeout in TableScanner::CopyTask()
and TableScanner::StartWork() methods, switching them to using the
--timeout_ms run-time flag.
In addition, I sneaked in a code clean-up:
* use std::atomic instead of Atomic in table_scanner
* use 'constexpr const char* const' in kudu-tool-test
* groups DECLARE_xxx separately from DEFINE_xxx
Change-Id: Id9ec546cb5cb9c94d13705596517258ee15330f1
Reviewed-on: http://gerrit.cloudera.org:8080/17597
Reviewed-by: Andrew Wong <[email protected]>
Tested-by: Kudu Jenkins
---
src/kudu/tools/kudu-tool-test.cc | 66 +++++++++++++++++++-------------------
src/kudu/tools/table_scanner.cc | 29 +++++++++--------
src/kudu/tools/table_scanner.h | 6 ++--
src/kudu/tools/tool_action_perf.cc | 7 ++--
4 files changed, 56 insertions(+), 52 deletions(-)
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 528347f..ff4f8d9 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -1837,9 +1837,9 @@ TEST_F(ToolTest, TestWalDump) {
}
TEST_F(ToolTest, TestLocalReplicaDumpDataDirs) {
- const string kTestTablet = "ffffffffffffffffffffffffffffffff";
- const string kTestTableId = "test-table";
- const string kTestTableName = "test-fs-data-dirs-dump-table";
+ constexpr const char* const kTestTablet = "ffffffffffffffffffffffffffffffff";
+ constexpr const char* const kTestTableId = "test-table";
+ constexpr const char* const kTestTableName = "test-fs-data-dirs-dump-table";
const Schema kSchema(GetSimpleTestSchema());
const Schema kSchemaWithIds(SchemaBuilder(kSchema).Build());
@@ -1882,10 +1882,10 @@ TEST_F(ToolTest, TestLocalReplicaDumpDataDirs) {
}
TEST_F(ToolTest, TestLocalReplicaDumpMeta) {
+ constexpr const char* const kTestTablet = "ffffffffffffffffffffffffffffffff";
+ constexpr const char* const kTestTableId = "test-table";
+ constexpr const char* const kTestTableName = "test-fs-meta-dump-table";
const string kTestDir = GetTestPath("test");
- const string kTestTablet = "ffffffffffffffffffffffffffffffff";
- const string kTestTableId = "test-table";
- const string kTestTableName = "test-fs-meta-dump-table";
const Schema kSchema(GetSimpleTestSchema());
const Schema kSchemaWithIds(SchemaBuilder(kSchema).Build());
@@ -2261,7 +2261,7 @@ void ToolTest::RunLoadgen(int num_tservers,
opts.num_tablet_servers = num_tservers;
NO_FATALS(StartExternalMiniCluster(std::move(opts)));
if (!table_name.empty()) {
- static const string kKeyColumnName = "key";
+ constexpr const char* const kKeyColumnName = "key";
static const Schema kSchema = Schema(
{
ColumnSchema(kKeyColumnName, INT64),
@@ -2822,14 +2822,14 @@ TEST_F(ToolTest, TestNonRandomWorkloadLoadgen) {
}
TEST_F(ToolTest, TestPerfTableScan) {
- const string& kTableName = "perf.table_scan";
+ constexpr const char* const kTableName = "perf.table_scan";
NO_FATALS(RunLoadgen(1, { "--run_scan" }, kTableName));
NO_FATALS(RunScanTableCheck(kTableName, "", 1, 2000, {}, "perf table_scan"));
}
TEST_F(ToolTest, TestPerfTabletScan) {
// Create a table.
- const string& kTableName = "perf.tablet_scan";
+ constexpr const char* const kTableName = "perf.tablet_scan";
NO_FATALS(RunLoadgen(1, {}, kTableName));
// Get the list of tablets.
@@ -3541,7 +3541,7 @@ TEST_F(ToolTest, TestDeleteTable) {
ASSERT_OK(cluster_->CreateClient(nullptr, &client));
string master_addr = cluster_->master()->bound_rpc_addr().ToString();
- const string& kTableName = "kudu.table";
+ constexpr const char* const kTableName = "kudu.table";
// Create a table.
TestWorkload workload(cluster_.get());
@@ -3569,8 +3569,8 @@ TEST_F(ToolTest, TestRenameTable) {
ASSERT_OK(cluster_->CreateClient(nullptr, &client));
string master_addr = cluster_->master()->bound_rpc_addr().ToString();
- const string& kTableName = "kudu.table";
- const string& kNewTableName = "kudu_table";
+ constexpr const char* const kTableName = "kudu.table";
+ constexpr const char* const kNewTableName = "kudu_table";
// Create the table.
TestWorkload workload(cluster_.get());
@@ -3593,9 +3593,9 @@ TEST_F(ToolTest, TestRenameTable) {
TEST_F(ToolTest, TestRenameColumn) {
NO_FATALS(StartExternalMiniCluster());
- const string& kTableName = "table";
- const string& kColumnName = "col.0";
- const string& kNewColumnName = "col_0";
+ constexpr const char* const kTableName = "table";
+ constexpr const char* const kColumnName = "col.0";
+ constexpr const char* const kNewColumnName = "col_0";
KuduSchemaBuilder schema_builder;
schema_builder.AddColumn("key")
@@ -3813,11 +3813,11 @@ TEST_F(ToolTest, TestScanTableProjection) {
}
TEST_F(ToolTest, TestScanTableMultiPredicates) {
+ constexpr const char* const kTableName = "kudu.table.scan.multipredicates";
+
NO_FATALS(StartExternalMiniCluster());
string master_addr = cluster_->master()->bound_rpc_addr().ToString();
- const string kTableName = "kudu.table.scan.multipredicates";
-
// Create the src table and write some data to it.
TestWorkload ww(cluster_.get());
ww.set_table_name(kTableName);
@@ -3866,8 +3866,8 @@ TEST_P(ToolTestCopyTableParameterized, TestCopyTable) {
TEST_F(ToolTest, TestAlterColumn) {
NO_FATALS(StartExternalMiniCluster());
- const string& kTableName = "kudu.table.alter.column";
- const string& kColumnName = "col.0";
+ constexpr const char* const kTableName = "kudu.table.alter.column";
+ constexpr const char* const kColumnName = "col.0";
KuduSchemaBuilder schema_builder;
schema_builder.AddColumn("key")
@@ -3978,15 +3978,15 @@ TEST_F(ToolTest, TestAlterColumn) {
TEST_F(ToolTest, TestColumnSetDefault) {
NO_FATALS(StartExternalMiniCluster());
- const string& kTableName = "kudu.table.set.default";
- const string& kIntColumn = "col.int";
- const string& kStringColumn = "col.string";
- const string& kBoolColumn = "col.bool";
- const string& kFloatColumn = "col.float";
- const string& kDoubleColumn = "col.double";
- const string& kBinaryColumn = "col.binary";
- const string& kUnixtimeMicrosColumn = "col.unixtime_micros";
- const string& kDecimalColumn = "col.decimal";
+ constexpr const char* const kTableName = "kudu.table.set.default";
+ constexpr const char* const kIntColumn = "col.int";
+ constexpr const char* const kStringColumn = "col.string";
+ constexpr const char* const kBoolColumn = "col.bool";
+ constexpr const char* const kFloatColumn = "col.float";
+ constexpr const char* const kDoubleColumn = "col.double";
+ constexpr const char* const kBinaryColumn = "col.binary";
+ constexpr const char* const kUnixtimeMicrosColumn = "col.unixtime_micros";
+ constexpr const char* const kDecimalColumn = "col.decimal";
KuduSchemaBuilder schema_builder;
schema_builder.AddColumn("key")
@@ -4093,8 +4093,8 @@ TEST_F(ToolTest, TestColumnSetDefault) {
TEST_F(ToolTest, TestDeleteColumn) {
NO_FATALS(StartExternalMiniCluster());
- const string& kTableName = "kudu.table.delete.column";
- const string& kColumnName = "col.0";
+ constexpr const char* const kTableName = "kudu.table.delete.column";
+ constexpr const char* const kColumnName = "col.0";
KuduSchemaBuilder schema_builder;
schema_builder.AddColumn("key")
@@ -4129,7 +4129,7 @@ TEST_F(ToolTest, TestDeleteColumn) {
}
TEST_F(ToolTest, TestChangeTableLimitNotSupported) {
- const string kTableName = "kudu.table.failtochangelimit";
+ constexpr const char* const kTableName = "kudu.table.failtochangelimit";
// Disable table write limit by default, then set limit will not take effect.
NO_FATALS(StartExternalMiniCluster());
// Create the table.
@@ -4179,7 +4179,7 @@ TEST_F(ToolTest, TestChangeTableLimitNotSupported) {
}
TEST_F(ToolTest, TestChangeTableLimitSupported) {
- const string kTableName = "kudu.table.changelimit";
+ constexpr const char* const kTableName = "kudu.table.changelimit";
const int64_t kDiskSizeLimit = 999999;
const int64_t kRowCountLimit = 100000;
@@ -5837,7 +5837,7 @@ TEST_F(ToolTest,
TestFsSwappingDirectoriesFailsGracefully) {
TEST_F(ToolTest, TestStartEndMaintenanceMode) {
NO_FATALS(StartMiniCluster());
// Perform the steps on a tserver that exists and one that doesn't.
- const string& kDummyUuid = "foobaruuid";
+ constexpr const char* const kDummyUuid = "foobaruuid";
MiniMaster* mini_master = mini_cluster_->mini_master();
const string& ts_uuid = mini_cluster_->mini_tablet_server(0)->uuid();
TSManager* ts_manager = mini_master->master()->ts_manager();
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index d88b3c1..f37315e 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -21,6 +21,7 @@
#include <cstddef>
#include <cstdint>
#include <cstring>
+#include <initializer_list>
#include <iostream>
#include <iterator>
#include <map>
@@ -84,10 +85,8 @@ using strings::Substitute;
DEFINE_bool(create_table, true,
"Whether to create the destination table if it doesn't exist.");
-DECLARE_string(columns);
DEFINE_bool(fill_cache, true,
"Whether to fill block cache when scanning.");
-DECLARE_int32(num_threads);
DEFINE_string(predicates, "",
"Query predicates on columns. Unlike traditional SQL syntax, "
"the scan tool's simple query predicates are represented in a "
@@ -109,26 +108,31 @@ DEFINE_string(predicates, "",
"The only supported predicate operator is `AND`.");
DEFINE_bool(show_values, false,
"Whether to show values of scanned rows.");
-DECLARE_string(tablets);
DEFINE_string(write_type, "insert",
"How data should be copied to the destination table. Valid
values are 'insert', "
"'upsert' or the empty string. If the empty string, data will
not be copied "
"(useful when create_table is 'true').");
+DECLARE_bool(row_count_only);
+DECLARE_int32(num_threads);
+DECLARE_int64(timeout_ms);
+DECLARE_string(tablets);
+DECLARE_string(columns);
+
static bool ValidateWriteType(const char* flag_name,
const string& flag_value) {
- const vector<string> allowed_values = { "insert", "upsert", "" };
- if (std::find_if(allowed_values.begin(), allowed_values.end(),
+ static const auto kWriteTypes = { "insert", "upsert", "" };
+ if (std::find_if(kWriteTypes.begin(), kWriteTypes.end(),
[&](const string& allowed_value) {
return iequals(allowed_value, flag_value);
- }) != allowed_values.end()) {
+ }) != kWriteTypes.end()) {
return true;
}
std::ostringstream ss;
ss << "'" << flag_value << "': unsupported value for --" << flag_name
<< " flag; should be one of ";
- copy(allowed_values.begin(), allowed_values.end(),
+ copy(kWriteTypes.begin(), kWriteTypes.end(),
std::ostream_iterator<string>(ss, " "));
LOG(ERROR) << ss.str();
@@ -464,7 +468,6 @@ Status TableScanner::AddRow(const
client::sp::shared_ptr<KuduTable>& table,
Status TableScanner::ScanData(const std::vector<kudu::client::KuduScanToken*>&
tokens,
const std::function<void(const KuduScanBatch&
batch)>& cb) {
-
for (auto token : tokens) {
Stopwatch sw(Stopwatch::THIS_THREAD);
sw.start();
@@ -480,7 +483,7 @@ Status TableScanner::ScanData(const
std::vector<kudu::client::KuduScanToken*>& t
KuduScanBatch batch;
RETURN_NOT_OK(scanner->NextBatch(&batch));
count += batch.NumRows();
- total_count_.IncrementBy(batch.NumRows());
+ total_count_ += batch.NumRows();
cb(batch);
}
@@ -517,7 +520,7 @@ void TableScanner::CopyTask(const vector<KuduScanToken*>&
tokens, Status* thread
client::sp::shared_ptr<KuduSession> session(dst_client_.get()->NewSession());
CHECK_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
CHECK_OK(session->SetErrorBufferSpace(1024));
- session->SetTimeoutMillis(30000);
+ session->SetTimeoutMillis(FLAGS_timeout_ms);
*thread_status = ScanData(tokens, [&](const KuduScanBatch& batch) {
for (const auto& row : batch) {
@@ -557,7 +560,7 @@ Status TableScanner::StartWork(WorkType type) {
if (mode_) {
RETURN_NOT_OK(builder.SetReadMode(mode_.get()));
}
- RETURN_NOT_OK(builder.SetTimeoutMillis(30000));
+ RETURN_NOT_OK(builder.SetTimeoutMillis(FLAGS_timeout_ms));
// Set projection if needed.
if (type == WorkType::kScan) {
@@ -610,13 +613,13 @@ Status TableScanner::StartWork(WorkType type) {
}
}
while (!thread_pool_->WaitFor(MonoDelta::FromSeconds(5))) {
- LOG(INFO) << "Scanned count: " << total_count_.Load();
+ LOG(INFO) << "Scanned count: " << total_count_;
}
thread_pool_->Shutdown();
sw.stop();
if (out_) {
- *out_ << "Total count " << total_count_.Load()
+ *out_ << "Total count " << total_count_
<< " cost " << sw.elapsed().wall_seconds() << " seconds" << endl;
}
diff --git a/src/kudu/tools/table_scanner.h b/src/kudu/tools/table_scanner.h
index eb11d79..d4b0787 100644
--- a/src/kudu/tools/table_scanner.h
+++ b/src/kudu/tools/table_scanner.h
@@ -17,6 +17,7 @@
#pragma once
+#include <atomic>
#include <cstdint>
#include <functional>
#include <iosfwd>
@@ -30,7 +31,6 @@
#include "kudu/client/client.h"
#include "kudu/client/scan_batch.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
-#include "kudu/util/atomic.h"
#include "kudu/util/mutex.h"
#include "kudu/util/status.h"
#include "kudu/util/threadpool.h"
@@ -70,7 +70,7 @@ class TableScanner {
Status StartCopy();
uint64_t TotalScannedCount() const {
- return total_count_.Load();
+ return total_count_;
}
private:
@@ -90,7 +90,7 @@ class TableScanner {
const kudu::client::KuduScanBatch::RowPtr& src_row,
const client::sp::shared_ptr<kudu::client::KuduSession>&
session);
- AtomicInt<uint64_t> total_count_;
+ std::atomic<uint64_t> total_count_;
boost::optional<kudu::client::KuduScanner::ReadMode> mode_;
client::sp::shared_ptr<kudu::client::KuduClient> client_;
std::string table_name_;
diff --git a/src/kudu/tools/tool_action_perf.cc
b/src/kudu/tools/tool_action_perf.cc
index 0efce37..151f11b 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -291,7 +291,6 @@ DEFINE_int64(num_rows_per_thread, 1000,
"Number of rows each thread generates and inserts; "
"-1 means unlimited. All rows generated by a thread are inserted "
"in the context of the same session.");
-DECLARE_int32(num_threads);
DEFINE_bool(use_client_per_thread,
true,
"Use a separate KuduClient instance for each load-generating
thread. "
@@ -319,7 +318,6 @@ DEFINE_int32(show_first_n_errors, 0,
"effective number of errors in the output. If so, consider "
"increasing the size of the error buffer using the "
"'--error_buffer_size_bytes' flag.");
-DECLARE_bool(show_values);
DEFINE_string(string_fixed, "",
"Pre-defined string to write into binary and string columns. "
"Client generates more data per second using pre-defined string "
@@ -336,7 +334,6 @@ DEFINE_string(auto_database, "default",
"not created. This flag is useful primarily when the Hive
Metastore "
"integration is enabled in the cluster. If empty, no database is
"
"used.");
-DECLARE_string(table_name);
DEFINE_int32(table_num_hash_partitions, 8,
"The number of hash partitions to create when this tool creates "
"a new table. Note: The total number of partitions must be "
@@ -383,6 +380,10 @@ DEFINE_bool(txn_rollback, false,
"the inserted rows. Setting --txn_rollback=true implies setting "
"--txn_start=true as well.");
+DECLARE_bool(show_values);
+DECLARE_int32(num_threads);
+DECLARE_string(table_name);
+
namespace kudu {
namespace tools {