This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 05903d962 [Tools] Support to config hash bucket numbers when copy a
table
05903d962 is described below
commit 05903d96296f9d06423deaddcfbeb5f22461dc76
Author: xinghuayu007 <[email protected]>
AuthorDate: Thu Jun 9 11:09:16 2022 +0800
[Tools] Support to config hash bucket numbers when copy a table
When copying a table to another table, we can create the table
with the same schema. But we can not configure the number of hash bucket
in the new table. Why we need to config hash bucket number? The old table
might be configured with small number of hash buckets, but contained a lot
of data in it. When copying the table to new cluster, we want to add more
hash buckets to store. And there isn't a way to change the number of hash
buckets in the partition schema of an already existing table.
Change-Id: I1cec38e5ea09c66bfed20622b85033602da60d41
Reviewed-on: http://gerrit.cloudera.org:8080/18604
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/tools/kudu-tool-test.cc | 117 +++++++++++++++++++++++++++++++++---
src/kudu/tools/table_scanner.cc | 46 +++++++++++++-
src/kudu/tools/tool_action_table.cc | 1 +
3 files changed, 155 insertions(+), 9 deletions(-)
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 876a1854e..23601cedf 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -522,6 +522,7 @@ class ToolTest : public KuduTest {
string columns;
TableCopyMode mode;
int32_t create_table_replication_factor;
+ string create_table_hash_bucket_nums;
};
void RunCopyTableCheck(const RunCopyTableCheckArgs& args) {
@@ -569,9 +570,11 @@ class ToolTest : public KuduTest {
// Execute copy command.
string stdout;
- NO_FATALS(RunActionStdoutString(
+ string stderr;
+ Status s = RunActionStdoutStderrString(
Substitute("table copy $0 $1 $2 -dst_table=$3 -predicates=$4
-write_type=$5 "
- "-create_table=$6
-create_table_replication_factor=$7",
+ "-create_table=$6
-create_table_replication_factor=$7 "
+ "-create_table_hash_bucket_nums=$8",
cluster_->master()->bound_rpc_addr().ToString(),
args.src_table_name,
cluster_->master()->bound_rpc_addr().ToString(),
@@ -579,8 +582,36 @@ class ToolTest : public KuduTest {
args.predicates_json,
write_type,
create_table,
- args.create_table_replication_factor),
- &stdout));
+ args.create_table_replication_factor,
+ args.create_table_hash_bucket_nums),
+ &stdout, &stderr);
+ if (args.create_table_hash_bucket_nums == "10,aa") {
+ ASSERT_STR_CONTAINS(stderr, "cannot parse the number of hash buckets.");
+ return;
+ }
+ if (args.create_table_hash_bucket_nums == "10,20,30") {
+ ASSERT_STR_CONTAINS(stderr, "The count of hash bucket numbers must be
equal to the "
+ "number of hash schema dimensions.");
+ return;
+ }
+ if (args.create_table_hash_bucket_nums == "10") {
+ ASSERT_STR_CONTAINS(stderr, "The count of hash bucket numbers must be
equal to the "
+ "number of hash schema dimensions.");
+ return;
+ }
+ if (args.create_table_hash_bucket_nums == "10,1") {
+ ASSERT_STR_CONTAINS(stderr, "The number of hash buckets must not be less
than 2.");
+ return;
+ }
+ if (args.create_table_hash_bucket_nums == "10,1") {
+ ASSERT_STR_CONTAINS(stderr, "The number of hash buckets must not be less
than 2.");
+ return;
+ }
+ if (args.create_table_hash_bucket_nums == "10,50") {
+ ASSERT_STR_CONTAINS(stderr, "There are no hash partitions defined in
this table.");
+ return;
+ }
+ ASSERT_TRUE(s.ok());
// Check total count.
int64_t total = max<int64_t>(args.max_value - args.min_value + 1, 0);
@@ -612,6 +643,32 @@ class ToolTest : public KuduTest {
// Replication factor is different when explicitly set it to 3
(default 1).
if (args.create_table_replication_factor == 3 &&
HasPrefixString(src_schema[i], "REPLICAS ")) continue;
+ vector<string> hash_bucket_nums =
Split(args.create_table_hash_bucket_nums,
+ ",", strings::SkipEmpty());
+ if (args.create_table_hash_bucket_nums == "10,20" &&
+ HasPrefixString(src_schema[i], "HASH (key_hash0)")) {
+ ASSERT_STR_CONTAINS(dst_schema[i], Substitute("PARTITIONS $0",
+ hash_bucket_nums[0]));
+ continue;
+ }
+ if (args.create_table_hash_bucket_nums == "10,20" &&
+ HasPrefixString(src_schema[i], "HASH (key_hash1, key_hash2)")) {
+ ASSERT_STR_CONTAINS(dst_schema[i], Substitute("PARTITIONS $0",
+ hash_bucket_nums[1]));
+ continue;
+ }
+ if (args.create_table_hash_bucket_nums == "10,2" &&
+ HasPrefixString(src_schema[i], "HASH (key_hash0)")) {
+ ASSERT_STR_CONTAINS(dst_schema[i], Substitute("PARTITIONS $0",
+ hash_bucket_nums[0]));
+ continue;
+ }
+ if (args.create_table_hash_bucket_nums == "10,2" &&
+ HasPrefixString(src_schema[i], "HASH (key_hash1, key_hash2)")) {
+ ASSERT_STR_CONTAINS(dst_schema[i], Substitute("PARTITIONS $0",
+ hash_bucket_nums[1]));
+ continue;
+ }
ASSERT_EQ(src_schema[i], dst_schema[i]);
}
}
@@ -804,13 +861,57 @@ class ToolTestCopyTableParameterized :
}
return multi_args;
}
- case kTestCopyTableComplexSchema:
+ case kTestCopyTableComplexSchema: {
args.columns = kComplexSchemaColumns;
args.mode = TableCopyMode::INSERT_TO_NOT_EXIST_TABLE;
- return { args };
- case kTestCopyUnpartitionedTable:
+ vector<RunCopyTableCheckArgs> multi_args;
+ {
+ auto args_temp = args;
+ args.create_table_hash_bucket_nums = "10,20";
+ multi_args.emplace_back(std::move(args_temp));
+ }
+ {
+ auto args_temp = args;
+ args_temp.create_table_hash_bucket_nums = "10,aa";
+ multi_args.emplace_back(std::move(args_temp));
+ }
+ {
+ auto args_temp = args;
+ args_temp.create_table_hash_bucket_nums = "10,20,30";
+ multi_args.emplace_back(std::move(args_temp));
+ }
+ {
+ auto args_temp = args;
+ args_temp.create_table_hash_bucket_nums = "10";
+ multi_args.emplace_back(std::move(args_temp));
+ }
+ {
+ auto args_temp = args;
+ args_temp.create_table_hash_bucket_nums = "10,1";
+ multi_args.emplace_back(std::move(args_temp));
+ }
+ {
+ auto args_temp = args;
+ args_temp.create_table_hash_bucket_nums = "10,2";
+ multi_args.emplace_back(std::move(args_temp));
+ }
+ {
+ auto args_temp = args;
+ args_temp.create_table_hash_bucket_nums = "";
+ multi_args.emplace_back(std::move(args_temp));
+ }
+ return multi_args;
+ }
+ case kTestCopyUnpartitionedTable: {
args.mode = TableCopyMode::INSERT_TO_NOT_EXIST_TABLE;
- return {args};
+ vector<RunCopyTableCheckArgs> multi_args;
+ {
+ auto args_temp = args;
+ args.create_table_hash_bucket_nums = "10,50";
+ multi_args.emplace_back(std::move(args_temp));
+ }
+ return multi_args;
+ }
case kTestCopyTablePredicates: {
auto mid = total_rows_ / 2;
vector<RunCopyTableCheckArgs> multi_args;
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index c2c535db9..f60c3a386 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -48,6 +48,7 @@
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/numbers.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/bitmap.h"
@@ -93,6 +94,8 @@ DEFINE_bool(create_table, true,
DEFINE_int32(create_table_replication_factor, -1,
"The replication factor of the destination table if the table
will be created. "
"By default, the replication factor of source table will be
used.");
+DEFINE_string(create_table_hash_bucket_nums, "",
+ "The number of hash buckets in each hash dimension seperated by
comma");
DEFINE_bool(fill_cache, true,
"Whether to fill block cache when scanning.");
DEFINE_string(predicates, "",
@@ -428,11 +431,52 @@ Status CreateDstTableIfNeeded(const
client::sp::shared_ptr<KuduTable>& src_table
.num_replicas(num_replicas);
// Add hash partition schema.
+ vector<int> hash_bucket_nums;
+ if (!partition_schema.hash_schema().empty()) {
+ vector<string> hash_bucket_nums_str =
Split(FLAGS_create_table_hash_bucket_nums,
+ ",", strings::SkipEmpty());
+ // FLAGS_create_table_hash_bucket_nums is not defined, set it to -1
defaultly.
+ if (hash_bucket_nums_str.empty()) {
+ for (int i = 0; i < partition_schema.hash_schema().size(); i++) {
+ hash_bucket_nums.push_back(-1);
+ }
+ } else {
+ // If the --create_table_hash_bucket_nums flag is set, the number
+ // of comma-separated elements must be equal to the number of hash
schema dimensions.
+ if (partition_schema.hash_schema().size() !=
hash_bucket_nums_str.size()) {
+ return Status::InvalidArgument("The count of hash bucket numbers must
be equal to the "
+ "number of hash schema dimensions.");
+ }
+ for (int i = 0; i < hash_bucket_nums_str.size(); i++) {
+ int bucket_num = 0;
+ bool is_number = safe_strto32(hash_bucket_nums_str[i], &bucket_num);
+ if (!is_number) {
+ return Status::InvalidArgument(Substitute("'$0': cannot parse the
number "
+ "of hash buckets.",
+ hash_bucket_nums_str[i]));
+ }
+ if (bucket_num < 2) {
+ return Status::InvalidArgument("The number of hash buckets must not
be less than 2.");
+ }
+ hash_bucket_nums.push_back(bucket_num);
+ }
+ }
+ }
+
+ if (partition_schema.hash_schema().empty() &&
+ !FLAGS_create_table_hash_bucket_nums.empty()) {
+ return Status::InvalidArgument("There are no hash partitions defined in
this table.");
+ }
+
+ int i = 0;
for (const auto& hash_dimension : partition_schema.hash_schema()) {
+ int num_buckets = hash_bucket_nums[i] != -1 ? hash_bucket_nums[i] :
+ hash_dimension.num_buckets;
auto hash_columns = convert_column_ids_to_names(hash_dimension.column_ids);
table_creator->add_hash_partitions(hash_columns,
- hash_dimension.num_buckets,
+ num_buckets,
hash_dimension.seed);
+ i++;
}
// Add range partition schema.
diff --git a/src/kudu/tools/tool_action_table.cc
b/src/kudu/tools/tool_action_table.cc
index 23b260bc7..e68c6f1f7 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -1567,6 +1567,7 @@ unique_ptr<Mode> BuildTableMode() {
.AddRequiredParameter({ kTableNameArg, "Name of the source table" })
.AddRequiredParameter({ kDestMasterAddressesArg,
kDestMasterAddressesArgDesc })
.AddOptionalParameter("create_table")
+ .AddOptionalParameter("create_table_hash_bucket_nums")
.AddOptionalParameter("create_table_replication_factor")
.AddOptionalParameter("dst_table")
.AddOptionalParameter("num_threads")