This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 7d647e2fc0 GH-45733: [C++][Python] Add biased/unbiased toggle to skew
and kurtosis functions (#45762)
7d647e2fc0 is described below
commit 7d647e2fc06d870c1ba8980f6e444d82945e448e
Author: Raúl Cumplido <[email protected]>
AuthorDate: Tue Mar 18 12:04:53 2025 +0100
GH-45733: [C++][Python] Add biased/unbiased toggle to skew and kurtosis
functions (#45762)
### Rationale for this change
Allow unbiased computations of skew and kurtosis. Toggle variable to
compute biased/unbiased.
### What changes are included in this PR?
Add `biased` option to compute biased/unbiased versions when computing Skew
and kurtosis.
### Are these changes tested?
Yes, tests added.
### Are there any user-facing changes?
There is a new `biased` option on `SkewOptions`
* GitHub Issue: #45733
Authored-by: Raúl Cumplido <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/acero/hash_aggregate_test.cc | 22 ++++++++++------
cpp/src/arrow/compute/api_aggregate.cc | 4 ++-
cpp/src/arrow/compute/api_aggregate.h | 7 +++++-
cpp/src/arrow/compute/kernels/aggregate_test.cc | 5 ++++
cpp/src/arrow/compute/kernels/aggregate_var_std.cc | 10 +++++---
.../compute/kernels/aggregate_var_std_internal.h | 29 +++++++++++++++++-----
.../compute/kernels/hash_aggregate_numeric.cc | 27 ++++++++++++--------
python/pyarrow/_compute.pyx | 11 +++++---
python/pyarrow/includes/libarrow.pxd | 3 ++-
python/pyarrow/tests/test_compute.py | 15 +++++++++++
10 files changed, 99 insertions(+), 34 deletions(-)
diff --git a/cpp/src/arrow/acero/hash_aggregate_test.cc
b/cpp/src/arrow/acero/hash_aggregate_test.cc
index 347bb96269..1c456c2fd7 100644
--- a/cpp/src/arrow/acero/hash_aggregate_test.cc
+++ b/cpp/src/arrow/acero/hash_aggregate_test.cc
@@ -1504,11 +1504,15 @@ TEST_P(GroupBy, VarianceOptionsAndSkewOptions) {
/*ddof=*/0, /*skip_nulls=*/false, /*min_count=*/3);
auto skew_keep_nulls = std::make_shared<SkewOptions>(/*skip_nulls=*/false,
+ /*biased=*/true,
/*min_count=*/0);
- auto skew_min_count =
- std::make_shared<SkewOptions>(/*skip_nulls=*/true, /*min_count=*/3);
+ auto skew_min_count = std::make_shared<SkewOptions>(/*skip_nulls=*/true,
+ /*biased=*/true,
/*min_count=*/3);
auto skew_keep_nulls_min_count = std::make_shared<SkewOptions>(
- /*skip_nulls=*/false, /*min_count=*/3);
+ /*skip_nulls=*/false, /*biased=*/true, /*min_count=*/3);
+
+ auto skew_unbiased = std::make_shared<SkewOptions>(
+ /*skip_nulls=*/false, /*biased=*/false, /*min_count=*/0);
for (std::string value_column : {"argument", "argument1"}) {
for (bool use_threads : {false}) {
@@ -1553,10 +1557,12 @@ TEST_P(GroupBy, VarianceOptionsAndSkewOptions) {
{"hash_skew", skew_keep_nulls, value_column, "hash_skew"},
{"hash_skew", skew_min_count, value_column, "hash_skew"},
{"hash_skew", skew_keep_nulls_min_count, value_column,
"hash_skew"},
+ {"hash_skew", skew_unbiased, value_column, "hash_skew"},
{"hash_kurtosis", skew_keep_nulls, value_column,
"hash_kurtosis"},
{"hash_kurtosis", skew_min_count, value_column,
"hash_kurtosis"},
{"hash_kurtosis", skew_keep_nulls_min_count, value_column,
"hash_kurtosis"},
+ {"hash_kurtosis", skew_unbiased, value_column,
"hash_kurtosis"},
},
use_threads));
expected = ArrayFromJSON(struct_({
@@ -1564,15 +1570,17 @@ TEST_P(GroupBy, VarianceOptionsAndSkewOptions) {
field("hash_skew", float64()),
field("hash_skew", float64()),
field("hash_skew", float64()),
+ field("hash_skew", float64()),
+ field("hash_kurtosis", float64()),
field("hash_kurtosis", float64()),
field("hash_kurtosis", float64()),
field("hash_kurtosis", float64()),
}),
R"([
- [1, null, 0.707106, null, null, -1.5, null ],
- [2, 0.213833, 0.213833, 0.213833, -1.720164, -1.720164, -1.720164],
- [3, 0.0, null, null, -2.0, null, null ],
- [4, null, 0.707106, null, null, -1.5, null ]
+ [1, null, 0.707106, null, null, null, -1.5,
null, null ],
+ [2, 0.213833, 0.213833, 0.213833, 0.37037, -1.720164, -1.720164,
-1.720164, -3.90123],
+ [3, 0.0, null, null, null, -2.0, null,
null, null ],
+ [4, null, 0.707106, null, null, null, -1.5,
null, null ]
])");
ValidateOutput(actual);
AssertDatumsApproxEqual(expected, actual, /*verbose=*/true);
diff --git a/cpp/src/arrow/compute/api_aggregate.cc
b/cpp/src/arrow/compute/api_aggregate.cc
index 20d3ce2faf..4cd78a296a 100644
--- a/cpp/src/arrow/compute/api_aggregate.cc
+++ b/cpp/src/arrow/compute/api_aggregate.cc
@@ -111,6 +111,7 @@ static auto kVarianceOptionsType =
GetFunctionOptionsType<VarianceOptions>(
DataMember("min_count", &VarianceOptions::min_count));
static auto kSkewOptionsType = GetFunctionOptionsType<SkewOptions>(
DataMember("skip_nulls", &SkewOptions::skip_nulls),
+ DataMember("biased", &SkewOptions::biased),
DataMember("min_count", &SkewOptions::min_count));
static auto kQuantileOptionsType = GetFunctionOptionsType<QuantileOptions>(
DataMember("q", &QuantileOptions::q),
@@ -154,9 +155,10 @@ VarianceOptions::VarianceOptions(int ddof, bool
skip_nulls, uint32_t min_count)
min_count(min_count) {}
constexpr char VarianceOptions::kTypeName[];
-SkewOptions::SkewOptions(bool skip_nulls, uint32_t min_count)
+SkewOptions::SkewOptions(bool skip_nulls, bool biased, uint32_t min_count)
: FunctionOptions(internal::kSkewOptionsType),
skip_nulls(skip_nulls),
+ biased(biased),
min_count(min_count) {}
QuantileOptions::QuantileOptions(double q, enum Interpolation interpolation,
diff --git a/cpp/src/arrow/compute/api_aggregate.h
b/cpp/src/arrow/compute/api_aggregate.h
index 61bab4cdb8..1d9076f6ba 100644
--- a/cpp/src/arrow/compute/api_aggregate.h
+++ b/cpp/src/arrow/compute/api_aggregate.h
@@ -117,13 +117,18 @@ class ARROW_EXPORT VarianceOptions : public
FunctionOptions {
/// \brief Control Skew and Kurtosis kernel behavior
class ARROW_EXPORT SkewOptions : public FunctionOptions {
public:
- explicit SkewOptions(bool skip_nulls = true, uint32_t min_count = 0);
+ explicit SkewOptions(bool skip_nulls = true, bool biased = true,
+ uint32_t min_count = 0);
static constexpr char const kTypeName[] = "SkewOptions";
static SkewOptions Defaults() { return SkewOptions{}; }
/// If true (the default), null values are ignored. Otherwise, if any value
is null,
/// emit null.
bool skip_nulls;
+ /// If true (the default), the calculated value is biased. If false, the
calculated
+ /// value includes a correction factor to reduce bias, making it more
accurate for
+ /// small sample sizes.
+ bool biased;
/// If less than this many non-null values are observed, emit null.
uint32_t min_count;
};
diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc
b/cpp/src/arrow/compute/kernels/aggregate_test.cc
index d64c740a8e..0ed30781e7 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc
@@ -3693,6 +3693,11 @@ TEST_F(TestSkewKurtosis, Options) {
AssertSkewKurtosisInvalid(type, {"[]", "[]", "[]"}, options);
AssertSkewKurtosisAre(type, "[0, 1, null, 2]", options, 0.0, -1.5);
AssertSkewKurtosisAre(type, {"[0, 1]", "[]", "[null, 2]"}, options, 0.0,
-1.5);
+ options.biased = false;
+ AssertSkewKurtosisInvalid(type, "[0, 1]", options);
+ AssertSkewKurtosisAre(type, {"[1, 2, 3]", "[40, null]"}, options,
1.9889477403978211,
+ 3.9631931024230695);
+ options.biased = true;
options.min_count = 3;
AssertSkewKurtosisAre(type, "[0, 1, null, 2]", options, 0.0, -1.5);
AssertSkewKurtosisAre(type, {"[0, 1]", "[]", "[null, 2]"}, options, 0.0,
-1.5);
diff --git a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
index 8d2da195b0..021ca712c5 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
@@ -176,6 +176,7 @@ struct StatisticImpl : public ScalarAggregator {
: out_type(out_type),
stat_type(stat_type),
skip_nulls(options.skip_nulls),
+ biased(options.biased),
min_count(options.min_count),
ddof(0),
state(moments_level_for_statistic(stat_type), decimal_scale,
skip_nulls) {}
@@ -197,7 +198,9 @@ struct StatisticImpl : public ScalarAggregator {
Status Finalize(KernelContext*, Datum* out) override {
if (state.count() <= ddof || state.count() < min_count ||
- (!state.all_valid && !skip_nulls)) {
+ (!state.all_valid && !skip_nulls) ||
+ (stat_type == StatisticType::Skew && !biased && state.count() < 3) ||
+ (stat_type == StatisticType::Kurtosis && !biased && state.count() <
4)) {
out->value = std::make_shared<DoubleScalar>();
} else {
switch (stat_type) {
@@ -208,10 +211,10 @@ struct StatisticImpl : public ScalarAggregator {
out->value =
std::make_shared<DoubleScalar>(state.moments.Variance(ddof));
break;
case StatisticType::Skew:
- out->value = std::make_shared<DoubleScalar>(state.moments.Skew());
+ out->value =
std::make_shared<DoubleScalar>(state.moments.Skew(biased));
break;
case StatisticType::Kurtosis:
- out->value =
std::make_shared<DoubleScalar>(state.moments.Kurtosis());
+ out->value =
std::make_shared<DoubleScalar>(state.moments.Kurtosis(biased));
break;
default:
return Status::NotImplemented("Unsupported statistic type ",
@@ -224,6 +227,7 @@ struct StatisticImpl : public ScalarAggregator {
std::shared_ptr<DataType> out_type;
StatisticType stat_type;
bool skip_nulls;
+ bool biased;
uint32_t min_count;
int ddof = 0;
MomentsState<ArrowType> state;
diff --git a/cpp/src/arrow/compute/kernels/aggregate_var_std_internal.h
b/cpp/src/arrow/compute/kernels/aggregate_var_std_internal.h
index f7c35bea96..962b0db98a 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_var_std_internal.h
+++ b/cpp/src/arrow/compute/kernels/aggregate_var_std_internal.h
@@ -84,14 +84,31 @@ struct Moments {
double Stddev(int ddof) const { return sqrt(Variance(ddof)); }
- double Skew() const {
- // This may return NaN for m2 == 0 and m3 == 0, which is expected
- return sqrt(count) * m3 / sqrt(m2 * m2 * m2);
+ double Skew(bool biased = true) const {
+ double result;
+ // This may return NaN for m2 == 0 and m3 == 0, which is expected.
+ if (biased) {
+ result = sqrt(count) * m3 / sqrt(m2 * m2 * m2);
+ } else {
+ auto m2_avg = m2 / count;
+ result = sqrt(count * (count - 1)) / (count - 2) * (m3 / count) /
+ sqrt(m2_avg * m2_avg * m2_avg);
+ }
+ return result;
}
- double Kurtosis() const {
- // This may return NaN for m2 == 0 and m4 == 0, which is expected
- return count * m4 / (m2 * m2) - 3;
+ double Kurtosis(bool biased = true) const {
+ double result;
+ // This may return NaN for m2 == 0 and m4 == 0, which is expected.
+ if (biased) {
+ result = count * m4 / (m2 * m2) - 3;
+ } else {
+ auto m2_avg = m2 / count;
+ result = 1.0 / ((count - 2) * (count - 3)) *
+ (((count * count) - 1.0) * (m4 / count) / (m2_avg * m2_avg) -
+ 3 * ((count - 1) * (count - 1)));
+ }
+ return result;
}
void MergeFrom(int level, const Moments& other) { *this = Merge(level,
*this, other); }
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_numeric.cc
b/cpp/src/arrow/compute/kernels/hash_aggregate_numeric.cc
index e4c621f5b2..4a318942af 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate_numeric.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate_numeric.cc
@@ -452,35 +452,37 @@ struct GroupedStatisticImpl : public GroupedAggregator {
Status InitInternal(ExecContext* ctx, const KernelInitArgs& args,
StatisticType stat_type, const VarianceOptions& options)
{
return InitInternal(ctx, args, stat_type, options.ddof, options.skip_nulls,
- options.min_count);
+ /*biased=*/false, options.min_count);
}
// Init helper for hash_skew and hash_kurtosis
Status InitInternal(ExecContext* ctx, const KernelInitArgs& args,
StatisticType stat_type, const SkewOptions& options) {
return InitInternal(ctx, args, stat_type, /*ddof=*/0, options.skip_nulls,
- options.min_count);
+ options.biased, options.min_count);
}
Status InitInternal(ExecContext* ctx, const KernelInitArgs& args,
- StatisticType stat_type, int ddof, bool skip_nulls,
+ StatisticType stat_type, int ddof, bool skip_nulls, bool
biased,
uint32_t min_count) {
if constexpr (is_decimal_type<Type>::value) {
int32_t decimal_scale =
checked_cast<const DecimalType&>(*args.inputs[0].type).scale();
- return InitInternal(ctx, stat_type, decimal_scale, ddof, skip_nulls,
min_count);
+ return InitInternal(ctx, stat_type, decimal_scale, ddof, skip_nulls,
biased,
+ min_count);
} else {
- return InitInternal(ctx, stat_type, /*decimal_scale=*/0, ddof,
skip_nulls,
+ return InitInternal(ctx, stat_type, /*decimal_scale=*/0, ddof,
skip_nulls, biased,
min_count);
}
}
Status InitInternal(ExecContext* ctx, StatisticType stat_type, int32_t
decimal_scale,
- int ddof, bool skip_nulls, uint32_t min_count) {
+ int ddof, bool skip_nulls, bool biased, uint32_t
min_count) {
stat_type_ = stat_type;
moments_level_ = moments_level_for_statistic(stat_type_);
decimal_scale_ = decimal_scale;
skip_nulls_ = skip_nulls;
+ biased_ = biased;
min_count_ = min_count;
ddof_ = ddof;
ctx_ = ctx;
@@ -539,7 +541,7 @@ struct GroupedStatisticImpl : public GroupedAggregator {
Status ConsumeGeneric(const ExecSpan& batch) {
GroupedStatisticImpl<Type> state;
RETURN_NOT_OK(state.InitInternal(ctx_, stat_type_, decimal_scale_, ddof_,
skip_nulls_,
- min_count_));
+ biased_, min_count_));
RETURN_NOT_OK(state.Resize(num_groups_));
int64_t* counts = state.counts_.mutable_data();
double* means = state.means_.mutable_data();
@@ -612,7 +614,7 @@ struct GroupedStatisticImpl : public GroupedAggregator {
var_std.resize(num_groups_);
GroupedStatisticImpl<Type> state;
RETURN_NOT_OK(state.InitInternal(ctx_, stat_type_, decimal_scale_, ddof_,
- skip_nulls_, min_count_));
+ skip_nulls_, biased_, min_count_));
RETURN_NOT_OK(state.Resize(num_groups_));
int64_t* other_counts = state.counts_.mutable_data();
double* other_means = state.means_.mutable_data();
@@ -739,7 +741,9 @@ struct GroupedStatisticImpl : public GroupedAggregator {
const double* m3s = m3s_data();
const double* m4s = m4s_data();
for (int64_t i = 0; i < num_groups_; ++i) {
- if (counts[i] > ddof_ && counts[i] >= min_count_) {
+ if (counts[i] > ddof_ && counts[i] >= min_count_ &&
+ (stat_type_ != StatisticType::Skew || biased_ || counts[i] > 2) &&
+ (stat_type_ != StatisticType::Kurtosis || biased_ || counts[i] > 3))
{
const auto moments = Moments(counts[i], means[i], m2s[i], m3s[i],
m4s[i]);
switch (stat_type_) {
case StatisticType::Var:
@@ -749,10 +753,10 @@ struct GroupedStatisticImpl : public GroupedAggregator {
results[i] = moments.Stddev(ddof_);
break;
case StatisticType::Skew:
- results[i] = moments.Skew();
+ results[i] = moments.Skew(biased_);
break;
case StatisticType::Kurtosis:
- results[i] = moments.Kurtosis();
+ results[i] = moments.Kurtosis(biased_);
break;
default:
return Status::NotImplemented("Statistic type ",
@@ -809,6 +813,7 @@ struct GroupedStatisticImpl : public GroupedAggregator {
int moments_level_;
int32_t decimal_scale_;
bool skip_nulls_;
+ bool biased_;
uint32_t min_count_;
int ddof_;
int64_t num_groups_ = 0;
diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx
index db6cf5b45d..ed19cd71b3 100644
--- a/python/pyarrow/_compute.pyx
+++ b/python/pyarrow/_compute.pyx
@@ -1909,8 +1909,8 @@ class VarianceOptions(_VarianceOptions):
cdef class _SkewOptions(FunctionOptions):
- def _set_options(self, skip_nulls, min_count):
- self.wrapped.reset(new CSkewOptions(skip_nulls, min_count))
+ def _set_options(self, skip_nulls, biased, min_count):
+ self.wrapped.reset(new CSkewOptions(skip_nulls, biased, min_count))
class SkewOptions(_SkewOptions):
@@ -1920,11 +1920,14 @@ class SkewOptions(_SkewOptions):
Parameters
----------
{_skip_nulls_doc()}
+ biased : bool, default True
+ Whether the calculated value is biased.
+ If False, the value computed includes a correction factor to reduce
bias.
{_min_count_doc(default=0)}
"""
- def __init__(self, *, skip_nulls=True, min_count=0):
- self._set_options(skip_nulls, min_count)
+ def __init__(self, *, skip_nulls=True, biased=True, min_count=0):
+ self._set_options(skip_nulls, biased, min_count)
cdef class _SplitOptions(FunctionOptions):
diff --git a/python/pyarrow/includes/libarrow.pxd
b/python/pyarrow/includes/libarrow.pxd
index f9fa091171..00e5b9e912 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -2628,8 +2628,9 @@ cdef extern from "arrow/compute/api.h" namespace
"arrow::compute" nogil:
cdef cppclass CSkewOptions \
"arrow::compute::SkewOptions"(CFunctionOptions):
- CSkewOptions(c_bool skip_nulls, uint32_t min_count)
+ CSkewOptions(c_bool skip_nulls, c_bool biased, uint32_t min_count)
c_bool skip_nulls
+ c_bool biased
uint32_t min_count
cdef cppclass CScalarAggregateOptions \
diff --git a/python/pyarrow/tests/test_compute.py
b/python/pyarrow/tests/test_compute.py
index 73506fedfc..eaceabcb9c 100644
--- a/python/pyarrow/tests/test_compute.py
+++ b/python/pyarrow/tests/test_compute.py
@@ -457,6 +457,21 @@ def test_kurtosis():
assert pc.kurtosis(data, min_count=4).as_py() is None
[email protected]("input, expected", (
+ (
+ [1.0, 2.0, 3.0, 40.0, None],
+ {'skew': 1.988947740397821, 'kurtosis': 3.9631931024230695}
+ ),
+ ([1, 2, 40], {'skew': 1.7281098503730385, 'kurtosis': None}),
+ ([1, 40], {'skew': None, 'kurtosis': None}),
+))
+def test_unbiased_skew_and_kurtosis(input, expected):
+ arrow_skew = pc.skew(input, skip_nulls=True, biased=False)
+ arrow_kurtosis = pc.kurtosis(input, skip_nulls=True, biased=False)
+ assert arrow_skew == pa.scalar(expected['skew'], type=pa.float64())
+ assert arrow_kurtosis == pa.scalar(expected['kurtosis'], type=pa.float64())
+
+
def test_count_substring():
for (ty, offset) in [(pa.string(), pa.int32()),
(pa.large_string(), pa.int64())]: