This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch branch-1.17.x
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/branch-1.17.x by this push:
new 597d2bf15 KUDU-3564: Fix IN list predicate pruning
597d2bf15 is described below
commit 597d2bf156df097e7b04c7040323a55b291d0f3f
Author: zhangyifan27 <[email protected]>
AuthorDate: Fri Apr 5 09:35:46 2024 +0800
KUDU-3564: Fix IN list predicate pruning
This patch fixes IN list predicate pruning with a range specific
hash schema by modifying the content of 'PartitionMayContainRow'
method. We now get the right hash schema based on specific
partition's lower bound key.
This is a follow-up to 607d9d0.
Change-Id: I964b1ccfb85602741843ab555cdee53343217033
Reviewed-on: http://gerrit.cloudera.org:8080/21243
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
(cherry picked from commit 5a2c776dfb894310fc286f3ebe60d53c8a5e9341)
Reviewed-on: http://gerrit.cloudera.org:8080/21253
Reviewed-by: Yifan Zhang <[email protected]>
---
src/kudu/common/column_predicate.cc | 2 +-
src/kudu/common/partition.cc | 4 +-
src/kudu/common/scan_spec-test.cc | 101 ++++++++++++++++++++++++++++++++----
3 files changed, 93 insertions(+), 14 deletions(-)
diff --git a/src/kudu/common/column_predicate.cc
b/src/kudu/common/column_predicate.cc
index 4b163da9e..08d582715 100644
--- a/src/kudu/common/column_predicate.cc
+++ b/src/kudu/common/column_predicate.cc
@@ -464,7 +464,7 @@ void ColumnPredicate::MergeIntoIsNull(const ColumnPredicate
&other) {
void ColumnPredicate::MergeIntoInList(const ColumnPredicate &other) {
CHECK(predicate_type_ == PredicateType::InList);
- DCHECK(values_.size() > 1);
+ DCHECK(values_.size() >= 1);
switch (other.predicate_type()) {
case PredicateType::None: {
diff --git a/src/kudu/common/partition.cc b/src/kudu/common/partition.cc
index cd8599ca3..0d3c150a9 100644
--- a/src/kudu/common/partition.cc
+++ b/src/kudu/common/partition.cc
@@ -808,9 +808,7 @@ bool PartitionSchema::PartitionMayContainRow(const
Partition& partition,
return false;
}
- string range_key;
- EncodeColumns(row, range_schema_.column_ids, &range_key);
- const auto& hash_schema = GetHashSchemaForRange(range_key);
+ const auto& hash_schema =
GetHashSchemaForRange(partition.begin_.range_key());
for (size_t i = 0; i < hash_schema.size(); ++i) {
const auto& hash_dimension = hash_schema[i];
if (hash_dimension.column_ids.size() == 1 &&
diff --git a/src/kudu/common/scan_spec-test.cc
b/src/kudu/common/scan_spec-test.cc
index 6c8e68ac9..90549a32a 100644
--- a/src/kudu/common/scan_spec-test.cc
+++ b/src/kudu/common/scan_spec-test.cc
@@ -35,6 +35,8 @@
#include "kudu/common/partial_row.h"
#include "kudu/common/partition.h"
#include "kudu/common/row.h"
+#include "kudu/common/row_operations.h"
+#include "kudu/common/row_operations.pb.h"
#include "kudu/common/schema.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/stringpiece.h"
@@ -51,16 +53,13 @@ using std::vector;
namespace kudu {
namespace {
-// Generate partition schema of a table with given hash_partitions and range
partition keys.
-// E.g. GeneratePartitionSchema(schema, {make_pair({a, b}, 3), make_pair({c},
5) })
-// Returns 'partition by hash(a, b) partitions 3, hash(c) partitions 5'.
-void GeneratePartitionSchema(const Schema& schema,
- const vector<pair<vector<string>, int>>&
hash_partitions,
- const vector<string>& range_partition_columns,
- PartitionSchema* partition_schema) {
- PartitionSchemaPB partition_schema_pb;
+
+void GeneratePartitionSchemaPB(const Schema& schema,
+ const vector<pair<vector<string>, int>>&
hash_partitions,
+ const vector<string>& range_partition_columns,
+ PartitionSchemaPB* partition_schema_pb) {
for (const auto& col_names_and_num_buckets : hash_partitions) {
- auto* hash_dimension_pb = partition_schema_pb.add_hash_schema();
+ auto* hash_dimension_pb = partition_schema_pb->add_hash_schema();
hash_dimension_pb->set_num_buckets(col_names_and_num_buckets.second);
hash_dimension_pb->set_seed(0);
for (const auto& col_name : col_names_and_num_buckets.first) {
@@ -71,14 +70,53 @@ void GeneratePartitionSchema(const Schema& schema,
}
}
if (!range_partition_columns.empty()) {
- auto* range_schema = partition_schema_pb.mutable_range_schema();
+ auto* range_schema = partition_schema_pb->mutable_range_schema();
for (const auto& range_column : range_partition_columns) {
range_schema->add_columns()->set_name(range_column);
}
}
+}
+
+// Generate partition schema of a table with given hash_partitions and range
partition keys.
+// E.g. GeneratePartitionSchema(schema, {make_pair({a, b}, 3), make_pair({c},
5) })
+// Returns 'partition by hash(a, b) partitions 3, hash(c) partitions 5'.
+void GeneratePartitionSchema(const Schema& schema,
+ const vector<pair<vector<string>, int>>&
hash_partitions,
+ const vector<string>& range_partition_columns,
+ PartitionSchema* partition_schema) {
+ PartitionSchemaPB partition_schema_pb;
+ GeneratePartitionSchemaPB(
+ schema, hash_partitions, range_partition_columns, &partition_schema_pb);
CHECK_OK(PartitionSchema::FromPB(partition_schema_pb, schema,
partition_schema));
}
+void AddRangePartitionWithSchema(const Schema& schema,
+ const vector<pair<vector<string>, int>>&
hash_partitions,
+ const vector<pair<string, int8_t>>&
lower_int_cols,
+ const vector<pair<string, int8_t>>&
upper_int_cols,
+ PartitionSchemaPB* pb) {
+ auto* range = pb->add_custom_hash_schema_ranges();
+ RowOperationsPBEncoder encoder(range->mutable_range_bounds());
+ KuduPartialRow lower(&schema);
+ KuduPartialRow upper(&schema);
+ for (const auto& [column_name, value] : lower_int_cols) {
+ ASSERT_OK(lower.SetInt8(column_name, value));
+ }
+ for (const auto& [column_name, value] : upper_int_cols) {
+ ASSERT_OK(upper.SetInt8(column_name, value));
+ }
+ encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+ encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+ for (const auto& [col_names, num_buckets] : hash_partitions) {
+ auto* hash_dimension_pb = range->add_hash_schema();
+ for (const auto& column_name : col_names) {
+ hash_dimension_pb->add_columns()->set_name(column_name);
+ }
+ hash_dimension_pb->set_num_buckets(num_buckets);
+ hash_dimension_pb->set_seed(0);
+ }
+}
+
// Copy a spec and return the pruned spec string.
string PruneInlistValuesAndGetSchemaString(const ScanSpec& spec,
const Schema& schema,
@@ -938,6 +976,49 @@ TEST_F(CompositeIntKeysTest,
NonKeyValuesInListHashPruning) {
spec, schema, partitions[2], partition_schema, &arena_));
}
+// Test that IN list pruning works with a per range hash schema.
+TEST_F(CompositeIntKeysTest, CustomRangePartitionInListPruning) {
+ ScanSpec spec;
+ AddInPredicate<int8_t>(&spec, "a", { 1, 2});
+ AddInPredicate<int8_t>(&spec, "b", { 1, 2});
+
+ const auto schema = schema_.CopyWithColumnIds();
+ PartitionSchemaPB partition_schema_pb;
+ GeneratePartitionSchemaPB(
+ schema, { { { "a" }, 3 } }, {"b"}, &partition_schema_pb);
+ AddRangePartitionWithSchema(
+ schema, { { { "a" }, 2 } }, {{"b", 1}}, {{"b", 2}},
&partition_schema_pb);
+ PartitionSchema partition_schema;
+ PartitionSchema::RangesWithHashSchemas ranges;
+ ASSERT_OK(PartitionSchema::FromPB(partition_schema_pb, schema,
&partition_schema, &ranges));
+
+ vector<Partition> partitions;
+ ASSERT_OK(partition_schema.CreatePartitions(ranges, schema, &partitions));
+ ASSERT_EQ(2, partitions.size());
+
+ // Set lower and upper bound for 'spec' to imitate a scan request from
client that
+ // has start_primary_key and stop_primary_key.
+ KuduPartialRow lower_bound(&schema_);
+ CHECK_OK(lower_bound.SetInt8("a", -128));
+ CHECK_OK(lower_bound.SetInt8("b", -128));
+ CHECK_OK(lower_bound.SetInt8("c", -128));
+ SetLowerBound(&spec, lower_bound);
+ KuduPartialRow upper_bound(&schema_);
+ CHECK_OK(upper_bound.SetInt8("a", 127));
+ CHECK_OK(upper_bound.SetInt8("b", 127));
+ CHECK_OK(upper_bound.SetInt8("c", 127));
+ SetExclusiveUpperBound(&spec, upper_bound);
+
+ ASSERT_EQ("PK >= (int8 a=2, int8 b=1, int8 c=-128) AND "
+ "PK < (int8 a=2, int8 b=2, int8 c=-128) AND a IN (2) AND b IN (1)",
+ PruneInlistValuesAndGetSchemaString(
+ spec, schema, partitions[0], partition_schema, &arena_));
+ ASSERT_EQ("PK >= (int8 a=1, int8 b=1, int8 c=-128) AND "
+ "PK < (int8 a=1, int8 b=2, int8 c=-128) AND a IN (1) AND b IN (1)",
+ PruneInlistValuesAndGetSchemaString(
+ spec, schema, partitions[1], partition_schema, &arena_));
+}
+
// Test that IN list mixed with range predicates get pushed into the primary
key
// bounds.
TEST_F(CompositeIntKeysTest, TestInListPushdownWithRange) {