This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 51b0e91  [tools] use --timeout_ms in tablet_scanner
51b0e91 is described below

commit 51b0e91232b25b676316a0dc724bc1b0e77f0e2e
Author: Alexey Serbin <[email protected]>
AuthorDate: Tue Jun 15 23:46:43 2021 -0700

    [tools] use --timeout_ms in tablet_scanner
    
    This patch removes the hard-coded timeout in TableScanner::CopyTask()
    and TableScanner::StartWork() methods, switching them to using the
    --timeout_ms run-time flag.
    
    In addition, I sneaked in a code clean-up:
      * use std::atomic instead of Atomic in table_scanner
      * use 'constexpr const char* const' in kudu-tool-test
      * groups DECLARE_xxx separately from DEFINE_xxx
    
    Change-Id: Id9ec546cb5cb9c94d13705596517258ee15330f1
    Reviewed-on: http://gerrit.cloudera.org:8080/17597
    Reviewed-by: Andrew Wong <[email protected]>
    Tested-by: Kudu Jenkins
---
 src/kudu/tools/kudu-tool-test.cc   | 66 +++++++++++++++++++-------------------
 src/kudu/tools/table_scanner.cc    | 29 +++++++++--------
 src/kudu/tools/table_scanner.h     |  6 ++--
 src/kudu/tools/tool_action_perf.cc |  7 ++--
 4 files changed, 56 insertions(+), 52 deletions(-)

diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 528347f..ff4f8d9 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -1837,9 +1837,9 @@ TEST_F(ToolTest, TestWalDump) {
 }
 
 TEST_F(ToolTest, TestLocalReplicaDumpDataDirs) {
-  const string kTestTablet = "ffffffffffffffffffffffffffffffff";
-  const string kTestTableId = "test-table";
-  const string kTestTableName = "test-fs-data-dirs-dump-table";
+  constexpr const char* const kTestTablet = "ffffffffffffffffffffffffffffffff";
+  constexpr const char* const kTestTableId = "test-table";
+  constexpr const char* const kTestTableName = "test-fs-data-dirs-dump-table";
   const Schema kSchema(GetSimpleTestSchema());
   const Schema kSchemaWithIds(SchemaBuilder(kSchema).Build());
 
@@ -1882,10 +1882,10 @@ TEST_F(ToolTest, TestLocalReplicaDumpDataDirs) {
 }
 
 TEST_F(ToolTest, TestLocalReplicaDumpMeta) {
+  constexpr const char* const kTestTablet = "ffffffffffffffffffffffffffffffff";
+  constexpr const char* const kTestTableId = "test-table";
+  constexpr const char* const kTestTableName = "test-fs-meta-dump-table";
   const string kTestDir = GetTestPath("test");
-  const string kTestTablet = "ffffffffffffffffffffffffffffffff";
-  const string kTestTableId = "test-table";
-  const string kTestTableName = "test-fs-meta-dump-table";
   const Schema kSchema(GetSimpleTestSchema());
   const Schema kSchemaWithIds(SchemaBuilder(kSchema).Build());
 
@@ -2261,7 +2261,7 @@ void ToolTest::RunLoadgen(int num_tservers,
   opts.num_tablet_servers = num_tservers;
   NO_FATALS(StartExternalMiniCluster(std::move(opts)));
   if (!table_name.empty()) {
-    static const string kKeyColumnName = "key";
+    constexpr const char* const kKeyColumnName = "key";
     static const Schema kSchema = Schema(
       {
         ColumnSchema(kKeyColumnName, INT64),
@@ -2822,14 +2822,14 @@ TEST_F(ToolTest, TestNonRandomWorkloadLoadgen) {
 }
 
 TEST_F(ToolTest, TestPerfTableScan) {
-  const string& kTableName = "perf.table_scan";
+  constexpr const char* const kTableName = "perf.table_scan";
   NO_FATALS(RunLoadgen(1, { "--run_scan" }, kTableName));
   NO_FATALS(RunScanTableCheck(kTableName, "", 1, 2000, {}, "perf table_scan"));
 }
 
 TEST_F(ToolTest, TestPerfTabletScan) {
   // Create a table.
-  const string& kTableName = "perf.tablet_scan";
+  constexpr const char* const kTableName = "perf.tablet_scan";
   NO_FATALS(RunLoadgen(1, {}, kTableName));
 
   // Get the list of tablets.
@@ -3541,7 +3541,7 @@ TEST_F(ToolTest, TestDeleteTable) {
   ASSERT_OK(cluster_->CreateClient(nullptr, &client));
   string master_addr = cluster_->master()->bound_rpc_addr().ToString();
 
-  const string& kTableName = "kudu.table";
+  constexpr const char* const kTableName = "kudu.table";
 
   // Create a table.
   TestWorkload workload(cluster_.get());
@@ -3569,8 +3569,8 @@ TEST_F(ToolTest, TestRenameTable) {
   ASSERT_OK(cluster_->CreateClient(nullptr, &client));
   string master_addr = cluster_->master()->bound_rpc_addr().ToString();
 
-  const string& kTableName = "kudu.table";
-  const string& kNewTableName = "kudu_table";
+  constexpr const char* const kTableName = "kudu.table";
+  constexpr const char* const kNewTableName = "kudu_table";
 
   // Create the table.
   TestWorkload workload(cluster_.get());
@@ -3593,9 +3593,9 @@ TEST_F(ToolTest, TestRenameTable) {
 
 TEST_F(ToolTest, TestRenameColumn) {
   NO_FATALS(StartExternalMiniCluster());
-  const string& kTableName = "table";
-  const string& kColumnName = "col.0";
-  const string& kNewColumnName = "col_0";
+  constexpr const char* const kTableName = "table";
+  constexpr const char* const kColumnName = "col.0";
+  constexpr const char* const kNewColumnName = "col_0";
 
   KuduSchemaBuilder schema_builder;
   schema_builder.AddColumn("key")
@@ -3813,11 +3813,11 @@ TEST_F(ToolTest, TestScanTableProjection) {
 }
 
 TEST_F(ToolTest, TestScanTableMultiPredicates) {
+  constexpr const char* const kTableName = "kudu.table.scan.multipredicates";
+
   NO_FATALS(StartExternalMiniCluster());
   string master_addr = cluster_->master()->bound_rpc_addr().ToString();
 
-  const string kTableName = "kudu.table.scan.multipredicates";
-
   // Create the src table and write some data to it.
   TestWorkload ww(cluster_.get());
   ww.set_table_name(kTableName);
@@ -3866,8 +3866,8 @@ TEST_P(ToolTestCopyTableParameterized, TestCopyTable) {
 
 TEST_F(ToolTest, TestAlterColumn) {
   NO_FATALS(StartExternalMiniCluster());
-  const string& kTableName = "kudu.table.alter.column";
-  const string& kColumnName = "col.0";
+  constexpr const char* const kTableName = "kudu.table.alter.column";
+  constexpr const char* const kColumnName = "col.0";
 
   KuduSchemaBuilder schema_builder;
   schema_builder.AddColumn("key")
@@ -3978,15 +3978,15 @@ TEST_F(ToolTest, TestAlterColumn) {
 
 TEST_F(ToolTest, TestColumnSetDefault) {
   NO_FATALS(StartExternalMiniCluster());
-  const string& kTableName = "kudu.table.set.default";
-  const string& kIntColumn = "col.int";
-  const string& kStringColumn = "col.string";
-  const string& kBoolColumn = "col.bool";
-  const string& kFloatColumn = "col.float";
-  const string& kDoubleColumn = "col.double";
-  const string& kBinaryColumn = "col.binary";
-  const string& kUnixtimeMicrosColumn = "col.unixtime_micros";
-  const string& kDecimalColumn = "col.decimal";
+  constexpr const char* const kTableName = "kudu.table.set.default";
+  constexpr const char* const kIntColumn = "col.int";
+  constexpr const char* const kStringColumn = "col.string";
+  constexpr const char* const kBoolColumn = "col.bool";
+  constexpr const char* const kFloatColumn = "col.float";
+  constexpr const char* const kDoubleColumn = "col.double";
+  constexpr const char* const kBinaryColumn = "col.binary";
+  constexpr const char* const kUnixtimeMicrosColumn = "col.unixtime_micros";
+  constexpr const char* const kDecimalColumn = "col.decimal";
 
   KuduSchemaBuilder schema_builder;
   schema_builder.AddColumn("key")
@@ -4093,8 +4093,8 @@ TEST_F(ToolTest, TestColumnSetDefault) {
 
 TEST_F(ToolTest, TestDeleteColumn) {
   NO_FATALS(StartExternalMiniCluster());
-  const string& kTableName = "kudu.table.delete.column";
-  const string& kColumnName = "col.0";
+  constexpr const char* const kTableName = "kudu.table.delete.column";
+  constexpr const char* const kColumnName = "col.0";
 
   KuduSchemaBuilder schema_builder;
   schema_builder.AddColumn("key")
@@ -4129,7 +4129,7 @@ TEST_F(ToolTest, TestDeleteColumn) {
 }
 
 TEST_F(ToolTest, TestChangeTableLimitNotSupported) {
-  const string kTableName = "kudu.table.failtochangelimit";
+  constexpr const char* const kTableName = "kudu.table.failtochangelimit";
   // Disable table write limit by default, then set limit will not take effect.
   NO_FATALS(StartExternalMiniCluster());
   // Create the table.
@@ -4179,7 +4179,7 @@ TEST_F(ToolTest, TestChangeTableLimitNotSupported) {
 }
 
 TEST_F(ToolTest, TestChangeTableLimitSupported) {
-  const string kTableName = "kudu.table.changelimit";
+  constexpr const char* const kTableName = "kudu.table.changelimit";
   const int64_t kDiskSizeLimit = 999999;
   const int64_t kRowCountLimit = 100000;
 
@@ -5837,7 +5837,7 @@ TEST_F(ToolTest, 
TestFsSwappingDirectoriesFailsGracefully) {
 TEST_F(ToolTest, TestStartEndMaintenanceMode) {
   NO_FATALS(StartMiniCluster());
   // Perform the steps on a tserver that exists and one that doesn't.
-  const string& kDummyUuid = "foobaruuid";
+  constexpr const char* const kDummyUuid = "foobaruuid";
   MiniMaster* mini_master = mini_cluster_->mini_master();
   const string& ts_uuid = mini_cluster_->mini_tablet_server(0)->uuid();
   TSManager* ts_manager = mini_master->master()->ts_manager();
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index d88b3c1..f37315e 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -21,6 +21,7 @@
 #include <cstddef>
 #include <cstdint>
 #include <cstring>
+#include <initializer_list>
 #include <iostream>
 #include <iterator>
 #include <map>
@@ -84,10 +85,8 @@ using strings::Substitute;
 
 DEFINE_bool(create_table, true,
             "Whether to create the destination table if it doesn't exist.");
-DECLARE_string(columns);
 DEFINE_bool(fill_cache, true,
             "Whether to fill block cache when scanning.");
-DECLARE_int32(num_threads);
 DEFINE_string(predicates, "",
               "Query predicates on columns. Unlike traditional SQL syntax, "
               "the scan tool's simple query predicates are represented in a "
@@ -109,26 +108,31 @@ DEFINE_string(predicates, "",
               "The only supported predicate operator is `AND`.");
 DEFINE_bool(show_values, false,
             "Whether to show values of scanned rows.");
-DECLARE_string(tablets);
 DEFINE_string(write_type, "insert",
               "How data should be copied to the destination table. Valid 
values are 'insert', "
               "'upsert' or the empty string. If the empty string, data will 
not be copied "
               "(useful when create_table is 'true').");
 
+DECLARE_bool(row_count_only);
+DECLARE_int32(num_threads);
+DECLARE_int64(timeout_ms);
+DECLARE_string(tablets);
+DECLARE_string(columns);
+
 static bool ValidateWriteType(const char* flag_name,
                               const string& flag_value) {
-  const vector<string> allowed_values = { "insert", "upsert", "" };
-  if (std::find_if(allowed_values.begin(), allowed_values.end(),
+  static const auto kWriteTypes = { "insert", "upsert", "" };
+  if (std::find_if(kWriteTypes.begin(), kWriteTypes.end(),
                    [&](const string& allowed_value) {
                      return iequals(allowed_value, flag_value);
-                   }) != allowed_values.end()) {
+                   }) != kWriteTypes.end()) {
     return true;
   }
 
   std::ostringstream ss;
   ss << "'" << flag_value << "': unsupported value for --" << flag_name
      << " flag; should be one of ";
-  copy(allowed_values.begin(), allowed_values.end(),
+  copy(kWriteTypes.begin(), kWriteTypes.end(),
        std::ostream_iterator<string>(ss, " "));
   LOG(ERROR) << ss.str();
 
@@ -464,7 +468,6 @@ Status TableScanner::AddRow(const 
client::sp::shared_ptr<KuduTable>& table,
 
 Status TableScanner::ScanData(const std::vector<kudu::client::KuduScanToken*>& 
tokens,
                               const std::function<void(const KuduScanBatch& 
batch)>& cb) {
-
   for (auto token : tokens) {
     Stopwatch sw(Stopwatch::THIS_THREAD);
     sw.start();
@@ -480,7 +483,7 @@ Status TableScanner::ScanData(const 
std::vector<kudu::client::KuduScanToken*>& t
       KuduScanBatch batch;
       RETURN_NOT_OK(scanner->NextBatch(&batch));
       count += batch.NumRows();
-      total_count_.IncrementBy(batch.NumRows());
+      total_count_ += batch.NumRows();
       cb(batch);
     }
 
@@ -517,7 +520,7 @@ void TableScanner::CopyTask(const vector<KuduScanToken*>& 
tokens, Status* thread
   client::sp::shared_ptr<KuduSession> session(dst_client_.get()->NewSession());
   CHECK_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
   CHECK_OK(session->SetErrorBufferSpace(1024));
-  session->SetTimeoutMillis(30000);
+  session->SetTimeoutMillis(FLAGS_timeout_ms);
 
   *thread_status = ScanData(tokens, [&](const KuduScanBatch& batch) {
     for (const auto& row : batch) {
@@ -557,7 +560,7 @@ Status TableScanner::StartWork(WorkType type) {
   if (mode_) {
     RETURN_NOT_OK(builder.SetReadMode(mode_.get()));
   }
-  RETURN_NOT_OK(builder.SetTimeoutMillis(30000));
+  RETURN_NOT_OK(builder.SetTimeoutMillis(FLAGS_timeout_ms));
 
   // Set projection if needed.
   if (type == WorkType::kScan) {
@@ -610,13 +613,13 @@ Status TableScanner::StartWork(WorkType type) {
     }
   }
   while (!thread_pool_->WaitFor(MonoDelta::FromSeconds(5))) {
-    LOG(INFO) << "Scanned count: " << total_count_.Load();
+    LOG(INFO) << "Scanned count: " << total_count_;
   }
   thread_pool_->Shutdown();
 
   sw.stop();
   if (out_) {
-    *out_ << "Total count " << total_count_.Load()
+    *out_ << "Total count " << total_count_
         << " cost " << sw.elapsed().wall_seconds() << " seconds" << endl;
   }
 
diff --git a/src/kudu/tools/table_scanner.h b/src/kudu/tools/table_scanner.h
index eb11d79..d4b0787 100644
--- a/src/kudu/tools/table_scanner.h
+++ b/src/kudu/tools/table_scanner.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <atomic>
 #include <cstdint>
 #include <functional>
 #include <iosfwd>
@@ -30,7 +31,6 @@
 #include "kudu/client/client.h"
 #include "kudu/client/scan_batch.h"
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
-#include "kudu/util/atomic.h"
 #include "kudu/util/mutex.h"
 #include "kudu/util/status.h"
 #include "kudu/util/threadpool.h"
@@ -70,7 +70,7 @@ class TableScanner {
   Status StartCopy();
 
   uint64_t TotalScannedCount() const {
-    return total_count_.Load();
+    return total_count_;
   }
 
  private:
@@ -90,7 +90,7 @@ class TableScanner {
                 const kudu::client::KuduScanBatch::RowPtr& src_row,
                 const client::sp::shared_ptr<kudu::client::KuduSession>& 
session);
 
-  AtomicInt<uint64_t> total_count_;
+  std::atomic<uint64_t> total_count_;
   boost::optional<kudu::client::KuduScanner::ReadMode> mode_;
   client::sp::shared_ptr<kudu::client::KuduClient> client_;
   std::string table_name_;
diff --git a/src/kudu/tools/tool_action_perf.cc 
b/src/kudu/tools/tool_action_perf.cc
index 0efce37..151f11b 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -291,7 +291,6 @@ DEFINE_int64(num_rows_per_thread, 1000,
              "Number of rows each thread generates and inserts; "
              "-1 means unlimited. All rows generated by a thread are inserted "
              "in the context of the same session.");
-DECLARE_int32(num_threads);
 DEFINE_bool(use_client_per_thread,
             true,
             "Use a separate KuduClient instance for each load-generating 
thread. "
@@ -319,7 +318,6 @@ DEFINE_int32(show_first_n_errors, 0,
              "effective number of errors in the output. If so, consider "
              "increasing the size of the error buffer using the "
              "'--error_buffer_size_bytes' flag.");
-DECLARE_bool(show_values);
 DEFINE_string(string_fixed, "",
               "Pre-defined string to write into binary and string columns. "
               "Client generates more data per second using pre-defined string "
@@ -336,7 +334,6 @@ DEFINE_string(auto_database, "default",
               "not created. This flag is useful primarily when the Hive 
Metastore "
               "integration is enabled in the cluster. If empty, no database is 
"
               "used.");
-DECLARE_string(table_name);
 DEFINE_int32(table_num_hash_partitions, 8,
              "The number of hash partitions to create when this tool creates "
              "a new table. Note: The total number of partitions must be "
@@ -383,6 +380,10 @@ DEFINE_bool(txn_rollback, false,
             "the inserted rows. Setting --txn_rollback=true implies setting "
             "--txn_start=true as well.");
 
+DECLARE_bool(show_values);
+DECLARE_int32(num_threads);
+DECLARE_string(table_name);
+
 namespace kudu {
 namespace tools {
 

Reply via email to