This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new e572e5967 [catalog_manager] KUDU-2671 fix bug in
CreatePartitionsForRange()
e572e5967 is described below
commit e572e59673f6dade62f96eb38b6417ef018bc7bf
Author: Alexey Serbin <[email protected]>
AuthorDate: Tue Jul 26 16:44:42 2022 -0700
[catalog_manager] KUDU-2671 fix bug in CreatePartitionsForRange()
This patch fixes a bug in PartitionSchema::CreatePartitionsForRange().
The manifestation of the bug was the inability to add an unbounded range
with custom hash schema (e.g. [0, +inf)) by AlterTable due to a conflict
with already existing range (e.g., [-inf, 0)) when if fact there was no
conflict at all. The root cause was the assumption that PartitionSchema
contained information on the range to be added in its internal map
'hash_schema_idx_by_encoded_range_start_' but that wasn't the case,
so GetHashSchemaForRange() would return the table-wide hash schema for
the new range being added instead of proper range-specific hash schema.
That lead to incorrect updating of range boundaries in
UpdatePartitionBoundaries(), producing wrong results.
This patch also contains a new test scenario that allowed to reproduce
the issue: the new scenario is failing without the fix applied.
Change-Id: I33a2bdea2e71bf4b567664c0166e9fbc07c4b882
Reviewed-on: http://gerrit.cloudera.org:8080/18793
Tested-by: Kudu Jenkins
Reviewed-by: Mahesh Reddy <[email protected]>
Reviewed-by: Abhishek Chennaka <[email protected]>
Reviewed-by: Attila Bukor <[email protected]>
---
src/kudu/client/flex_partitioning_client-test.cc | 63 ++++++++++++++
src/kudu/common/partition.cc | 103 ++++++++++++++---------
src/kudu/common/partition.h | 11 ++-
3 files changed, 137 insertions(+), 40 deletions(-)
diff --git a/src/kudu/client/flex_partitioning_client-test.cc
b/src/kudu/client/flex_partitioning_client-test.cc
index 3a89aea24..6b42eff1b 100644
--- a/src/kudu/client/flex_partitioning_client-test.cc
+++ b/src/kudu/client/flex_partitioning_client-test.cc
@@ -1775,6 +1775,69 @@ TEST_F(FlexPartitioningAlterTableTest,
ReadAndWriteToCustomRangePartition) {
}
}
+TEST_F(FlexPartitioningAlterTableTest,
ReadAndWriteToUnboundedCustomRangePartition) {
+ constexpr const char* const kTableName =
+ "ReadAndWriteToUnboundedCustomRangePartition";
+ unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+ unique_ptr<KuduPartialRow> lower(schema_.NewRow());
+ ASSERT_OK(lower->SetInt32(kKeyColumn, -100));
+ unique_ptr<KuduPartialRow> upper(schema_.NewRow());
+ ASSERT_OK(upper->SetInt32(kKeyColumn, 100));
+ table_creator->table_name(kTableName)
+ .schema(&schema_)
+ .num_replicas(1)
+ .add_hash_partitions({ kKeyColumn }, 2)
+ .set_range_partition_columns({ kKeyColumn })
+ .add_range_partition(lower.release(), upper.release());
+ ASSERT_OK(table_creator->Create());
+
+ unique_ptr<KuduTableAlterer>
table_alterer(client_->NewTableAlterer(kTableName));
+ {
+ auto p = CreateRangePartitionNoLowerBound(-100);
+ ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 1));
+ table_alterer->AddRangePartition(p.release());
+ }
+ {
+ auto p = CreateRangePartitionNoUpperBound(100);
+ ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 4, 2));
+ table_alterer->AddRangePartition(p.release());
+ }
+ ASSERT_OK(table_alterer->Alter());
+ NO_FATALS(CheckTabletCount(kTableName, 9)); // 2 + 3 + 4 = 9
+
+ // Make sure it's possible to insert rows into the table for all the existing
+ // the partitions: first check the range of table-wide schema, then check
+ // the ranges with custom hash schemas.
+ ASSERT_OK(InsertTestRows(kTableName, -100, 100));
+ NO_FATALS(CheckTableRowsNum(kTableName, 200));
+ ASSERT_OK(InsertTestRows(kTableName, -200, -100));
+ NO_FATALS(CheckTableRowsNum(kTableName, 300));
+ ASSERT_OK(InsertTestRows(kTableName, 100, 200));
+ NO_FATALS(CheckTableRowsNum(kTableName, 400));
+ ASSERT_OK(InsertTestRows(kTableName, INT32_MIN, INT32_MIN + 100));
+ NO_FATALS(CheckTableRowsNum(kTableName, 500));
+ ASSERT_OK(InsertTestRows(kTableName, INT32_MAX - 100, INT32_MAX));
+ NO_FATALS(CheckTableRowsNum(kTableName, 600));
+
+ NO_FATALS(CheckTableRowsNum(kTableName, kKeyColumn, -300, 300, 400));
+
+ // Drop a partition in the middle and re-scan with various ranges.
+ {
+ unique_ptr<KuduTableAlterer>
table_alterer_drop(client_->NewTableAlterer(kTableName));
+ unique_ptr<KuduPartialRow> lower_drop(schema_.NewRow());
+ ASSERT_OK(lower_drop->SetInt32(kKeyColumn, -100));
+ unique_ptr<KuduPartialRow> upper_drop(schema_.NewRow());
+ ASSERT_OK(upper_drop->SetInt32(kKeyColumn, 100));
+ table_alterer_drop->DropRangePartition(lower_drop.release(),
upper_drop.release());
+ ASSERT_OK(table_alterer_drop->Alter());
+ }
+ NO_FATALS(CheckTableRowsNum(kTableName, 400));
+ NO_FATALS(CheckTableRowsNum(kTableName, kKeyColumn, 0, 300, 100));
+ NO_FATALS(CheckTableRowsNum(kTableName, kKeyColumn, -300, 0, 100));
+ NO_FATALS(CheckTableRowsNum(kTableName, kKeyColumn, -200, 200, 200));
+ NO_FATALS(CheckTableRowsNum(kTableName, kKeyColumn, -500, 500, 200));
+}
+
// When working with a cluster that doesn't support range-specific hash schemas
// for tables, the client should receive proper error while trying to add
// a range with custom hash schema.
diff --git a/src/kudu/common/partition.cc b/src/kudu/common/partition.cc
index 85bfbfb96..3d428999e 100644
--- a/src/kudu/common/partition.cc
+++ b/src/kudu/common/partition.cc
@@ -622,7 +622,7 @@ Status PartitionSchema::CreatePartitions(
}
}
- UpdatePartitionBoundaries(&new_partitions);
+ UpdatePartitionBoundaries(hash_encoder, &new_partitions);
*partitions = std::move(new_partitions);
return Status::OK();
@@ -655,7 +655,7 @@ Status PartitionSchema::CreatePartitions(
std::make_move_iterator(current_bound_hash_partitions.end()));
}
- UpdatePartitionBoundaries(&result_partitions);
+ UpdatePartitionBoundaries(hash_encoder, &result_partitions);
*partitions = std::move(result_partitions);
return Status::OK();
@@ -666,11 +666,28 @@ Status PartitionSchema::CreatePartitionsForRange(
const HashSchema& range_hash_schema,
const Schema& schema,
std::vector<Partition>* partitions) const {
+ RETURN_NOT_OK(CheckRangeSchema(schema));
+
RangesWithHashSchemas ranges_with_hash_schemas;
RETURN_NOT_OK(EncodeRangeBounds(
{range_bound}, {range_hash_schema}, schema, &ranges_with_hash_schemas));
+ DCHECK_EQ(1, ranges_with_hash_schemas.size());
- return CreatePartitions(ranges_with_hash_schemas, schema, partitions);
+ const auto& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
+ const auto& range = ranges_with_hash_schemas.front();
+ vector<Partition> result_partitions = GenerateHashPartitions(
+ range.hash_schema, hash_encoder);
+ // Add range information to the partition key.
+ for (Partition& p : result_partitions) {
+ DCHECK(p.begin_.range_key().empty()) << p.begin_.DebugString();
+ p.begin_.mutable_range_key()->assign(range.lower);
+ DCHECK(p.end().range_key().empty());
+ p.end_.mutable_range_key()->assign(range.upper);
+ UpdatePartitionBoundaries(hash_encoder, range_hash_schema, &p);
+ }
+ *partitions = std::move(result_partitions);
+
+ return Status::OK();
}
template<typename Row>
@@ -1455,7 +1472,20 @@ Status PartitionSchema::CheckRangeSchema(const Schema&
schema) const {
return Status::OK();
}
-void PartitionSchema::UpdatePartitionBoundaries(vector<Partition>* partitions)
const {
+void PartitionSchema::UpdatePartitionBoundaries(
+ const KeyEncoder<string>& hash_encoder,
+ vector<Partition>* partitions) const {
+ for (size_t idx = 0; idx < partitions->size(); ++idx) {
+ auto& p = (*partitions)[idx];
+ UpdatePartitionBoundaries(
+ hash_encoder, GetHashSchemaForRange(p.begin().range_key()), &p);
+ }
+}
+
+void PartitionSchema::UpdatePartitionBoundaries(
+ const KeyEncoder<string>& hash_encoder,
+ const HashSchema& partition_hash_schema,
+ Partition* partition) {
// Note: the following discussion and logic only takes effect when the
table's
// partition schema includes at least one hash bucket component, and the
// absolute upper and/or absolute lower range bound is unbounded.
@@ -1474,43 +1504,38 @@ void
PartitionSchema::UpdatePartitionBoundaries(vector<Partition>* partitions) c
// the absolute start and end case, these holes are filled by clearing the
// partition key beginning at the hash component. For a concrete example,
// see PartitionTest::TestCreatePartitions.
- const auto& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
- for (size_t partition_idx = 0; partition_idx < partitions->size();
++partition_idx) {
- auto& p = (*partitions)[partition_idx];
- const auto& hash_schema = GetHashSchemaForRange(p.begin().range_key());
- CHECK_EQ(hash_schema.size(), p.hash_buckets().size());
- const auto hash_buckets_num = static_cast<int>(p.hash_buckets().size());
- // Find the first nonzero-valued bucket from the end and truncate the
- // partition key starting from that bucket onwards for zero-valued buckets.
- //
- // TODO(aserbin): is this really necessary -- zeros in hash key are the
- // minimum possible values, and the range part is already
- // empty?
- if (p.begin().range_key().empty()) {
- for (int i = hash_buckets_num - 1; i >= 0; --i) {
- if (p.hash_buckets()[i] != 0) {
- break;
- }
- p.begin_.mutable_hash_key()->erase(kEncodedBucketSize * i);
+ DCHECK(partition);
+ auto& p = *partition; // just a handy shortcut
+ CHECK_EQ(partition_hash_schema.size(), p.hash_buckets().size());
+ const auto hash_buckets_num = static_cast<int>(p.hash_buckets().size());
+ // Find the first nonzero-valued bucket from the end and truncate the
+ // partition key starting from that bucket onwards for zero-valued buckets.
+ // This is necessary to complement the truncation performed below for the
+ // hash part of the partition's end key below.
+ if (p.begin().range_key().empty()) {
+ for (int i = hash_buckets_num - 1; i >= 0; --i) {
+ if (p.hash_buckets()[i] != 0) {
+ break;
}
+ p.begin_.mutable_hash_key()->erase(kEncodedBucketSize * i);
}
- // Starting from the last hash bucket, truncate the partition key until we
hit the first
- // non-max-valued bucket, at which point, replace the encoding with the
next-incremented
- // bucket value. For example, the following range end partition keys
should be transformed,
- // where the key is (hash_comp1, hash_comp2, range_comp):
- //
- // [ (0, 0, "") -> (0, 1, "") ]
- // [ (0, 1, "") -> (1, _, "") ]
- // [ (1, 0, "") -> (1, 1, "") ]
- // [ (1, 1, "") -> (_, _, "") ]
- if (p.end().range_key().empty()) {
- for (int i = hash_buckets_num - 1; i >= 0; --i) {
- p.end_.mutable_hash_key()->erase(kEncodedBucketSize * i);
- const int32_t hash_bucket = p.hash_buckets()[i] + 1;
- if (hash_bucket != hash_schema[i].num_buckets) {
- hash_encoder.Encode(&hash_bucket, p.end_.mutable_hash_key());
- break;
- }
+ }
+ // Starting from the last hash bucket, truncate the partition key until we
hit the first
+ // non-max-valued bucket, at which point, replace the encoding with the
next-incremented
+ // bucket value. For example, the following range end partition keys should
be transformed,
+ // where the key is (hash_comp1, hash_comp2, range_comp):
+ //
+ // [ (0, 0, "") -> (0, 1, "") ]
+ // [ (0, 1, "") -> (1, _, "") ]
+ // [ (1, 0, "") -> (1, 1, "") ]
+ // [ (1, 1, "") -> (_, _, "") ]
+ if (p.end().range_key().empty()) {
+ for (int i = hash_buckets_num - 1; i >= 0; --i) {
+ p.end_.mutable_hash_key()->erase(kEncodedBucketSize * i);
+ const int32_t hash_bucket = p.hash_buckets()[i] + 1;
+ if (hash_bucket != partition_hash_schema[i].num_buckets) {
+ hash_encoder.Encode(&hash_bucket, p.end_.mutable_hash_key());
+ break;
}
}
}
diff --git a/src/kudu/common/partition.h b/src/kudu/common/partition.h
index 90e67ef6b..9fb886196 100644
--- a/src/kudu/common/partition.h
+++ b/src/kudu/common/partition.h
@@ -24,6 +24,8 @@
#include <utility>
#include <vector>
+#include <gtest/gtest_prod.h>
+
#include "kudu/common/schema.h"
#include "kudu/gutil/port.h"
#include "kudu/util/slice.h"
@@ -634,7 +636,14 @@ class PartitionSchema {
// method fills in the address space to have the proper ordering of the
// serialized partition keys -- that's important for partition pruning and
// overall ordering of the serialized partition keys.
- void UpdatePartitionBoundaries(std::vector<Partition>* partitions) const;
+ void UpdatePartitionBoundaries(const KeyEncoder<std::string>& hash_encoder,
+ std::vector<Partition>* partitions) const;
+ // Similar to the above, but update the boundaries for just a single
partition
+ // specified along with its hash schema.
+ static void UpdatePartitionBoundaries(
+ const KeyEncoder<std::string>& hash_encoder,
+ const HashSchema& partition_hash_schema,
+ Partition* partition);
// Validates the split rows, converts them to partition key form, and inserts
// them into splits in sorted order.