This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 1609ecc72487a3237ccc464e3fa6f82b3d173e17
Author: Alexey Serbin <[email protected]>
AuthorDate: Sat Jul 2 21:40:38 2022 -0700

    KUDU-2671 fix handling ranges with table-wide hash schemas
    
    This patch fixes a bug/typo introduced recently: adding a range with
    the table-wide hash schema would be a no-op when
    --enable_per_range_hash_schemas was set 'true'.
    
    I added new test scenarios to cover the regression and verified
    that, as expected, they would fail without the fix.
    
    This is a follow-up to f55dd22bfea4beee99d72891efbbc67307b19d1e.
    
    Change-Id: I908e9654ed856920d483ce63a546946cbac0a641
    Reviewed-on: http://gerrit.cloudera.org:8080/18696
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Mahesh Reddy <[email protected]>
    Reviewed-by: Abhishek Chennaka <[email protected]>
    Reviewed-by: Attila Bukor <[email protected]>
---
 src/kudu/client/flex_partitioning_client-test.cc |  84 +++++++++
 src/kudu/master/catalog_manager.cc               |  27 ++-
 src/kudu/master/master-test.cc                   | 219 ++++++++++++++++++++---
 3 files changed, 299 insertions(+), 31 deletions(-)

diff --git a/src/kudu/client/flex_partitioning_client-test.cc 
b/src/kudu/client/flex_partitioning_client-test.cc
index 04013900c..35a10a54f 100644
--- a/src/kudu/client/flex_partitioning_client-test.cc
+++ b/src/kudu/client/flex_partitioning_client-test.cc
@@ -1675,5 +1675,89 @@ TEST_F(FlexPartitioningAlterTableTest, 
UnsupportedRangeSpecificHashSchema) {
   ASSERT_OK(client_->DeleteTable(kTableName));
 }
 
+// Make sure adding and dropping ranges with the table-wide hash schema works
+// as expected when --enable_per_range_hash_schemas=true.
+TEST_F(FlexPartitioningAlterTableTest, AddDropTableWideHashSchemaPartitions) {
+  FLAGS_enable_per_range_hash_schemas = true;
+
+  constexpr const char* const kTableName =
+      "AddDropTableWideHashSchemaPartitions";
+
+  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  table_creator->table_name(kTableName)
+      .schema(&schema_)
+      .num_replicas(1)
+      .add_hash_partitions({ kKeyColumn }, 2)
+      .set_range_partition_columns({ kKeyColumn });
+
+  // Add a range partition with the table-wide hash partitioning rules.
+  {
+    unique_ptr<KuduPartialRow> lower(schema_.NewRow());
+    ASSERT_OK(lower->SetInt32(kKeyColumn, 0));
+    unique_ptr<KuduPartialRow> upper(schema_.NewRow());
+    ASSERT_OK(upper->SetInt32(kKeyColumn, 111));
+    table_creator->add_range_partition(lower.release(), upper.release());
+  }
+
+  ASSERT_OK(table_creator->Create());
+
+  ASSERT_OK(InsertTestRows(kTableName, 0, 111));
+  NO_FATALS(CheckTableRowsNum(kTableName, 111));
+
+  // To have mix of ranges, add a single range with custom hash schema.
+  {
+    unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName));
+    auto p = CreateRangePartition(-111, 0);
+    ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 5, 1));
+    alterer->AddRangePartition(p.release());
+    ASSERT_OK(alterer->Alter());
+  }
+
+  ASSERT_OK(InsertTestRows(kTableName, -111, 0));
+  NO_FATALS(CheckTableRowsNum(kTableName, 222));
+
+  // Add one more range partition with the table-wide hash schema.
+  {
+    unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName));
+
+    unique_ptr<KuduPartialRow> lower(schema_.NewRow());
+    ASSERT_OK(lower->SetInt32(kKeyColumn, 111));
+    unique_ptr<KuduPartialRow> upper(schema_.NewRow());
+    ASSERT_OK(upper->SetInt32(kKeyColumn, 222));
+    alterer->AddRangePartition(lower.release(), upper.release());
+
+    ASSERT_OK(alterer->Alter());
+  }
+
+  ASSERT_OK(InsertTestRows(kTableName, 111, 222));
+  NO_FATALS(CheckTableRowsNum(kTableName, 333));
+
+  // Drop the ranges with the table-wide hash partitions.
+  {
+    unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName));
+
+    {
+      unique_ptr<KuduPartialRow> lower(schema_.NewRow());
+      ASSERT_OK(lower->SetInt32(kKeyColumn, 111));
+      unique_ptr<KuduPartialRow> upper(schema_.NewRow());
+      ASSERT_OK(upper->SetInt32(kKeyColumn, 222));
+      alterer->DropRangePartition(lower.release(), upper.release());
+    }
+    {
+      unique_ptr<KuduPartialRow> lower(schema_.NewRow());
+      ASSERT_OK(lower->SetInt32(kKeyColumn, 0));
+      unique_ptr<KuduPartialRow> upper(schema_.NewRow());
+      ASSERT_OK(upper->SetInt32(kKeyColumn, 111));
+      alterer->DropRangePartition(lower.release(), upper.release());
+    }
+
+    ASSERT_OK(alterer->Alter());
+  }
+
+  NO_FATALS(CheckTableRowsNum(kTableName, 111));
+
+  ASSERT_OK(client_->DeleteTable(kTableName));
+}
+
 } // namespace client
 } // namespace kudu
diff --git a/src/kudu/master/catalog_manager.cc 
b/src/kudu/master/catalog_manager.cc
index 78fc4ec75..17411b1c3 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -2677,16 +2677,17 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
     vector<Partition> partitions;
     const pair<KuduPartialRow, KuduPartialRow> range_bound =
         { *ops[0].split_row, *ops[1].split_row };
-    if (!FLAGS_enable_per_range_hash_schemas) {
-      RETURN_NOT_OK(partition_schema.CreatePartitions(
-          {}, { range_bound }, schema, &partitions));
-    } else {
-      const Schema schema = client_schema.CopyWithColumnIds();
-      if (step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION &&
-          step.add_range_partition().custom_hash_schema_size() > 0) {
+    if (step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION) {
+      const auto& custom_hash_schema_pb =
+          step.add_range_partition().custom_hash_schema();
+      if (!FLAGS_enable_per_range_hash_schemas || 
custom_hash_schema_pb.empty()) {
+        RETURN_NOT_OK(partition_schema.CreatePartitions(
+            {}, { range_bound }, schema, &partitions));
+      } else {
+        const Schema schema = client_schema.CopyWithColumnIds();
         PartitionSchema::HashSchema hash_schema;
         RETURN_NOT_OK(PartitionSchema::ExtractHashSchemaFromPB(
-            schema, step.add_range_partition().custom_hash_schema(), 
&hash_schema));
+            schema, custom_hash_schema_pb, &hash_schema));
         if (partition_schema.hash_schema().size() != hash_schema.size()) {
           return Status::NotSupported(
               "varying number of hash dimensions per range is not yet 
supported");
@@ -2710,7 +2711,15 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
           }
         }
         ++partition_schema_updates;
-      } else if (step.type() == AlterTableRequestPB::DROP_RANGE_PARTITION) {
+      }
+    } else {
+      DCHECK_EQ(AlterTableRequestPB::DROP_RANGE_PARTITION, step.type());
+      if (!FLAGS_enable_per_range_hash_schemas ||
+          !partition_schema.HasCustomHashSchemas()) {
+        RETURN_NOT_OK(partition_schema.CreatePartitions(
+            {}, { range_bound }, schema, &partitions));
+      } else {
+        const Schema schema = client_schema.CopyWithColumnIds();
         PartitionSchema::HashSchema range_hash_schema;
         RETURN_NOT_OK(partition_schema.GetHashSchemaForRange(
             range_bound.first, schema, &range_hash_schema));
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index 03a9f7085..f66ddfc9b 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -205,7 +205,6 @@ class MasterTest : public KuduTest {
                      const vector<pair<KuduPartialRow, KuduPartialRow>>& 
bounds = {},
                      const vector<RangeWithHashSchema>& 
ranges_with_hash_schemas = {});
 
-
   Status CreateTable(const string& name,
                      const Schema& schema,
                      const optional<TableTypePB>& type,
@@ -217,6 +216,9 @@ class MasterTest : public KuduTest {
                      const HashSchema& table_wide_hash_schema,
                      CreateTableResponsePB* resp);
 
+  Status GetTablePartitionSchema(const string& table_name,
+                                 PartitionSchemaPB* ps_pb);
+
   shared_ptr<Messenger> client_messenger_;
   unique_ptr<MiniMaster> mini_master_;
   Master* master_;
@@ -319,6 +321,25 @@ Status MasterTest::CreateTable(
   return Status::OK();
 }
 
+Status MasterTest::GetTablePartitionSchema(const string& table_name,
+                                           PartitionSchemaPB* ps_pb) {
+  DCHECK(ps_pb);
+  GetTableSchemaRequestPB req;
+  req.mutable_table()->set_table_name(table_name);
+
+  RpcController ctl;
+  GetTableSchemaResponsePB resp;
+  RETURN_NOT_OK(proxy_->GetTableSchema(req, &resp, &ctl));
+  if (resp.has_error()) {
+    return StatusFromPB(resp.error().status());
+  }
+  if (!resp.has_partition_schema()) {
+    return Status::IllegalState("partition schema information is missing");
+  }
+  *ps_pb = resp.partition_schema();
+  return Status::OK();
+}
+
 void MasterTest::DoListTables(const ListTablesRequestPB& req, 
ListTablesResponsePB* resp) {
   RpcController controller;
   ASSERT_OK(proxy_->ListTables(req, resp, &controller));
@@ -1010,23 +1031,6 @@ TEST_F(MasterTest, 
AlterTableAddAndDropRangeWithSpecificHashSchema) {
   FLAGS_enable_per_range_hash_schemas = true;
   FLAGS_default_num_replicas = 1;
 
-  const auto partition_schema_retriever = [this](PartitionSchemaPB* ps_pb) {
-    GetTableSchemaRequestPB req;
-    req.mutable_table()->set_table_name(kTableName);
-
-    RpcController ctl;
-    GetTableSchemaResponsePB resp;
-    RETURN_NOT_OK(proxy_->GetTableSchema(req, &resp, &ctl));
-    if (resp.has_error()) {
-      return StatusFromPB(resp.error().status());
-    }
-    if (!resp.has_partition_schema()) {
-      return Status::IllegalState("partition schema information is missing");
-    }
-    *ps_pb = resp.partition_schema();
-    return Status::OK();
-  };
-
   // Create a table with one range partition based on the table-wide hash 
schema.
   CreateTableResponsePB create_table_resp;
   {
@@ -1094,7 +1098,7 @@ TEST_F(MasterTest, 
AlterTableAddAndDropRangeWithSpecificHashSchema) {
 
       // Check the partition schema stored in the system catalog.
       PartitionSchemaPB ps_pb;
-      ASSERT_OK(partition_schema_retriever(&ps_pb));
+      ASSERT_OK(GetTablePartitionSchema(kTableName, &ps_pb));
       ASSERT_EQ(0, ps_pb.custom_hash_schema_ranges_size());
     }
 
@@ -1114,7 +1118,7 @@ TEST_F(MasterTest, 
AlterTableAddAndDropRangeWithSpecificHashSchema) {
 
       // Check the partition schema stored in the system catalog.
       PartitionSchemaPB ps_pb;
-      ASSERT_OK(partition_schema_retriever(&ps_pb));
+      ASSERT_OK(GetTablePartitionSchema(kTableName, &ps_pb));
       ASSERT_EQ(1, ps_pb.custom_hash_schema_ranges_size());
 
       // Check the hash schema parameters (i.e. columns and number of hash
@@ -1236,7 +1240,7 @@ TEST_F(MasterTest, 
AlterTableAddAndDropRangeWithSpecificHashSchema) {
 
       // Check the partition schema stored in the system catalog.
       PartitionSchemaPB ps_pb;
-      ASSERT_OK(partition_schema_retriever(&ps_pb));
+      ASSERT_OK(GetTablePartitionSchema(kTableName, &ps_pb));
       ASSERT_EQ(1, ps_pb.custom_hash_schema_ranges_size());
     }
 
@@ -1257,12 +1261,183 @@ TEST_F(MasterTest, 
AlterTableAddAndDropRangeWithSpecificHashSchema) {
 
       // Check the partition schema stored in the system catalog.
       PartitionSchemaPB ps_pb;
-      ASSERT_OK(partition_schema_retriever(&ps_pb));
+      ASSERT_OK(GetTablePartitionSchema(kTableName, &ps_pb));
       ASSERT_EQ(0, ps_pb.custom_hash_schema_ranges_size());
     }
   }
 }
 
+// This scenario verifies that when the support for range-specific hash schemas
+// is enabled, adding and dropping range partitions with table-wide hash 
schemas
+// works as expected.
+TEST_F(MasterTest, AlterTableAddDropRangeWithTableWideHashSchema) {
+  constexpr const char* const kTableName = "alter_with_table_wide_hash_schema";
+  constexpr const char* const kCol0 = "c_int32";
+  constexpr const char* const kCol1 = "c_int64";
+  constexpr const char* const kCol2 = "c_string";
+  const Schema kTableSchema({ColumnSchema(kCol0, INT32),
+                             ColumnSchema(kCol1, INT64),
+                             ColumnSchema(kCol2, STRING)}, 2);
+  FLAGS_enable_per_range_hash_schemas = true;
+  FLAGS_default_num_replicas = 1;
+
+  // Create a table with one range partition based on the table-wide hash 
schema.
+  CreateTableResponsePB create_table_resp;
+  {
+    KuduPartialRow lower(&kTableSchema);
+    ASSERT_OK(lower.SetInt32(kCol0, 0));
+    KuduPartialRow upper(&kTableSchema);
+    ASSERT_OK(upper.SetInt32(kCol0, 100));
+    ASSERT_OK(CreateTable(
+        kTableName, kTableSchema, nullopt, nullopt, nullopt, {},
+        {{lower, upper}}, {}, {{{kCol0, kCol1}, 3, 5}}, &create_table_resp));
+  }
+  const auto& table_id = create_table_resp.table_id();
+
+  // Check the number of tablets in the newly created table.
+  {
+    CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
+    std::vector<scoped_refptr<TableInfo>> tables;
+    master_->catalog_manager()->GetAllTables(&tables);
+    ASSERT_EQ(1, tables.size());
+    // There should be 3 tablets (because of 3 hash buckets) for
+    // already existing range added upon the creation of a table.
+    ASSERT_EQ(3, tables.front()->num_tablets());
+
+    PartitionSchemaPB ps_pb;
+    ASSERT_OK(GetTablePartitionSchema(kTableName, &ps_pb));
+    ASSERT_EQ(0, ps_pb.custom_hash_schema_ranges_size());
+    ASSERT_EQ(1, ps_pb.hash_schema_size());
+  }
+
+  // Alter the table, adding a new range with table-wide hash schema.
+  {
+    AlterTableRequestPB req;
+    req.mutable_table()->set_table_name(kTableName);
+    req.mutable_table()->set_table_id(table_id);
+
+    // Add the required information on the table's schema:
+    // key and non-null columns must be present in the request.
+    {
+      ColumnSchemaPB* col0 = req.mutable_schema()->add_columns();
+      col0->set_name(kCol0);
+      col0->set_type(INT32);
+      col0->set_is_key(true);
+
+      ColumnSchemaPB* col1 = req.mutable_schema()->add_columns();
+      col1->set_name(kCol1);
+      col1->set_type(INT64);
+      col0->set_is_key(true);
+
+      ColumnSchemaPB* col2 = req.mutable_schema()->add_columns();
+      col2->set_name(kCol2);
+      col2->set_type(STRING);
+    }
+
+    AlterTableRequestPB::Step* step = req.add_alter_schema_steps();
+    step->set_type(AlterTableRequestPB::ADD_RANGE_PARTITION);
+    KuduPartialRow lower(&kTableSchema);
+    ASSERT_OK(lower.SetInt32(kCol0, 100));
+    KuduPartialRow upper(&kTableSchema);
+    ASSERT_OK(upper.SetInt32(kCol0, 200));
+    RowOperationsPBEncoder enc(
+        step->mutable_add_range_partition()->mutable_range_bounds());
+    enc.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+    enc.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+
+    RpcController ctl;
+    AlterTableResponsePB resp;
+    ASSERT_OK(proxy_->AlterTable(req, &resp, &ctl));
+    ASSERT_FALSE(resp.has_error())
+        << StatusFromPB(resp.error().status()).ToString();
+  }
+
+  // Check the number of tablets in the table after adding a new range.
+  {
+    CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
+    std::vector<scoped_refptr<TableInfo>> tables;
+    master_->catalog_manager()->GetAllTables(&tables);
+    ASSERT_EQ(1, tables.size());
+    // Extra 3 tablets (because of 3 hash buckets) for the newly added range.
+    ASSERT_EQ(6, tables.front()->num_tablets());
+
+    // Check the partition schema stored in the system catalog.
+    PartitionSchemaPB ps_pb;
+    ASSERT_OK(GetTablePartitionSchema(kTableName, &ps_pb));
+    ASSERT_EQ(0, ps_pb.custom_hash_schema_ranges_size());
+
+    // Check the parameters of the table-wide hash schema.
+    ASSERT_EQ(1, ps_pb.hash_schema_size());
+    const auto& hash_schema = ps_pb.hash_schema(0);
+
+    ASSERT_EQ(3, hash_schema.num_buckets());
+    //ASSERT_EQ(5, hash_schema.seed());
+
+    ASSERT_EQ(2, hash_schema.columns_size());
+    const auto schema = kTableSchema.CopyWithColumnIds();
+
+    const auto ref_col_0_id = int32_t(schema.column_id(0));
+    const auto& col_0 = hash_schema.columns(0);
+    ASSERT_TRUE(col_0.has_id());
+    ASSERT_EQ(ref_col_0_id, col_0.id());
+
+    const auto ref_col_1_id = int32_t(schema.column_id(1));
+    const auto& col_1 = hash_schema.columns(1);
+    ASSERT_TRUE(col_1.has_id());
+    ASSERT_EQ(ref_col_1_id, col_1.id());
+  }
+
+  {
+    AlterTableRequestPB req;
+    req.mutable_table()->set_table_name(kTableName);
+    req.mutable_table()->set_table_id(table_id);
+
+    // Add the required information on the table's schema:
+    // key and non-null columns must be present in the request.
+    {
+      ColumnSchemaPB* col0 = req.mutable_schema()->add_columns();
+      col0->set_name(kCol0);
+      col0->set_type(INT32);
+      col0->set_is_key(true);
+
+      ColumnSchemaPB* col1 = req.mutable_schema()->add_columns();
+      col1->set_name(kCol1);
+      col1->set_type(INT64);
+      col0->set_is_key(true);
+
+      ColumnSchemaPB* col2 = req.mutable_schema()->add_columns();
+      col2->set_name(kCol2);
+      col2->set_type(STRING);
+    }
+
+    AlterTableRequestPB::Step* step = req.add_alter_schema_steps();
+    step->set_type(AlterTableRequestPB::DROP_RANGE_PARTITION);
+    KuduPartialRow lower(&kTableSchema);
+    ASSERT_OK(lower.SetInt32(kCol0, 100));
+    KuduPartialRow upper(&kTableSchema);
+    ASSERT_OK(upper.SetInt32(kCol0, 200));
+    RowOperationsPBEncoder enc(
+        step->mutable_drop_range_partition()->mutable_range_bounds());
+    enc.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+    enc.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+
+    RpcController ctl;
+    AlterTableResponsePB resp;
+    ASSERT_OK(proxy_->AlterTable(req, &resp, &ctl));
+    ASSERT_FALSE(resp.has_error())
+        << StatusFromPB(resp.error().status()).ToString();
+  }
+
+  {
+    CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
+    std::vector<scoped_refptr<TableInfo>> tables;
+    master_->catalog_manager()->GetAllTables(&tables);
+    ASSERT_EQ(1, tables.size());
+    // There should be just 3 tablets left since the range has been dropped.
+    ASSERT_EQ(3, tables.front()->num_tablets());
+  }
+}
+
 TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) {
   constexpr const char* const kTableName = "testtb";
   const Schema kTableSchema({ ColumnSchema("key", INT32), ColumnSchema("val", 
INT32) }, 1);

Reply via email to