This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new f6c32e991 KUDU-3606 [tools] Check column IDs when --strict_column_id
is enabled
f6c32e991 is described below
commit f6c32e991cb94a06d62ee6af167350496f9a51af
Author: Yingchun Lai <[email protected]>
AuthorDate: Tue Aug 13 20:53:46 2024 +0800
KUDU-3606 [tools] Check column IDs when --strict_column_id is enabled
This patch ensures the destination table has the
same user-facing schema and internal column ids
as the source table when using 'kudu table copy'.
If they are different, more details, including
column ids, default values, will be shown in
error messages. A new flag --strict_column_id is
added to indicate whether to compare column ids
when checking if the schemas match.
Change-Id: I77af7b92b45f3866cc8b699e61b9e71b73ed6c4b
Reviewed-on: http://gerrit.cloudera.org:8080/21695
Tested-by: Yingchun Lai <[email protected]>
Reviewed-by: Mahesh Reddy <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/tools/kudu-tool-test.cc | 79 +++++++++++++++++++--
src/kudu/tools/table_scanner.cc | 134 +++++++++++++++++++++++++++++-------
src/kudu/tools/tool_action_table.cc | 2 +
3 files changed, 183 insertions(+), 32 deletions(-)
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 9dc36d6fc..186c71ebf 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -531,6 +531,7 @@ class ToolTest : public KuduTest {
COPY_SCHEMA_ONLY = 3,
INSERT_IGNORE_TO_EXISTING_TABLE = 4,
UPSERT_IGNORE_TO_EXISTING_TABLE = 5,
+ COPY_SCHEMA_THEN_COPY_DATA = 6,
};
struct RunCopyTableCheckArgs {
@@ -542,6 +543,7 @@ class ToolTest : public KuduTest {
TableCopyMode mode;
int32_t create_table_replication_factor;
string create_table_hash_bucket_nums;
+ bool strict_column_id;
};
void RunCopyTableCheck(const RunCopyTableCheckArgs& args) {
@@ -608,17 +610,32 @@ class ToolTest : public KuduTest {
write_type = "";
create_table = "true";
break;
+ case TableCopyMode::COPY_SCHEMA_THEN_COPY_DATA:
+ write_type = "insert";
+ create_table = "false";
+ break;
default:
ASSERT_TRUE(false);
}
+ // Only copy schema.
+ if (args.mode == TableCopyMode::COPY_SCHEMA_THEN_COPY_DATA) {
+ RunActionStdoutNone(
+ Substitute("table copy $0 $1 $2 -dst_table=$3 -write_type="" "
+ "-create_table=true",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ args.src_table_name,
+ cluster_->master()->bound_rpc_addr().ToString(),
+ kDstTableName));
+ }
+
// Execute copy command.
string stdout;
string stderr;
Status s = RunActionStdoutStderrString(
Substitute("table copy $0 $1 $2 -dst_table=$3 -predicates=$4
-write_type=$5 "
"-create_table=$6
-create_table_replication_factor=$7 "
- "-create_table_hash_bucket_nums=$8",
+ "-create_table_hash_bucket_nums=$8
-strict_column_id=$9",
cluster_->master()->bound_rpc_addr().ToString(),
args.src_table_name,
cluster_->master()->bound_rpc_addr().ToString(),
@@ -627,7 +644,8 @@ class ToolTest : public KuduTest {
write_type,
create_table,
args.create_table_replication_factor,
- args.create_table_hash_bucket_nums),
+ args.create_table_hash_bucket_nums,
+ args.strict_column_id),
&stdout, &stderr);
if (args.create_table_hash_bucket_nums == "10,aa") {
ASSERT_STR_CONTAINS(stderr, "cannot parse the number of hash buckets.");
@@ -655,7 +673,7 @@ class ToolTest : public KuduTest {
ASSERT_STR_CONTAINS(stderr, "There are no hash partitions defined in
this table.");
return;
}
- ASSERT_TRUE(s.ok());
+ ASSERT_TRUE(s.ok()) << s.ToString() << ": " << stderr;
// Check total count.
int64_t total = max<int64_t>(args.max_value - args.min_value + 1, 0);
@@ -865,7 +883,8 @@ enum RunCopyTableCheckArgsType {
kTestCopyUnpartitionedTable,
kTestCopyTablePredicates,
kTestCopyTableWithStringBounds,
- kTestCopyTableAutoIncrementingColumn
+ kTestCopyTableAutoIncrementingColumn,
+ kTestCopyTableWithColumnIdHoles,
};
// Subclass of ToolTest that allows running individual test cases with
different parameters to run
// 'kudu table copy' CLI tool.
@@ -910,6 +929,10 @@ class ToolTestCopyTableParameterized :
KuduSchema schema;
ASSERT_OK(CreateAutoIncrementingTable(&schema));
ww.set_schema(schema);
+ } else if (test_case_ == kTestCopyTableWithColumnIdHoles) {
+ KuduSchema schema;
+ ASSERT_OK(CreateTableWithColumnIdHoles(&schema));
+ ww.set_schema(schema);
}
ww.Setup();
ww.Start();
@@ -932,7 +955,9 @@ class ToolTestCopyTableParameterized :
total_rows_,
kSimpleSchemaColumns,
TableCopyMode::INSERT_TO_EXISTING_TABLE,
- -1 };
+ -1,
+ "",
+ false };
switch (test_case_) {
case kTestCopyTableDstTableExist:
return { args };
@@ -1091,6 +1116,11 @@ class ToolTestCopyTableParameterized :
args.mode = TableCopyMode::COPY_SCHEMA_ONLY;
args.columns = "";
return {args};
+ case kTestCopyTableWithColumnIdHoles:
+ args.mode = TableCopyMode::COPY_SCHEMA_THEN_COPY_DATA;
+ args.columns = "";
+ args.strict_column_id = true;
+ return { args };
default:
LOG(FATAL) << "Unknown type " << test_case_;
}
@@ -1224,6 +1254,42 @@ class ToolTestCopyTableParameterized :
.Create();
}
+ Status CreateTableWithColumnIdHoles(KuduSchema* schema) {
+ shared_ptr<KuduClient> client;
+ RETURN_NOT_OK(cluster_->CreateClient(nullptr, &client));
+ unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
+ KuduSchemaBuilder b;
+
b.AddColumn("key")->Type(client::KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
+ b.AddColumn("int_val1")->Type(client::KuduColumnSchema::INT32);
+
b.AddColumn("string_val1")->Type(client::KuduColumnSchema::STRING)->Nullable();
+ // The columns with 'delete' word will be dropped to construct column id
holes.
+
b.AddColumn("to_delete_col1")->Type(client::KuduColumnSchema::INT32)->Nullable();
+
b.AddColumn("int_val2")->Type(client::KuduColumnSchema::STRING)->Nullable();
+
b.AddColumn("to_delete_col2")->Type(client::KuduColumnSchema::INT32)->Nullable();
+
b.AddColumn("to_delete_col3")->Type(client::KuduColumnSchema::STRING)->Nullable();
+
b.AddColumn("string_val2")->Type(client::KuduColumnSchema::STRING)->Nullable();
+
b.AddColumn("to_delete_col4")->Type(client::KuduColumnSchema::INT32)->Nullable();
+ RETURN_NOT_OK(b.Build(schema));
+
+ RETURN_NOT_OK(table_creator->table_name(kTableName)
+ .schema(schema)
+ .set_range_partition_columns({})
+ .num_replicas(1)
+ .Create());
+
+ unique_ptr<client::KuduTableAlterer>
alterer(client->NewTableAlterer(kTableName));
+ RETURN_NOT_OK(alterer->DropColumn("to_delete_col1")
+ ->DropColumn("to_delete_col2")
+ ->DropColumn("to_delete_col3")
+ ->DropColumn("to_delete_col4")
+ ->Alter());
+
+ client::sp::shared_ptr<KuduTable> table;
+ RETURN_NOT_OK(client->OpenTable(kTableName, &table));
+ *schema = table->schema();
+ return Status::OK();
+ }
+
void InsertOneRowWithNullCell() {
shared_ptr<KuduClient> client;
ASSERT_OK(cluster_->CreateClient(nullptr, &client));
@@ -1266,7 +1332,8 @@ INSTANTIATE_TEST_SUITE_P(CopyTableParameterized,
kTestCopyUnpartitionedTable,
kTestCopyTablePredicates,
kTestCopyTableWithStringBounds,
-
kTestCopyTableAutoIncrementingColumn));
+
kTestCopyTableAutoIncrementingColumn,
+ kTestCopyTableWithColumnIdHoles));
void ToolTest::StartExternalMiniCluster(ExternalMiniClusterOptions opts) {
cluster_.reset(new ExternalMiniCluster(std::move(opts)));
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index 8fb4fd932..46e293c91 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -43,6 +43,7 @@
#include "kudu/client/value.h"
#include "kudu/client/write_op.h"
#include "kudu/common/column_predicate.h"
+#include "kudu/common/common.pb.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/partition.h"
#include "kudu/common/row.h"
@@ -58,6 +59,7 @@
#include "kudu/util/logging.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/oid_generator.h"
#include "kudu/util/slice.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/string_case.h"
@@ -141,6 +143,12 @@ DEFINE_string(replica_selection, "CLOSEST",
"Replica selection for scan operations. Acceptable values are: "
"CLOSEST, LEADER (maps into KuduClient::CLOSEST_REPLICA and "
"KuduClient::LEADER_ONLY correspondingly).");
+DEFINE_bool(strict_column_id, false,
+ "Whether to compare column IDs when comparing schemas. It should
be enabled when we "
+ "expect the destination table has the same server internal column
IDs as the source "
+ "table. For example, before using 'kudu remote_replica copy' to
copy data from "
+ "another table, we should enable this flag to make sure the table
schemas are "
+ "completely the same.");
DEFINE_int64(table_copy_throttler_bytes_per_sec, 0,
"Limit table copying speed. It limits the copying speed of all
the tablets "
"in one table for one session. The default value is 0, which
means not limiting "
@@ -411,55 +419,113 @@ Status AddPredicates(const
client::sp::shared_ptr<KuduTable>& table,
return Status::OK();
}
+Status SchemasMatch(const Schema& src_table_schema,
+ const Schema& dst_table_schema) {
+ bool same_schema = (src_table_schema == dst_table_schema);
+ if (FLAGS_strict_column_id) {
+ same_schema &= (src_table_schema.column_ids() ==
dst_table_schema.column_ids());
+ }
+ if (!same_schema) {
+ // The Schema's "==" operator uses the default COMPARE_ALL mode, so we
show all the column
+ // information to help users to understand the difference.
+ static const auto kSchemaStringifyMode =
+ Schema::ToStringMode::BASE_INFO |
+ Schema::ToStringMode::WITH_COLUMN_ATTRIBUTES |
+ Schema::ToStringMode::WITH_COLUMN_COMMENTS |
+ Schema::ToStringMode::WITH_COLUMN_IDS;
+ return Status::NotSupported(Substitute(
+ "destination table's schema differs from the source one ($0 vs $1)",
+ dst_table_schema.ToString(kSchemaStringifyMode),
+ src_table_schema.ToString(kSchemaStringifyMode)));
+ }
+ return Status::OK();
+}
+
Status CreateDstTableIfNeeded(const client::sp::shared_ptr<KuduTable>&
src_table,
const client::sp::shared_ptr<KuduClient>&
dst_client,
const string& dst_table_name) {
client::sp::shared_ptr<KuduTable> dst_table;
- Status s = dst_client->OpenTable(dst_table_name, &dst_table);
+ auto s = dst_client->OpenTable(dst_table_name, &dst_table);
if (!s.IsNotFound() && !s.ok()) {
return s;
}
// Destination table exists.
- const KuduSchema& src_table_schema = src_table->schema();
+ const Schema src_schema_internal = KuduSchema::ToSchema(src_table->schema());
if (s.ok()) {
if (src_table->id() == dst_table->id()) {
return Status::AlreadyPresent("Destination table is the same as the
source table.");
}
-
- const KuduSchema& dst_table_schema = dst_table->schema();
- if (src_table_schema != dst_table_schema) {
- return Status::NotSupported(Substitute(
- "destination table's schema differs from the source one ($0 vs $1)",
- dst_table_schema.ToString(), src_table_schema.ToString()));
- }
-
+ RETURN_NOT_OK(SchemasMatch(src_schema_internal,
+ KuduSchema::ToSchema(dst_table->schema())));
return Status::OK();
}
- // Destination table does NOT exist.
+ // The destination table does NOT exist.
if (!FLAGS_create_table) {
return Status::NotFound(Substitute("Table $0 does not exist in the
destination cluster.",
dst_table_name));
}
- Schema schema_internal = KuduSchema::ToSchema(src_table_schema);
- // Convert Schema to KuduSchema will drop internal ColumnIds.
- KuduSchema dst_table_schema = KuduSchema::FromSchema(schema_internal);
+ // Construct the destination table schema.
+ //
+ // 'to_delete_columns' is used to store the dummy columns that will be
dropped after the table
+ // has been created if there are some column id holes in the source table
schema.
+ vector<string> to_delete_columns;
+ static ObjectIdGenerator oid_generator;
+ SchemaBuilder builder;
+ int32_t expect_column_id = src_schema_internal.column_id(0);
+ for (size_t idx = 0; idx < src_schema_internal.num_columns();) {
+ const int32_t actual_column_id = src_schema_internal.column_id(idx);
+ if (expect_column_id == actual_column_id) {
+ // Construct the destination column schema according to the source
column for continuous
+ // column id.
+ builder.AddColumn(src_schema_internal.column(idx),
+ src_schema_internal.is_key_column(idx));
+ VLOG(1) << Substitute("Add a real column $0 for column id $1",
+ src_schema_internal.column(idx).ToString(),
+ actual_column_id);
+ // The expected column id is continuous.
+ ++expect_column_id;
+ ++idx;
+ } else {
+ // When there are column id holes, the expected column id must be less
than the actual
+ // column id.
+ if (PREDICT_FALSE(expect_column_id >= actual_column_id)) {
+ return Status::Corruption(
+ Substitute("The internal column IDs must be monotonically
increasing, but we got $0 "
+ "while expecting $1.",
+ actual_column_id, expect_column_id));
+ }
+ // Fill the hole with dummy columns.
+ while (expect_column_id < actual_column_id) {
+ auto dummy_column_name = "dummy_" + oid_generator.Next();
+ builder.AddColumn(dummy_column_name, DataType::INT8);
+ VLOG(1) << Substitute("Add a dummy column $0 for column id $1",
+ dummy_column_name, expect_column_id);
+ // The dummy columns will be dropped after the table is created.
+ to_delete_columns.emplace_back(dummy_column_name);
+ ++expect_column_id;
+ }
+ }
+ }
+
+ const Schema dst_schema_internal = builder.Build();
const auto& partition_schema = src_table->partition_schema();
- auto convert_column_ids_to_names = [&schema_internal] (const
vector<ColumnId>& column_ids) {
+ auto convert_column_ids_to_names = [&dst_schema_internal] (const
vector<ColumnId>& column_ids) {
vector<string> column_names;
column_names.reserve(column_ids.size());
for (const auto& column_id : column_ids) {
-
column_names.emplace_back(schema_internal.column_by_id(column_id).name());
+
column_names.emplace_back(dst_schema_internal.column_by_id(column_id).name());
}
return column_names;
};
// Table schema and replica number.
- int num_replicas = FLAGS_create_table_replication_factor == -1 ?
+ const int num_replicas = FLAGS_create_table_replication_factor == -1 ?
src_table->num_replicas() : FLAGS_create_table_replication_factor;
+ const KuduSchema dst_table_schema =
KuduSchema::FromSchema(dst_schema_internal);
unique_ptr<KuduTableCreator> table_creator(dst_client->NewTableCreator());
table_creator->table_name(dst_table_name)
.schema(&dst_table_schema)
@@ -484,7 +550,7 @@ Status CreateDstTableIfNeeded(const
client::sp::shared_ptr<KuduTable>& src_table
}
for (int i = 0; i < hash_bucket_nums_str.size(); i++) {
int bucket_num = 0;
- bool is_number = safe_strto32(hash_bucket_nums_str[i], &bucket_num);
+ const bool is_number = safe_strto32(hash_bucket_nums_str[i],
&bucket_num);
if (!is_number) {
return Status::InvalidArgument(Substitute("'$0': cannot parse the
number "
"of hash buckets.",
@@ -505,18 +571,18 @@ Status CreateDstTableIfNeeded(const
client::sp::shared_ptr<KuduTable>& src_table
int i = 0;
for (const auto& hash_dimension : partition_schema.hash_schema()) {
- int num_buckets = hash_bucket_nums[i] != -1 ? hash_bucket_nums[i] :
- hash_dimension.num_buckets;
- auto hash_columns = convert_column_ids_to_names(hash_dimension.column_ids);
+ const int num_buckets = hash_bucket_nums[i] != -1 ? hash_bucket_nums[i] :
+
hash_dimension.num_buckets;
+ const auto hash_columns =
convert_column_ids_to_names(hash_dimension.column_ids);
table_creator->add_hash_partitions(hash_columns,
num_buckets,
- hash_dimension.seed);
+
static_cast<int32_t>(hash_dimension.seed));
i++;
}
// Add range partition schema.
if (!partition_schema.range_schema().column_ids.empty()) {
- auto range_columns
+ const auto range_columns
=
convert_column_ids_to_names(partition_schema.range_schema().column_ids);
table_creator->set_range_partition_columns(range_columns);
}
@@ -534,11 +600,11 @@ Status CreateDstTableIfNeeded(const
client::sp::shared_ptr<KuduTable>& src_table
}
// Partitions are considered metadata, so don't redact them.
- ScopedDisableRedaction no_redaction;
+ const ScopedDisableRedaction no_redaction;
Arena arena(256);
- std::unique_ptr<KuduPartialRow> lower(new
KuduPartialRow(&schema_internal));
- std::unique_ptr<KuduPartialRow> upper(new
KuduPartialRow(&schema_internal));
+ std::unique_ptr<KuduPartialRow> lower(new
KuduPartialRow(&dst_schema_internal));
+ std::unique_ptr<KuduPartialRow> upper(new
KuduPartialRow(&dst_schema_internal));
Slice range_key_start(partition.begin().range_key());
Slice range_key_end(partition.end().range_key());
RETURN_NOT_OK(partition_schema.DecodeRangeKey(&range_key_start,
lower.get(), &arena));
@@ -556,6 +622,22 @@ Status CreateDstTableIfNeeded(const
client::sp::shared_ptr<KuduTable>& src_table
// Create table.
RETURN_NOT_OK(table_creator->Create());
+
+ // Drop the dummy columns.
+ if (!to_delete_columns.empty()) {
+ unique_ptr<client::KuduTableAlterer>
alterer(dst_client->NewTableAlterer(dst_table_name));
+ for (const auto &to_delete_column: to_delete_columns) {
+ VLOG(1) << Substitute("Drop dummy column $0", to_delete_column);
+ alterer->DropColumn(to_delete_column);
+ }
+ RETURN_NOT_OK(alterer->Alter());
+ }
+
+ // Check that the schemas match.
+ RETURN_NOT_OK(dst_client->OpenTable(dst_table_name, &dst_table));
+ RETURN_NOT_OK(SchemasMatch(src_schema_internal,
+ KuduSchema::ToSchema(dst_table->schema())));
+
LOG(INFO) << "Table " << dst_table_name << " created successfully";
return Status::OK();
diff --git a/src/kudu/tools/tool_action_table.cc
b/src/kudu/tools/tool_action_table.cc
index 8035f0766..4261a22ae 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -216,6 +216,7 @@ DECLARE_bool(fault_tolerant);
DECLARE_int32(create_table_replication_factor);
DECLARE_bool(row_count_only);
DECLARE_bool(show_scanner_stats);
+DECLARE_bool(strict_column_id);
DEFINE_string(encoding_type, "AUTO_ENCODING",
"Type of encoding for the column including AUTO_ENCODING,
PLAIN_ENCODING, "
@@ -2028,6 +2029,7 @@ unique_ptr<Mode> BuildTableMode() {
.AddOptionalParameter("num_threads")
.AddOptionalParameter("predicates")
.AddOptionalParameter("scan_batch_size")
+ .AddOptionalParameter("strict_column_id")
.AddOptionalParameter("tablets")
.AddOptionalParameter("write_type")
.AddOptionalParameter("table_copy_throttler_bytes_per_sec")