This is an automated email from the ASF dual-hosted git repository.

felipecrv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new ec41209ea0 GH-37055: [C++] Optimize hash kernels for Dictionary 
ChunkedArrays (#38394)
ec41209ea0 is described below

commit ec41209ea02bdb410bc7e049cb3100afedf4ba2f
Author: Jin Shang <[email protected]>
AuthorDate: Sat Dec 23 23:50:39 2023 +0800

    GH-37055: [C++] Optimize hash kernels for Dictionary ChunkedArrays (#38394)
    
    
    
    ### Rationale for this change
    
    When merging dictionaries across chunks, the hash kernels unnecessarily 
unify the existing dictionary, dragging down the performance.
    
    ### What changes are included in this PR?
    
    Reuse the dictionary unifier across chunks.
    
    ### Are these changes tested?
    
    Yes, with a new benchmark for dictionary chunked arrays.
    
    ### Are there any user-facing changes?
    
    No.
    
    * Closes: #37055
    
    Lead-authored-by: Jin Shang <[email protected]>
    Co-authored-by: Felipe Oliveira Carvalho <[email protected]>
    Signed-off-by: Felipe Oliveira Carvalho <[email protected]>
---
 cpp/src/arrow/compute/kernels/vector_hash.cc       | 55 +++++++++++++++-------
 .../arrow/compute/kernels/vector_hash_benchmark.cc | 36 ++++++++++++++
 2 files changed, 74 insertions(+), 17 deletions(-)

diff --git a/cpp/src/arrow/compute/kernels/vector_hash.cc 
b/cpp/src/arrow/compute/kernels/vector_hash.cc
index 65e59d1a2e..800deba3a5 100644
--- a/cpp/src/arrow/compute/kernels/vector_hash.cc
+++ b/cpp/src/arrow/compute/kernels/vector_hash.cc
@@ -26,17 +26,20 @@
 #include "arrow/array/concatenate.h"
 #include "arrow/array/dict_internal.h"
 #include "arrow/array/util.h"
+#include "arrow/buffer.h"
 #include "arrow/compute/api_vector.h"
 #include "arrow/compute/cast.h"
 #include "arrow/compute/kernels/common_internal.h"
 #include "arrow/result.h"
 #include "arrow/util/hashing.h"
+#include "arrow/util/int_util.h"
 #include "arrow/util/unreachable.h"
 
 namespace arrow {
 
 using internal::DictionaryTraits;
 using internal::HashTraits;
+using internal::TransposeInts;
 
 namespace compute {
 namespace internal {
@@ -448,9 +451,9 @@ class DictionaryHashKernel : public HashKernel {
 
   Status Append(const ArraySpan& arr) override {
     auto arr_dict = arr.dictionary().ToArray();
-    if (!dictionary_) {
-      dictionary_ = arr_dict;
-    } else if (!dictionary_->Equals(*arr_dict)) {
+    if (!first_dictionary_) {
+      first_dictionary_ = arr_dict;
+    } else if (!first_dictionary_->Equals(*arr_dict)) {
       // NOTE: This approach computes a new dictionary unification per chunk.
       // This is in effect O(n*k) where n is the total chunked array length and
       // k is the number of chunks (therefore O(n**2) if chunks have a fixed 
size).
@@ -458,21 +461,23 @@ class DictionaryHashKernel : public HashKernel {
       // A better approach may be to run the kernel over each individual chunk,
       // and then hash-aggregate all results (for example sum-group-by for
       // the "value_counts" kernel).
-      auto out_dict_type = dictionary_->type();
+      if (dictionary_unifier_ == nullptr) {
+        ARROW_ASSIGN_OR_RAISE(dictionary_unifier_,
+                              
DictionaryUnifier::Make(first_dictionary_->type()));
+        RETURN_NOT_OK(dictionary_unifier_->Unify(*first_dictionary_));
+      }
+      auto out_dict_type = first_dictionary_->type();
       std::shared_ptr<Buffer> transpose_map;
-      std::shared_ptr<Array> out_dict;
-      ARROW_ASSIGN_OR_RAISE(auto unifier, 
DictionaryUnifier::Make(out_dict_type));
 
-      ARROW_CHECK_OK(unifier->Unify(*dictionary_));
-      ARROW_CHECK_OK(unifier->Unify(*arr_dict, &transpose_map));
-      ARROW_CHECK_OK(unifier->GetResult(&out_dict_type, &out_dict));
+      RETURN_NOT_OK(dictionary_unifier_->Unify(*arr_dict, &transpose_map));
 
-      dictionary_ = out_dict;
       auto transpose = reinterpret_cast<const int32_t*>(transpose_map->data());
-      auto in_dict_array = arr.ToArray();
+      auto in_array = arr.ToArray();
+      const auto& in_dict_array =
+          arrow::internal::checked_cast<const DictionaryArray&>(*in_array);
       ARROW_ASSIGN_OR_RAISE(
-          auto tmp, arrow::internal::checked_cast<const 
DictionaryArray&>(*in_dict_array)
-                        .Transpose(arr.type->GetSharedPtr(), out_dict, 
transpose));
+          auto tmp, in_dict_array.Transpose(arr.type->GetSharedPtr(),
+                                            in_dict_array.dictionary(), 
transpose));
       return indices_kernel_->Append(*tmp->data());
     }
 
@@ -495,12 +500,27 @@ class DictionaryHashKernel : public HashKernel {
     return dictionary_value_type_;
   }
 
-  std::shared_ptr<Array> dictionary() const { return dictionary_; }
+  /// This can't be called more than once because 
DictionaryUnifier::GetResult()
+  /// can't be called more than once and produce the same output.
+  Result<std::shared_ptr<Array>> dictionary() const {
+    if (!first_dictionary_) {  // Append was never called
+      return nullptr;
+    }
+    if (!dictionary_unifier_) {  // Append was called only once
+      return first_dictionary_;
+    }
+
+    auto out_dict_type = first_dictionary_->type();
+    std::shared_ptr<Array> out_dict;
+    RETURN_NOT_OK(dictionary_unifier_->GetResult(&out_dict_type, &out_dict));
+    return out_dict;
+  }
 
  private:
   std::unique_ptr<HashKernel> indices_kernel_;
-  std::shared_ptr<Array> dictionary_;
+  std::shared_ptr<Array> first_dictionary_;
   std::shared_ptr<DataType> dictionary_value_type_;
+  std::unique_ptr<DictionaryUnifier> dictionary_unifier_;
 };
 
 // ----------------------------------------------------------------------
@@ -630,8 +650,9 @@ Status ValueCountsFinalize(KernelContext* ctx, 
std::vector<Datum>* out) {
 // hence have no dictionary.
 Result<std::shared_ptr<ArrayData>> EnsureHashDictionary(KernelContext* ctx,
                                                         DictionaryHashKernel* 
hash) {
-  if (hash->dictionary()) {
-    return hash->dictionary()->data();
+  ARROW_ASSIGN_OR_RAISE(auto dict, hash->dictionary());
+  if (dict) {
+    return dict->data();
   }
   ARROW_ASSIGN_OR_RAISE(auto null, 
MakeArrayOfNull(hash->dictionary_value_type(),
                                                    /*length=*/0, 
ctx->memory_pool()));
diff --git a/cpp/src/arrow/compute/kernels/vector_hash_benchmark.cc 
b/cpp/src/arrow/compute/kernels/vector_hash_benchmark.cc
index e9548e133a..472f50db8c 100644
--- a/cpp/src/arrow/compute/kernels/vector_hash_benchmark.cc
+++ b/cpp/src/arrow/compute/kernels/vector_hash_benchmark.cc
@@ -25,6 +25,7 @@
 #include "arrow/testing/gtest_util.h"
 #include "arrow/testing/random.h"
 #include "arrow/testing/util.h"
+#include "arrow/util/logging.h"
 
 #include "arrow/compute/api.h"
 
@@ -226,6 +227,33 @@ static void UniqueString100bytes(benchmark::State& state) {
   BenchUnique(state, 
HashParams<StringType>{general_bench_cases[state.range(0)], 100});
 }
 
+template <typename ParamType>
+void BenchValueCountsDictionaryChunks(benchmark::State& state, const 
ParamType& params) {
+  std::shared_ptr<Array> arr;
+  params.GenerateTestData(&arr);
+  // chunk arr to 100 slices
+  std::vector<std::shared_ptr<Array>> chunks;
+  const int64_t chunk_size = arr->length() / 100;
+  for (int64_t i = 0; i < 100; ++i) {
+    auto slice = arr->Slice(i * chunk_size, chunk_size);
+    auto datum = DictionaryEncode(slice).ValueOrDie();
+    ARROW_CHECK(datum.is_array());
+    chunks.push_back(datum.make_array());
+  }
+  auto chunked_array = std::make_shared<ChunkedArray>(chunks);
+
+  while (state.KeepRunning()) {
+    ABORT_NOT_OK(ValueCounts(chunked_array).status());
+  }
+  params.SetMetadata(state);
+}
+
+static void ValueCountsDictionaryChunks(benchmark::State& state) {
+  // Dictionary of byte strings with 10 bytes each
+  BenchValueCountsDictionaryChunks(
+      state, HashParams<StringType>{general_bench_cases[state.range(0)], 10});
+}
+
 void HashSetArgs(benchmark::internal::Benchmark* bench) {
   for (int i = 0; i < static_cast<int>(general_bench_cases.size()); ++i) {
     bench->Arg(i);
@@ -239,6 +267,14 @@ BENCHMARK(UniqueInt64)->Apply(HashSetArgs);
 BENCHMARK(UniqueString10bytes)->Apply(HashSetArgs);
 BENCHMARK(UniqueString100bytes)->Apply(HashSetArgs);
 
+void DictionaryChunksHashSetArgs(benchmark::internal::Benchmark* bench) {
+  for (int i = 0; i < static_cast<int>(general_bench_cases.size()); ++i) {
+    bench->Arg(i);
+  }
+}
+
+BENCHMARK(ValueCountsDictionaryChunks)->Apply(DictionaryChunksHashSetArgs);
+
 void UInt8SetArgs(benchmark::internal::Benchmark* bench) {
   for (int i = 0; i < static_cast<int>(uint8_bench_cases.size()); ++i) {
     bench->Arg(i);

Reply via email to