This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 1609ecc72487a3237ccc464e3fa6f82b3d173e17 Author: Alexey Serbin <[email protected]> AuthorDate: Sat Jul 2 21:40:38 2022 -0700 KUDU-2671 fix handling ranges with table-wide hash schemas This patch fixes a bug/typo introduced recently: adding a range with the table-wide hash schema would be a no-op when --enable_per_range_hash_schemas was set 'true'. I added new test scenarios to cover the regression and verified that, as expected, they would fail without the fix. This is a follow-up to f55dd22bfea4beee99d72891efbbc67307b19d1e. Change-Id: I908e9654ed856920d483ce63a546946cbac0a641 Reviewed-on: http://gerrit.cloudera.org:8080/18696 Tested-by: Alexey Serbin <[email protected]> Reviewed-by: Mahesh Reddy <[email protected]> Reviewed-by: Abhishek Chennaka <[email protected]> Reviewed-by: Attila Bukor <[email protected]> --- src/kudu/client/flex_partitioning_client-test.cc | 84 +++++++++ src/kudu/master/catalog_manager.cc | 27 ++- src/kudu/master/master-test.cc | 219 ++++++++++++++++++++--- 3 files changed, 299 insertions(+), 31 deletions(-) diff --git a/src/kudu/client/flex_partitioning_client-test.cc b/src/kudu/client/flex_partitioning_client-test.cc index 04013900c..35a10a54f 100644 --- a/src/kudu/client/flex_partitioning_client-test.cc +++ b/src/kudu/client/flex_partitioning_client-test.cc @@ -1675,5 +1675,89 @@ TEST_F(FlexPartitioningAlterTableTest, UnsupportedRangeSpecificHashSchema) { ASSERT_OK(client_->DeleteTable(kTableName)); } +// Make sure adding and dropping ranges with the table-wide hash schema works +// as expected when --enable_per_range_hash_schemas=true. +TEST_F(FlexPartitioningAlterTableTest, AddDropTableWideHashSchemaPartitions) { + FLAGS_enable_per_range_hash_schemas = true; + + constexpr const char* const kTableName = + "AddDropTableWideHashSchemaPartitions"; + + unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); + table_creator->table_name(kTableName) + .schema(&schema_) + .num_replicas(1) + .add_hash_partitions({ kKeyColumn }, 2) + .set_range_partition_columns({ kKeyColumn }); + + // Add a range partition with the table-wide hash partitioning rules. + { + unique_ptr<KuduPartialRow> lower(schema_.NewRow()); + ASSERT_OK(lower->SetInt32(kKeyColumn, 0)); + unique_ptr<KuduPartialRow> upper(schema_.NewRow()); + ASSERT_OK(upper->SetInt32(kKeyColumn, 111)); + table_creator->add_range_partition(lower.release(), upper.release()); + } + + ASSERT_OK(table_creator->Create()); + + ASSERT_OK(InsertTestRows(kTableName, 0, 111)); + NO_FATALS(CheckTableRowsNum(kTableName, 111)); + + // To have mix of ranges, add a single range with custom hash schema. + { + unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName)); + auto p = CreateRangePartition(-111, 0); + ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 5, 1)); + alterer->AddRangePartition(p.release()); + ASSERT_OK(alterer->Alter()); + } + + ASSERT_OK(InsertTestRows(kTableName, -111, 0)); + NO_FATALS(CheckTableRowsNum(kTableName, 222)); + + // Add one more range partition with the table-wide hash schema. + { + unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName)); + + unique_ptr<KuduPartialRow> lower(schema_.NewRow()); + ASSERT_OK(lower->SetInt32(kKeyColumn, 111)); + unique_ptr<KuduPartialRow> upper(schema_.NewRow()); + ASSERT_OK(upper->SetInt32(kKeyColumn, 222)); + alterer->AddRangePartition(lower.release(), upper.release()); + + ASSERT_OK(alterer->Alter()); + } + + ASSERT_OK(InsertTestRows(kTableName, 111, 222)); + NO_FATALS(CheckTableRowsNum(kTableName, 333)); + + // Drop the ranges with the table-wide hash partitions. + { + unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName)); + + { + unique_ptr<KuduPartialRow> lower(schema_.NewRow()); + ASSERT_OK(lower->SetInt32(kKeyColumn, 111)); + unique_ptr<KuduPartialRow> upper(schema_.NewRow()); + ASSERT_OK(upper->SetInt32(kKeyColumn, 222)); + alterer->DropRangePartition(lower.release(), upper.release()); + } + { + unique_ptr<KuduPartialRow> lower(schema_.NewRow()); + ASSERT_OK(lower->SetInt32(kKeyColumn, 0)); + unique_ptr<KuduPartialRow> upper(schema_.NewRow()); + ASSERT_OK(upper->SetInt32(kKeyColumn, 111)); + alterer->DropRangePartition(lower.release(), upper.release()); + } + + ASSERT_OK(alterer->Alter()); + } + + NO_FATALS(CheckTableRowsNum(kTableName, 111)); + + ASSERT_OK(client_->DeleteTable(kTableName)); +} + } // namespace client } // namespace kudu diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc index 78fc4ec75..17411b1c3 100644 --- a/src/kudu/master/catalog_manager.cc +++ b/src/kudu/master/catalog_manager.cc @@ -2677,16 +2677,17 @@ Status CatalogManager::ApplyAlterPartitioningSteps( vector<Partition> partitions; const pair<KuduPartialRow, KuduPartialRow> range_bound = { *ops[0].split_row, *ops[1].split_row }; - if (!FLAGS_enable_per_range_hash_schemas) { - RETURN_NOT_OK(partition_schema.CreatePartitions( - {}, { range_bound }, schema, &partitions)); - } else { - const Schema schema = client_schema.CopyWithColumnIds(); - if (step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION && - step.add_range_partition().custom_hash_schema_size() > 0) { + if (step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION) { + const auto& custom_hash_schema_pb = + step.add_range_partition().custom_hash_schema(); + if (!FLAGS_enable_per_range_hash_schemas || custom_hash_schema_pb.empty()) { + RETURN_NOT_OK(partition_schema.CreatePartitions( + {}, { range_bound }, schema, &partitions)); + } else { + const Schema schema = client_schema.CopyWithColumnIds(); PartitionSchema::HashSchema hash_schema; RETURN_NOT_OK(PartitionSchema::ExtractHashSchemaFromPB( - schema, step.add_range_partition().custom_hash_schema(), &hash_schema)); + schema, custom_hash_schema_pb, &hash_schema)); if (partition_schema.hash_schema().size() != hash_schema.size()) { return Status::NotSupported( "varying number of hash dimensions per range is not yet supported"); @@ -2710,7 +2711,15 @@ Status CatalogManager::ApplyAlterPartitioningSteps( } } ++partition_schema_updates; - } else if (step.type() == AlterTableRequestPB::DROP_RANGE_PARTITION) { + } + } else { + DCHECK_EQ(AlterTableRequestPB::DROP_RANGE_PARTITION, step.type()); + if (!FLAGS_enable_per_range_hash_schemas || + !partition_schema.HasCustomHashSchemas()) { + RETURN_NOT_OK(partition_schema.CreatePartitions( + {}, { range_bound }, schema, &partitions)); + } else { + const Schema schema = client_schema.CopyWithColumnIds(); PartitionSchema::HashSchema range_hash_schema; RETURN_NOT_OK(partition_schema.GetHashSchemaForRange( range_bound.first, schema, &range_hash_schema)); diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc index 03a9f7085..f66ddfc9b 100644 --- a/src/kudu/master/master-test.cc +++ b/src/kudu/master/master-test.cc @@ -205,7 +205,6 @@ class MasterTest : public KuduTest { const vector<pair<KuduPartialRow, KuduPartialRow>>& bounds = {}, const vector<RangeWithHashSchema>& ranges_with_hash_schemas = {}); - Status CreateTable(const string& name, const Schema& schema, const optional<TableTypePB>& type, @@ -217,6 +216,9 @@ class MasterTest : public KuduTest { const HashSchema& table_wide_hash_schema, CreateTableResponsePB* resp); + Status GetTablePartitionSchema(const string& table_name, + PartitionSchemaPB* ps_pb); + shared_ptr<Messenger> client_messenger_; unique_ptr<MiniMaster> mini_master_; Master* master_; @@ -319,6 +321,25 @@ Status MasterTest::CreateTable( return Status::OK(); } +Status MasterTest::GetTablePartitionSchema(const string& table_name, + PartitionSchemaPB* ps_pb) { + DCHECK(ps_pb); + GetTableSchemaRequestPB req; + req.mutable_table()->set_table_name(table_name); + + RpcController ctl; + GetTableSchemaResponsePB resp; + RETURN_NOT_OK(proxy_->GetTableSchema(req, &resp, &ctl)); + if (resp.has_error()) { + return StatusFromPB(resp.error().status()); + } + if (!resp.has_partition_schema()) { + return Status::IllegalState("partition schema information is missing"); + } + *ps_pb = resp.partition_schema(); + return Status::OK(); +} + void MasterTest::DoListTables(const ListTablesRequestPB& req, ListTablesResponsePB* resp) { RpcController controller; ASSERT_OK(proxy_->ListTables(req, resp, &controller)); @@ -1010,23 +1031,6 @@ TEST_F(MasterTest, AlterTableAddAndDropRangeWithSpecificHashSchema) { FLAGS_enable_per_range_hash_schemas = true; FLAGS_default_num_replicas = 1; - const auto partition_schema_retriever = [this](PartitionSchemaPB* ps_pb) { - GetTableSchemaRequestPB req; - req.mutable_table()->set_table_name(kTableName); - - RpcController ctl; - GetTableSchemaResponsePB resp; - RETURN_NOT_OK(proxy_->GetTableSchema(req, &resp, &ctl)); - if (resp.has_error()) { - return StatusFromPB(resp.error().status()); - } - if (!resp.has_partition_schema()) { - return Status::IllegalState("partition schema information is missing"); - } - *ps_pb = resp.partition_schema(); - return Status::OK(); - }; - // Create a table with one range partition based on the table-wide hash schema. CreateTableResponsePB create_table_resp; { @@ -1094,7 +1098,7 @@ TEST_F(MasterTest, AlterTableAddAndDropRangeWithSpecificHashSchema) { // Check the partition schema stored in the system catalog. PartitionSchemaPB ps_pb; - ASSERT_OK(partition_schema_retriever(&ps_pb)); + ASSERT_OK(GetTablePartitionSchema(kTableName, &ps_pb)); ASSERT_EQ(0, ps_pb.custom_hash_schema_ranges_size()); } @@ -1114,7 +1118,7 @@ TEST_F(MasterTest, AlterTableAddAndDropRangeWithSpecificHashSchema) { // Check the partition schema stored in the system catalog. PartitionSchemaPB ps_pb; - ASSERT_OK(partition_schema_retriever(&ps_pb)); + ASSERT_OK(GetTablePartitionSchema(kTableName, &ps_pb)); ASSERT_EQ(1, ps_pb.custom_hash_schema_ranges_size()); // Check the hash schema parameters (i.e. columns and number of hash @@ -1236,7 +1240,7 @@ TEST_F(MasterTest, AlterTableAddAndDropRangeWithSpecificHashSchema) { // Check the partition schema stored in the system catalog. PartitionSchemaPB ps_pb; - ASSERT_OK(partition_schema_retriever(&ps_pb)); + ASSERT_OK(GetTablePartitionSchema(kTableName, &ps_pb)); ASSERT_EQ(1, ps_pb.custom_hash_schema_ranges_size()); } @@ -1257,12 +1261,183 @@ TEST_F(MasterTest, AlterTableAddAndDropRangeWithSpecificHashSchema) { // Check the partition schema stored in the system catalog. PartitionSchemaPB ps_pb; - ASSERT_OK(partition_schema_retriever(&ps_pb)); + ASSERT_OK(GetTablePartitionSchema(kTableName, &ps_pb)); ASSERT_EQ(0, ps_pb.custom_hash_schema_ranges_size()); } } } +// This scenario verifies that when the support for range-specific hash schemas +// is enabled, adding and dropping range partitions with table-wide hash schemas +// works as expected. +TEST_F(MasterTest, AlterTableAddDropRangeWithTableWideHashSchema) { + constexpr const char* const kTableName = "alter_with_table_wide_hash_schema"; + constexpr const char* const kCol0 = "c_int32"; + constexpr const char* const kCol1 = "c_int64"; + constexpr const char* const kCol2 = "c_string"; + const Schema kTableSchema({ColumnSchema(kCol0, INT32), + ColumnSchema(kCol1, INT64), + ColumnSchema(kCol2, STRING)}, 2); + FLAGS_enable_per_range_hash_schemas = true; + FLAGS_default_num_replicas = 1; + + // Create a table with one range partition based on the table-wide hash schema. + CreateTableResponsePB create_table_resp; + { + KuduPartialRow lower(&kTableSchema); + ASSERT_OK(lower.SetInt32(kCol0, 0)); + KuduPartialRow upper(&kTableSchema); + ASSERT_OK(upper.SetInt32(kCol0, 100)); + ASSERT_OK(CreateTable( + kTableName, kTableSchema, nullopt, nullopt, nullopt, {}, + {{lower, upper}}, {}, {{{kCol0, kCol1}, 3, 5}}, &create_table_resp)); + } + const auto& table_id = create_table_resp.table_id(); + + // Check the number of tablets in the newly created table. + { + CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager()); + std::vector<scoped_refptr<TableInfo>> tables; + master_->catalog_manager()->GetAllTables(&tables); + ASSERT_EQ(1, tables.size()); + // There should be 3 tablets (because of 3 hash buckets) for + // already existing range added upon the creation of a table. + ASSERT_EQ(3, tables.front()->num_tablets()); + + PartitionSchemaPB ps_pb; + ASSERT_OK(GetTablePartitionSchema(kTableName, &ps_pb)); + ASSERT_EQ(0, ps_pb.custom_hash_schema_ranges_size()); + ASSERT_EQ(1, ps_pb.hash_schema_size()); + } + + // Alter the table, adding a new range with table-wide hash schema. + { + AlterTableRequestPB req; + req.mutable_table()->set_table_name(kTableName); + req.mutable_table()->set_table_id(table_id); + + // Add the required information on the table's schema: + // key and non-null columns must be present in the request. + { + ColumnSchemaPB* col0 = req.mutable_schema()->add_columns(); + col0->set_name(kCol0); + col0->set_type(INT32); + col0->set_is_key(true); + + ColumnSchemaPB* col1 = req.mutable_schema()->add_columns(); + col1->set_name(kCol1); + col1->set_type(INT64); + col0->set_is_key(true); + + ColumnSchemaPB* col2 = req.mutable_schema()->add_columns(); + col2->set_name(kCol2); + col2->set_type(STRING); + } + + AlterTableRequestPB::Step* step = req.add_alter_schema_steps(); + step->set_type(AlterTableRequestPB::ADD_RANGE_PARTITION); + KuduPartialRow lower(&kTableSchema); + ASSERT_OK(lower.SetInt32(kCol0, 100)); + KuduPartialRow upper(&kTableSchema); + ASSERT_OK(upper.SetInt32(kCol0, 200)); + RowOperationsPBEncoder enc( + step->mutable_add_range_partition()->mutable_range_bounds()); + enc.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower); + enc.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper); + + RpcController ctl; + AlterTableResponsePB resp; + ASSERT_OK(proxy_->AlterTable(req, &resp, &ctl)); + ASSERT_FALSE(resp.has_error()) + << StatusFromPB(resp.error().status()).ToString(); + } + + // Check the number of tablets in the table after adding a new range. + { + CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager()); + std::vector<scoped_refptr<TableInfo>> tables; + master_->catalog_manager()->GetAllTables(&tables); + ASSERT_EQ(1, tables.size()); + // Extra 3 tablets (because of 3 hash buckets) for the newly added range. + ASSERT_EQ(6, tables.front()->num_tablets()); + + // Check the partition schema stored in the system catalog. + PartitionSchemaPB ps_pb; + ASSERT_OK(GetTablePartitionSchema(kTableName, &ps_pb)); + ASSERT_EQ(0, ps_pb.custom_hash_schema_ranges_size()); + + // Check the parameters of the table-wide hash schema. + ASSERT_EQ(1, ps_pb.hash_schema_size()); + const auto& hash_schema = ps_pb.hash_schema(0); + + ASSERT_EQ(3, hash_schema.num_buckets()); + //ASSERT_EQ(5, hash_schema.seed()); + + ASSERT_EQ(2, hash_schema.columns_size()); + const auto schema = kTableSchema.CopyWithColumnIds(); + + const auto ref_col_0_id = int32_t(schema.column_id(0)); + const auto& col_0 = hash_schema.columns(0); + ASSERT_TRUE(col_0.has_id()); + ASSERT_EQ(ref_col_0_id, col_0.id()); + + const auto ref_col_1_id = int32_t(schema.column_id(1)); + const auto& col_1 = hash_schema.columns(1); + ASSERT_TRUE(col_1.has_id()); + ASSERT_EQ(ref_col_1_id, col_1.id()); + } + + { + AlterTableRequestPB req; + req.mutable_table()->set_table_name(kTableName); + req.mutable_table()->set_table_id(table_id); + + // Add the required information on the table's schema: + // key and non-null columns must be present in the request. + { + ColumnSchemaPB* col0 = req.mutable_schema()->add_columns(); + col0->set_name(kCol0); + col0->set_type(INT32); + col0->set_is_key(true); + + ColumnSchemaPB* col1 = req.mutable_schema()->add_columns(); + col1->set_name(kCol1); + col1->set_type(INT64); + col0->set_is_key(true); + + ColumnSchemaPB* col2 = req.mutable_schema()->add_columns(); + col2->set_name(kCol2); + col2->set_type(STRING); + } + + AlterTableRequestPB::Step* step = req.add_alter_schema_steps(); + step->set_type(AlterTableRequestPB::DROP_RANGE_PARTITION); + KuduPartialRow lower(&kTableSchema); + ASSERT_OK(lower.SetInt32(kCol0, 100)); + KuduPartialRow upper(&kTableSchema); + ASSERT_OK(upper.SetInt32(kCol0, 200)); + RowOperationsPBEncoder enc( + step->mutable_drop_range_partition()->mutable_range_bounds()); + enc.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower); + enc.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper); + + RpcController ctl; + AlterTableResponsePB resp; + ASSERT_OK(proxy_->AlterTable(req, &resp, &ctl)); + ASSERT_FALSE(resp.has_error()) + << StatusFromPB(resp.error().status()).ToString(); + } + + { + CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager()); + std::vector<scoped_refptr<TableInfo>> tables; + master_->catalog_manager()->GetAllTables(&tables); + ASSERT_EQ(1, tables.size()); + // There should be just 3 tablets left since the range has been dropped. + ASSERT_EQ(3, tables.front()->num_tablets()); + } +} + TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) { constexpr const char* const kTableName = "testtb"; const Schema kTableSchema({ ColumnSchema("key", INT32), ColumnSchema("val", INT32) }, 1);
