This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch branch-1.17.x
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit cdad8f58a95dbb7d6d7cae3aea9412a9e88a457c
Author: Abhishek Chennaka <[email protected]>
AuthorDate: Sun May 14 20:30:32 2023 -0700

    [tools] KUDU-1945: Kudu table copy and perf loadgen
    
    This patch introduces support for 'table copy' CLI tool for the tables
    with auto-incrementing columns. While scanning the source table, we do
    not scan the auto incrementing column and while writing back the
    rows to the destination table, we write all the scanned rows. The
    auto-incrementing column is then populated at the server side during
    each row's write.
    
    It also adds support for 'perf loadgen' CLI tool to insert into tables
    with auto incrementing columns. This currently only works with insert
    and insert_ignore write operation. upsert and upsert_ignore are not
    supported currently.
    
    Change-Id: I754a7e84c16d1f3b2d52be937e1eb50b3d00d759
    Reviewed-on: http://gerrit.cloudera.org:8080/19890
    Reviewed-by: Alexey Serbin <[email protected]>
    Tested-by: Kudu Jenkins
    Reviewed-on: http://gerrit.cloudera.org:8080/19925
    Reviewed-by: Yingchun Lai <[email protected]>
    Tested-by: Yingchun Lai <[email protected]>
---
 src/kudu/integration-tests/data_gen_util.cc |  5 +-
 src/kudu/tools/kudu-tool-test.cc            | 84 +++++++++++++++++++++++++++--
 src/kudu/tools/table_scanner.cc             | 46 ++++++++++++++--
 src/kudu/tools/tool_action_perf.cc          |  3 ++
 4 files changed, 129 insertions(+), 9 deletions(-)

diff --git a/src/kudu/integration-tests/data_gen_util.cc 
b/src/kudu/integration-tests/data_gen_util.cc
index 98a6d2fc4..05923e569 100644
--- a/src/kudu/integration-tests/data_gen_util.cc
+++ b/src/kudu/integration-tests/data_gen_util.cc
@@ -82,10 +82,13 @@ void GenerateDataForRow(const client::KuduSchema& schema, 
uint64_t record_id,
                         RNG* random, KuduPartialRow* row) {
   for (int col_idx = 0; col_idx < schema.num_columns(); col_idx++) {
     // We randomly generate the inserted data, except for the first column,
-    // which is always based on a monotonic "record id".
+    // which is always based on a monotonic "record id" and the 
auto-incrementing column,
+    // if present.
     uint64_t value;
     if (col_idx == 0) {
       value = record_id;
+    } else if (col_idx == schema.GetAutoIncrementingColumnIndex()) {
+      continue;
     } else {
       value = random->Next64();
     }
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index de597ab00..f1b7ff674 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -853,7 +853,8 @@ enum RunCopyTableCheckArgsType {
   kTestCopyTableComplexSchema,
   kTestCopyUnpartitionedTable,
   kTestCopyTablePredicates,
-  kTestCopyTableWithStringBounds
+  kTestCopyTableWithStringBounds,
+  kTestCopyTableAutoIncrementingColumn
 };
 // Subclass of ToolTest that allows running individual test cases with 
different parameters to run
 // 'kudu table copy' CLI tool.
@@ -864,9 +865,10 @@ class ToolTestCopyTableParameterized :
   void SetUp() override {
     test_case_ = GetParam();
     ExternalMiniClusterOptions opts;
-    if (test_case_ == kTestCopyTableSchemaOnly) {
-      // In kTestCopyTableSchemaOnly case, we may create table with RF=3,
-      // means 3 tservers needed at least.
+    if (test_case_ == kTestCopyTableSchemaOnly ||
+        test_case_ == kTestCopyTableAutoIncrementingColumn) {
+      // In kTestCopyTableSchemaOnly and kTestCopyTableAutoIncrementingColumn
+      // case, we may create table with RF=3, means 3 tservers needed at least.
       opts.num_tablet_servers = 3;
     }
     NO_FATALS(StartExternalMiniCluster(opts));
@@ -893,6 +895,10 @@ class ToolTestCopyTableParameterized :
       ww.set_schema(schema);
       ww.Setup();
       return;
+    } else if (test_case_ == kTestCopyTableAutoIncrementingColumn) {
+      KuduSchema schema;
+      ASSERT_OK(CreateAutoIncrementingTable(&schema));
+      ww.set_schema(schema);
     }
     ww.Setup();
     ww.Start();
@@ -922,6 +928,10 @@ class ToolTestCopyTableParameterized :
       case kTestCopyTableDstTableNotExist:
         args.mode = TableCopyMode::INSERT_TO_NEW_TABLE;
         return { args };
+      case kTestCopyTableAutoIncrementingColumn:
+        args.mode = TableCopyMode::INSERT_TO_NEW_TABLE;
+        args.columns = kAutoIncrementingSchemaColumns;
+        return { args };
       case kTestCopyTableInsertIgnore:
         args.mode = TableCopyMode::INSERT_IGNORE_TO_EXISTING_TABLE;
         return { args };
@@ -1186,6 +1196,23 @@ class ToolTestCopyTableParameterized :
         .Create();
   }
 
+  Status CreateAutoIncrementingTable(KuduSchema* schema) {
+    shared_ptr<KuduClient> client;
+    RETURN_NOT_OK(cluster_->CreateClient(nullptr, &client));
+    unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
+    KuduSchemaBuilder b;
+    
b.AddColumn("key")->Type(client::KuduColumnSchema::INT32)->NotNull()->NonUniquePrimaryKey();
+    b.AddColumn("int_val")->Type(client::KuduColumnSchema::INT32);
+    
b.AddColumn("string_val")->Type(client::KuduColumnSchema::STRING)->Nullable();
+    RETURN_NOT_OK(b.Build(schema));
+
+    return table_creator->table_name(kTableName)
+        .schema(schema)
+        .set_range_partition_columns({})
+        .num_replicas(3)
+        .Create();
+  }
+
   void InsertOneRowWithNullCell() {
     shared_ptr<KuduClient> client;
     ASSERT_OK(cluster_->CreateClient(nullptr, &client));
@@ -1203,12 +1230,15 @@ class ToolTestCopyTableParameterized :
 
   static const char kTableName[];
   static const char kSimpleSchemaColumns[];
+  static const char kAutoIncrementingSchemaColumns[];
   static const char kComplexSchemaColumns[];
   int test_case_ = 0;
   int64_t total_rows_ = 0;
 };
 const char ToolTestCopyTableParameterized::kTableName[] = 
"ToolTestCopyTableParameterized";
 const char ToolTestCopyTableParameterized::kSimpleSchemaColumns[] = 
"key,int_val,string_val";
+const char ToolTestCopyTableParameterized::kAutoIncrementingSchemaColumns[]
+    = "key,int_val,string_val";
 const char ToolTestCopyTableParameterized::kComplexSchemaColumns[]
     = 
"key_hash0,key_hash1,key_hash2,key_range,int8_val,int16_val,int32_val,int64_val,"
       
"timestamp_val,string_val,bool_val,float_val,double_val,binary_val,decimal_val";
@@ -1224,7 +1254,8 @@ INSTANTIATE_TEST_SUITE_P(CopyTableParameterized,
                                            kTestCopyTableComplexSchema,
                                            kTestCopyUnpartitionedTable,
                                            kTestCopyTablePredicates,
-                                           kTestCopyTableWithStringBounds));
+                                           kTestCopyTableWithStringBounds,
+                                           
kTestCopyTableAutoIncrementingColumn));
 
 void ToolTest::StartExternalMiniCluster(ExternalMiniClusterOptions opts) {
   cluster_.reset(new ExternalMiniCluster(std::move(opts)));
@@ -3727,6 +3758,49 @@ TEST_F(ToolTest, TestLoadgenAutoGenTablePartitioning) {
   ASSERT_OK(WaitForNumTabletsOnTS(ts, expected_tablets, kTimeout));
 }
 
+TEST_F(ToolTest, TestLoadgenAutoIncrementingColumn) {
+  shared_ptr<KuduClient> client;
+  const string kTableName = "loadgen_auto_incrementing";
+  NO_FATALS(StartExternalMiniCluster());
+
+  // Create a table with auto-incrementing column and a single tablet
+  // for simplicity in the test case.
+  ASSERT_OK(cluster_->CreateClient(nullptr, &client));
+  unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
+  KuduSchemaBuilder b;
+  
b.AddColumn("key")->Type(client::KuduColumnSchema::INT32)->NotNull()->NonUniquePrimaryKey();
+  b.AddColumn("int_val")->Type(client::KuduColumnSchema::INT32);
+  KuduSchema schema;
+  ASSERT_OK(b.Build(&schema));
+  ASSERT_OK(table_creator->table_name(kTableName)
+            .schema(&schema)
+            .set_range_partition_columns({})
+            .num_replicas(1)
+            .Create());
+
+  // Insert data into the table with perf loadgen tool
+  constexpr int kNumRows = 100;
+  NO_FATALS(RunTool(
+      Substitute("perf loadgen $0 -table_name=$1 -num_threads=1 "
+                 "-num_rows_per_thread=$2",
+                 cluster_->master()->bound_rpc_addr().ToString(),
+                 kTableName, kNumRows)));
+
+  // Scan the data back and validate
+  vector<string> lines;
+  NO_FATALS(RunActionStdoutLines(
+      Substitute("table scan $0 $1 -num_threads=1",
+                 cluster_->master()->bound_rpc_addr().ToString(),
+                 kTableName),
+      &lines));
+
+  ASSERT_EQ(kNumRows + 1, lines.size());
+  for (int i = 0; i < kNumRows; i++) {
+    ASSERT_STR_CONTAINS(lines[i], Substitute("int32 key=$0, int64 
auto_incrementing_id=$1,"
+                                             " int32 int_val", i, i+1));
+  }
+}
+
 // Run the loadgen with txn-related options.
 TEST_F(ToolTest, LoadgenTxnBasics) {
   SKIP_IF_SLOW_NOT_ALLOWED();
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index 8178c7379..35b87f70f 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -750,6 +750,27 @@ Status TableScanner::StartWork(WorkType work_type) {
     }
   }
 
+  if (work_type == WorkType::kCopy) {
+    // If we are copying a table we do not want to scan the auto-incrementing 
column as it would be
+    // populated on the server side. This would avoid scanning an entire 
column of the table.
+    if (src_table->schema().GetAutoIncrementingColumnIndex() != -1) {
+      vector<string> projected_column_names;
+      for (int i = 0; i < src_table->schema().num_columns(); i++) {
+        if (src_table->schema().Column(i).name() == 
KuduSchema::GetAutoIncrementingColumnName()) {
+          continue;
+        }
+        
projected_column_names.emplace_back(src_table->schema().Column(i).name());
+      }
+      RETURN_NOT_OK(builder.SetProjectedColumnNames(projected_column_names));
+    }
+    // Ensure both the source and destination table schemas are identical at 
this point.
+    client::sp::shared_ptr<KuduTable> dst_table;
+    RETURN_NOT_OK(dst_client_->get()->OpenTable(*dst_table_name_, &dst_table));
+    if (dst_table->schema() != src_table->schema()) {
+      Status::InvalidArgument("source and destination tables should have the 
same schema");
+    }
+  }
+
   // Set predicates.
   RETURN_NOT_OK(AddPredicates(src_table, &builder));
 
@@ -853,10 +874,29 @@ Status TableScanner::AddRow(KuduSession* session,
       break;  // unreachable
   }
 
+  // If the destination table has auto-incrementing column, we do not set it
+  // as we skip scanning the auto-incrementing column while scanning the 
source table.
   auto* dst_row = write_op->mutable_row();
-  memcpy(dst_row->row_data_, src_row.row_data_,
-         ContiguousRowHelper::row_size(*src_row.schema_));
-  BitmapChangeBits(dst_row->isset_bitmap_, 0, table->schema().num_columns(), 
true);
+  const int auto_incrementing_col_idx = 
table->schema().GetAutoIncrementingColumnIndex();
+  if (auto_incrementing_col_idx == Schema::kColumnNotFound) {
+    memcpy(dst_row->row_data_, src_row.row_data_,
+           ContiguousRowHelper::row_size(*src_row.schema_));
+    BitmapChangeBits(dst_row->isset_bitmap_, 0, table->schema().num_columns(), 
true);
+  } else {
+    int src_iterator = 0;
+    for (int dst_iterator = 0; dst_iterator < table->schema().num_columns(); 
dst_iterator++) {
+      if (auto_incrementing_col_idx != dst_iterator) {
+        if (src_row.IsNull(src_iterator)) {
+          RETURN_NOT_OK(dst_row->SetNull(dst_iterator));
+        } else {
+          RETURN_NOT_OK(dst_row->Set(dst_iterator, src_row.row_data_ +
+              src_row->schema_->column_offset(src_iterator)));
+        }
+        BitmapChange(dst_row->isset_bitmap_, dst_iterator, true);
+        src_iterator++;
+      }
+    }
+  }
 
   return session->Apply(write_op.release());
 }
diff --git a/src/kudu/tools/tool_action_perf.cc 
b/src/kudu/tools/tool_action_perf.cc
index 09cf70ac5..489cc9d01 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -528,6 +528,9 @@ Status GenerateRowData(Generator* key_gen, Generator* 
value_gen, KuduPartialRow*
   // when perform DELETE operations.
   Generator* gen = key_gen;
   for (size_t idx = 0; idx < gen_column_count; ++idx) {
+    if (columns[idx].is_auto_incrementing()) {
+      continue;
+    }
     if (idx == row->schema()->num_key_columns()) {
       gen = value_gen;
     }

Reply via email to