This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 5d6774b1022f26d33abd6cc9fb99507490849428
Author: duyuqi <[email protected]>
AuthorDate: Thu Mar 30 10:43:07 2023 +0800

    [cpp-client] KUDU-3455 Reduce space complexity and speed up hash partition 
pruning for in-list predicate
    
    This patch comes from a committed patch, its committed id is 'b69dbeb'. As 
that
    patch said, logic of pruning hash partitions for in-list predicate in Kudu 
cpp
    client has also a high space complexity and slow. Old algorithm must keep
    all intermedium objects because they are incomplete until they are
    completed and can be computed hash.
    
    This patch fixes the problems and provides a recursive algorithm the
    same as the one used by java client.
    
    Compared with java client, the cpp client is less likely to cause the
    OOM condition because it does not keep too many intermediate results.
    This optimization has good benefits too. The benefits are related to
    the in-list length and the number of primary keys, the performance
    would be better if in-list length is longer. For example,
    PartitionPrunerTest::TestMultiColumnInListHashPruningManyValues,
    using 10 key columns and kMaxInListLength=50, old algorithm memory cost
    may reach 600MB, while new algorithm's memory cost can be ignored
    (it only need one objects and a few stacks for contexts). At the same
    time, new algorithm has a good speedup, some effect as below:
    
    combination_count: 5554006920000, old cost: 428238us, new cost: 713us, 
speedup: 600.6x
    combination_count: 89083783664568, old cost: 2764924us, new cost: 1145us, 
speedup: 2414.7x
    combination_count: 27194091724800, old cost: 1610475us, new cost: 1151us, 
speedup: 1399.2x
    combination_count: 7116622216704, old cost: 34544289us, new cost: 375us, 
speedup: 92118.1x
    combination_count: 37570734489600, old cost: 1733205us, new cost: 901us, 
speedup: 1923.6x
    
    Change-Id: Ie4bea5c10b4ac2c62b85625fe9d2a33ceb4fb2e9
    Reviewed-on: http://gerrit.cloudera.org:8080/19794
    Reviewed-by: Yingchun Lai <[email protected]>
    Tested-by: Yingchun Lai <[email protected]>
    Reviewed-by: Yuqi Du <[email protected]>
---
 src/kudu/common/partition_pruner-test.cc | 176 +++++++++++++++++++++++++++++++
 src/kudu/common/partition_pruner.cc      |  74 +++++++++----
 src/kudu/common/partition_pruner.h       |   9 ++
 3 files changed, 240 insertions(+), 19 deletions(-)

diff --git a/src/kudu/common/partition_pruner-test.cc 
b/src/kudu/common/partition_pruner-test.cc
index af3326669..c808c905a 100644
--- a/src/kudu/common/partition_pruner-test.cc
+++ b/src/kudu/common/partition_pruner-test.cc
@@ -24,14 +24,17 @@
 #include <optional>
 #include <string>
 #include <tuple>
+#include <unordered_map>
 #include <utility>
 #include <vector>
 
+#include <glog/logging.h>
 #include <gtest/gtest.h>
 
 #include "kudu/common/column_predicate.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/encoded_key.h"
+#include "kudu/common/key_encoder.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/partition.h"
 #include "kudu/common/row.h"
@@ -39,8 +42,11 @@
 #include "kudu/common/row_operations.pb.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
+#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/memory/arena.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/random.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -85,6 +91,12 @@ class PartitionPrunerTest : public KuduTest {
                                     const ScanSpec& spec,
                                     size_t remaining_tablets,
                                     size_t pruner_ranges);
+
+  // Search all combinations of in-list and equality predicates.
+  // Return a bitset indicates which hash buckets are selected of these 
combinations.
+  static vector<bool> PruneHashComponent(const PartitionSchema::HashDimension& 
hash_dimension,
+                                         const Schema& schema,
+                                         const ScanSpec& scan_spec);
 };
 
 void PartitionPrunerTest::CreatePartitionSchemaPB(
@@ -178,6 +190,49 @@ void PartitionPrunerTest::CheckPrunedPartitions(
   ASSERT_EQ(pruner_ranges, pruner.NumRangesRemaining());
 }
 
+// The old algorithm. It moved from 'partition_pruner.cc'.
+vector<bool> PartitionPrunerTest::PruneHashComponent(
+    const PartitionSchema::HashDimension& hash_dimension,
+    const Schema& schema,
+    const ScanSpec& scan_spec) {
+  vector<bool> hash_bucket_bitset(hash_dimension.num_buckets, false);
+  vector<string> encoded_strings(1, "");
+  for (size_t col_offset = 0; col_offset < hash_dimension.column_ids.size(); 
++col_offset) {
+    vector<string> new_encoded_strings;
+    const ColumnSchema& column = 
schema.column_by_id(hash_dimension.column_ids[col_offset]);
+    const ColumnPredicate& predicate = FindOrDie(scan_spec.predicates(), 
column.name());
+    const KeyEncoder<string>& encoder = 
GetKeyEncoder<string>(column.type_info());
+
+    vector<const void*> predicate_values;
+    if (predicate.predicate_type() == PredicateType::Equality) {
+      predicate_values.push_back(predicate.raw_lower());
+    } else {
+      CHECK(predicate.predicate_type() == PredicateType::InList);
+      predicate_values.insert(predicate_values.end(),
+                              predicate.raw_values().begin(),
+                              predicate.raw_values().end());
+    }
+    // For each of the encoded string, replicate it by the number of values in
+    // equality and in-list predicate.
+    for (const string& encoded_string : encoded_strings) {
+      for (const void* predicate_value : predicate_values) {
+        string new_encoded_string = encoded_string;
+        encoder.Encode(predicate_value,
+                       col_offset + 1 == hash_dimension.column_ids.size(),
+                       &new_encoded_string);
+        new_encoded_strings.emplace_back(new_encoded_string);
+      }
+    }
+    encoded_strings.swap(new_encoded_strings);
+  }
+  for (const string& encoded_string : encoded_strings) {
+    uint32_t hash_value = PartitionSchema::HashValueForEncodedColumns(
+        encoded_string, hash_dimension);
+    hash_bucket_bitset[hash_value] = true;
+  }
+  return hash_bucket_bitset;
+}
+
 TEST_F(PartitionPrunerTest, TestPrimaryKeyRangePruning) {
   // CREATE TABLE t
   // (a INT8, b INT8, c INT8)
@@ -917,6 +972,127 @@ TEST_F(PartitionPrunerTest, 
TestMultiColumnInListHashPruning) {
                   6, 2));
 }
 
+// For test cases that will run with in-list predicates using variant length
+class PartitionPrunerTestWithMaxInListLength : public PartitionPrunerTest,
+                                               public 
testing::WithParamInterface<int32_t> {};
+
+INSTANTIATE_TEST_SUITE_P(MaxInListLength,
+                         PartitionPrunerTestWithMaxInListLength,
+                         ::testing::Values(1, 10, 20, 50));
+
+// Create a table with 200 columns including 10 key columns and
+// generate some in-list predicates for these key columns. The old algorithm
+// is correct by test cases and in practice, so we check the correctness
+// of new algorithm by comparing it with the old one. At the same time,
+// compare the efficiency of the two algorithms.
+TEST_P(PartitionPrunerTestWithMaxInListLength, 
TestMultiColumnInListHashPruningManyValues) {
+  const int kMaxInListLength = GetParam();
+  if (kMaxInListLength > 1) {
+    SKIP_IF_SLOW_NOT_ALLOWED();
+  }
+  constexpr const int kColumnSize = 200;
+  constexpr const int kKeyColumnSize = 10;
+  vector<string> key_column_names;
+  SchemaBuilder builder;
+  int i = 0;
+  for (; i < kKeyColumnSize; i++) {
+    builder.AddKeyColumn(Substitute("key_$0", i), DataType::INT32);
+    key_column_names.push_back(Substitute("key_$0", i));
+  }
+  for (; i < kColumnSize; i++) {
+    builder.AddColumn(Substitute("column_$0", i), DataType::INT32);
+  }
+  Schema schema = builder.Build();
+
+  PartitionSchemaPB pb;
+  vector<ColumnNamesNumBucketsAndSeed> hash_buckets;
+  hash_buckets.push_back({{key_column_names[0]}, 24, 0});
+  hash_buckets.push_back({{key_column_names[1], key_column_names[2]}, 24, 0});
+  hash_buckets.push_back({{key_column_names[3], key_column_names[4]}, 64, 0});
+  hash_buckets.push_back({{key_column_names[5],
+                           key_column_names[6],
+                           key_column_names[7],
+                           key_column_names[8],
+                           key_column_names[9]},
+                          12,
+                          0});
+
+  CreatePartitionSchemaPB({}, hash_buckets, &pb);
+  pb.mutable_range_schema()->clear_columns();
+  PartitionSchema partition_schema;
+  ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
+
+  vector<Partition> partitions;
+  ASSERT_OK(partition_schema.CreatePartitions({}, {}, schema, &partitions));
+
+  // Applies the specified predicates to a scan and checks that the new 
agorithm is
+  // correct by comparing it with the old one. Show the speedup about new 
agorithm.
+  const auto check = [&](const vector<ColumnPredicate>& predicates,
+                         int64_t combination_count) {
+    ScanSpec opt_spec;
+    for (const auto& pred : predicates) {
+      opt_spec.AddPredicate(pred);
+    }
+    int64_t v1_elapsed_us = 0;
+    int64_t v2_elapsed_us = 0;
+    for (const auto& hash_dimension : partition_schema.hash_schema()) {
+      MonoTime t1 = MonoTime::Now();
+      vector<bool> v1 = 
PartitionPrunerTest::PruneHashComponent(hash_dimension, schema, opt_spec);
+      MonoTime t2 = MonoTime::Now();
+      vector<bool> v2 = PartitionPruner::PruneHashComponent(hash_dimension, 
schema, opt_spec);
+      MonoTime t3 = MonoTime::Now();
+      v1_elapsed_us += (t2 - t1).ToMicroseconds();
+      v2_elapsed_us += (t3 - t2).ToMicroseconds();
+      ASSERT_EQ(v1, v2);
+    }
+    // v2 algorithm is more efficient than v1 algorithm.
+    // The following logs are used to compare efficiency of the two algorithms.
+    // v2 (new algorithm) is quicker 100x than v1 (older one).
+    if (v2_elapsed_us != 0 && v1_elapsed_us != 0) {
+      LOG(INFO) << Substitute(
+          "combination_count: $0, old algorithm "
+          "cost: $1us, new algorithm cost: $2us, speedup: $3x",
+          combination_count,
+          v1_elapsed_us,
+          v2_elapsed_us,
+          static_cast<double>(v1_elapsed_us) / v2_elapsed_us);
+    }
+  };
+
+  constexpr const int kTotalCount = 10;
+  for (int index = 0; index < kTotalCount; index++) {
+    vector<int32_t> temp_values;
+    // Increase the vector's capacity to kMaxInListLength * kKeyColumnSize to 
avoid reallocation
+    // that invalidates some references to the elements.
+    temp_values.reserve(kMaxInListLength * kKeyColumnSize);
+    vector<vector<const void *>> test_cases;
+    test_cases.reserve(kKeyColumnSize);
+    Random r(SeedRandom());
+    int64_t combination_count = 1;
+
+    for (int i = 0; i < kKeyColumnSize; i++) {
+      uint32_t in_list_length = r.Uniform(kMaxInListLength) + 1;
+      vector<const void *> test_case;
+      test_case.reserve(in_list_length);
+      for (int j = 0; j < in_list_length; j++) {
+        int32_t t_value = r.Next32();
+        temp_values.push_back(t_value);
+        test_case.push_back(reinterpret_cast<const 
void*>(&temp_values.back()));
+      }
+      test_cases.push_back(test_case);
+      combination_count *= in_list_length;
+    }
+
+    vector<ColumnPredicate> predicates;
+    predicates.reserve(kKeyColumnSize);
+    for (int i = 0; i < kKeyColumnSize; i++) {
+      predicates.emplace_back(ColumnPredicate::InList(schema.column(i), 
&test_cases[i]));
+    }
+
+    check(predicates, combination_count);
+  }
+}
+
 TEST_F(PartitionPrunerTest, TestPruning) {
   // CREATE TABLE timeseries
   // (host STRING, metric STRING, time UNIXTIME_MICROS, value DOUBLE)
diff --git a/src/kudu/common/partition_pruner.cc 
b/src/kudu/common/partition_pruner.cc
index 3b228926b..7d954be8e 100644
--- a/src/kudu/common/partition_pruner.cc
+++ b/src/kudu/common/partition_pruner.cc
@@ -183,13 +183,11 @@ vector<bool> PartitionPruner::PruneHashComponent(
     const PartitionSchema::HashDimension& hash_dimension,
     const Schema& schema,
     const ScanSpec& scan_spec) {
-  vector<bool> hash_bucket_bitset(hash_dimension.num_buckets, false);
-  vector<string> encoded_strings(1, "");
+  vector<vector<const void*>> predicate_values_list;
+  predicate_values_list.reserve(hash_dimension.column_ids.size());
   for (size_t col_offset = 0; col_offset < hash_dimension.column_ids.size(); 
++col_offset) {
-    vector<string> new_encoded_strings;
     const ColumnSchema& column = 
schema.column_by_id(hash_dimension.column_ids[col_offset]);
     const ColumnPredicate& predicate = FindOrDie(scan_spec.predicates(), 
column.name());
-    const KeyEncoder<string>& encoder = 
GetKeyEncoder<string>(column.type_info());
 
     vector<const void*> predicate_values;
     if (predicate.predicate_type() == PredicateType::Equality) {
@@ -200,25 +198,63 @@ vector<bool> PartitionPruner::PruneHashComponent(
                               predicate.raw_values().begin(),
                               predicate.raw_values().end());
     }
-    // For each of the encoded string, replicate it by the number of values in
-    // equality and in-list predicate.
-    for (const string& encoded_string : encoded_strings) {
-      for (const void* predicate_value : predicate_values) {
-        string new_encoded_string = encoded_string;
-        encoder.Encode(predicate_value,
-                       col_offset + 1 == hash_dimension.column_ids.size(),
-                       &new_encoded_string);
-        new_encoded_strings.emplace_back(new_encoded_string);
-      }
-    }
-    encoded_strings.swap(new_encoded_strings);
+    predicate_values_list.emplace_back(std::move(predicate_values));
   }
-  for (const string& encoded_string : encoded_strings) {
+
+  // A combination of the predicate values is selected to compute the hash 
buckets.
+  vector<const void*> predicate_values_selected;
+  predicate_values_selected.reserve(hash_dimension.column_ids.size());
+  // Return value for this hash_dimension, it indicates which hash buckets are 
selected.
+  vector<bool> hash_bucket_bitset(hash_dimension.num_buckets, false);
+  ComputeHashBuckets(schema,
+                     hash_dimension,
+                     predicate_values_list,
+                     &predicate_values_selected,
+                     &hash_bucket_bitset);
+  return hash_bucket_bitset;
+}
+
+void PartitionPruner::ComputeHashBuckets(const Schema& schema, // 
NOLINT(misc-no-recursion)
+                                         const PartitionSchema::HashDimension& 
hash_dimension,
+                                         const vector<vector<const void*>>& 
predicate_values_list,
+                                         vector<const void*>* 
predicate_values_selected,
+                                         vector<bool>* hash_bucket_bitset) {
+  DCHECK_NOTNULL(predicate_values_selected);
+  DCHECK_NOTNULL(hash_bucket_bitset);
+  bool all_hash_bucket_needed = true;
+  for (const auto b : *hash_bucket_bitset) {
+    all_hash_bucket_needed &= b;
+  }
+  if (all_hash_bucket_needed) {
+    return;
+  }
+  size_t level = predicate_values_selected->size();
+  DCHECK_EQ(hash_dimension.column_ids.size(), predicate_values_list.size());
+  if (level == hash_dimension.column_ids.size()) {
+    string encoded_string;
+    for (size_t col_offset = 0; col_offset < hash_dimension.column_ids.size(); 
++col_offset) {
+      const ColumnSchema& column = 
schema.column_by_id(hash_dimension.column_ids[col_offset]);
+      const KeyEncoder<string>& encoder = 
GetKeyEncoder<string>(column.type_info());
+      const void* predicate_value = (*predicate_values_selected)[col_offset];
+      encoder.Encode(predicate_value,
+                     col_offset + 1 == hash_dimension.column_ids.size(),
+                     &encoded_string);
+    }
     uint32_t hash_value = PartitionSchema::HashValueForEncodedColumns(
         encoded_string, hash_dimension);
-    hash_bucket_bitset[hash_value] = true;
+    (*hash_bucket_bitset)[hash_value] = true;
+    return;
+  }
+  DCHECK_LT(level, predicate_values_list.size());
+  for (size_t i = 0; i < predicate_values_list[level].size(); ++i) {
+    predicate_values_selected->emplace_back(predicate_values_list[level][i]);
+    ComputeHashBuckets(schema,
+                       hash_dimension,
+                       predicate_values_list,
+                       predicate_values_selected,
+                       hash_bucket_bitset);
+    predicate_values_selected->pop_back();
   }
-  return hash_bucket_bitset;
 }
 
 vector<PartitionPruner::PartitionKeyRange> 
PartitionPruner::ConstructPartitionKeyRanges(
diff --git a/src/kudu/common/partition_pruner.h 
b/src/kudu/common/partition_pruner.h
index 145854356..2537ad6d3 100644
--- a/src/kudu/common/partition_pruner.h
+++ b/src/kudu/common/partition_pruner.h
@@ -72,6 +72,7 @@ class PartitionPruner {
  private:
   friend class PartitionPrunerRangeSetTest;
   FRIEND_TEST(PartitionPrunerRangeSetTest, PrepareRangeSet);
+  FRIEND_TEST(PartitionPrunerTestWithMaxInListLength, 
TestMultiColumnInListHashPruningManyValues);
 
   struct RangeBounds {
     std::string lower;
@@ -90,6 +91,14 @@ class PartitionPruner {
       const Schema& schema,
       const ScanSpec& scan_spec);
 
+  // Pick all combinations in in-list values and compute their hash buckets,
+  // the result is stored in hash_bucket_bitset.
+  static void ComputeHashBuckets(const Schema& schema,
+                                 const PartitionSchema::HashDimension& 
hash_dimension,
+                                 const std::vector<std::vector<const void*>>& 
predicate_values_list,
+                                 std::vector<const void*>* 
predicate_values_selected,
+                                 std::vector<bool>* hash_bucket_bitset);
+
   // Given the range bounds and the hash schema, constructs a set of partition
   // key ranges.
   static std::vector<PartitionKeyRange> ConstructPartitionKeyRanges(

Reply via email to