This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 392f43f KUDU-2671 remove semantically duplicate range_hash_schemas
392f43f is described below
commit 392f43f8233f80d993c9b2bf35dbf43930969ff5
Author: Alexey Serbin <[email protected]>
AuthorDate: Fri Jul 16 18:06:34 2021 -0700
KUDU-2671 remove semantically duplicate range_hash_schemas
This patch removes the 'range_hash_schemas' field from
CreateTableRequestPB because the field was semantically duplicating the
'range_hash_schemas' sub-field of the already existing
'partition_schema' field. This change doesn't break any compatibility
because the corresponding client side which uses the removed field
hasn't yet been released.
In addition, this patch fixes an invalid memory access condition
(sometimes leading to SIGSEGV) in PartitionSchema::EncodeRangeBounds()
if the number of per range hash schemas is less than the number
of range bounds.
With the removal of the semantically duplicate field, the check for
the validity of per-range hash bucket ranges is now effective. This
patch adds a new test scenario to verify that the validation is now
in place and to catch regressions in the future.
I also updated the corresponding code in the C++ client and tests.
This is a follow-up to 23ab89db1 and 586b79132.
Change-Id: Icde3d0b0870fd3a3941fcc91602993ae7ad46266
Reviewed-on: http://gerrit.cloudera.org:8080/17694
Reviewed-by: Andrew Wong <[email protected]>
Tested-by: Kudu Jenkins
Reviewed-by: Mahesh Reddy <[email protected]>
---
src/kudu/client/client.cc | 23 ++--
src/kudu/client/flex_partitioning_client-test.cc | 149 +++++++++++----------
src/kudu/common/partition-test.cc | 44 +++---
src/kudu/common/partition.cc | 72 ++++++----
src/kudu/common/partition.h | 22 +--
.../integration-tests/table_locations-itest.cc | 25 ++--
src/kudu/master/catalog_manager.cc | 11 +-
src/kudu/master/master-test.cc | 70 +++++++---
src/kudu/master/master.proto | 14 +-
9 files changed, 261 insertions(+), 169 deletions(-)
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 280f17b..8c57302 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -935,13 +935,13 @@ Status KuduTableCreator::Create() {
SCHEMA_PB_WITHOUT_WRITE_DEFAULT),
"Invalid schema");
- RowOperationsPBEncoder encoder(req.mutable_split_rows_range_bounds());
bool has_range_splits = false;
+ RowOperationsPBEncoder splits_encoder(req.mutable_split_rows_range_bounds());
for (const auto& row : data_->range_partition_splits_) {
if (!row) {
return Status::InvalidArgument("range split row must not be null");
}
- encoder.Add(RowOperationsPB::SPLIT_ROW, *row);
+ splits_encoder.Add(RowOperationsPB::SPLIT_ROW, *row);
has_range_splits = true;
}
@@ -963,6 +963,9 @@ Status KuduTableCreator::Create() {
"choose one or the other");
}
+ auto* partition_schema = req.mutable_partition_schema();
+ partition_schema->CopyFrom(data_->partition_schema_);
+
for (const auto& p : data_->range_partitions_) {
const auto* range = p->data_;
if (!range->lower_bound_ || !range->upper_bound_) {
@@ -979,27 +982,29 @@ Status KuduTableCreator::Create() {
? RowOperationsPB::RANGE_UPPER_BOUND
: RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND;
- encoder.Add(lower_bound_type, *range->lower_bound_);
- encoder.Add(upper_bound_type, *range->upper_bound_);
+ splits_encoder.Add(lower_bound_type, *range->lower_bound_);
+ splits_encoder.Add(upper_bound_type, *range->upper_bound_);
if (has_range_with_custom_hash_schema) {
+ // In case of per-range custom hash bucket schemas, add range bounds
+ // into PartitionSchemaPB::range_bounds as well.
+ RowOperationsPBEncoder encoder(partition_schema->add_range_bounds());
+ encoder.Add(lower_bound_type, *range->lower_bound_);
+ encoder.Add(upper_bound_type, *range->upper_bound_);
// Populate corresponding element in 'range_hash_schemas' if there is at
// least one range with custom hash partitioning schema.
- auto* schemas_pb = req.add_range_hash_schemas();
+ auto* schemas_pb = partition_schema->add_range_hash_schemas();
for (const auto& schema : range->hash_bucket_schemas_) {
auto* pb = schemas_pb->add_hash_schemas();
pb->set_seed(schema.seed);
pb->set_num_buckets(schema.num_buckets);
for (const auto& column_name : schema.column_names) {
- auto* column_id = pb->add_columns();
- column_id->set_name(column_name);
+ pb->add_columns()->set_name(column_name);
}
}
}
}
- req.mutable_partition_schema()->CopyFrom(data_->partition_schema_);
-
if (data_->table_type_) {
req.set_table_type(*data_->table_type_);
}
diff --git a/src/kudu/client/flex_partitioning_client-test.cc
b/src/kudu/client/flex_partitioning_client-test.cc
index 35e9c52..0d303ab 100644
--- a/src/kudu/client/flex_partitioning_client-test.cc
+++ b/src/kudu/client/flex_partitioning_client-test.cc
@@ -199,66 +199,6 @@ TEST_F(FlexPartitioningCreateTableTest, CustomHashBuckets)
{
ASSERT_OK(CreateTable(kTableName, std::move(partitions)));
NO_FATALS(CheckTabletCount(kTableName, 3));
}
-
- // One-level hash bucket structure with hashing on non-key column only:
- // { 3, "int_val" }.
- {
- constexpr const char* const kTableName = "3@int_val";
- RangePartitions partitions;
- partitions.emplace_back(CreateRangePartition());
- auto& p = partitions.back();
- ASSERT_OK(p->add_hash_partitions({ kIntValColumn }, 2, 0));
- ASSERT_OK(CreateTable(kTableName, std::move(partitions)));
- NO_FATALS(CheckTabletCount(kTableName, 2));
- }
-
- // One-level hash bucket structure with hashing on non-key nullable column:
- // { 5, "string_val" }.
- {
- constexpr const char* const kTableName = "3@string_val";
- RangePartitions partitions;
- partitions.emplace_back(CreateRangePartition());
- auto& p = partitions.back();
- ASSERT_OK(p->add_hash_partitions({ kStringValColumn }, 5, 0));
- ASSERT_OK(CreateTable(kTableName, std::move(partitions)));
- NO_FATALS(CheckTabletCount(kTableName, 5));
- }
-
- // Two-level hash bucket structure: { 3, "key" } x { 3, "key" }.
- {
- constexpr const char* const kTableName = "3@key_x_3@key";
- RangePartitions partitions;
- partitions.emplace_back(CreateRangePartition());
- auto& p = partitions.back();
- ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 0));
- ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 1));
- ASSERT_OK(CreateTable(kTableName, std::move(partitions)));
- NO_FATALS(CheckTabletCount(kTableName, 9));
- }
-
- // Two-level hash bucket structure: { 2, "key" } x { 3, "int_val" }.
- {
- constexpr const char* const kTableName = "2@key_x_3@int_val";
- RangePartitions partitions;
- partitions.emplace_back(CreateRangePartition());
- auto& p = partitions.back();
- ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 2, 0));
- ASSERT_OK(p->add_hash_partitions({ kIntValColumn }, 3, 1));
- ASSERT_OK(CreateTable(kTableName, std::move(partitions)));
- NO_FATALS(CheckTabletCount(kTableName, 6));
- }
-
- // Two-level hash bucket structure: { 3, "key" } x { 2, "key", "int_val" }.
- {
- constexpr const char* const kTableName = "3@key_x_2@key:int_val";
- RangePartitions partitions;
- partitions.emplace_back(CreateRangePartition());
- auto& p = partitions.back();
- ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 0));
- ASSERT_OK(p->add_hash_partitions({ kKeyColumn, kIntValColumn }, 2, 1));
- ASSERT_OK(CreateTable(kTableName, std::move(partitions)));
- NO_FATALS(CheckTabletCount(kTableName, 6));
- }
}
// Create a table with mixed set of range partitions, using both table-wide and
@@ -272,12 +212,12 @@ TEST_F(FlexPartitioningCreateTableTest,
DefaultAndCustomHashBuckets) {
// Create a table with the following partitions:
//
// hash bucket
- // key 0 1 2 3
- // --------------------------------------------------------------
- // <111 x:{key} x:{key} - -
- // 111-222 x:{key} x:{key} x:{key} -
- // 222-333 x:{int_val} x:{int_val} x:{int_val} x:{int_val}
- // 333-444 x:{key,int_val} x:{key,int_val} - -
+ // key 0 1 2 3
+ // -----------------------------------------------------------
+ // <111 x:{key} x:{key} - -
+ // 111-222 x:{key} x:{key} x:{key} -
+ // 222-333 x:{key} x:{key} x:{key} x:{key}
+ // 333-444 x:{key} x:{key} - -
constexpr const char* const kTableName = "DefaultAndCustomHashBuckets";
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
@@ -305,24 +245,23 @@ TEST_F(FlexPartitioningCreateTableTest,
DefaultAndCustomHashBuckets) {
}
// Add a range partition with custom hash sub-partitioning rules:
- // 4 buckets with hash based on the "int_val" column with hash seed 2.
+ // 4 buckets with hash based on the "key" column with hash seed 2.
{
auto p = CreateRangePartition(222, 333);
- ASSERT_OK(p->add_hash_partitions({ kIntValColumn }, 4, 2));
+ ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 4, 2));
table_creator->add_custom_range_partition(p.release());
}
// Add a range partition with custom hash sub-partitioning rules:
- // 3 buckets hashing on the { "key", "int_val" } columns with hash seed 3.
+ // 2 buckets hashing on the "key" column with hash seed 3.
{
auto p = CreateRangePartition(333, 444);
- ASSERT_OK(p->add_hash_partitions({ kKeyColumn, kIntValColumn }, 2, 3));
+ ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 2, 3));
table_creator->add_custom_range_partition(p.release());
}
ASSERT_OK(table_creator->Create());
NO_FATALS(CheckTabletCount(kTableName, 11));
-
// Make sure it's possible to insert rows into the table.
//ASSERT_OK(InsertTestRows(kTableName, 111, 444));
}
@@ -414,6 +353,74 @@ TEST_F(FlexPartitioningCreateTableTest, Negatives) {
"split rows and custom hash bucket schemas for ranges are
incompatible: "
"choose one or the other");
}
+
+ {
+ constexpr const char* const kTableName = "3@key_x_3@key";
+ RangePartitions partitions;
+ partitions.emplace_back(CreateRangePartition());
+ auto& p = partitions.back();
+ ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 0));
+ ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 1));
+ const auto s = CreateTable(kTableName, std::move(partitions));
+ ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "hash bucket schema components must not contain "
+ "columns in common");
+ }
+
+ {
+ constexpr const char* const kTableName = "3@key_x_3@key:int_val";
+ RangePartitions partitions;
+ partitions.emplace_back(CreateRangePartition());
+ auto& p = partitions.back();
+ ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 2, 0));
+ ASSERT_OK(p->add_hash_partitions({ kKeyColumn, kIntValColumn }, 3, 1));
+ const auto s = CreateTable(kTableName, std::move(partitions));
+ ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "hash bucket schema components must not contain "
+ "columns in common");
+ }
+
+ {
+ constexpr const char* const kTableName = "3@int_val";
+ RangePartitions partitions;
+ partitions.emplace_back(CreateRangePartition());
+ auto& p = partitions.back();
+ ASSERT_OK(p->add_hash_partitions({ kIntValColumn }, 2, 0));
+ const auto s = CreateTable(kTableName, std::move(partitions));
+ ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "must specify only primary key columns for hash "
+ "bucket partition components");
+ }
+
+ {
+ constexpr const char* const kTableName = "3@string_val";
+ RangePartitions partitions;
+ partitions.emplace_back(CreateRangePartition());
+ auto& p = partitions.back();
+ ASSERT_OK(p->add_hash_partitions({ kStringValColumn }, 5, 0));
+ const auto s = CreateTable(kTableName, std::move(partitions));
+ ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "must specify only primary key columns for hash "
+ "bucket partition components");
+ }
+
+ {
+ constexpr const char* const kTableName = "2@key_x_3@int_val";
+ RangePartitions partitions;
+ partitions.emplace_back(CreateRangePartition());
+ auto& p = partitions.back();
+ ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 2, 0));
+ ASSERT_OK(p->add_hash_partitions({ kIntValColumn }, 3, 1));
+ const auto s = CreateTable(kTableName, std::move(partitions));
+ ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "must specify only primary key columns for hash "
+ "bucket partition components");
+ }
}
} // namespace client
diff --git a/src/kudu/common/partition-test.cc
b/src/kudu/common/partition-test.cc
index f26e5ce..299a33f 100644
--- a/src/kudu/common/partition-test.cc
+++ b/src/kudu/common/partition-test.cc
@@ -1125,8 +1125,8 @@ TEST_F(PartitionTest, TestVaryingHashSchemasPerRange) {
range_hash_schemas.clear();
partitions.clear();
- // Using std::random_shuffle to insert bounds and their hash schemas out of
sorted order,
- // yet resulting partitions will still be the same.
+ // Using std::random_shuffle to insert bounds and their hash schemas out of
+ // sorted order, yet resulting partitions will still be the same.
std::mt19937 gen(SeedRandom());
std::shuffle(bounds_with_hash_schemas.begin(),
bounds_with_hash_schemas.end(), gen);
@@ -1139,7 +1139,8 @@ TEST_F(PartitionTest, TestVaryingHashSchemasPerRange) {
{}, bounds, range_hash_schemas, schema, &partitions));
NO_FATALS(check_partitions(partitions));
- // not clearing bounds or range_hash_schemas, adding a split row to test
incompatibility
+ // Not clearing bounds or range_hash_schemas, adding a split row to test
+ // incompatibility.
vector<KuduPartialRow> splits;
{ // split: (a1, _, c12)
KuduPartialRow split(&schema);
@@ -1148,20 +1149,29 @@ TEST_F(PartitionTest, TestVaryingHashSchemasPerRange) {
splits.emplace_back(std::move(split));
}
- // expecting Status:InvalidArgument due to 'splits' and schemas within
'range_hash_schemas'
- // being defined at the same time.
- Status s = partition_schema.CreatePartitions(splits, bounds,
range_hash_schemas,
- schema, &partitions);
- ASSERT_EQ("Invalid argument: Both 'split_rows' and 'range_hash_schemas' "
- "cannot be populated at the same time.", s.ToString());
-
- // adding another schema to range_hash_schemas to trigger
Status::InvalidArgument due to
- // 'bounds and 'range_hash_schema' not being the same size.
- range_hash_schemas.emplace_back(PartitionSchema::HashBucketSchemas());
- Status s1 = partition_schema.CreatePartitions({}, bounds, range_hash_schemas,
- schema, &partitions);
- ASSERT_EQ("Invalid argument: The number of range bounds does not match the
number of per "
- "range hash schemas.", s1.ToString());
+ // Expecting Status::InvalidArgument() due to 'splits' and schemas within
+ // 'range_hash_schemas' being defined at the same time.
+ {
+ const auto s = partition_schema.CreatePartitions(
+ splits, bounds, range_hash_schemas, schema, &partitions);
+ ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "Both 'split_rows' and 'range_hash_schemas' "
+ "cannot be populated at the same time");
+ }
+
+ // Adding another schema to range_hash_schemas to trigger
+ // Status::InvalidArgument() due to 'bounds and 'range_hash_schema' not being
+ // the same size.
+ {
+ range_hash_schemas.push_back({});
+ const auto s = partition_schema.CreatePartitions(
+ {}, bounds, range_hash_schemas, schema, &partitions);
+ ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "4 vs 3: per range hash schemas and range bounds "
+ "must have the same size");
+ }
}
TEST_F(PartitionTest, CustomHashSchemasPerRangeOnly) {
diff --git a/src/kudu/common/partition.cc b/src/kudu/common/partition.cc
index aa9e1fa..c46d15c 100644
--- a/src/kudu/common/partition.cc
+++ b/src/kudu/common/partition.cc
@@ -198,6 +198,7 @@ Status PartitionSchema::FromPB(const PartitionSchemaPB& pb,
RETURN_NOT_OK(ExtractHashBucketSchemasFromPB(schema,
pb.range_hash_schemas(i).hash_schemas(),
&range_hash_schema[i]));
}
+
vector<pair<KuduPartialRow, KuduPartialRow>> range_bounds;
for (int i = 0; i < pb.range_bounds_size(); i++) {
RowOperationsPBDecoder decoder(&pb.range_bounds(i), &schema, &schema,
nullptr);
@@ -250,9 +251,15 @@ Status PartitionSchema::FromPB(const PartitionSchemaPB& pb,
}
}
+ auto* ranges_with_schemas_ptr = &partition_schema->ranges_with_hash_schemas_;
if (!range_bounds.empty()) {
RETURN_NOT_OK(partition_schema->EncodeRangeBounds(
- range_bounds, range_hash_schema, schema,
&partition_schema->ranges_with_hash_schemas_));
+ range_bounds, range_hash_schema, schema, ranges_with_schemas_ptr));
+ }
+ if (range_bounds.size() != ranges_with_schemas_ptr->size()) {
+ return Status::InvalidArgument(Substitute("the number of range bounds "
+ "($0) differs from the number ranges with hash schemas ($1)",
+ range_bounds.size(), ranges_with_schemas_ptr->size()));
}
return partition_schema->Validate(schema);
@@ -372,19 +379,26 @@ Status PartitionSchema::EncodeRangeSplits(const
vector<KuduPartialRow>& split_ro
return Status::OK();
}
-Status PartitionSchema::EncodeRangeBounds(const vector<pair<KuduPartialRow,
- KuduPartialRow>>&
range_bounds,
- const RangeHashSchema&
range_hash_schemas,
- const Schema& schema,
- vector<RangeWithHashSchemas>*
- bounds_with_hash_schemas) const {
+Status PartitionSchema::EncodeRangeBounds(
+ const vector<pair<KuduPartialRow, KuduPartialRow>>& range_bounds,
+ const RangeHashSchema& range_hash_schemas,
+ const Schema& schema,
+ vector<RangeWithHashSchemas>* bounds_with_hash_schemas) const {
DCHECK(bounds_with_hash_schemas->empty());
if (range_bounds.empty()) {
bounds_with_hash_schemas->emplace_back(RangeWithHashSchemas{"", "", {}});
return Status::OK();
}
- int j = 0;
+ if (!range_hash_schemas.empty() &&
+ range_hash_schemas.size() != range_bounds.size()) {
+ return Status::InvalidArgument(Substitute(
+ "$0 vs $1: per range hash schemas and range bounds "
+ "must have the same size",
+ range_hash_schemas.size(), range_bounds.size()));
+ }
+
+ size_t j = 0;
for (const auto& bound : range_bounds) {
string lower;
string upper;
@@ -407,21 +421,27 @@ Status PartitionSchema::EncodeRangeBounds(const
vector<pair<KuduPartialRow,
[](const RangeWithHashSchemas& s1, const RangeWithHashSchemas& s2)
{
return s1.lower < s2.lower;
});
+
// Check that the range bounds are non-overlapping
+ if (bounds_with_hash_schemas->empty()) {
+ return Status::OK();
+ }
for (int i = 0; i < bounds_with_hash_schemas->size() - 1; i++) {
const string& first_upper = bounds_with_hash_schemas->at(i).upper;
const string& second_lower = bounds_with_hash_schemas->at(i + 1).lower;
- if (first_upper.empty() || second_lower.empty() || first_upper >
second_lower) {
+ if (first_upper.empty() || second_lower.empty() ||
+ first_upper > second_lower) {
return Status::InvalidArgument(
"overlapping range partitions",
- strings::Substitute("first range partition: $0, second range
partition: $1",
-
RangePartitionDebugString(bounds_with_hash_schemas->at(i).lower,
-
bounds_with_hash_schemas->at(i).upper,
- schema),
-
RangePartitionDebugString(bounds_with_hash_schemas->at(i + 1).lower,
-
bounds_with_hash_schemas->at(i + 1).upper,
- schema)));
+ Substitute(
+ "first range partition: $0, second range partition: $1",
+ RangePartitionDebugString(bounds_with_hash_schemas->at(i).lower,
+ bounds_with_hash_schemas->at(i).upper,
+ schema),
+ RangePartitionDebugString(bounds_with_hash_schemas->at(i +
1).lower,
+ bounds_with_hash_schemas->at(i +
1).upper,
+ schema)));
}
}
@@ -474,22 +494,24 @@ Status PartitionSchema::SplitRangeBounds(const Schema&
schema,
return Status::OK();
}
-Status PartitionSchema::CreatePartitions(const vector<KuduPartialRow>&
split_rows,
- const vector<pair<KuduPartialRow,
- KuduPartialRow>>&
range_bounds,
- const RangeHashSchema&
range_hash_schemas,
- const Schema& schema,
- vector<Partition>* partitions) const {
+Status PartitionSchema::CreatePartitions(
+ const vector<KuduPartialRow>& split_rows,
+ const vector<pair<KuduPartialRow, KuduPartialRow>>& range_bounds,
+ const RangeHashSchema& range_hash_schemas,
+ const Schema& schema,
+ vector<Partition>* partitions) const {
const auto& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
if (!range_hash_schemas.empty()) {
if (!split_rows.empty()) {
return Status::InvalidArgument("Both 'split_rows' and
'range_hash_schemas' cannot be "
"populated at the same time.");
- }
+ }
if (range_bounds.size() != range_hash_schemas.size()) {
- return Status::InvalidArgument("The number of range bounds does not
match the number of per "
- "range hash schemas.");
+ return Status::InvalidArgument(
+ Substitute("$0 vs $1: per range hash schemas and range bounds "
+ "must have the same size",
+ range_hash_schemas.size(), range_bounds.size()));
}
}
diff --git a/src/kudu/common/partition.h b/src/kudu/common/partition.h
index 82a4a43..1879b93 100644
--- a/src/kudu/common/partition.h
+++ b/src/kudu/common/partition.h
@@ -197,12 +197,12 @@ class PartitionSchema {
// its order corresponds to the bounds in 'range_bounds'.
// If 'range_hash_schemas' is empty, the table wide hash schema is used per
range.
// Size of 'range_hash_schemas' and 'range_bounds' are equal if
'range_hash_schema' isn't empty.
- Status CreatePartitions(const std::vector<KuduPartialRow>& split_rows,
- const std::vector<std::pair<KuduPartialRow,
- KuduPartialRow>>&
range_bounds,
- const RangeHashSchema& range_hash_schemas,
- const Schema& schema,
- std::vector<Partition>* partitions) const
WARN_UNUSED_RESULT;
+ Status CreatePartitions(
+ const std::vector<KuduPartialRow>& split_rows,
+ const std::vector<std::pair<KuduPartialRow, KuduPartialRow>>&
range_bounds,
+ const RangeHashSchema& range_hash_schemas,
+ const Schema& schema,
+ std::vector<Partition>* partitions) const WARN_UNUSED_RESULT;
// Tests if the partition contains the row.
Status PartitionContainsRow(const Partition& partition,
@@ -442,11 +442,11 @@ class PartitionSchema {
// inserts them into 'bounds_with_hash_schemas' in sorted order. The hash
schemas
// per range are stored within 'range_hash_schemas'. If 'range_hash_schemas'
is empty,
// it indicates that the table wide hash schema will be used per range.
- Status EncodeRangeBounds(const std::vector<std::pair<KuduPartialRow,
- KuduPartialRow>>&
range_bounds,
- const RangeHashSchema& range_hash_schemas,
- const Schema& schema,
- std::vector<RangeWithHashSchemas>*
bounds_with_hash_schemas) const;
+ Status EncodeRangeBounds(
+ const std::vector<std::pair<KuduPartialRow, KuduPartialRow>>&
range_bounds,
+ const RangeHashSchema& range_hash_schemas,
+ const Schema& schema,
+ std::vector<RangeWithHashSchemas>* bounds_with_hash_schemas) const;
// Splits the encoded range bounds by the split points. The splits and
bounds within
// 'bounds_with_hash_schemas' must be sorted. If `bounds_with_hash_schemas`
is empty,
diff --git a/src/kudu/integration-tests/table_locations-itest.cc
b/src/kudu/integration-tests/table_locations-itest.cc
index ed10d5e..2b11f80 100644
--- a/src/kudu/integration-tests/table_locations-itest.cc
+++ b/src/kudu/integration-tests/table_locations-itest.cc
@@ -188,22 +188,25 @@ Status TableLocationsTest::CreateTable(const string&
table_name,
const vector<HashBucketSchema>&
table_hash_schema = {}) {
CreateTableRequestPB req;
- CreateTableResponsePB resp;
- RpcController controller;
-
req.set_name(table_name);
RETURN_NOT_OK(SchemaToPB(schema, req.mutable_schema()));
- RowOperationsPBEncoder encoder(req.mutable_split_rows_range_bounds());
+ RowOperationsPBEncoder splits_encoder(req.mutable_split_rows_range_bounds());
for (const KuduPartialRow& row : split_rows) {
- encoder.Add(RowOperationsPB::SPLIT_ROW, row);
+ splits_encoder.Add(RowOperationsPB::SPLIT_ROW, row);
}
+ auto* partition_schema_pb = req.mutable_partition_schema();
for (const auto& bound : bounds) {
- encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, bound.first);
- encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, bound.second);
+ splits_encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, bound.first);
+ splits_encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, bound.second);
+ if (!range_hash_schema.empty()) {
+ RowOperationsPBEncoder encoder(partition_schema_pb->add_range_bounds());
+ encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, bound.first);
+ encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, bound.second);
+ }
}
for (const auto& hash_schemas : range_hash_schema) {
- auto* range_hash_schemas_pb = req.add_range_hash_schemas();
+ auto* range_hash_schemas_pb =
partition_schema_pb->add_range_hash_schemas();
for (const auto& hash_schema : hash_schemas) {
auto* hash_schema_pb = range_hash_schemas_pb->add_hash_schemas();
for (const string& col_name : hash_schema.columns) {
@@ -215,7 +218,6 @@ Status TableLocationsTest::CreateTable(const string&
table_name,
}
if (!table_hash_schema.empty()) {
- auto* partition_schema_pb = req.mutable_partition_schema();
for (const auto& hash_schema : table_hash_schema) {
auto* hash_schema_pb = partition_schema_pb->add_hash_bucket_schemas();
for (const string& col_name : hash_schema.columns) {
@@ -226,6 +228,11 @@ Status TableLocationsTest::CreateTable(const string&
table_name,
}
}
+ CreateTableResponsePB resp;
+ RpcController controller;
+ if (resp.has_error()) {
+ RETURN_NOT_OK(StatusFromPB(resp.error().status()));
+ }
return proxy_->CreateTable(req, &resp, &controller);
}
diff --git a/src/kudu/master/catalog_manager.cc
b/src/kudu/master/catalog_manager.cc
index 12df2a6..318487a 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -1861,10 +1861,17 @@ Status CatalogManager::CreateTable(const
CreateTableRequestPB* orig_req,
PartitionSchema::RangeHashSchema range_hash_schemas;
if (FLAGS_enable_per_range_hash_schemas) {
- for (int i = 0; i < req.range_hash_schemas_size(); i++) {
+ // TODO(aserbin): the signature of CreatePartitions() require the
+ // 'range_hash_schemas' parameters: update its signature
+ // to remove the extra parameter and rely on its
+ // 'ranges_with_hash_schemas_' member field; the path in
+ // CatalogManager::ApplyAlterPartitioningSteps() involving
+ // CreatePartitions() should be updated correspondingly.
+ const auto& ps = req.partition_schema();
+ for (int i = 0; i < ps.range_hash_schemas_size(); i++) {
PartitionSchema::HashBucketSchemas hash_bucket_schemas;
RETURN_NOT_OK(PartitionSchema::ExtractHashBucketSchemasFromPB(
- schema, req.range_hash_schemas(i).hash_schemas(),
&hash_bucket_schemas));
+ schema, ps.range_hash_schemas(i).hash_schemas(),
&hash_bucket_schemas));
range_hash_schemas.emplace_back(std::move(hash_bucket_schemas));
}
}
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index 33d9f58..748a759 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -34,7 +34,7 @@
#include <vector>
#include <boost/optional/optional.hpp>
-#include <gflags/gflags_declare.h>
+#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include <rapidjson/document.h>
@@ -125,6 +125,7 @@ DECLARE_bool(master_support_authz_tokens);
DECLARE_bool(mock_table_metrics_for_testing);
DECLARE_bool(raft_prepare_replacement_before_eviction);
DECLARE_double(sys_catalog_fail_during_write);
+DECLARE_int32(default_num_replicas);
DECLARE_int32(diagnostics_log_stack_traces_interval_ms);
DECLARE_int32(flush_threshold_mb);
DECLARE_int32(flush_threshold_secs);
@@ -606,25 +607,28 @@ Status MasterTest::CreateTable(const string& table_name,
const optional<TableTypePB>& table_type,
const vector<vector<HashBucketSchema>>&
range_hash_schema) {
CreateTableRequestPB req;
- CreateTableResponsePB resp;
- RpcController controller;
-
req.set_name(table_name);
if (table_type) {
req.set_table_type(*table_type);
}
RETURN_NOT_OK(SchemaToPB(schema, req.mutable_schema()));
- RowOperationsPBEncoder encoder(req.mutable_split_rows_range_bounds());
+ RowOperationsPBEncoder splits_encoder(req.mutable_split_rows_range_bounds());
for (const KuduPartialRow& row : split_rows) {
- encoder.Add(RowOperationsPB::SPLIT_ROW, row);
+ splits_encoder.Add(RowOperationsPB::SPLIT_ROW, row);
}
+ auto* partition_schema_pb = req.mutable_partition_schema();
for (const pair<KuduPartialRow, KuduPartialRow>& bound : bounds) {
- encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, bound.first);
- encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, bound.second);
+ splits_encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, bound.first);
+ splits_encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, bound.second);
+ if (!range_hash_schema.empty()) {
+ RowOperationsPBEncoder encoder(partition_schema_pb->add_range_bounds());
+ encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, bound.first);
+ encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, bound.second);
+ }
}
for (const auto& hash_schemas : range_hash_schema) {
- auto* hash_schemas_pb = req.add_range_hash_schemas();
+ auto* hash_schemas_pb = partition_schema_pb->add_range_hash_schemas();
for (const auto& hash_schema : hash_schemas) {
auto* hash_bucket_schema_pb = hash_schemas_pb->add_hash_schemas();
for (const string& col_name : hash_schema.columns) {
@@ -641,10 +645,12 @@ Status MasterTest::CreateTable(const string& table_name,
if (comment) {
req.set_comment(*comment);
}
+ RpcController controller;
if (!bounds.empty()) {
controller.RequireServerFeature(MasterFeatures::RANGE_PARTITION_BOUNDS);
}
+ CreateTableResponsePB resp;
RETURN_NOT_OK(proxy_->CreateTable(req, &resp, &controller));
if (resp.has_error()) {
RETURN_NOT_OK(StatusFromPB(resp.error().status()));
@@ -880,6 +886,7 @@ TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) {
// No split rows and range specific hashing concurrently.
{
+ google::FlagSaver flag_saver;
FLAGS_enable_per_range_hash_schemas = true; // enable for testing.
KuduPartialRow split1(&kTableSchema);
ASSERT_OK(split1.SetInt32("key", 1));
@@ -898,21 +905,26 @@ TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) {
// The number of range bounds must match the size of user defined hash
schemas.
{
+ google::FlagSaver flag_saver;
FLAGS_enable_per_range_hash_schemas = true; // enable for testing.
KuduPartialRow a_lower(&kTableSchema);
KuduPartialRow a_upper(&kTableSchema);
ASSERT_OK(a_lower.SetInt32("key", 0));
ASSERT_OK(a_upper.SetInt32("key", 100));
+ KuduPartialRow b_lower(&kTableSchema);
+ KuduPartialRow b_upper(&kTableSchema);
+ ASSERT_OK(b_lower.SetInt32("key", 100));
+ ASSERT_OK(b_upper.SetInt32("key", 200));
vector<HashBucketSchema> hash_schemas_4 = { { {"key"}, 4, 0 } };
- vector<HashBucketSchema> hash_schemas_2 = { { {"val"}, 2, 0 } };
- vector<vector<HashBucketSchema>> range_hash_schema =
{std::move(hash_schemas_2),
-
std::move(hash_schemas_4)};
- Status s = CreateTable(kTableName, kTableSchema, { }, { { a_lower, a_upper
} },
+ vector<vector<HashBucketSchema>> range_hash_schema =
+ { std::move(hash_schemas_4) };
+ Status s = CreateTable(kTableName, kTableSchema, {},
+ { { a_lower, a_upper }, { b_lower, b_upper }, },
none, none, none, range_hash_schema);
- ASSERT_TRUE(s.IsInvalidArgument());
+ ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(),
- "The number of range bounds does not match the number
of per "
- "range hash schemas.");
+ "1 vs 2: per range hash schemas and range bounds "
+ "must have the same size");
}
// No non-range columns.
@@ -1132,6 +1144,32 @@ TEST_F(MasterTest, TestCreateTableMismatchedDefaults) {
SecureShortDebugString(resp.error().status()));
}
+// Non-PK columns cannot be used for per-range custom hash bucket schemas.
+TEST_F(MasterTest, NonPrimaryKeyColumnsForPerRangeCustomHashSchema) {
+ constexpr const char* const kTableName = "nicetry";
+ const Schema kTableSchema(
+ { ColumnSchema("key", INT32), ColumnSchema("int32_val", INT32) }, 1);
+
+ // Explicitly enable support for per-range custom hash bucket schemas.
+ FLAGS_enable_per_range_hash_schemas = true;
+
+ // For simplicity, a single tablet replica is enough.
+ FLAGS_default_num_replicas = 1;
+
+ KuduPartialRow lower(&kTableSchema);
+ KuduPartialRow upper(&kTableSchema);
+ ASSERT_OK(lower.SetInt32("key", 0));
+ ASSERT_OK(upper.SetInt32("key", 100));
+ vector<vector<HashBucketSchema>> range_hash_schema{{{{"int32_val"}, 2, 0}}};
+ const auto s = CreateTable(
+ kTableName, kTableSchema, {}, { { lower, upper } },
+ none, none, none, range_hash_schema);
+ ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "must specify only primary key columns for "
+ "hash bucket partition components");
+}
+
// Regression test for KUDU-253/KUDU-592: crash if the GetTableLocations RPC
call is
// invalid.
TEST_F(MasterTest, TestInvalidGetTableLocations) {
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 540cfd4..28c4180 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -513,17 +513,13 @@ message CreateTableRequestPB {
// Holds either the split rows or the range bounds (or both) of the table.
optional RowOperationsPB split_rows_range_bounds = 6;
- // Holds the table's partition schema, the partition schema's hash bucket
schemas
- // are the default for any range where 'range_hash_schemas' is empty.
+ // Holds the table's partition schema, table-wide hash bucket schema, and,
+ // optionally, custom hash bucket schemas for each range. The latter is
+ // populated only when the 'split_rows_range_bounds' field above specifies
+ // range bounds, and must not be present if any split rows are specified.
optional PartitionSchemaPB partition_schema = 7;
- // Holds the hash bucket schemas for each range during table creation. Only
- // populated when 'split_rows_range_bounds' specifies range bounds, must not
- // be present if any split rows are specified. If this field is present, its
- // size must match the number of ranges specified by range bounds and they
- // must be in the same order. If this field is absent, 'partition_schema' is
- // assumed for every range bound.
- repeated PartitionSchemaPB.PerRangeHashBucketSchemasPB range_hash_schemas =
12;
+ // repeated PartitionSchemaPB.PerRangeHashBucketSchemasPB range_hash_schemas
= 12;
// Number of replicas for a partition/tablet, a.k.a. table's replication
// factor. All tablets of the same table has same replication factor.