This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new d254964e6 KUDU-3577 fix altering tables with custom hash schemas
d254964e6 is described below
commit d254964e6037f6ae0c9459d99cffa13303596f07
Author: Alexey Serbin <[email protected]>
AuthorDate: Thu Jun 6 17:33:55 2024 -0700
KUDU-3577 fix altering tables with custom hash schemas
Since partition boundaries for ranges with custom hash schemas are
represented via RowOperationsPB (see RangeWithHashSchemaPB::range_bounds
field in src/kudu/common/common.proto), addressing this design defect
requires re-encoding the information as a part of PartitionSchemaPB
stored in the system catalog upon particular modifications of the
table's schema. This patch does exactly so, and also adds corresponding
test scenario which would fail without the fix.
A proper solution would be using primary-key-only projection of a
table's schema to encode the information on range boundaries, but it's
necessary to provide backwards compatibility with already released Kudu
clients. See KUDU-3577 for details.
Change-Id: I21a775538063768b986edd2b6bc25d03360b5216
Reviewed-on: http://gerrit.cloudera.org:8080/21486
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Mahesh Reddy <[email protected]>
Reviewed-by: Abhishek Chennaka <[email protected]>
---
src/kudu/integration-tests/alter_table-test.cc | 137 +++++++++++++++++++++++++
src/kudu/master/catalog_manager.cc | 35 ++++++-
src/kudu/master/catalog_manager.h | 3 +-
3 files changed, 172 insertions(+), 3 deletions(-)
diff --git a/src/kudu/integration-tests/alter_table-test.cc
b/src/kudu/integration-tests/alter_table-test.cc
index d4884579e..2296dfe60 100644
--- a/src/kudu/integration-tests/alter_table-test.cc
+++ b/src/kudu/integration-tests/alter_table-test.cc
@@ -101,6 +101,7 @@ using kudu::client::KuduColumnSchema;
using kudu::client::KuduDelete;
using kudu::client::KuduError;
using kudu::client::KuduInsert;
+using kudu::client::KuduRangePartition;
using kudu::client::KuduRowResult;
using kudu::client::KuduScanBatch;
using kudu::client::KuduScanner;
@@ -2174,6 +2175,142 @@ TEST_F(AlterTableTest,
TestAddRangePartitionConflictExhaustive) {
expect_range_partitions_conflict(0, 1, 0, 1);
}
+// Test scenario for KUDU-3577.
+TEST_F(AlterTableTest, RangeWithCustomHashSchema) {
+ KuduSchemaBuilder b;
+ b.AddColumn("c0")->Type(KuduColumnSchema::INT32)->NotNull()->
+ Default(KuduValue::FromInt(0));
+ b.AddColumn("c1")->Type(KuduColumnSchema::INT32)->NotNull()->
+ Default(KuduValue::FromInt(0));
+ b.AddColumn("c2")->Type(KuduColumnSchema::INT32)->Nullable();
+ b.SetPrimaryKey({ "c0", "c1" });
+ KuduSchema schema;
+ ASSERT_OK(b.Build(&schema));
+
+ // Create table with table-wide hash schema: 2 hash buckets on "c1" column.
+ const string table_name = "kudu-3577";
+ unique_ptr<KuduTableCreator> creator(client_->NewTableCreator());
+ creator->table_name(table_name)
+ .schema(&schema)
+ .set_range_partition_columns({ "c0" })
+ .add_hash_partitions({ "c1" }, 2)
+ .num_replicas(1);
+
+ // Add a range partition with the table-wide hash schema.
+ {
+ unique_ptr<KuduPartialRow> lower(schema.NewRow());
+ ASSERT_OK(lower->SetInt32("c0", -100));
+ unique_ptr<KuduPartialRow> upper(schema.NewRow());
+ ASSERT_OK(upper->SetInt32("c0", 0));
+ creator->add_range_partition(lower.release(), upper.release());
+ }
+
+ // Add a range partition with custom hash schema.
+ {
+ unique_ptr<KuduPartialRow> lower(schema.NewRow());
+ ASSERT_OK(lower->SetInt32("c0", 0));
+ unique_ptr<KuduPartialRow> upper(schema.NewRow());
+ ASSERT_OK(upper->SetInt32("c0", 100));
+ unique_ptr<KuduRangePartition> p(
+ new KuduRangePartition(lower.release(), upper.release()));
+ ASSERT_OK(p->add_hash_partitions({ "c1" }, 3, 0));
+ creator->add_custom_range_partition(p.release());
+ }
+ ASSERT_OK(creator->Create());
+
+ {
+ // Make sure client successfully opens the newly created table.
+ shared_ptr<KuduTable> table;
+ ASSERT_OK(client_->OpenTable(table_name, &table));
+ // The newly created table is empty, of course.
+ ASSERT_EQ(0, CountTableRows(table.get()));
+ // Insert 100 rows.
+ ASSERT_OK(InsertRowsSequential(table_name, -50, 100));
+ ASSERT_EQ(100, CountTableRows(table.get()));
+ }
+
+ {
+ // Drop "c2", the only nullable column in the table as of now.
+ unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(table_name));
+ alterer->DropColumn("c2");
+ ASSERT_OK(alterer->Alter());
+
+ // Make sure client successfully opens the altered table.
+ shared_ptr<KuduTable> table;
+ ASSERT_OK(client_->OpenTable(table_name, &table));
+ ASSERT_EQ(100, CountTableRows(table.get()));
+ }
+
+ {
+ // Add at least 8 new columns to change the size of the 'column_is_set'
+ // bitset. All the columns are non-nullable.
+ unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(table_name));
+ for (auto i = 2; i < 10; ++i) {
+ alterer->AddColumn(Substitute("c$0", i))->Type(KuduColumnSchema::INT32)->
+ NotNull()->Default(KuduValue::FromInt(0));
+ }
+ ASSERT_OK(alterer->Alter());
+
+ // Make sure client successfully opens the altered table.
+ shared_ptr<KuduTable> table;
+ ASSERT_OK(client_->OpenTable(table_name, &table));
+ ASSERT_EQ(100, CountTableRows(table.get()));
+ }
+
+ {
+ // Drop one non-nullable column.
+ unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(table_name));
+ alterer->DropColumn("c9");
+ ASSERT_OK(alterer->Alter());
+
+ // Make sure client successfully opens the altered table.
+ shared_ptr<KuduTable> table;
+ ASSERT_OK(client_->OpenTable(table_name, &table));
+ ASSERT_EQ(100, CountTableRows(table.get()));
+ }
+
+ {
+ // Add "c9" column back, but make it nullable.
+ unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(table_name));
+ alterer->AddColumn("c9")->Type(KuduColumnSchema::INT32)->Nullable();
+ ASSERT_OK(alterer->Alter());
+
+ // Make sure client successfully opens the altered table.
+ shared_ptr<KuduTable> table;
+ ASSERT_OK(client_->OpenTable(table_name, &table));
+ ASSERT_EQ(100, CountTableRows(table.get()));
+ }
+
+ {
+ // Add 90 more nullable columns.
+ unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(table_name));
+ for (auto i = 10; i < 100; ++i) {
+ alterer->AddColumn(Substitute("c$0", i))->Type(KuduColumnSchema::INT32)->
+ Nullable();
+ }
+ ASSERT_OK(alterer->Alter());
+
+ // Make sure client successfully opens the altered table.
+ shared_ptr<KuduTable> table;
+ ASSERT_OK(client_->OpenTable(table_name, &table));
+ ASSERT_EQ(100, CountTableRows(table.get()));
+ }
+
+ {
+ // Drop all but primary key columns.
+ unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(table_name));
+ for (auto i = 2; i < 100; ++i) {
+ alterer->DropColumn(Substitute("c$0", i));
+ }
+ ASSERT_OK(alterer->Alter());
+
+ // Make sure client successfully opens the altered table.
+ shared_ptr<KuduTable> table;
+ ASSERT_OK(client_->OpenTable(table_name, &table));
+ ASSERT_EQ(100, CountTableRows(table.get()));
+ }
+}
+
TEST_F(ReplicatedAlterTableTest, TestReplicatedAlter) {
const int kNumRows = 100;
InsertRows(0, kNumRows);
diff --git a/src/kudu/master/catalog_manager.cc
b/src/kudu/master/catalog_manager.cc
index da9881c22..c62145f3f 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -131,6 +131,7 @@
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tserver/tserver_admin.pb.h"
#include "kudu/tserver/tserver_admin.proxy.h" // IWYU pragma: keep
+#include "kudu/util/bitmap.h"
#include "kudu/util/cache_metrics.h"
#include "kudu/util/condition_variable.h"
#include "kudu/util/debug/trace_event.h"
@@ -2945,7 +2946,8 @@ Status CatalogManager::ApplyAlterSchemaSteps(
const SysTablesEntryPB& current_pb,
const vector<AlterTableRequestPB::Step>& steps,
Schema* new_schema,
- ColumnId* next_col_id) {
+ ColumnId* next_col_id,
+ bool* needs_range_bounds_refresh) {
const SchemaPB& current_schema_pb = current_pb.schema();
Schema cur_schema;
RETURN_NOT_OK(SchemaFromPB(current_schema_pb, &cur_schema));
@@ -3021,6 +3023,15 @@ Status CatalogManager::ApplyAlterSchemaSteps(
}
*new_schema = builder.Build();
*next_col_id = builder.next_column_id();
+
+ // KUDU-3577: check if the serialized representation of range partition keys
+ // has changed after updating the schema -- the size of the 'column_is_set'
+ // and the 'nullable' bitmaps might change upon adding or dropping columns
+ *needs_range_bounds_refresh =
+ !current_pb.partition_schema().custom_hash_schema_ranges().empty() &&
+ (new_schema->has_nullables() != cur_schema.has_nullables() ||
+ BitmapSize(new_schema->num_columns()) !=
BitmapSize(cur_schema.num_columns()));
+
return Status::OK();
}
@@ -3614,8 +3625,17 @@ Status CatalogManager::AlterTable(const
AlterTableRequestPB& req,
// is essentialy a no-op. It's still important to execute because
// ApplyAlterSchemaSteps populates 'new_schema', which is used below.
TRACE("Apply alter schema");
+
+ // KUDU-3577: 'needs_range_bounds_refresh' reflects whether it's necessary
+ // to re-encode information on partition boundaries for ranges with custom
+ // hash schemas in the system catalog
+ bool needs_range_bounds_refresh = false;
RETURN_NOT_OK(SetupError(
- ApplyAlterSchemaSteps(l.data().pb, alter_schema_steps, &new_schema,
&next_col_id),
+ ApplyAlterSchemaSteps(l.data().pb,
+ alter_schema_steps,
+ &new_schema,
+ &next_col_id,
+ &needs_range_bounds_refresh),
resp, MasterErrorPB::INVALID_SCHEMA));
DCHECK_NE(next_col_id, 0);
@@ -3757,6 +3777,17 @@ Status CatalogManager::AlterTable(const
AlterTableRequestPB& req,
}
// 10. Serialize the schema and increment the version number.
+ if (needs_range_bounds_refresh) {
+ // KUDU-3577: if necessary, re-encode the information on partition
+ // boundaries for ranges with custom hash schemas
+ Schema cur_schema; // the current table schema before being altered
+ RETURN_NOT_OK(SchemaFromPB(l.data().pb.schema(), &cur_schema));
+ PartitionSchema partition_schema;
+ RETURN_NOT_OK(PartitionSchema::FromPB(
+ l.data().pb.partition_schema(), cur_schema, &partition_schema));
+ RETURN_NOT_OK(partition_schema.ToPB(
+ new_schema, l.mutable_data()->pb.mutable_partition_schema()));
+ }
if (has_metadata_changes_for_existing_tablets &&
!l.data().pb.has_fully_applied_schema()) {
l.mutable_data()->pb.mutable_fully_applied_schema()->CopyFrom(l.data().pb.schema());
}
diff --git a/src/kudu/master/catalog_manager.h
b/src/kudu/master/catalog_manager.h
index a7bf477df..f1180eaf1 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -1033,7 +1033,8 @@ class CatalogManager : public
tserver::TabletReplicaLookupIf {
const SysTablesEntryPB& current_pb,
const std::vector<AlterTableRequestPB::Step>& steps,
Schema* new_schema,
- ColumnId* next_col_id);
+ ColumnId* next_col_id,
+ bool* needs_range_bounds_refresh);
// Delete the specified table in the catalog. If 'user' is provided,
// checks that the user is authorized to delete the table. Otherwise,