This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d647e2fc0 GH-45733: [C++][Python] Add biased/unbiased toggle to skew 
and kurtosis functions (#45762)
7d647e2fc0 is described below

commit 7d647e2fc06d870c1ba8980f6e444d82945e448e
Author: Raúl Cumplido <[email protected]>
AuthorDate: Tue Mar 18 12:04:53 2025 +0100

    GH-45733: [C++][Python] Add biased/unbiased toggle to skew and kurtosis 
functions (#45762)
    
    ### Rationale for this change
    
    Allow unbiased computations of skew and kurtosis. Toggle variable to 
compute biased/unbiased.
    
    ### What changes are included in this PR?
    
    Add `biased` option to compute biased/unbiased versions when computing Skew 
and kurtosis.
    
    ### Are these changes tested?
    
    Yes, tests added.
    
    ### Are there any user-facing changes?
    
    There is a new `biased` option on `SkewOptions`
    
    * GitHub Issue: #45733
    
    Authored-by: Raúl Cumplido <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/acero/hash_aggregate_test.cc         | 22 ++++++++++------
 cpp/src/arrow/compute/api_aggregate.cc             |  4 ++-
 cpp/src/arrow/compute/api_aggregate.h              |  7 +++++-
 cpp/src/arrow/compute/kernels/aggregate_test.cc    |  5 ++++
 cpp/src/arrow/compute/kernels/aggregate_var_std.cc | 10 +++++---
 .../compute/kernels/aggregate_var_std_internal.h   | 29 +++++++++++++++++-----
 .../compute/kernels/hash_aggregate_numeric.cc      | 27 ++++++++++++--------
 python/pyarrow/_compute.pyx                        | 11 +++++---
 python/pyarrow/includes/libarrow.pxd               |  3 ++-
 python/pyarrow/tests/test_compute.py               | 15 +++++++++++
 10 files changed, 99 insertions(+), 34 deletions(-)

diff --git a/cpp/src/arrow/acero/hash_aggregate_test.cc 
b/cpp/src/arrow/acero/hash_aggregate_test.cc
index 347bb96269..1c456c2fd7 100644
--- a/cpp/src/arrow/acero/hash_aggregate_test.cc
+++ b/cpp/src/arrow/acero/hash_aggregate_test.cc
@@ -1504,11 +1504,15 @@ TEST_P(GroupBy, VarianceOptionsAndSkewOptions) {
       /*ddof=*/0, /*skip_nulls=*/false, /*min_count=*/3);
 
   auto skew_keep_nulls = std::make_shared<SkewOptions>(/*skip_nulls=*/false,
+                                                       /*biased=*/true,
                                                        /*min_count=*/0);
-  auto skew_min_count =
-      std::make_shared<SkewOptions>(/*skip_nulls=*/true, /*min_count=*/3);
+  auto skew_min_count = std::make_shared<SkewOptions>(/*skip_nulls=*/true,
+                                                      /*biased=*/true, 
/*min_count=*/3);
   auto skew_keep_nulls_min_count = std::make_shared<SkewOptions>(
-      /*skip_nulls=*/false, /*min_count=*/3);
+      /*skip_nulls=*/false, /*biased=*/true, /*min_count=*/3);
+
+  auto skew_unbiased = std::make_shared<SkewOptions>(
+      /*skip_nulls=*/false, /*biased=*/false, /*min_count=*/0);
 
   for (std::string value_column : {"argument", "argument1"}) {
     for (bool use_threads : {false}) {
@@ -1553,10 +1557,12 @@ TEST_P(GroupBy, VarianceOptionsAndSkewOptions) {
                   {"hash_skew", skew_keep_nulls, value_column, "hash_skew"},
                   {"hash_skew", skew_min_count, value_column, "hash_skew"},
                   {"hash_skew", skew_keep_nulls_min_count, value_column, 
"hash_skew"},
+                  {"hash_skew", skew_unbiased, value_column, "hash_skew"},
                   {"hash_kurtosis", skew_keep_nulls, value_column, 
"hash_kurtosis"},
                   {"hash_kurtosis", skew_min_count, value_column, 
"hash_kurtosis"},
                   {"hash_kurtosis", skew_keep_nulls_min_count, value_column,
                    "hash_kurtosis"},
+                  {"hash_kurtosis", skew_unbiased, value_column, 
"hash_kurtosis"},
               },
               use_threads));
       expected = ArrayFromJSON(struct_({
@@ -1564,15 +1570,17 @@ TEST_P(GroupBy, VarianceOptionsAndSkewOptions) {
                                    field("hash_skew", float64()),
                                    field("hash_skew", float64()),
                                    field("hash_skew", float64()),
+                                   field("hash_skew", float64()),
+                                   field("hash_kurtosis", float64()),
                                    field("hash_kurtosis", float64()),
                                    field("hash_kurtosis", float64()),
                                    field("hash_kurtosis", float64()),
                                }),
                                R"([
-         [1, null,      0.707106,  null,     null,      -1.5,      null     ],
-         [2, 0.213833,  0.213833,  0.213833, -1.720164, -1.720164, -1.720164],
-         [3, 0.0,       null,      null,     -2.0,       null,     null     ],
-         [4, null,      0.707106,  null,     null,      -1.5,      null     ]
+         [1, null,      0.707106,  null,     null,    null,      -1.5,      
null,      null    ],
+         [2, 0.213833,  0.213833,  0.213833, 0.37037, -1.720164, -1.720164, 
-1.720164, -3.90123],
+         [3, 0.0,       null,      null,     null,    -2.0,       null,     
null,      null    ],
+         [4, null,      0.707106,  null,     null,    null,      -1.5,      
null,      null    ]
          ])");
       ValidateOutput(actual);
       AssertDatumsApproxEqual(expected, actual, /*verbose=*/true);
diff --git a/cpp/src/arrow/compute/api_aggregate.cc 
b/cpp/src/arrow/compute/api_aggregate.cc
index 20d3ce2faf..4cd78a296a 100644
--- a/cpp/src/arrow/compute/api_aggregate.cc
+++ b/cpp/src/arrow/compute/api_aggregate.cc
@@ -111,6 +111,7 @@ static auto kVarianceOptionsType = 
GetFunctionOptionsType<VarianceOptions>(
     DataMember("min_count", &VarianceOptions::min_count));
 static auto kSkewOptionsType = GetFunctionOptionsType<SkewOptions>(
     DataMember("skip_nulls", &SkewOptions::skip_nulls),
+    DataMember("biased", &SkewOptions::biased),
     DataMember("min_count", &SkewOptions::min_count));
 static auto kQuantileOptionsType = GetFunctionOptionsType<QuantileOptions>(
     DataMember("q", &QuantileOptions::q),
@@ -154,9 +155,10 @@ VarianceOptions::VarianceOptions(int ddof, bool 
skip_nulls, uint32_t min_count)
       min_count(min_count) {}
 constexpr char VarianceOptions::kTypeName[];
 
-SkewOptions::SkewOptions(bool skip_nulls, uint32_t min_count)
+SkewOptions::SkewOptions(bool skip_nulls, bool biased, uint32_t min_count)
     : FunctionOptions(internal::kSkewOptionsType),
       skip_nulls(skip_nulls),
+      biased(biased),
       min_count(min_count) {}
 
 QuantileOptions::QuantileOptions(double q, enum Interpolation interpolation,
diff --git a/cpp/src/arrow/compute/api_aggregate.h 
b/cpp/src/arrow/compute/api_aggregate.h
index 61bab4cdb8..1d9076f6ba 100644
--- a/cpp/src/arrow/compute/api_aggregate.h
+++ b/cpp/src/arrow/compute/api_aggregate.h
@@ -117,13 +117,18 @@ class ARROW_EXPORT VarianceOptions : public 
FunctionOptions {
 /// \brief Control Skew and Kurtosis kernel behavior
 class ARROW_EXPORT SkewOptions : public FunctionOptions {
  public:
-  explicit SkewOptions(bool skip_nulls = true, uint32_t min_count = 0);
+  explicit SkewOptions(bool skip_nulls = true, bool biased = true,
+                       uint32_t min_count = 0);
   static constexpr char const kTypeName[] = "SkewOptions";
   static SkewOptions Defaults() { return SkewOptions{}; }
 
   /// If true (the default), null values are ignored. Otherwise, if any value 
is null,
   /// emit null.
   bool skip_nulls;
+  /// If true (the default), the calculated value is biased. If false, the 
calculated
+  /// value includes a correction factor to reduce bias, making it more 
accurate for
+  /// small sample sizes.
+  bool biased;
   /// If less than this many non-null values are observed, emit null.
   uint32_t min_count;
 };
diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc 
b/cpp/src/arrow/compute/kernels/aggregate_test.cc
index d64c740a8e..0ed30781e7 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc
@@ -3693,6 +3693,11 @@ TEST_F(TestSkewKurtosis, Options) {
     AssertSkewKurtosisInvalid(type, {"[]", "[]", "[]"}, options);
     AssertSkewKurtosisAre(type, "[0, 1, null, 2]", options, 0.0, -1.5);
     AssertSkewKurtosisAre(type, {"[0, 1]", "[]", "[null, 2]"}, options, 0.0, 
-1.5);
+    options.biased = false;
+    AssertSkewKurtosisInvalid(type, "[0, 1]", options);
+    AssertSkewKurtosisAre(type, {"[1, 2, 3]", "[40, null]"}, options, 
1.9889477403978211,
+                          3.9631931024230695);
+    options.biased = true;
     options.min_count = 3;
     AssertSkewKurtosisAre(type, "[0, 1, null, 2]", options, 0.0, -1.5);
     AssertSkewKurtosisAre(type, {"[0, 1]", "[]", "[null, 2]"}, options, 0.0, 
-1.5);
diff --git a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc 
b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
index 8d2da195b0..021ca712c5 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
@@ -176,6 +176,7 @@ struct StatisticImpl : public ScalarAggregator {
       : out_type(out_type),
         stat_type(stat_type),
         skip_nulls(options.skip_nulls),
+        biased(options.biased),
         min_count(options.min_count),
         ddof(0),
         state(moments_level_for_statistic(stat_type), decimal_scale, 
skip_nulls) {}
@@ -197,7 +198,9 @@ struct StatisticImpl : public ScalarAggregator {
 
   Status Finalize(KernelContext*, Datum* out) override {
     if (state.count() <= ddof || state.count() < min_count ||
-        (!state.all_valid && !skip_nulls)) {
+        (!state.all_valid && !skip_nulls) ||
+        (stat_type == StatisticType::Skew && !biased && state.count() < 3) ||
+        (stat_type == StatisticType::Kurtosis && !biased && state.count() < 
4)) {
       out->value = std::make_shared<DoubleScalar>();
     } else {
       switch (stat_type) {
@@ -208,10 +211,10 @@ struct StatisticImpl : public ScalarAggregator {
           out->value = 
std::make_shared<DoubleScalar>(state.moments.Variance(ddof));
           break;
         case StatisticType::Skew:
-          out->value = std::make_shared<DoubleScalar>(state.moments.Skew());
+          out->value = 
std::make_shared<DoubleScalar>(state.moments.Skew(biased));
           break;
         case StatisticType::Kurtosis:
-          out->value = 
std::make_shared<DoubleScalar>(state.moments.Kurtosis());
+          out->value = 
std::make_shared<DoubleScalar>(state.moments.Kurtosis(biased));
           break;
         default:
           return Status::NotImplemented("Unsupported statistic type ",
@@ -224,6 +227,7 @@ struct StatisticImpl : public ScalarAggregator {
   std::shared_ptr<DataType> out_type;
   StatisticType stat_type;
   bool skip_nulls;
+  bool biased;
   uint32_t min_count;
   int ddof = 0;
   MomentsState<ArrowType> state;
diff --git a/cpp/src/arrow/compute/kernels/aggregate_var_std_internal.h 
b/cpp/src/arrow/compute/kernels/aggregate_var_std_internal.h
index f7c35bea96..962b0db98a 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_var_std_internal.h
+++ b/cpp/src/arrow/compute/kernels/aggregate_var_std_internal.h
@@ -84,14 +84,31 @@ struct Moments {
 
   double Stddev(int ddof) const { return sqrt(Variance(ddof)); }
 
-  double Skew() const {
-    // This may return NaN for m2 == 0 and m3 == 0, which is expected
-    return sqrt(count) * m3 / sqrt(m2 * m2 * m2);
+  double Skew(bool biased = true) const {
+    double result;
+    // This may return NaN for m2 == 0 and m3 == 0, which is expected.
+    if (biased) {
+      result = sqrt(count) * m3 / sqrt(m2 * m2 * m2);
+    } else {
+      auto m2_avg = m2 / count;
+      result = sqrt(count * (count - 1)) / (count - 2) * (m3 / count) /
+               sqrt(m2_avg * m2_avg * m2_avg);
+    }
+    return result;
   }
 
-  double Kurtosis() const {
-    // This may return NaN for m2 == 0 and m4 == 0, which is expected
-    return count * m4 / (m2 * m2) - 3;
+  double Kurtosis(bool biased = true) const {
+    double result;
+    // This may return NaN for m2 == 0 and m4 == 0, which is expected.
+    if (biased) {
+      result = count * m4 / (m2 * m2) - 3;
+    } else {
+      auto m2_avg = m2 / count;
+      result = 1.0 / ((count - 2) * (count - 3)) *
+               (((count * count) - 1.0) * (m4 / count) / (m2_avg * m2_avg) -
+                3 * ((count - 1) * (count - 1)));
+    }
+    return result;
   }
 
   void MergeFrom(int level, const Moments& other) { *this = Merge(level, 
*this, other); }
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_numeric.cc 
b/cpp/src/arrow/compute/kernels/hash_aggregate_numeric.cc
index e4c621f5b2..4a318942af 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate_numeric.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate_numeric.cc
@@ -452,35 +452,37 @@ struct GroupedStatisticImpl : public GroupedAggregator {
   Status InitInternal(ExecContext* ctx, const KernelInitArgs& args,
                       StatisticType stat_type, const VarianceOptions& options) 
{
     return InitInternal(ctx, args, stat_type, options.ddof, options.skip_nulls,
-                        options.min_count);
+                        /*biased=*/false, options.min_count);
   }
 
   // Init helper for hash_skew and hash_kurtosis
   Status InitInternal(ExecContext* ctx, const KernelInitArgs& args,
                       StatisticType stat_type, const SkewOptions& options) {
     return InitInternal(ctx, args, stat_type, /*ddof=*/0, options.skip_nulls,
-                        options.min_count);
+                        options.biased, options.min_count);
   }
 
   Status InitInternal(ExecContext* ctx, const KernelInitArgs& args,
-                      StatisticType stat_type, int ddof, bool skip_nulls,
+                      StatisticType stat_type, int ddof, bool skip_nulls, bool 
biased,
                       uint32_t min_count) {
     if constexpr (is_decimal_type<Type>::value) {
       int32_t decimal_scale =
           checked_cast<const DecimalType&>(*args.inputs[0].type).scale();
-      return InitInternal(ctx, stat_type, decimal_scale, ddof, skip_nulls, 
min_count);
+      return InitInternal(ctx, stat_type, decimal_scale, ddof, skip_nulls, 
biased,
+                          min_count);
     } else {
-      return InitInternal(ctx, stat_type, /*decimal_scale=*/0, ddof, 
skip_nulls,
+      return InitInternal(ctx, stat_type, /*decimal_scale=*/0, ddof, 
skip_nulls, biased,
                           min_count);
     }
   }
 
   Status InitInternal(ExecContext* ctx, StatisticType stat_type, int32_t 
decimal_scale,
-                      int ddof, bool skip_nulls, uint32_t min_count) {
+                      int ddof, bool skip_nulls, bool biased, uint32_t 
min_count) {
     stat_type_ = stat_type;
     moments_level_ = moments_level_for_statistic(stat_type_);
     decimal_scale_ = decimal_scale;
     skip_nulls_ = skip_nulls;
+    biased_ = biased;
     min_count_ = min_count;
     ddof_ = ddof;
     ctx_ = ctx;
@@ -539,7 +541,7 @@ struct GroupedStatisticImpl : public GroupedAggregator {
   Status ConsumeGeneric(const ExecSpan& batch) {
     GroupedStatisticImpl<Type> state;
     RETURN_NOT_OK(state.InitInternal(ctx_, stat_type_, decimal_scale_, ddof_, 
skip_nulls_,
-                                     min_count_));
+                                     biased_, min_count_));
     RETURN_NOT_OK(state.Resize(num_groups_));
     int64_t* counts = state.counts_.mutable_data();
     double* means = state.means_.mutable_data();
@@ -612,7 +614,7 @@ struct GroupedStatisticImpl : public GroupedAggregator {
       var_std.resize(num_groups_);
       GroupedStatisticImpl<Type> state;
       RETURN_NOT_OK(state.InitInternal(ctx_, stat_type_, decimal_scale_, ddof_,
-                                       skip_nulls_, min_count_));
+                                       skip_nulls_, biased_, min_count_));
       RETURN_NOT_OK(state.Resize(num_groups_));
       int64_t* other_counts = state.counts_.mutable_data();
       double* other_means = state.means_.mutable_data();
@@ -739,7 +741,9 @@ struct GroupedStatisticImpl : public GroupedAggregator {
     const double* m3s = m3s_data();
     const double* m4s = m4s_data();
     for (int64_t i = 0; i < num_groups_; ++i) {
-      if (counts[i] > ddof_ && counts[i] >= min_count_) {
+      if (counts[i] > ddof_ && counts[i] >= min_count_ &&
+          (stat_type_ != StatisticType::Skew || biased_ || counts[i] > 2) &&
+          (stat_type_ != StatisticType::Kurtosis || biased_ || counts[i] > 3)) 
{
         const auto moments = Moments(counts[i], means[i], m2s[i], m3s[i], 
m4s[i]);
         switch (stat_type_) {
           case StatisticType::Var:
@@ -749,10 +753,10 @@ struct GroupedStatisticImpl : public GroupedAggregator {
             results[i] = moments.Stddev(ddof_);
             break;
           case StatisticType::Skew:
-            results[i] = moments.Skew();
+            results[i] = moments.Skew(biased_);
             break;
           case StatisticType::Kurtosis:
-            results[i] = moments.Kurtosis();
+            results[i] = moments.Kurtosis(biased_);
             break;
           default:
             return Status::NotImplemented("Statistic type ",
@@ -809,6 +813,7 @@ struct GroupedStatisticImpl : public GroupedAggregator {
   int moments_level_;
   int32_t decimal_scale_;
   bool skip_nulls_;
+  bool biased_;
   uint32_t min_count_;
   int ddof_;
   int64_t num_groups_ = 0;
diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx
index db6cf5b45d..ed19cd71b3 100644
--- a/python/pyarrow/_compute.pyx
+++ b/python/pyarrow/_compute.pyx
@@ -1909,8 +1909,8 @@ class VarianceOptions(_VarianceOptions):
 
 
 cdef class _SkewOptions(FunctionOptions):
-    def _set_options(self, skip_nulls, min_count):
-        self.wrapped.reset(new CSkewOptions(skip_nulls, min_count))
+    def _set_options(self, skip_nulls, biased, min_count):
+        self.wrapped.reset(new CSkewOptions(skip_nulls, biased, min_count))
 
 
 class SkewOptions(_SkewOptions):
@@ -1920,11 +1920,14 @@ class SkewOptions(_SkewOptions):
     Parameters
     ----------
     {_skip_nulls_doc()}
+    biased : bool, default True
+        Whether the calculated value is biased.
+        If False, the value computed includes a correction factor to reduce 
bias.
     {_min_count_doc(default=0)}
     """
 
-    def __init__(self, *, skip_nulls=True, min_count=0):
-        self._set_options(skip_nulls, min_count)
+    def __init__(self, *, skip_nulls=True, biased=True, min_count=0):
+        self._set_options(skip_nulls, biased, min_count)
 
 
 cdef class _SplitOptions(FunctionOptions):
diff --git a/python/pyarrow/includes/libarrow.pxd 
b/python/pyarrow/includes/libarrow.pxd
index f9fa091171..00e5b9e912 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -2628,8 +2628,9 @@ cdef extern from "arrow/compute/api.h" namespace 
"arrow::compute" nogil:
 
     cdef cppclass CSkewOptions \
             "arrow::compute::SkewOptions"(CFunctionOptions):
-        CSkewOptions(c_bool skip_nulls, uint32_t min_count)
+        CSkewOptions(c_bool skip_nulls, c_bool biased, uint32_t min_count)
         c_bool skip_nulls
+        c_bool biased
         uint32_t min_count
 
     cdef cppclass CScalarAggregateOptions \
diff --git a/python/pyarrow/tests/test_compute.py 
b/python/pyarrow/tests/test_compute.py
index 73506fedfc..eaceabcb9c 100644
--- a/python/pyarrow/tests/test_compute.py
+++ b/python/pyarrow/tests/test_compute.py
@@ -457,6 +457,21 @@ def test_kurtosis():
     assert pc.kurtosis(data, min_count=4).as_py() is None
 
 
[email protected]("input, expected", (
+    (
+        [1.0, 2.0, 3.0, 40.0, None],
+        {'skew': 1.988947740397821, 'kurtosis': 3.9631931024230695}
+    ),
+    ([1, 2, 40], {'skew': 1.7281098503730385, 'kurtosis': None}),
+    ([1, 40], {'skew': None, 'kurtosis': None}),
+))
+def test_unbiased_skew_and_kurtosis(input, expected):
+    arrow_skew = pc.skew(input, skip_nulls=True, biased=False)
+    arrow_kurtosis = pc.kurtosis(input, skip_nulls=True, biased=False)
+    assert arrow_skew == pa.scalar(expected['skew'], type=pa.float64())
+    assert arrow_kurtosis == pa.scalar(expected['kurtosis'], type=pa.float64())
+
+
 def test_count_substring():
     for (ty, offset) in [(pa.string(), pa.int32()),
                          (pa.large_string(), pa.int64())]:

Reply via email to