pitrou commented on a change in pull request #9435:
URL: https://github.com/apache/arrow/pull/9435#discussion_r580320934



##########
File path: cpp/src/arrow/compute/kernels/aggregate_test.cc
##########
@@ -1533,23 +1533,67 @@ TEST_F(TestInt64QuantileKernel, Int64) {
 class TestRandomQuantileKernel : public TestPrimitiveQuantileKernel<Int32Type> 
{
  public:
   void CheckQuantiles(int64_t array_size, int64_t num_quantiles) {
-    auto rand = random::RandomArrayGenerator(0x5487658);
+    std::shared_ptr<Array> array;
+    std::vector<double> quantiles;
     // small value range to exercise input array with equal values and 
histogram approach
-    const auto array = rand.Numeric<Int32Type>(array_size, -100, 200, 0.1);
+    GenerateTestData(array_size, num_quantiles, -100, 200, &array, &quantiles);
+
+    this->AssertQuantilesAre(array, QuantileOptions{quantiles},
+                             NaiveQuantile(*array, quantiles, 
interpolations_));
+  }
 
+  void CheckTDigests(std::vector<int> chunk_sizes, int64_t num_quantiles) {
+    int total_size = 0;
+    for (int size : chunk_sizes) {
+      total_size += size;
+    }
+    std::shared_ptr<Array> array;
     std::vector<double> quantiles;
-    random_real(num_quantiles, 0x5487658, 0.0, 1.0, &quantiles);
-    // make sure to exercise 0 and 1 quantiles
-    *std::min_element(quantiles.begin(), quantiles.end()) = 0;
-    *std::max_element(quantiles.begin(), quantiles.end()) = 1;
+    GenerateTestData(total_size, num_quantiles, 100, 123456789, &array, 
&quantiles);
 
-    this->AssertQuantilesAre(array, QuantileOptions{quantiles},
-                             NaiveQuantile(*array, quantiles));
+    total_size = 0;
+    ArrayVector array_vector;
+    for (int size : chunk_sizes) {
+      array_vector.emplace_back(array->Slice(total_size, size));
+      total_size += size;
+    }
+    auto chunked = *ChunkedArray::Make(array_vector);
+
+    TDigestOptions options(quantiles);
+    ASSERT_OK_AND_ASSIGN(Datum out, TDigest(chunked, options));
+    const auto& out_array = out.make_array();
+    ASSERT_OK(out_array->ValidateFull());
+    ASSERT_EQ(out_array->length(), quantiles.size());
+    ASSERT_EQ(out_array->null_count(), 0);
+    ASSERT_EQ(out_array->type(), float64());

Review comment:
       Use `AssertTypeEquals`. The type is not guaranteed to be equal 
by-pointer.

##########
File path: cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
##########
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/api_aggregate.h"
+#include "arrow/compute/kernels/aggregate_internal.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/tdigest.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+
+namespace {
+
+using arrow::internal::TDigest;
+using arrow::internal::VisitSetBitRunsVoid;
+
+template <typename ArrowType>
+struct TDigestImpl : public ScalarAggregator {
+  using ThisType = TDigestImpl<ArrowType>;
+  using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
+  using CType = typename ArrowType::c_type;
+
+  explicit TDigestImpl(const TDigestOptions& options)
+      : q{options.q}, tdigest{new TDigest(options.delta, options.buffer_size)} 
{}
+
+  void Consume(KernelContext*, const ExecBatch& batch) override {
+    ArrayType array{batch[0].array()};
+    const ArrayData& data = *array.data();
+    const CType* values = data.GetValues<CType>(1);
+
+    if (array.length() > array.null_count()) {

Review comment:
       You could write `if (data.length > data.GetNullCount())` and avoid 
instantiating `array`.

##########
File path: cpp/src/arrow/compute/kernels/aggregate_test.cc
##########
@@ -1533,23 +1533,67 @@ TEST_F(TestInt64QuantileKernel, Int64) {
 class TestRandomQuantileKernel : public TestPrimitiveQuantileKernel<Int32Type> 
{
  public:
   void CheckQuantiles(int64_t array_size, int64_t num_quantiles) {
-    auto rand = random::RandomArrayGenerator(0x5487658);
+    std::shared_ptr<Array> array;
+    std::vector<double> quantiles;
     // small value range to exercise input array with equal values and 
histogram approach
-    const auto array = rand.Numeric<Int32Type>(array_size, -100, 200, 0.1);
+    GenerateTestData(array_size, num_quantiles, -100, 200, &array, &quantiles);
+
+    this->AssertQuantilesAre(array, QuantileOptions{quantiles},
+                             NaiveQuantile(*array, quantiles, 
interpolations_));
+  }
 
+  void CheckTDigests(std::vector<int> chunk_sizes, int64_t num_quantiles) {
+    int total_size = 0;
+    for (int size : chunk_sizes) {
+      total_size += size;
+    }
+    std::shared_ptr<Array> array;
     std::vector<double> quantiles;
-    random_real(num_quantiles, 0x5487658, 0.0, 1.0, &quantiles);
-    // make sure to exercise 0 and 1 quantiles
-    *std::min_element(quantiles.begin(), quantiles.end()) = 0;
-    *std::max_element(quantiles.begin(), quantiles.end()) = 1;
+    GenerateTestData(total_size, num_quantiles, 100, 123456789, &array, 
&quantiles);
 
-    this->AssertQuantilesAre(array, QuantileOptions{quantiles},
-                             NaiveQuantile(*array, quantiles));
+    total_size = 0;
+    ArrayVector array_vector;
+    for (int size : chunk_sizes) {
+      array_vector.emplace_back(array->Slice(total_size, size));
+      total_size += size;
+    }
+    auto chunked = *ChunkedArray::Make(array_vector);
+
+    TDigestOptions options(quantiles);
+    ASSERT_OK_AND_ASSIGN(Datum out, TDigest(chunked, options));
+    const auto& out_array = out.make_array();
+    ASSERT_OK(out_array->ValidateFull());
+    ASSERT_EQ(out_array->length(), quantiles.size());
+    ASSERT_EQ(out_array->null_count(), 0);
+    ASSERT_EQ(out_array->type(), float64());
+
+    // linear interpolated exact quantile as reference
+    std::vector<std::vector<Datum>> exact =
+        NaiveQuantile(*array, quantiles, {QuantileOptions::LINEAR});
+    const double* approx = out_array->data()->GetValues<double>(1);
+    for (size_t i = 0; i < quantiles.size(); ++i) {
+      const auto& exact_scalar =
+          std::static_pointer_cast<DoubleScalar>(exact[i][0].scalar());

Review comment:
       Use `checked_pointer_cast`?

##########
File path: cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
##########
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/api_aggregate.h"
+#include "arrow/compute/kernels/aggregate_internal.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/tdigest.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+
+namespace {
+
+using arrow::internal::TDigest;
+using arrow::internal::VisitSetBitRunsVoid;
+
+template <typename ArrowType>
+struct TDigestImpl : public ScalarAggregator {
+  using ThisType = TDigestImpl<ArrowType>;
+  using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
+  using CType = typename ArrowType::c_type;
+
+  explicit TDigestImpl(const TDigestOptions& options)
+      : q{options.q}, tdigest{new TDigest(options.delta, options.buffer_size)} 
{}
+
+  void Consume(KernelContext*, const ExecBatch& batch) override {
+    ArrayType array{batch[0].array()};
+    const ArrayData& data = *array.data();
+    const CType* values = data.GetValues<CType>(1);
+
+    if (array.length() > array.null_count()) {
+      VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+                          [&](int64_t pos, int64_t len) {
+                            for (int64_t i = 0; i < len; ++i) {
+                              this->tdigest->NanAdd(values[pos + i]);
+                            }
+                          });
+    }
+  }
+
+  void MergeFrom(KernelContext*, KernelState&& src) override {
+    auto& other = checked_cast<ThisType&>(src);
+    std::vector<std::unique_ptr<TDigest>> other_tdigest;
+    other_tdigest.push_back(std::move(other.tdigest));
+    this->tdigest->Merge(&other_tdigest);
+  }
+
+  void Finalize(KernelContext* ctx, Datum* out) override {
+    const int64_t out_length = this->tdigest->is_empty() ? 0 : this->q.size();
+    auto out_data = ArrayData::Make(float64(), out_length, 0);
+    out_data->buffers.resize(2, nullptr);
+
+    if (out_length > 0) {
+      KERNEL_ASSIGN_OR_RAISE(out_data->buffers[1], ctx,
+                             ctx->Allocate(out_length * sizeof(double)));
+      double* out_buffer = out_data->template GetMutableValues<double>(1);
+      for (int64_t i = 0; i < out_length; ++i) {
+        out_buffer[i] = this->tdigest->Quantile(this->q[i]);
+      }
+    }
+
+    *out = Datum(std::move(out_data));
+  }
+
+  const std::vector<double>& q;
+  std::unique_ptr<TDigest> tdigest;

Review comment:
       It doesn't seem to me that the `unique_ptr` is useful, you could just 
store the `TDigest` directly here.

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -60,12 +61,27 @@ class ARROW_EXPORT TDigest {
     input_.push_back(value);
   }
 
+  // skip NAN on adding
+  // TODO(yibo): store NAN as is, partition to buffer end before merging
+  template <typename T>
+  typename std::enable_if<std::is_floating_point<T>::value>::type NanAdd(T 
value) {
+    if (!std::isnan(value)) Add(value);
+  }
+
+  template <typename T>
+  typename std::enable_if<std::is_integral<T>::value>::type NanAdd(T value) {
+    Add(static_cast<double>(value));

Review comment:
       Note that `std::isnan` is defined for integrals. So you could probably 
just write:
   ```c++
     template <typename T>
     void NanAdd(T value) {
       if (!std::isnan(value)) { Add(static_cast<double>(value)); }
     }
   ```
   

##########
File path: cpp/src/arrow/compute/kernels/aggregate_benchmark.cc
##########
@@ -498,5 +498,19 @@ QUANTILE_KERNEL_BENCHMARK_WIDE(QuantileKernelInt64Wide, 
Int64Type);
 QUANTILE_KERNEL_BENCHMARK_NARROW(QuantileKernelInt64Narrow, Int64Type);
 QUANTILE_KERNEL_BENCHMARK_WIDE(QuantileKernelDouble, DoubleType);
 
+static void TDigestKernel(benchmark::State& state) {
+  TDigestOptions options;
+  RegressionArgs args(state);
+  const int64_t array_size = args.size / sizeof(double);
+  auto rand = random::RandomArrayGenerator(1926);
+  auto array = rand.Numeric<DoubleType>(array_size, 0, 1 << 24, 
args.null_proportion);
+
+  for (auto _ : state) {
+    ABORT_NOT_OK(TDigest(array, options).status());
+  }
+}
+
+BENCHMARK(TDigestKernel)->Apply(QuantileKernelBenchArgs);

Review comment:
       Call this `TDigestKernelDouble`?

##########
File path: cpp/src/arrow/compute/kernels/aggregate_test.cc
##########
@@ -1625,6 +1669,10 @@ TEST_F(TestRandomQuantileKernel, Histogram) {
   // exercise histogram approach: size >= 65536, range <= 65536
   this->CheckQuantiles(/*array_size=*/80000, /*num_quantiles=*/100);
 }
+
+TEST_F(TestRandomQuantileKernel, TDigest) {
+  this->CheckTDigests(/*chunk_sizes=*/{12345, 6789, 8765, 4321}, 
/*num_quantiles=*/100);

Review comment:
       Does it also check the case whether there are only nulls or NaNs? If 
not, can you add explicit tests?

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -60,12 +61,27 @@ class ARROW_EXPORT TDigest {
     input_.push_back(value);
   }
 
+  // skip NAN on adding
+  // TODO(yibo): store NAN as is, partition to buffer end before merging

Review comment:
       Why would it be better to partition at the end?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to