This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new b41386d0e5 GH-45269: [C++][Compute] Add "pivot_wider" and
"hash_pivot_wider" functions (#45562)
b41386d0e5 is described below
commit b41386d0e520298551783a0313914b9f30e708be
Author: Antoine Pitrou <[email protected]>
AuthorDate: Tue Mar 4 17:21:38 2025 +0100
GH-45269: [C++][Compute] Add "pivot_wider" and "hash_pivot_wider" functions
(#45562)
### Rationale for this change
Add "pivot wider" functionality such as [in
Pandas](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.pivot.html),
through two dedicated functions:
1. a "pivot_wider" scalar aggregate function returning a Struct scalar
2. a "hash_pivot_wider" grouped aggregate function returning a Struct array
Both functions take two arguments (the column of pivot keys and the column
of pivot values) and require passing a `PivotWiderOptions` structure with the
expected pivot keys, so as to determine the output Struct type.
### Are these changes tested?
Yes, by dedicated unit tests.
### Are there any user-facing changes?
No, just new APIs.
* GitHub Issue: #45269
Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/CMakeLists.txt | 2 +
cpp/src/arrow/acero/groupby_aggregate_node.cc | 5 +
cpp/src/arrow/acero/hash_aggregate_test.cc | 731 ++++++++++++++++++++++-
cpp/src/arrow/compute/api_aggregate.cc | 31 +-
cpp/src/arrow/compute/api_aggregate.h | 82 +++
cpp/src/arrow/compute/kernels/aggregate_pivot.cc | 188 ++++++
cpp/src/arrow/compute/kernels/aggregate_test.cc | 289 +++++++++
cpp/src/arrow/compute/kernels/hash_aggregate.cc | 432 +++++++++++++-
cpp/src/arrow/compute/kernels/pivot_internal.cc | 127 ++++
cpp/src/arrow/compute/kernels/pivot_internal.h | 51 ++
cpp/src/arrow/compute/registry.cc | 1 +
cpp/src/arrow/compute/registry_internal.h | 1 +
cpp/src/arrow/compute/type_fwd.h | 1 +
docs/source/cpp/compute.rst | 83 +--
docs/source/python/api/compute.rst | 2 +
python/pyarrow/_compute.pyx | 44 ++
python/pyarrow/compute.py | 1 +
python/pyarrow/includes/libarrow.pxd | 10 +
python/pyarrow/tests/test_compute.py | 29 +-
19 files changed, 2063 insertions(+), 47 deletions(-)
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index b9b8785cbc..775e3633aa 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -753,10 +753,12 @@ if(ARROW_COMPUTE)
ARROW_COMPUTE_SRCS
compute/kernels/aggregate_basic.cc
compute/kernels/aggregate_mode.cc
+ compute/kernels/aggregate_pivot.cc
compute/kernels/aggregate_quantile.cc
compute/kernels/aggregate_tdigest.cc
compute/kernels/aggregate_var_std.cc
compute/kernels/hash_aggregate.cc
+ compute/kernels/pivot_internal.cc
compute/kernels/scalar_arithmetic.cc
compute/kernels/scalar_boolean.cc
compute/kernels/scalar_compare.cc
diff --git a/cpp/src/arrow/acero/groupby_aggregate_node.cc
b/cpp/src/arrow/acero/groupby_aggregate_node.cc
index 06b034ab2d..2beef360b4 100644
--- a/cpp/src/arrow/acero/groupby_aggregate_node.cc
+++ b/cpp/src/arrow/acero/groupby_aggregate_node.cc
@@ -282,6 +282,11 @@ Status GroupByNode::Merge() {
DCHECK(state0->agg_states[span_i]);
batch_ctx.SetState(state0->agg_states[span_i].get());
+ // XXX this resizes each KernelState (state0->agg_states[span_i])
multiple times.
+ // An alternative would be a two-pass algorithm:
+ // 1. Compute all transpositions (one per local state) and the final
number of
+ // groups.
+ // 2. Process all agg kernels, resizing each KernelState only once.
RETURN_NOT_OK(
agg_kernels_[span_i]->resize(&batch_ctx,
state0->grouper->num_groups()));
RETURN_NOT_OK(agg_kernels_[span_i]->merge(
diff --git a/cpp/src/arrow/acero/hash_aggregate_test.cc
b/cpp/src/arrow/acero/hash_aggregate_test.cc
index 7f4b6dd752..e50113968e 100644
--- a/cpp/src/arrow/acero/hash_aggregate_test.cc
+++ b/cpp/src/arrow/acero/hash_aggregate_test.cc
@@ -20,6 +20,7 @@
#include <algorithm>
#include <limits>
#include <memory>
+#include <random>
#include <type_traits>
#include <unordered_map>
#include <utility>
@@ -30,16 +31,14 @@
#include "arrow/acero/options.h"
#include "arrow/acero/test_util_internal.h"
#include "arrow/array.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
#include "arrow/array/concatenate.h"
#include "arrow/chunked_array.h"
#include "arrow/compute/api_aggregate.h"
-#include "arrow/compute/api_scalar.h"
-#include "arrow/compute/api_vector.h"
#include "arrow/compute/cast.h"
#include "arrow/compute/exec.h"
#include "arrow/compute/exec_internal.h"
-#include "arrow/compute/kernels/aggregate_internal.h"
-#include "arrow/compute/kernels/codegen_internal.h"
#include "arrow/compute/registry.h"
#include "arrow/compute/row/grouper.h"
#include "arrow/table.h"
@@ -50,9 +49,7 @@
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/async_generator.h"
-#include "arrow/util/bitmap_reader.h"
#include "arrow/util/checked_cast.h"
-#include "arrow/util/int_util_overflow.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/string.h"
@@ -64,7 +61,6 @@ using testing::HasSubstr;
namespace arrow {
-using internal::BitmapReader;
using internal::checked_cast;
using internal::checked_pointer_cast;
using internal::ToChars;
@@ -77,6 +73,7 @@ using compute::ExecBatchFromJSON;
using compute::ExecSpan;
using compute::FunctionOptions;
using compute::Grouper;
+using compute::PivotWiderOptions;
using compute::RowSegmenter;
using compute::ScalarAggregateOptions;
using compute::Segment;
@@ -565,6 +562,7 @@ class GroupBy : public
::testing::TestWithParam<GroupByFunction> {
return acero::GroupByTest(GetParam(), arguments, keys, aggregates,
use_threads);
}
+ // This is not named GroupByTest to avoid ambiguities between overloads
Result<Datum> AltGroupBy(const std::vector<Datum>& arguments,
const std::vector<Datum>& keys,
const std::vector<Datum>& segment_keys,
@@ -574,6 +572,70 @@ class GroupBy : public
::testing::TestWithParam<GroupByFunction> {
/*naive=*/false);
}
+ Result<Datum> RunPivot(const std::shared_ptr<DataType>& key_type,
+ const std::shared_ptr<DataType>& value_type,
+ const PivotWiderOptions& options,
+ const std::shared_ptr<Table>& table, bool use_threads
= false) {
+ Aggregate agg{"hash_pivot_wider",
std::make_shared<PivotWiderOptions>(options),
+ /*target=*/std::vector<FieldRef>{"agg_0", "agg_1"},
/*name=*/"out"};
+ ARROW_ASSIGN_OR_RAISE(
+ Datum aggregated_and_grouped,
+ AltGroupBy({table->GetColumnByName("key"),
table->GetColumnByName("value")},
+ {table->GetColumnByName("group_key")},
+ /*segment_keys=*/{}, {agg}, use_threads));
+ ValidateOutput(aggregated_and_grouped);
+ return aggregated_and_grouped;
+ }
+
+ Result<Datum> RunPivot(const std::shared_ptr<DataType>& key_type,
+ const std::shared_ptr<DataType>& value_type,
+ const PivotWiderOptions& options,
+ const std::vector<std::string>& table_json,
+ bool use_threads = false) {
+ auto table =
+ TableFromJSON(schema({field("group_key", int64()), field("key",
key_type),
+ field("value", value_type)}),
+ table_json);
+ return RunPivot(key_type, value_type, options, table, use_threads);
+ }
+
+ void CheckPivoted(const std::shared_ptr<DataType>& key_type,
+ const std::shared_ptr<DataType>& value_type,
+ const PivotWiderOptions& options, const Datum& pivoted,
+ const std::string& expected_json) {
+ FieldVector pivoted_fields;
+ for (const auto& key_name : options.key_names) {
+ pivoted_fields.push_back(field(key_name, value_type));
+ }
+ auto expected_type = struct_({
+ field("key_0", int64()),
+ field("out", struct_(std::move(pivoted_fields))),
+ });
+ auto expected = ArrayFromJSON(expected_type, expected_json);
+ AssertDatumsEqual(expected, pivoted, /*verbose=*/true);
+ }
+
+ void TestPivot(const std::shared_ptr<DataType>& key_type,
+ const std::shared_ptr<DataType>& value_type,
+ const PivotWiderOptions& options,
+ const std::vector<std::string>& table_json,
+ const std::string& expected_json, bool use_threads) {
+ ASSERT_OK_AND_ASSIGN(
+ auto pivoted, RunPivot(key_type, value_type, options, table_json,
use_threads));
+ CheckPivoted(key_type, value_type, options, pivoted, expected_json);
+ }
+
+ void TestPivot(const std::shared_ptr<DataType>& key_type,
+ const std::shared_ptr<DataType>& value_type,
+ const PivotWiderOptions& options,
+ const std::vector<std::string>& table_json,
+ const std::string& expected_json) {
+ for (bool use_threads : {false, true}) {
+ ARROW_SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+ TestPivot(key_type, value_type, options, table_json, expected_json,
use_threads);
+ }
+ }
+
void TestSegmentKey(const std::shared_ptr<Table>& table, Datum output,
const std::vector<Datum>& segment_keys) {
return acero::TestSegmentKey(GetParam(), table, output, segment_keys);
@@ -4345,6 +4407,566 @@ TEST_P(GroupBy, OnlyKeys) {
}
}
+TEST_P(GroupBy, PivotBasics) {
+ auto key_type = utf8();
+ auto value_type = float32();
+ std::vector<std::string> table_json = {R"([
+ [1, "width", 10.5],
+ [2, "width", 11.5]
+ ])",
+ R"([
+ [2, "height", 12.5]
+ ])",
+ R"([
+ [3, "width", 13.5],
+ [1, "height", 14.5]
+ ])"};
+ std::string expected_json = R"([
+ [1, {"height": 14.5, "width": 10.5} ],
+ [2, {"height": 12.5, "width": 11.5} ],
+ [3, {"height": null, "width": 13.5} ]
+ ])";
+ for (auto unexpected_key_behavior :
+ {PivotWiderOptions::kIgnore, PivotWiderOptions::kRaise}) {
+ PivotWiderOptions options(/*key_names=*/{"height", "width"},
unexpected_key_behavior);
+ TestPivot(key_type, value_type, options, table_json, expected_json);
+ }
+}
+
+TEST_P(GroupBy, PivotAllKeyTypes) {
+ auto value_type = float32();
+ std::vector<std::string> table_json = {R"([
+ [1, "width", 10.5],
+ [2, "width", 11.5]
+ ])",
+ R"([
+ [2, "height", 12.5],
+ [3, "width", 13.5],
+ [1, "height", 14.5]
+ ])"};
+ std::string expected_json = R"([
+ [1, {"height": 14.5, "width": 10.5} ],
+ [2, {"height": 12.5, "width": 11.5} ],
+ [3, {"height": null, "width": 13.5} ]
+ ])";
+ PivotWiderOptions options(/*key_names=*/{"height", "width"});
+
+ for (const auto& key_type : BaseBinaryTypes()) {
+ ARROW_SCOPED_TRACE("key_type = ", *key_type);
+ TestPivot(key_type, value_type, options, table_json, expected_json);
+ }
+}
+
+TEST_P(GroupBy, PivotNumericValues) {
+ auto key_type = utf8();
+ std::vector<std::string> table_json = {R"([
+ [1, "width", 10],
+ [2, "width", 11]
+ ])",
+ R"([
+ [2, "height", 12],
+ [3, "width", 13],
+ [1, "height", 14]
+ ])"};
+ std::string expected_json = R"([
+ [1, {"height": 14, "width": 10} ],
+ [2, {"height": 12, "width": 11} ],
+ [3, {"height": null, "width": 13} ]
+ ])";
+ PivotWiderOptions options(/*key_names=*/{"height", "width"});
+
+ for (const auto& value_type : NumericTypes()) {
+ ARROW_SCOPED_TRACE("value_type = ", *value_type);
+ TestPivot(key_type, value_type, options, table_json, expected_json);
+ }
+}
+
+TEST_P(GroupBy, PivotBinaryLikeValues) {
+ auto key_type = utf8();
+ std::vector<std::string> table_json = {R"([
+ [1, "name", "Bob"],
+ [2, "eye_color", "brown"]
+ ])",
+ R"([
+ [2, "name", "Alice"],
+ [1, "eye_color", "gray"],
+ [3, "name", "Mallaury"]
+ ])"};
+ std::string expected_json = R"([
+ [1, {"name": "Bob", "eye_color": "gray"} ],
+ [2, {"name": "Alice", "eye_color": "brown"} ],
+ [3, {"name": "Mallaury", "eye_color": null} ]
+ ])";
+ PivotWiderOptions options(/*key_names=*/{"name", "eye_color"});
+
+ for (const auto& value_type : BaseBinaryTypes()) {
+ ARROW_SCOPED_TRACE("value_type = ", *value_type);
+ TestPivot(key_type, value_type, options, table_json, expected_json);
+ }
+}
+
+TEST_P(GroupBy, PivotDecimalValues) {
+ auto key_type = utf8();
+ auto value_type = decimal128(9, 1);
+ std::vector<std::string> table_json = {R"([
+ [1, "width", "10.1"],
+ [2, "width", "11.1"]
+ ])",
+ R"([
+ [2, "height", "12.1"],
+ [3, "width", "13.1"],
+ [1, "height", "14.1"]
+ ])"};
+ std::string expected_json = R"([
+ [1, {"height": "14.1", "width": "10.1"} ],
+ [2, {"height": "12.1", "width": "11.1"} ],
+ [3, {"height": null, "width": "13.1"} ]
+ ])";
+ PivotWiderOptions options(/*key_names=*/{"height", "width"});
+ TestPivot(key_type, value_type, options, table_json, expected_json);
+}
+
+TEST_P(GroupBy, PivotStructValues) {
+ auto key_type = utf8();
+ auto value_type = struct_({{"value", float32()}});
+ std::vector<std::string> table_json = {R"([
+ [1, "width", [10.1]],
+ [2, "width", [11.1]]
+ ])",
+ R"([
+ [2, "height", [12.1]],
+ [3, "width", [13.1]],
+ [1, "height", [14.1]]
+ ])"};
+ std::string expected_json = R"([
+ [1, {"height": [14.1], "width": [10.1]} ],
+ [2, {"height": [12.1], "width": [11.1]} ],
+ [3, {"height": null, "width": [13.1]} ]
+ ])";
+ PivotWiderOptions options(/*key_names=*/{"height", "width"});
+ TestPivot(key_type, value_type, options, table_json, expected_json);
+}
+
+TEST_P(GroupBy, PivotListValues) {
+ auto key_type = utf8();
+ auto value_type = list(float32());
+ std::vector<std::string> table_json = {R"([
+ [1, "foo", [10.5, 11.5]],
+ [2, "bar", [12.5]]
+ ])",
+ R"([
+ [2, "foo", []],
+ [3, "bar", [13.5]],
+ [1, "foo", null]
+ ])"};
+ std::string expected_json = R"([
+ [1, {"foo": [10.5, 11.5], "bar": null} ],
+ [2, {"foo": [], "bar": [12.5]} ],
+ [3, {"foo": null, "bar": [13.5]} ]
+ ])";
+ PivotWiderOptions options(/*key_names=*/{"foo", "bar"});
+ TestPivot(key_type, value_type, options, table_json, expected_json);
+}
+
+TEST_P(GroupBy, PivotNullValueType) {
+ auto key_type = utf8();
+ auto value_type = null();
+ std::vector<std::string> table_json = {R"([
+ [1, "foo", null],
+ [2, "bar", null]
+ ])",
+ R"([
+ [2, "foo", null],
+ [3, "bar", null],
+ [1, "foo", null]
+ ])"};
+ std::string expected_json = R"([
+ [1, {"foo": null, "bar": null} ],
+ [2, {"foo": null, "bar": null} ],
+ [3, {"foo": null, "bar": null} ]
+ ])";
+ PivotWiderOptions options(/*key_names=*/{"foo", "bar"});
+ TestPivot(key_type, value_type, options, table_json, expected_json);
+}
+
+TEST_P(GroupBy, PivotNullValues) {
+ auto key_type = utf8();
+ auto value_type = float32();
+ std::vector<std::string> table_json = {R"([
+ [1, "width", 10.5],
+ [2, "width", null]
+ ])",
+ R"([
+ [2, "height", 12.5],
+ [2, "width", 13.5],
+ [1, "width", null],
+ [2, "height", null]
+ ])",
+ R"([
+ [1, "width", null],
+ [2, "height", null]
+ ])"};
+ std::string expected_json = R"([
+ [1, {"height": null, "width": 10.5} ],
+ [2, {"height": 12.5, "width": 13.5} ]
+ ])";
+ PivotWiderOptions options(/*key_names=*/{"height", "width"},
PivotWiderOptions::kRaise);
+ TestPivot(key_type, value_type, options, table_json, expected_json);
+}
+
+TEST_P(GroupBy, PivotScalarKey) {
+ BatchesWithSchema input;
+ std::vector<TypeHolder> types = {int32(), utf8(), float32()};
+ std::vector<ArgShape> shapes = {ArgShape::ARRAY, ArgShape::SCALAR,
ArgShape::ARRAY};
+ input.batches = {
+ ExecBatchFromJSON(types, shapes,
+ R"([
+ [1, "width", 10.5],
+ [2, "width", 11.5]
+ ])"),
+ ExecBatchFromJSON(types, shapes,
+ R"([
+ [2, "width", null]
+ ])"),
+ ExecBatchFromJSON(types, shapes,
+ R"([
+ [3, "height", null],
+ [3, "height", null]
+ ])"),
+ ExecBatchFromJSON(types, shapes,
+ R"([
+ [3, "height", 12.5],
+ [1, "height", 13.5]
+ ])"),
+ };
+ input.schema = schema({field("group_key", int32()), field("pivot_key",
utf8()),
+ field("pivot_value", float32())});
+ Datum expected = ArrayFromJSON(
+ struct_({field("group_key", int32()),
+ field("pivoted",
+ struct_({field("height", float32()), field("width",
float32())}))}),
+ R"([
+ [1, {"height": 13.5, "width": 10.5} ],
+ [2, {"height": null, "width": 11.5} ],
+ [3, {"height": 12.5, "width": null} ]
+ ])");
+ auto options = std::make_shared<PivotWiderOptions>(
+ PivotWiderOptions(/*key_names=*/{"height", "width"}));
+ Aggregate aggregate{"hash_pivot_wider", options,
+ std::vector<FieldRef>{"pivot_key", "pivot_value"},
"pivoted"};
+ for (bool use_threads : {false, true}) {
+ SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+ ASSERT_OK_AND_ASSIGN(Datum actual,
+ RunGroupBy(input, {"group_key"}, {aggregate},
use_threads));
+ ValidateOutput(actual);
+ AssertDatumsApproxEqual(expected, actual, /*verbose=*/true);
+ }
+}
+
+TEST_P(GroupBy, PivotUnusedKeyName) {
+ auto key_type = utf8();
+ auto value_type = float32();
+ std::vector<std::string> table_json = {R"([
+ [1, "width", 10.5],
+ [2, "width", 11.5]
+ ])",
+ R"([
+ [2, "height", 12.5],
+ [3, "width", 13.5],
+ [1, "height", 14.5]
+ ])"};
+ std::string expected_json = R"([
+ [1, {"height": 14.5, "depth": null, "width": 10.5} ],
+ [2, {"height": 12.5, "depth": null, "width": 11.5} ],
+ [3, {"height": null, "depth": null, "width": 13.5} ]
+ ])";
+ for (auto unexpected_key_behavior :
+ {PivotWiderOptions::kIgnore, PivotWiderOptions::kRaise}) {
+ PivotWiderOptions options(/*key_names=*/{"height", "depth", "width"},
+ unexpected_key_behavior);
+ TestPivot(key_type, value_type, options, table_json, expected_json);
+ }
+}
+
+TEST_P(GroupBy, PivotUnexpectedKeyName) {
+ auto key_type = utf8();
+ auto value_type = float32();
+ std::vector<std::string> table_json = {R"([
+ [1, "width", 10.5],
+ [2, "width", 11.5]
+ ])",
+ R"([
+ [2, "height", 12.5],
+ [3, "width", 13.5],
+ [1, "depth", 15.5],
+ [1, "height", 14.5]
+ ])"};
+ PivotWiderOptions options(/*key_names=*/{"height", "width"});
+ std::string expected_json = R"([
+ [1, {"height": 14.5, "width": 10.5} ],
+ [2, {"height": 12.5, "width": 11.5} ],
+ [3, {"height": null, "width": 13.5} ]
+ ])";
+ TestPivot(key_type, value_type, options, table_json, expected_json);
+ options.unexpected_key_behavior = PivotWiderOptions::kRaise;
+ for (bool use_threads : {false, true}) {
+ ARROW_SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+ EXPECT_RAISES_WITH_MESSAGE_THAT(
+ KeyError, HasSubstr("Unexpected pivot key: depth"),
+ RunPivot(key_type, value_type, options, table_json, use_threads));
+ }
+}
+TEST_P(GroupBy, PivotNullKeys) {
+ auto key_type = utf8();
+ auto value_type = float32();
+ std::vector<std::string> table_json = {R"([
+ [1, "width", 10.5],
+ [2, null, 11.5]
+ ])"};
+ PivotWiderOptions options(/*key_names=*/{"height", "width"});
+ for (bool use_threads : {false, true}) {
+ ARROW_SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+ EXPECT_RAISES_WITH_MESSAGE_THAT(
+ KeyError, HasSubstr("pivot key name cannot be null"),
+ RunPivot(key_type, value_type, options, table_json, use_threads));
+ }
+}
+
+TEST_P(GroupBy, PivotDuplicateKeys) {
+ auto key_type = utf8();
+ auto value_type = float32();
+ std::vector<std::string> table_json = {R"([])"};
+ PivotWiderOptions options(/*key_names=*/{"height", "width", "height"});
+ EXPECT_RAISES_WITH_MESSAGE_THAT(
+ KeyError, HasSubstr("Duplicate key name 'height' in PivotWiderOptions"),
+ RunPivot(key_type, value_type, options, table_json));
+}
+
+TEST_P(GroupBy, PivotDuplicateValues) {
+ auto key_type = utf8();
+ auto value_type = float32();
+ PivotWiderOptions options(/*key_names=*/{"height", "width"});
+
+ for (bool use_threads : {false, true}) {
+ ARROW_SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+
+ // Duplicate values in same chunk
+ std::vector<std::string> table_json = {R"([
+ [1, "width", 10.5],
+ [2, "width", 11.5],
+ [1, "width", 11.5]
+ ])"};
+ EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid,
+ HasSubstr("Encountered more than one
non-null value"),
+ RunPivot(key_type, value_type, options,
table_json));
+
+ // Duplicate values in different chunks
+ table_json = {R"([
+ [1, "width", 10.5],
+ [2, "width", 11.5]
+ ])",
+ R"([
+ [1, "width", 11.5]
+ ])"};
+ EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid,
+ HasSubstr("Encountered more than one
non-null value"),
+ RunPivot(key_type, value_type, options,
table_json));
+ }
+}
+
+TEST_P(GroupBy, PivotScalarKeyWithDuplicateValues) {
+ BatchesWithSchema input;
+ std::vector<TypeHolder> types = {int32(), utf8(), float32()};
+ std::vector<ArgShape> shapes = {ArgShape::ARRAY, ArgShape::SCALAR,
ArgShape::ARRAY};
+ input.schema = schema({field("group_key", int32()), field("pivot_key",
utf8()),
+ field("pivot_value", float32())});
+ auto options = std::make_shared<PivotWiderOptions>(
+ PivotWiderOptions(/*key_names=*/{"height", "width"}));
+ Aggregate aggregate{"hash_pivot_wider", options,
+ std::vector<FieldRef>{"pivot_key", "pivot_value"},
"pivoted"};
+
+ // Duplicate values in same chunk
+ input.batches = {
+ ExecBatchFromJSON(types, shapes,
+ R"([
+ [1, "width", 10.5],
+ [1, "width", 11.5]
+ ])"),
+ };
+ for (bool use_threads : {false, true}) {
+ SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+ EXPECT_RAISES_WITH_MESSAGE_THAT(
+ Invalid, HasSubstr("Encountered more than one non-null value"),
+ RunGroupBy(input, {"group_key"}, {aggregate}, use_threads));
+ }
+
+ // Duplicate values in different chunks
+ input.batches = {
+ ExecBatchFromJSON(types, shapes,
+ R"([
+ [1, "width", 10.5],
+ [2, "width", 11.5]
+ ])"),
+ ExecBatchFromJSON(types, shapes,
+ R"([
+ [2, "width", 12.5]
+ ])"),
+ };
+ for (bool use_threads : {false, true}) {
+ SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+ EXPECT_RAISES_WITH_MESSAGE_THAT(
+ Invalid, HasSubstr("Encountered more than one non-null value"),
+ RunGroupBy(input, {"group_key"}, {aggregate}, use_threads));
+ }
+}
+
+struct RandomPivotTestCase {
+ PivotWiderOptions options;
+ std::shared_ptr<RecordBatch> input;
+ std::shared_ptr<Array> expected_output;
+};
+
+Result<RandomPivotTestCase> MakeRandomPivot(int64_t length) {
+ constexpr double kKeyPresenceProbability = 0.8;
+ constexpr double kValueValidityProbability = 0.7;
+
+ const std::vector<std::string> key_names = {"height", "width", "depth"};
+ std::default_random_engine gen(42);
+ std::uniform_real_distribution<float> value_dist(0.0f, 1.0f);
+ std::bernoulli_distribution key_presence_dist(kKeyPresenceProbability);
+ std::bernoulli_distribution value_validity_dist(kValueValidityProbability);
+
+ Int64Builder group_key_builder;
+ StringBuilder key_builder;
+ FloatBuilder value_builder;
+ RETURN_NOT_OK(group_key_builder.Reserve(length));
+ RETURN_NOT_OK(key_builder.Reserve(length));
+ RETURN_NOT_OK(value_builder.Reserve(length));
+
+ // The last input key name will not be part of the result
+ PivotWiderOptions options(
+ std::vector<std::string>(key_names.begin(), key_names.end() - 1));
+ Int64Builder pivoted_group_builder;
+ std::vector<FloatBuilder> pivoted_value_builders(options.key_names.size());
+
+ auto finish_group = [&](int64_t group_key) -> Status {
+ // First check if *any* pivoted column was populated (otherwise there was
+ // no valid value at all in this group, and no output row should be
generated).
+ RETURN_NOT_OK(pivoted_group_builder.Append(group_key));
+ // Make sure all pivoted columns are populated and in sync with the group
key column
+ for (auto& pivoted_value_builder : pivoted_value_builders) {
+ if (pivoted_value_builder.length() < pivoted_group_builder.length()) {
+ RETURN_NOT_OK(pivoted_value_builder.AppendNull());
+ }
+ EXPECT_EQ(pivoted_value_builder.length(),
pivoted_group_builder.length());
+ }
+ return Status::OK();
+ };
+
+ int64_t group_key = 1000;
+ bool group_started = false;
+ int key_id = 0;
+ while (group_key_builder.length() < length) {
+ // For the current group_key and key_id we can either:
+ // 1. not add a row
+ // 2. add a row with a null value
+ // 3. add a row with a non-null value
+ // 3a. the row will end up in the pivoted data iff the key is part of
+ // the PivotWiderOptions.key_names
+ if (key_presence_dist(gen)) {
+ group_key_builder.UnsafeAppend(group_key);
+ group_started = true;
+ RETURN_NOT_OK(key_builder.Append(key_names[key_id]));
+ if (value_validity_dist(gen)) {
+ const auto value = value_dist(gen);
+ value_builder.UnsafeAppend(value);
+ if (key_id < static_cast<int>(pivoted_value_builders.size())) {
+ RETURN_NOT_OK(pivoted_value_builders[key_id].Append(value));
+ }
+ } else {
+ value_builder.UnsafeAppendNull();
+ }
+ }
+ if (++key_id >= static_cast<int>(key_names.size())) {
+ // We've considered all keys for this group.
+ // Emit a pivoted row only if any key was emitted in the input.
+ if (group_started) {
+ RETURN_NOT_OK(finish_group(group_key));
+ }
+ // Initiate new group
+ ++group_key;
+ group_started = false;
+ key_id = 0;
+ }
+ }
+ if (group_started) {
+ // We've started this group, finish it
+ RETURN_NOT_OK(finish_group(group_key));
+ }
+ ARROW_ASSIGN_OR_RAISE(auto group_keys, group_key_builder.Finish());
+ ARROW_ASSIGN_OR_RAISE(auto keys, key_builder.Finish());
+ ARROW_ASSIGN_OR_RAISE(auto values, value_builder.Finish());
+ auto input_schema =
+ schema({{"group_key", int64()}, {"key", utf8()}, {"value", float32()}});
+ auto input = RecordBatch::Make(input_schema, length, {group_keys, keys,
values});
+ RETURN_NOT_OK(input->Validate());
+
+ ARROW_ASSIGN_OR_RAISE(auto pivoted_groups, pivoted_group_builder.Finish());
+ ArrayVector pivoted_value_columns;
+ for (auto& pivoted_value_builder : pivoted_value_builders) {
+ ARROW_ASSIGN_OR_RAISE(pivoted_value_columns.emplace_back(),
+ pivoted_value_builder.Finish());
+ }
+ ARROW_ASSIGN_OR_RAISE(
+ auto pivoted_values,
+ StructArray::Make(std::move(pivoted_value_columns), options.key_names));
+ ARROW_ASSIGN_OR_RAISE(auto output,
+ StructArray::Make({pivoted_groups, pivoted_values},
+ std::vector<std::string>{"key_0",
"out"}));
+ RETURN_NOT_OK(output->Validate());
+
+ return RandomPivotTestCase{std::move(options), std::move(input),
std::move(output)};
+}
+
+TEST_P(GroupBy, PivotRandom) {
+ constexpr int64_t kLength = 900;
+ // Larger than 256 to exercise take-index dispatch in pivot implementation
+ constexpr int64_t kChunkLength = 300;
+ ASSERT_OK_AND_ASSIGN(auto pivot_case, MakeRandomPivot(kLength));
+
+ for (bool shuffle : {false, true}) {
+ ARROW_SCOPED_TRACE("shuffle = ", shuffle);
+ auto input = Datum(pivot_case.input);
+ if (shuffle) {
+ // Since the "value" column is random-generated, sorting on it produces
+ // a random shuffle.
+ ASSERT_OK_AND_ASSIGN(
+ auto shuffle_indices,
+ SortIndices(pivot_case.input, SortOptions({SortKey("value")})));
+ ASSERT_OK_AND_ASSIGN(input, Take(input, shuffle_indices));
+ }
+ ASSERT_EQ(input.kind(), Datum::RECORD_BATCH);
+ RecordBatchVector chunks;
+ for (int64_t start = 0; start < kLength; start += kChunkLength) {
+ const auto chunk_length = std::min(kLength - start, kChunkLength);
+ chunks.push_back(input.record_batch()->Slice(start, chunk_length));
+ }
+ ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(chunks));
+
+ for (bool use_threads : {false, true}) {
+ ARROW_SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+ ASSERT_OK_AND_ASSIGN(auto pivoted, RunPivot(utf8(), float32(),
pivot_case.options,
+ table, use_threads));
+ // XXX For some reason this works even in the shuffled case
+ // (I would expect the test to require sorting of the output).
+ // This might depend on implementation details of group id generation
+ // by the hash-aggregate logic (the pivot implementation implicitly
+ // orders the output by ascending group id).
+ AssertDatumsEqual(pivot_case.expected_output, pivoted, /*verbose=*/true);
+ }
+ }
+}
+
INSTANTIATE_TEST_SUITE_P(GroupBy, GroupBy, ::testing::Values(RunGroupByImpl));
class SegmentedScalarGroupBy : public GroupBy {};
@@ -4620,6 +5242,101 @@ TEST_P(SegmentedKeyGroupBy, MultiSegmentKeyCombined) {
TestMultiSegmentKey(GetParam(), GetMultiSegmentInputAsCombined);
}
+TEST_P(SegmentedKeyGroupBy, PivotSegmentKey) {
+ auto group_by = GetParam();
+ auto key_type = utf8();
+ auto value_type = float32();
+
+ std::vector<std::string> table_json = {R"([
+ [1, "width", 10.5],
+ [1, "height", 11.5]
+ ])",
+ R"([
+ [2, "height", 12.5],
+ [2, "width", 13.5],
+ [3, "width", 14.5]
+ ])",
+ R"([
+ [3, "width", null],
+ [4, "height", 15.5]
+ ])"};
+ std::vector<std::string> expected_json = {
+ R"([[1, {"height": 11.5, "width": 10.5}]])",
+ R"([[2, {"height": 12.5, "width": 13.5}]])",
+ R"([[3, {"height": null, "width": 14.5}]])",
+ R"([[4, {"height": 15.5, "width": null}]])",
+ };
+
+ auto table =
+ TableFromJSON(schema({field("segment_key", int64()), field("pivot_key",
key_type),
+ field("pivot_value", value_type)}),
+ table_json);
+
+ auto options = std::make_shared<PivotWiderOptions>(
+ PivotWiderOptions(/*key_names=*/{"height", "width"}));
+ Aggregate aggregate{"pivot_wider", options, std::vector<FieldRef>{"agg_0",
"agg_1"},
+ "pivoted"};
+ ASSERT_OK_AND_ASSIGN(Datum actual,
+ group_by(
+ {
+ table->GetColumnByName("pivot_key"),
+ table->GetColumnByName("pivot_value"),
+ },
+ {}, {table->GetColumnByName("segment_key")},
{aggregate},
+ /*use_threads=*/false, /*naive=*/false));
+ ValidateOutput(actual);
+ auto expected = ChunkedArrayFromJSON(
+ struct_({field("key_0", int64()),
+ field("pivoted", struct_({field("height", value_type),
+ field("width", value_type)}))}),
+ expected_json);
+ AssertDatumsEqual(expected, actual, /*verbose=*/true);
+}
+
+TEST_P(SegmentedKeyGroupBy, PivotSegmentKeyDuplicateValues) {
+ // NOTE: besides testing "pivot_wider" behavior, this test also checks that
errors
+ // produced when consuming or merging an aggregate don't corrupt
+ // execution engine internals.
+ auto group_by = GetParam();
+ auto key_type = utf8();
+ auto value_type = float32();
+ auto options = std::make_shared<PivotWiderOptions>(
+ PivotWiderOptions(/*key_names=*/{"height", "width"}));
+ auto table_schema = schema({field("segment_key", int64()),
field("pivot_key", key_type),
+ field("pivot_value", value_type)});
+
+ auto test_duplicate_values = [&](const std::vector<std::string>& table_json)
{
+ auto table = TableFromJSON(table_schema, table_json);
+ Aggregate aggregate{"pivot_wider", options, std::vector<FieldRef>{"agg_0",
"agg_1"},
+ "pivoted"};
+ EXPECT_RAISES_WITH_MESSAGE_THAT(
+ Invalid,
+ HasSubstr("Encountered more than one non-null value for the same pivot
key"),
+ group_by(
+ {
+ table->GetColumnByName("pivot_key"),
+ table->GetColumnByName("pivot_value"),
+ },
+ {}, {table->GetColumnByName("segment_key")}, {aggregate},
+ /*use_threads=*/false, /*naive=*/false));
+ };
+
+ // Duplicate values in the same chunk
+ test_duplicate_values({R"([
+ [1, "width", 10.5],
+ [2, "width", 11.5],
+ [2, "width", 12.5]
+ ])"});
+ // Duplicate values in two different chunks
+ test_duplicate_values({R"([
+ [1, "width", 10.5],
+ [2, "width", 11.5]
+ ])",
+ R"([
+ [2, "width", 12.5]
+ ])"});
+}
+
INSTANTIATE_TEST_SUITE_P(SegmentedScalarGroupBy, SegmentedScalarGroupBy,
::testing::Values(RunSegmentedGroupByImpl));
diff --git a/cpp/src/arrow/compute/api_aggregate.cc
b/cpp/src/arrow/compute/api_aggregate.cc
index 49d8709660..37119d2b67 100644
--- a/cpp/src/arrow/compute/api_aggregate.cc
+++ b/cpp/src/arrow/compute/api_aggregate.cc
@@ -24,8 +24,8 @@
#include "arrow/util/logging.h"
namespace arrow {
-
namespace internal {
+
template <>
struct EnumTraits<compute::CountOptions::CountMode>
: BasicEnumTraits<compute::CountOptions::CountMode,
compute::CountOptions::ONLY_VALID,
@@ -67,6 +67,24 @@ struct EnumTraits<compute::QuantileOptions::Interpolation>
return "<INVALID>";
}
};
+
+template <>
+struct EnumTraits<compute::PivotWiderOptions::UnexpectedKeyBehavior>
+ : BasicEnumTraits<compute::PivotWiderOptions::UnexpectedKeyBehavior,
+ compute::PivotWiderOptions::kIgnore,
+ compute::PivotWiderOptions::kRaise> {
+ static std::string name() { return
"PivotWiderOptions::UnexpectedKeyBehavior"; }
+ static std::string
value_name(compute::PivotWiderOptions::UnexpectedKeyBehavior value) {
+ switch (value) {
+ case compute::PivotWiderOptions::kIgnore:
+ return "kIgnore";
+ case compute::PivotWiderOptions::kRaise:
+ return "kRaise";
+ }
+ return "<INVALID>";
+ }
+};
+
} // namespace internal
namespace compute {
@@ -101,6 +119,9 @@ static auto kTDigestOptionsType =
GetFunctionOptionsType<TDigestOptions>(
DataMember("buffer_size", &TDigestOptions::buffer_size),
DataMember("skip_nulls", &TDigestOptions::skip_nulls),
DataMember("min_count", &TDigestOptions::min_count));
+static auto kPivotOptionsType = GetFunctionOptionsType<PivotWiderOptions>(
+ DataMember("key_names", &PivotWiderOptions::key_names),
+ DataMember("unexpected_key_behavior",
&PivotWiderOptions::unexpected_key_behavior));
static auto kIndexOptionsType =
GetFunctionOptionsType<IndexOptions>(DataMember("value",
&IndexOptions::value));
} // namespace
@@ -164,6 +185,13 @@ TDigestOptions::TDigestOptions(std::vector<double> q,
uint32_t delta,
min_count{min_count} {}
constexpr char TDigestOptions::kTypeName[];
+PivotWiderOptions::PivotWiderOptions(std::vector<std::string> key_names,
+ UnexpectedKeyBehavior
unexpected_key_behavior)
+ : FunctionOptions(internal::kPivotOptionsType),
+ key_names(std::move(key_names)),
+ unexpected_key_behavior(unexpected_key_behavior) {}
+PivotWiderOptions::PivotWiderOptions() :
FunctionOptions(internal::kPivotOptionsType) {}
+
IndexOptions::IndexOptions(std::shared_ptr<Scalar> value)
: FunctionOptions(internal::kIndexOptionsType), value{std::move(value)} {}
IndexOptions::IndexOptions() : IndexOptions(std::make_shared<NullScalar>()) {}
@@ -177,6 +205,7 @@ void RegisterAggregateOptions(FunctionRegistry* registry) {
DCHECK_OK(registry->AddFunctionOptionsType(kVarianceOptionsType));
DCHECK_OK(registry->AddFunctionOptionsType(kQuantileOptionsType));
DCHECK_OK(registry->AddFunctionOptionsType(kTDigestOptionsType));
+ DCHECK_OK(registry->AddFunctionOptionsType(kPivotOptionsType));
DCHECK_OK(registry->AddFunctionOptionsType(kIndexOptionsType));
}
} // namespace internal
diff --git a/cpp/src/arrow/compute/api_aggregate.h
b/cpp/src/arrow/compute/api_aggregate.h
index 2e5210b073..66876ee66e 100644
--- a/cpp/src/arrow/compute/api_aggregate.h
+++ b/cpp/src/arrow/compute/api_aggregate.h
@@ -175,6 +175,88 @@ class ARROW_EXPORT TDigestOptions : public FunctionOptions
{
uint32_t min_count;
};
+/// \brief Control Pivot kernel behavior
+///
+/// These options apply to the "pivot_wider" and "hash_pivot_wider" functions.
+///
+/// Constraints:
+/// - The corresponding `Aggregate::target` must have two FieldRef elements;
+/// the first one points to the pivot key column, the second points to the
+/// pivoted data column.
+/// - The pivot key column must be string-like; its values will be matched
+/// against `key_names` in order to dispatch the pivoted data into the
+/// output.
+///
+/// "pivot_wider" example
+/// ---------------------
+///
+/// Assuming the following two input columns with types utf8 and int16
(respectively):
+/// ```
+/// width | 11
+/// height | 13
+/// ```
+/// and the options `PivotWiderOptions(.key_names = {"height", "width"})`
+///
+/// then the output will be a scalar with the type
+/// `struct{"height": int16, "width": int16}`
+/// and the value `{"height": 13, "width": 11}`.
+///
+/// "hash_pivot_wider" example
+/// --------------------------
+///
+/// Assuming the following input with schema
+/// `{"group": int32, "key": utf8, "value": int16}`:
+/// ```
+/// group | key | value
+/// -----------------------------
+/// 1 | height | 11
+/// 1 | width | 12
+/// 2 | width | 13
+/// 3 | height | 14
+/// 3 | depth | 15
+/// ```
+/// and the following settings:
+/// - a hash grouping key "group"
+/// - Aggregate(
+/// .function = "hash_pivot_wider",
+/// .options = PivotWiderOptions(.key_names = {"height", "width"}),
+/// .target = {"key", "value"},
+/// .name = {"properties"})
+///
+/// then the output will have the schema
+/// `{"group": int32, "properties": struct{"height": int16, "width": int16}}`
+/// and the following value:
+/// ```
+/// group | properties
+/// | height | width
+/// -----------------------------
+/// 1 | 11 | 12
+/// 2 | null | 13
+/// 3 | 14 | null
+/// ```
+class ARROW_EXPORT PivotWiderOptions : public FunctionOptions {
+ public:
+ /// Configure the behavior of pivot keys not in `key_names`
+ enum UnexpectedKeyBehavior {
+ /// Unexpected pivot keys are ignored silently
+ kIgnore,
+ /// Unexpected pivot keys return a KeyError
+ kRaise
+ };
+
+ explicit PivotWiderOptions(std::vector<std::string> key_names,
+ UnexpectedKeyBehavior unexpected_key_behavior =
kIgnore);
+ // Default constructor for serialization
+ PivotWiderOptions();
+ static constexpr char const kTypeName[] = "PivotWiderOptions";
+ static PivotWiderOptions Defaults() { return PivotWiderOptions{}; }
+
+ /// The values expected in the pivot key column
+ std::vector<std::string> key_names;
+ /// The behavior when pivot keys not in `key_names` are encountered
+ UnexpectedKeyBehavior unexpected_key_behavior = kIgnore;
+};
+
/// \brief Control Index kernel behavior
class ARROW_EXPORT IndexOptions : public FunctionOptions {
public:
diff --git a/cpp/src/arrow/compute/kernels/aggregate_pivot.cc
b/cpp/src/arrow/compute/kernels/aggregate_pivot.cc
new file mode 100644
index 0000000000..bcc2f53ac1
--- /dev/null
+++ b/cpp/src/arrow/compute/kernels/aggregate_pivot.cc
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/api_aggregate.h"
+#include "arrow/compute/kernels/aggregate_internal.h"
+#include "arrow/compute/kernels/common_internal.h"
+#include "arrow/compute/kernels/pivot_internal.h"
+#include "arrow/scalar.h"
+#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::compute::internal {
+namespace {
+
+using arrow::internal::VisitSetBitRunsVoid;
+using arrow::util::span;
+
+struct PivotImpl : public ScalarAggregator {
+ Status Init(const PivotWiderOptions& options, const std::vector<TypeHolder>&
in_types) {
+ options_ = &options;
+ key_type_ = in_types[0].GetSharedPtr();
+ auto value_type = in_types[1].GetSharedPtr();
+ FieldVector fields;
+ fields.reserve(options_->key_names.size());
+ values_.reserve(options_->key_names.size());
+ for (const auto& key_name : options_->key_names) {
+ fields.push_back(field(key_name, value_type));
+ values_.push_back(MakeNullScalar(value_type));
+ }
+ out_type_ = struct_(std::move(fields));
+ ARROW_ASSIGN_OR_RAISE(key_mapper_, PivotWiderKeyMapper::Make(*key_type_,
options_));
+ return Status::OK();
+ }
+
+ Status Consume(KernelContext*, const ExecSpan& batch) override {
+ DCHECK_EQ(batch.num_values(), 2);
+ if (batch[0].is_array()) {
+ ARROW_ASSIGN_OR_RAISE(span<const PivotWiderKeyIndex> keys,
+ key_mapper_->MapKeys(batch[0].array));
+ if (batch[1].is_array()) {
+ // Array keys, array values
+ auto values = batch[1].array.ToArray();
+ for (int64_t i = 0; i < batch.length; ++i) {
+ PivotWiderKeyIndex key = keys[i];
+ if (key != kNullPivotKey && !values->IsNull(i)) {
+ if (ARROW_PREDICT_FALSE(values_[key]->is_valid)) {
+ return DuplicateValue();
+ }
+ ARROW_ASSIGN_OR_RAISE(values_[key], values->GetScalar(i));
+ DCHECK(values_[key]->is_valid);
+ }
+ }
+ } else {
+ // Array keys, scalar value
+ const Scalar* value = batch[1].scalar;
+ if (value->is_valid) {
+ for (int64_t i = 0; i < batch.length; ++i) {
+ PivotWiderKeyIndex key = keys[i];
+ if (key != kNullPivotKey) {
+ if (ARROW_PREDICT_FALSE(values_[key]->is_valid)) {
+ return DuplicateValue();
+ }
+ values_[key] = value->GetSharedPtr();
+ }
+ }
+ }
+ }
+ } else {
+ ARROW_ASSIGN_OR_RAISE(PivotWiderKeyIndex key,
+ key_mapper_->MapKey(*batch[0].scalar));
+ if (key != kNullPivotKey) {
+ if (batch[1].is_array()) {
+ // Scalar key, array values
+ auto values = batch[1].array.ToArray();
+ for (int64_t i = 0; i < batch.length; ++i) {
+ if (!values->IsNull(i)) {
+ if (ARROW_PREDICT_FALSE(values_[key]->is_valid)) {
+ return DuplicateValue();
+ }
+ ARROW_ASSIGN_OR_RAISE(values_[key], values->GetScalar(i));
+ DCHECK(values_[key]->is_valid);
+ }
+ }
+ } else {
+ // Scalar key, scalar value
+ const Scalar* value = batch[1].scalar;
+ if (value->is_valid) {
+ if (batch.length > 1 || values_[key]->is_valid) {
+ return DuplicateValue();
+ }
+ values_[key] = value->GetSharedPtr();
+ }
+ }
+ }
+ }
+ return Status::OK();
+ }
+
+ Status MergeFrom(KernelContext*, KernelState&& src) override {
+ const auto& other_state = checked_cast<const PivotImpl&>(src);
+ for (int64_t key = 0; key < static_cast<int64_t>(values_.size()); ++key) {
+ if (other_state.values_[key]->is_valid) {
+ if (ARROW_PREDICT_FALSE(values_[key]->is_valid)) {
+ return DuplicateValue();
+ }
+ values_[key] = other_state.values_[key];
+ }
+ }
+ return Status::OK();
+ }
+
+ Status Finalize(KernelContext* ctx, Datum* out) override {
+ *out = std::make_shared<StructScalar>(std::move(values_), out_type_);
+ return Status::OK();
+ }
+
+ Status DuplicateValue() {
+ return Status::Invalid(
+ "Encountered more than one non-null value for the same pivot key");
+ }
+
+ std::shared_ptr<DataType> out_type() const { return out_type_; }
+
+ std::shared_ptr<DataType> key_type_;
+ std::shared_ptr<DataType> out_type_;
+ const PivotWiderOptions* options_;
+ std::unique_ptr<PivotWiderKeyMapper> key_mapper_;
+ ScalarVector values_;
+};
+
+Result<std::unique_ptr<KernelState>> PivotInit(KernelContext* ctx,
+ const KernelInitArgs& args) {
+ const auto& options = checked_cast<const PivotWiderOptions&>(*args.options);
+ DCHECK_EQ(args.inputs.size(), 2);
+ DCHECK(is_base_binary_like(args.inputs[0].id()));
+ auto state = std::make_unique<PivotImpl>();
+ RETURN_NOT_OK(state->Init(options, args.inputs));
+ return state;
+}
+
+Result<TypeHolder> ResolveOutputType(KernelContext* ctx, const
std::vector<TypeHolder>&) {
+ return checked_cast<PivotImpl*>(ctx->state())->out_type();
+}
+
+const FunctionDoc pivot_doc{
+ "Pivot values according to a pivot key column",
+ ("Output is a struct with as many fields as
`PivotWiderOptions.key_names`.\n"
+ "All output struct fields have the same type as `pivot_values`.\n"
+ "Each pivot key decides in which output field the corresponding pivot
value\n"
+ "is emitted. If a pivot key doesn't appear, null is emitted.\n"
+ "If more than one non-null value is encountered for a given pivot key,\n"
+ "Invalid is raised.\n"
+ "Behavior of unexpected pivot keys is controlled by
`unexpected_key_behavior`\n"
+ "in PivotWiderOptions."),
+ {"pivot_keys", "pivot_values"},
+ "PivotWiderOptions"};
+
+} // namespace
+
+void RegisterScalarAggregatePivot(FunctionRegistry* registry) {
+ static auto default_pivot_options = PivotWiderOptions::Defaults();
+
+ auto func = std::make_shared<ScalarAggregateFunction>(
+ "pivot_wider", Arity::Binary(), pivot_doc, &default_pivot_options);
+
+ for (auto key_type : BaseBinaryTypes()) {
+ auto sig = KernelSignature::Make({key_type->id(), InputType::Any()},
+ OutputType(ResolveOutputType));
+ AddAggKernel(std::move(sig), PivotInit, func.get());
+ }
+ DCHECK_OK(registry->AddFunction(std::move(func)));
+}
+
+} // namespace arrow::compute::internal
diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc
b/cpp/src/arrow/compute/kernels/aggregate_test.cc
index e6ad915fd5..36bb783d35 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc
@@ -4307,5 +4307,294 @@ TEST(TestTDigestKernel, ApproximateMedian) {
}
}
+//
+// Pivot
+//
+
+class TestPivotKernel : public ::testing::Test {
+ public:
+ void AssertPivot(const Datum& keys, const Datum& values, const Scalar&
expected,
+ const PivotWiderOptions& options) {
+ SCOPED_TRACE(options.ToString());
+ ASSERT_OK_AND_ASSIGN(Datum out,
+ CallFunction("pivot_wider", {keys, values},
&options));
+ ValidateOutput(out);
+ ASSERT_TRUE(out.is_scalar());
+ AssertScalarsEqual(expected, *out.scalar(), /*verbose=*/true);
+ }
+};
+
+TEST_F(TestPivotKernel, Basics) {
+ auto key_type = utf8();
+ auto value_type = float32();
+
+ auto keys = ArrayFromJSON(key_type, R"(["width", "height"])");
+ auto values = ArrayFromJSON(value_type, "[10.5, 11.5]");
+ auto expected = ScalarFromJSON(
+ struct_({field("height", value_type), field("width", value_type)}),
"[11.5, 10.5]");
+ AssertPivot(keys, values, *expected,
+ PivotWiderOptions(/*key_names=*/{"height", "width"}));
+}
+
+TEST_F(TestPivotKernel, AllKeyTypes) {
+ for (auto key_type : BaseBinaryTypes()) {
+ auto value_type = float32();
+
+ auto keys = ArrayFromJSON(key_type, R"(["width", "height"])");
+ auto values = ArrayFromJSON(value_type, "[10.5, 11.5]");
+ auto expected =
+ ScalarFromJSON(struct_({field("height", value_type), field("width",
value_type)}),
+ "[11.5, 10.5]");
+ AssertPivot(keys, values, *expected,
+ PivotWiderOptions(/*key_names=*/{"height", "width"}));
+ }
+}
+
+TEST_F(TestPivotKernel, Numbers) {
+ auto key_type = utf8();
+ for (auto value_type : NumericTypes()) {
+ auto keys = ArrayFromJSON(key_type, R"(["width", "height"])");
+ auto values = ArrayFromJSON(value_type, "[10, 11]");
+ auto expected = ScalarFromJSON(
+ struct_({field("height", value_type), field("width", value_type)}),
"[11, 10]");
+ AssertPivot(keys, values, *expected,
+ PivotWiderOptions(/*key_names=*/{"height", "width"}));
+ }
+}
+
+TEST_F(TestPivotKernel, Binary) {
+ auto key_type = utf8();
+ for (auto value_type : BaseBinaryTypes()) {
+ auto keys = ArrayFromJSON(key_type, R"(["abc", "def"])");
+ auto values = ArrayFromJSON(value_type, R"(["foo", "bar"])");
+ auto expected =
+ ScalarFromJSON(struct_({field("abc", value_type), field("def",
value_type)}),
+ R"(["foo", "bar"])");
+ AssertPivot(keys, values, *expected,
PivotWiderOptions(/*key_names=*/{"abc", "def"}));
+ }
+}
+
+TEST_F(TestPivotKernel, NullType) {
+ auto key_type = utf8();
+ auto value_type = null();
+
+ auto keys = ArrayFromJSON(key_type, R"(["abc", "def"])");
+ auto values = ArrayFromJSON(value_type, "[null, null]");
+ auto expected = ScalarFromJSON(
+ struct_({field("abc", value_type), field("def", value_type)}), R"([null,
null])");
+ AssertPivot(keys, values, *expected, PivotWiderOptions(/*key_names=*/{"abc",
"def"}));
+}
+
+TEST_F(TestPivotKernel, NullValues) {
+ auto key_type = utf8();
+ auto value_type = float32();
+
+ auto keys = ArrayFromJSON(key_type, R"(["width", "height", "height",
"width"])");
+ auto values = ArrayFromJSON(value_type, "[null, 10.5, null, 11.5]");
+ auto expected = ScalarFromJSON(
+ struct_({field("height", value_type), field("width", value_type)}),
"[10.5, 11.5]");
+ AssertPivot(keys, values, *expected,
+ PivotWiderOptions(/*key_names=*/{"height", "width"}));
+}
+
+TEST_F(TestPivotKernel, ChunkedInput) {
+ auto key_type = utf8();
+ auto value_type = float32();
+
+ auto keys = ChunkedArrayFromJSON(key_type,
+ {R"(["width"])", R"(["height", "height",
"width"])"});
+ auto values = ChunkedArrayFromJSON(value_type, {"[null, 10.5]", "[null,
11.5]"});
+ auto expected = ScalarFromJSON(
+ struct_({field("height", value_type), field("width", value_type)}),
"[10.5, 11.5]");
+ AssertPivot(keys, values, *expected,
+ PivotWiderOptions(/*key_names=*/{"height", "width"}));
+}
+
+TEST_F(TestPivotKernel, AllInputKinds) {
+ auto key_type = utf8();
+ auto value_type = float32();
+
+ DatumVector key_args = {
+ ScalarFromJSON(key_type, R"("width")"),
+ ArrayFromJSON(key_type, R"(["width"])"),
+ ChunkedArrayFromJSON(key_type, {R"(["width"])"}),
+ };
+ DatumVector value_args = {
+ ScalarFromJSON(value_type, "11.5"),
+ ArrayFromJSON(value_type, "[11.5]"),
+ ChunkedArrayFromJSON(value_type, {"[11.5]"}),
+ };
+ auto expected = ScalarFromJSON(
+ struct_({field("height", value_type), field("width", value_type)}),
"[null, 11.5]");
+
+ for (const Datum& keys : key_args) {
+ ARROW_SCOPED_TRACE("keys = ", keys.ToString());
+ for (const Datum& values : value_args) {
+ ARROW_SCOPED_TRACE("values = ", keys.ToString());
+ AssertPivot(keys, values, *expected,
+ PivotWiderOptions(/*key_names=*/{"height", "width"}));
+ }
+ }
+}
+
+TEST_F(TestPivotKernel, ScalarKey) {
+ auto key_type = utf8();
+ auto value_type = float32();
+ auto expected_type = struct_({field("height", value_type), field("width",
value_type)});
+
+ auto keys = ScalarFromJSON(key_type, R"("width")");
+ auto values = ArrayFromJSON(value_type, "[null, 11.5, null]");
+ auto expected = ScalarFromJSON(expected_type, "[null, 11.5]");
+ AssertPivot(keys, values, *expected,
+ PivotWiderOptions(/*key_names=*/{"height", "width"}));
+}
+
+TEST_F(TestPivotKernel, ScalarValue) {
+ auto key_type = utf8();
+ auto value_type = float32();
+ auto expected_type = struct_({field("height", value_type), field("width",
value_type)});
+
+ auto keys = ArrayFromJSON(key_type, R"(["width", "height"])");
+ auto values = ScalarFromJSON(value_type, "11.5");
+ auto expected = ScalarFromJSON(expected_type, "[11.5, 11.5]");
+ AssertPivot(keys, values, *expected,
+ PivotWiderOptions(/*key_names=*/{"height", "width"}));
+}
+
+TEST_F(TestPivotKernel, EmptyInput) {
+ auto key_type = utf8();
+ auto value_type = float32();
+ auto options = PivotWiderOptions(/*key_names=*/{"height", "width"});
+ auto expected_type = struct_({field("height", value_type), field("width",
value_type)});
+ auto expected = ScalarFromJSON(expected_type, "[null, null]");
+
+ AssertPivot(ArrayFromJSON(key_type, "[]"), ArrayFromJSON(value_type, "[]"),
*expected,
+ options);
+ AssertPivot(ChunkedArrayFromJSON(key_type, {}),
ChunkedArrayFromJSON(value_type, {}),
+ *expected, options);
+}
+
+TEST_F(TestPivotKernel, MissingKey) {
+ auto key_type = utf8();
+ auto value_type = float32();
+
+ auto keys = ArrayFromJSON(key_type, R"(["width", "height"])");
+ auto values = ArrayFromJSON(value_type, "[10.5, 11.5]");
+ auto options = PivotWiderOptions(/*key_names=*/{"height", "width", "depth"});
+ auto expected =
+ ScalarFromJSON(struct_({field("height", value_type), field("width",
value_type),
+ field("depth", value_type)}),
+ "[11.5, 10.5, null]");
+ AssertPivot(keys, values, *expected, options);
+}
+
+TEST_F(TestPivotKernel, UnexpectedKey) {
+ auto key_type = utf8();
+ auto value_type = float32();
+ auto expected_type = struct_({field("height", value_type), field("width",
value_type)});
+
+ auto options = PivotWiderOptions(/*key_names=*/{"height", "width"});
+ auto options_raise =
+ PivotWiderOptions(/*key_names=*/{"height", "width"},
PivotWiderOptions::kRaise);
+
+ {
+ auto keys = ArrayFromJSON(key_type, R"(["width", "height", "depth"])");
+ auto values = ArrayFromJSON(value_type, "[10.5, 11.5, 12.5]");
+ auto expected = ScalarFromJSON(expected_type, "[11.5, 10.5]");
+ AssertPivot(keys, values, *expected, options);
+ EXPECT_RAISES_WITH_MESSAGE_THAT(
+ KeyError, ::testing::HasSubstr("Unexpected pivot key: depth"),
+ CallFunction("pivot_wider", {keys, values}, &options_raise));
+ }
+ {
+ // Scalar key
+ auto keys = ScalarFromJSON(key_type, R"("depth")");
+ auto expected = ScalarFromJSON(expected_type, "[null, null]");
+ for (const Datum& values : DatumVector{ArrayFromJSON(value_type, "[10.5]"),
+ ScalarFromJSON(value_type,
"10.5")}) {
+ AssertPivot(keys, values, *expected, options);
+ EXPECT_RAISES_WITH_MESSAGE_THAT(
+ KeyError, ::testing::HasSubstr("Unexpected pivot key: depth"),
+ CallFunction("pivot_wider", {keys, values}, &options_raise));
+ }
+ }
+ {
+ // Scalar value
+ auto values = ScalarFromJSON(value_type, "10.5");
+ auto expected = ScalarFromJSON(expected_type, "[null, null]");
+ for (const Datum& keys : DatumVector{ArrayFromJSON(key_type,
R"(["depth"])"),
+ ScalarFromJSON(key_type,
R"("depth")")}) {
+ AssertPivot(keys, values, *expected, options);
+ EXPECT_RAISES_WITH_MESSAGE_THAT(
+ KeyError, ::testing::HasSubstr("Unexpected pivot key: depth"),
+ CallFunction("pivot_wider", {keys, values}, &options_raise));
+ }
+ }
+}
+
+TEST_F(TestPivotKernel, NullKey) {
+ auto key_type = utf8();
+ auto value_type = float32();
+
+ auto keys = ArrayFromJSON(key_type, R"(["width", null])");
+ auto values = ArrayFromJSON(value_type, "[10.5, 11.5]");
+ auto options = PivotWiderOptions(/*key_names=*/{"height", "width"});
+ EXPECT_RAISES_WITH_MESSAGE_THAT(KeyError,
+ ::testing::HasSubstr("pivot key name cannot
be null"),
+ CallFunction("pivot_wider", {keys, values},
&options));
+}
+
+TEST_F(TestPivotKernel, DuplicateKeyNames) {
+ auto key_type = utf8();
+ auto value_type = float32();
+
+ auto keys = ArrayFromJSON(key_type, "[]");
+ auto values = ArrayFromJSON(value_type, "[]");
+ auto options = PivotWiderOptions(/*key_names=*/{"height", "height",
"width"});
+ EXPECT_RAISES_WITH_MESSAGE_THAT(
+ KeyError, ::testing::HasSubstr("Duplicate key name 'height' in
PivotWiderOptions"),
+ CallFunction("pivot_wider", {keys, values}, &options));
+}
+
+TEST_F(TestPivotKernel, DuplicateValues) {
+ auto key_type = utf8();
+ auto value_type = float32();
+ auto options = PivotWiderOptions(/*key_names=*/{"height", "width"});
+
+ {
+ // Duplicate values in the same chunk
+ auto keys = ArrayFromJSON(key_type, R"(["width", "height", "height"])");
+ auto values = ArrayFromJSON(value_type, "[10.5, 11.5, 12.5]");
+ EXPECT_RAISES_WITH_MESSAGE_THAT(
+ Invalid, ::testing::HasSubstr("Encountered more than one non-null
value"),
+ CallFunction("pivot_wider", {keys, values}, &options));
+ }
+ {
+ // Duplicate values in different chunks
+ auto keys =
+ ChunkedArrayFromJSON(key_type, {R"(["width", "height"])",
R"(["height"])"});
+ auto values = ChunkedArrayFromJSON(value_type, {"[10.5, 11.5]", "[12.5]"});
+ EXPECT_RAISES_WITH_MESSAGE_THAT(
+ Invalid, ::testing::HasSubstr("Encountered more than one non-null
value"),
+ CallFunction("pivot_wider", {keys, values}, &options));
+ }
+ {
+ // Duplicate values with scalar key
+ auto keys = ScalarFromJSON(key_type, R"("width")");
+ auto values = ArrayFromJSON(value_type, "[10.5, 11.5]");
+ EXPECT_RAISES_WITH_MESSAGE_THAT(
+ Invalid, ::testing::HasSubstr("Encountered more than one non-null
value"),
+ CallFunction("pivot_wider", {keys, values}, &options));
+ }
+ {
+ // Duplicate values with scalar value
+ auto keys = ArrayFromJSON(key_type, R"(["width", "height", "height"])");
+ auto values = ScalarFromJSON(value_type, "10.5");
+ EXPECT_RAISES_WITH_MESSAGE_THAT(
+ Invalid, ::testing::HasSubstr("Encountered more than one non-null
value"),
+ CallFunction("pivot_wider", {keys, values}, &options));
+ }
+}
+
} // namespace compute
} // namespace arrow
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc
b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
index 21b7bd9bf6..b52f1c8286 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
@@ -26,6 +26,7 @@
#include "arrow/array/builder_nested.h"
#include "arrow/array/builder_primitive.h"
+#include "arrow/array/concatenate.h"
#include "arrow/buffer_builder.h"
#include "arrow/compute/api_aggregate.h"
#include "arrow/compute/api_vector.h"
@@ -33,6 +34,7 @@
#include "arrow/compute/kernels/aggregate_internal.h"
#include "arrow/compute/kernels/aggregate_var_std_internal.h"
#include "arrow/compute/kernels/common_internal.h"
+#include "arrow/compute/kernels/pivot_internal.h"
#include "arrow/compute/kernels/util_internal.h"
#include "arrow/compute/row/grouper.h"
#include "arrow/compute/row/row_encoder_internal.h"
@@ -40,6 +42,7 @@
#include "arrow/stl_allocator.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/bitmap_writer.h"
#include "arrow/util/checked_cast.h"
@@ -47,6 +50,7 @@
#include "arrow/util/int128_internal.h"
#include "arrow/util/int_util_overflow.h"
#include "arrow/util/ree_util.h"
+#include "arrow/util/span.h"
#include "arrow/util/task_group.h"
#include "arrow/util/tdigest.h"
#include "arrow/util/thread_pool.h"
@@ -56,6 +60,7 @@ namespace arrow {
using internal::checked_cast;
using internal::FirstTimeBitmapWriter;
+using util::span;
namespace compute {
namespace internal {
@@ -3319,9 +3324,404 @@ struct GroupedListFactory {
HashAggregateKernel kernel;
InputType argument_type;
};
-} // namespace
-namespace {
+// ----------------------------------------------------------------------
+// Pivot implementation
+
+struct GroupedPivotAccumulator {
+ Status Init(ExecContext* ctx, std::shared_ptr<DataType> value_type,
+ const PivotWiderOptions* options) {
+ ctx_ = ctx;
+ value_type_ = std::move(value_type);
+ num_keys_ = static_cast<int>(options->key_names.size());
+ num_groups_ = 0;
+ columns_.resize(num_keys_);
+ scratch_buffer_ = BufferBuilder(ctx_->memory_pool());
+ return Status::OK();
+ }
+
+ Status Consume(span<const uint32_t> groups, span<const PivotWiderKeyIndex>
keys,
+ const ArraySpan& values) {
+ // To dispatch the values into the right (group, key) coordinates,
+ // we first compute a vector of take indices for each output column.
+ //
+ // For each index #i, we set take_indices[keys[#i]][groups[#i]] = #i.
+ // Unpopulated take_indices entries are null.
+ //
+ // For example, assuming we get:
+ // groups | keys
+ // ===================
+ // 1 | 0
+ // 3 | 1
+ // 1 | 1
+ // 0 | 1
+ //
+ // We are going to compute:
+ // - take_indices[key = 0] = [null, 0, null, null]
+ // - take_indices[key = 1] = [3, 2, null, 1]
+ //
+ // Then each output column is computed by taking the values with the
+ // respective take_indices for the column's keys.
+ //
+
+ DCHECK_EQ(groups.size(), keys.size());
+ DCHECK_EQ(groups.size(), static_cast<size_t>(values.length));
+
+ std::shared_ptr<DataType> take_index_type;
+ std::vector<std::shared_ptr<Buffer>> take_indices(num_keys_);
+ std::vector<std::shared_ptr<Buffer>> take_bitmaps(num_keys_);
+
+ // A generic lambda that computes the take indices with the desired
integer width
+ auto compute_take_indices = [&](auto typed_index) {
+ ARROW_UNUSED(typed_index);
+ using TakeIndex = std::decay_t<decltype(typed_index)>;
+ take_index_type = CTypeTraits<TakeIndex>::type_singleton();
+
+ const int64_t take_indices_size =
+ bit_util::RoundUpToMultipleOf64(num_groups_ * sizeof(TakeIndex));
+ const int64_t take_bitmap_size =
+ bit_util::RoundUpToMultipleOf64(bit_util::BytesForBits(num_groups_));
+ const int64_t total_scratch_size =
+ num_keys_ * (take_indices_size + take_bitmap_size);
+ RETURN_NOT_OK(scratch_buffer_.Resize(total_scratch_size,
/*shrink_to_fit=*/false));
+
+ // Slice the scratch space into individual buffers for each output
column's
+ // take_indices array.
+ std::vector<TakeIndex*> take_indices_data(num_keys_);
+ std::vector<uint8_t*> take_bitmap_data(num_keys_);
+ int64_t offset = 0;
+ for (int i = 0; i < num_keys_; ++i) {
+ take_indices[i] = std::make_shared<MutableBuffer>(
+ scratch_buffer_.mutable_data() + offset, take_indices_size);
+ take_indices_data[i] = take_indices[i]->mutable_data_as<TakeIndex>();
+ offset += take_indices_size;
+ take_bitmaps[i] = std::make_shared<MutableBuffer>(
+ scratch_buffer_.mutable_data() + offset, take_bitmap_size);
+ take_bitmap_data[i] = take_bitmaps[i]->mutable_data();
+ memset(take_bitmap_data[i], 0, take_bitmap_size);
+ offset += take_bitmap_size;
+ }
+ DCHECK_LE(offset, scratch_buffer_.capacity());
+
+ // Populate the take_indices for each output column
+ for (int64_t i = 0; i < values.length; ++i) {
+ const PivotWiderKeyIndex key = keys[i];
+ if (key != kNullPivotKey && !values.IsNull(i)) {
+ DCHECK_LT(static_cast<int>(key), num_keys_);
+ const uint32_t group = groups[i];
+ if (bit_util::GetBit(take_bitmap_data[key], group)) {
+ return DuplicateValue();
+ }
+ // For row #group in column #key, we are going to take the value at
index #i
+ bit_util::SetBit(take_bitmap_data[key], group);
+ take_indices_data[key][group] = static_cast<TakeIndex>(i);
+ }
+ }
+ return Status::OK();
+ };
+
+ // Call compute_take_indices with the optimal integer width
+ if (values.length <=
static_cast<int64_t>(std::numeric_limits<uint8_t>::max())) {
+ RETURN_NOT_OK(compute_take_indices(uint8_t{}));
+ } else if (values.length <=
+ static_cast<int64_t>(std::numeric_limits<uint16_t>::max())) {
+ RETURN_NOT_OK(compute_take_indices(uint16_t{}));
+ } else if (values.length <=
+ static_cast<int64_t>(std::numeric_limits<uint32_t>::max())) {
+ RETURN_NOT_OK(compute_take_indices(uint32_t{}));
+ } else {
+ RETURN_NOT_OK(compute_take_indices(uint64_t{}));
+ }
+
+ // Use take_indices to compute the output columns for this batch
+ auto values_data = values.ToArrayData();
+ ArrayVector new_columns(num_keys_);
+ TakeOptions take_options(/*boundscheck=*/false);
+ for (int i = 0; i < num_keys_; ++i) {
+ auto indices_data =
+ ArrayData::Make(take_index_type, num_groups_,
+ {std::move(take_bitmaps[i]),
std::move(take_indices[i])});
+ // If indices_data is all nulls, we can just ignore this column.
+ if (indices_data->GetNullCount() != indices_data->length) {
+ ARROW_ASSIGN_OR_RAISE(Datum grouped_column,
+ Take(values_data, indices_data, take_options,
ctx_));
+ new_columns[i] = grouped_column.make_array();
+ }
+ }
+ // Merge them with the previous columns
+ return MergeColumns(std::move(new_columns));
+ }
+
+ Status Consume(span<const uint32_t> groups, const PivotWiderKeyIndex key,
+ const ArraySpan& values) {
+ if (key == kNullPivotKey) {
+ // Nothing to update
+ return Status::OK();
+ }
+ DCHECK_LT(static_cast<int>(key), num_keys_);
+ DCHECK_EQ(groups.size(), static_cast<size_t>(values.length));
+
+ // The algorithm is simpler than in the array-taking version of Consume()
+ // below, since only the column #key needs to be updated.
+ std::shared_ptr<DataType> take_index_type;
+ std::shared_ptr<Buffer> take_indices;
+ std::shared_ptr<Buffer> take_bitmap;
+
+ // A generic lambda that computes the take indices with the desired
integer width
+ auto compute_take_indices = [&](auto typed_index) {
+ ARROW_UNUSED(typed_index);
+ using TakeIndex = std::decay_t<decltype(typed_index)>;
+ take_index_type = CTypeTraits<TakeIndex>::type_singleton();
+
+ const int64_t take_indices_size =
+ bit_util::RoundUpToMultipleOf64(num_groups_ * sizeof(TakeIndex));
+ const int64_t take_bitmap_size =
+ bit_util::RoundUpToMultipleOf64(bit_util::BytesForBits(num_groups_));
+ const int64_t total_scratch_size = take_indices_size + take_bitmap_size;
+ RETURN_NOT_OK(scratch_buffer_.Resize(total_scratch_size,
/*shrink_to_fit=*/false));
+
+ take_indices =
std::make_shared<MutableBuffer>(scratch_buffer_.mutable_data(),
+ take_indices_size);
+ take_bitmap = std::make_shared<MutableBuffer>(
+ scratch_buffer_.mutable_data() + take_indices_size,
take_bitmap_size);
+ auto take_indices_data = take_indices->mutable_data_as<TakeIndex>();
+ auto take_bitmap_data = take_bitmap->mutable_data();
+ memset(take_bitmap_data, 0, take_bitmap_size);
+
+ for (int64_t i = 0; i < values.length; ++i) {
+ const uint32_t group = groups[i];
+ if (!values.IsNull(i)) {
+ if (bit_util::GetBit(take_bitmap_data, group)) {
+ return DuplicateValue();
+ }
+ bit_util::SetBit(take_bitmap_data, group);
+ take_indices_data[group] = static_cast<TakeIndex>(i);
+ }
+ }
+ return Status::OK();
+ };
+
+ // Call compute_take_indices with the optimal integer width
+ if (values.length <=
static_cast<int64_t>(std::numeric_limits<uint8_t>::max())) {
+ RETURN_NOT_OK(compute_take_indices(uint8_t{}));
+ } else if (values.length <=
+ static_cast<int64_t>(std::numeric_limits<uint16_t>::max())) {
+ RETURN_NOT_OK(compute_take_indices(uint16_t{}));
+ } else if (values.length <=
+ static_cast<int64_t>(std::numeric_limits<uint32_t>::max())) {
+ RETURN_NOT_OK(compute_take_indices(uint32_t{}));
+ } else {
+ RETURN_NOT_OK(compute_take_indices(uint64_t{}));
+ }
+
+ // Use take_indices to update column #key
+ auto values_data = values.ToArrayData();
+ auto indices_data = ArrayData::Make(
+ take_index_type, num_groups_, {std::move(take_bitmap),
std::move(take_indices)});
+ TakeOptions take_options(/*boundscheck=*/false);
+ ARROW_ASSIGN_OR_RAISE(Datum grouped_column,
+ Take(values_data, indices_data, take_options, ctx_));
+ return MergeColumn(&columns_[key], grouped_column.make_array());
+ }
+
+ Status Resize(int64_t new_num_groups) {
+ if (new_num_groups > std::numeric_limits<int32_t>::max()) {
+ return Status::NotImplemented("Pivot with more 2**31 groups");
+ }
+ return ResizeColumns(new_num_groups);
+ }
+
+ Status Merge(GroupedPivotAccumulator&& other, const ArrayData&
group_id_mapping) {
+ // To merge `other` into `*this`, we simply merge their respective columns.
+ // However, we must first transpose `other`'s rows using
`group_id_mapping`.
+ // This is a logical "scatter" operation.
+ //
+ // Since `scatter(indices)` is implemented as
`take(inverse_permutation(indices))`,
+ // we can save time by computing `inverse_permutation(indices)` once for
all
+ // columns.
+
+ // Scatter/InversePermutation only accept signed indices. We checked
+ // in Resize() above that we were inside the limites for int32.
+ auto scatter_indices = group_id_mapping.Copy();
+ scatter_indices->type = int32();
+ std::shared_ptr<DataType> take_indices_type;
+ if (num_groups_ - 1 <= std::numeric_limits<int8_t>::max()) {
+ take_indices_type = int8();
+ } else if (num_groups_ - 1 <= std::numeric_limits<int16_t>::max()) {
+ take_indices_type = int16();
+ } else {
+ DCHECK_GE(num_groups_ - 1, std::numeric_limits<int32_t>::max());
+ take_indices_type = int32();
+ }
+ InversePermutationOptions options(/*max_index=*/num_groups_ - 1,
take_indices_type);
+ ARROW_ASSIGN_OR_RAISE(auto take_indices,
+ InversePermutation(scatter_indices, options, ctx_));
+ auto scatter_column =
+ [&](const std::shared_ptr<Array>& column) ->
Result<std::shared_ptr<Array>> {
+ TakeOptions take_options(/*boundscheck=*/false);
+ ARROW_ASSIGN_OR_RAISE(auto scattered,
+ Take(column, take_indices, take_options, ctx_));
+ return scattered.make_array();
+ };
+ return MergeColumns(std::move(other.columns_), std::move(scatter_column));
+ }
+
+ Result<ArrayVector> Finalize() {
+ // Ensure that columns are allocated even if num_groups_ == 0
+ RETURN_NOT_OK(ResizeColumns(num_groups_));
+ return std::move(columns_);
+ }
+
+ protected:
+ Status ResizeColumns(int64_t new_num_groups) {
+ if (new_num_groups == num_groups_ && num_groups_ != 0) {
+ return Status::OK();
+ }
+ ARROW_ASSIGN_OR_RAISE(
+ auto array_suffix,
+ MakeArrayOfNull(value_type_, new_num_groups - num_groups_,
ctx_->memory_pool()));
+ for (auto& column : columns_) {
+ if (num_groups_ != 0) {
+ DCHECK_NE(column, nullptr);
+ ARROW_ASSIGN_OR_RAISE(
+ column, Concatenate({std::move(column), array_suffix},
ctx_->memory_pool()));
+ } else {
+ column = array_suffix;
+ }
+ DCHECK_EQ(column->length(), new_num_groups);
+ }
+ num_groups_ = new_num_groups;
+ return Status::OK();
+ }
+
+ using ColumnTransform =
+ std::function<Result<std::shared_ptr<Array>>(const
std::shared_ptr<Array>&)>;
+
+ Status MergeColumns(ArrayVector&& other_columns,
+ const ColumnTransform& transform = {}) {
+ DCHECK_EQ(columns_.size(), other_columns.size());
+ for (int i = 0; i < num_keys_; ++i) {
+ if (other_columns[i]) {
+ RETURN_NOT_OK(MergeColumn(&columns_[i], std::move(other_columns[i]),
transform));
+ }
+ }
+ return Status::OK();
+ }
+
+ Status MergeColumn(std::shared_ptr<Array>* column, std::shared_ptr<Array>
other_column,
+ const ColumnTransform& transform = {}) {
+ if (other_column->null_count() == other_column->length()) {
+ // Avoid paying for the transform step below, since merging will be a
no-op anyway.
+ return Status::OK();
+ }
+ if (transform) {
+ ARROW_ASSIGN_OR_RAISE(other_column, transform(other_column));
+ }
+ DCHECK_EQ(num_groups_, other_column->length());
+ if (!*column) {
+ *column = other_column;
+ return Status::OK();
+ }
+ if ((*column)->null_count() == (*column)->length()) {
+ *column = other_column;
+ return Status::OK();
+ }
+ int64_t expected_non_nulls = (num_groups_ - (*column)->null_count()) +
+ (num_groups_ - other_column->null_count());
+ ARROW_ASSIGN_OR_RAISE(auto coalesced,
+ CallFunction("coalesce", {*column, other_column},
ctx_));
+ // Check that all non-null values in other_column and column were kept in
the result.
+ if (expected_non_nulls != num_groups_ - coalesced.null_count()) {
+ DCHECK_GT(expected_non_nulls, num_groups_ - coalesced.null_count());
+ return DuplicateValue();
+ }
+ *column = coalesced.make_array();
+ return Status::OK();
+ }
+
+ Status DuplicateValue() {
+ return Status::Invalid(
+ "Encountered more than one non-null value for the same grouped pivot
key");
+ }
+
+ ExecContext* ctx_;
+ std::shared_ptr<DataType> value_type_;
+ int num_keys_;
+ int64_t num_groups_;
+ ArrayVector columns_;
+ // A persistent scratch buffer to store the take indices in Consume
+ BufferBuilder scratch_buffer_;
+};
+
+struct GroupedPivotImpl : public GroupedAggregator {
+ Status Init(ExecContext* ctx, const KernelInitArgs& args) override {
+ DCHECK_EQ(args.inputs.size(), 3);
+ key_type_ = args.inputs[0].GetSharedPtr();
+ options_ = checked_cast<const PivotWiderOptions*>(args.options);
+ DCHECK_NE(options_, nullptr);
+ auto value_type = args.inputs[1].GetSharedPtr();
+ FieldVector fields;
+ fields.reserve(options_->key_names.size());
+ for (const auto& key_name : options_->key_names) {
+ fields.push_back(field(key_name, value_type));
+ }
+ out_type_ = struct_(std::move(fields));
+ out_struct_type_ = checked_cast<const StructType*>(out_type_.get());
+ ARROW_ASSIGN_OR_RAISE(key_mapper_, PivotWiderKeyMapper::Make(*key_type_,
options_));
+ RETURN_NOT_OK(accumulator_.Init(ctx, value_type, options_));
+ return Status::OK();
+ }
+
+ Status Resize(int64_t new_num_groups) override {
+ num_groups_ = new_num_groups;
+ return accumulator_.Resize(new_num_groups);
+ }
+
+ Status Merge(GroupedAggregator&& raw_other,
+ const ArrayData& group_id_mapping) override {
+ auto other = checked_cast<GroupedPivotImpl*>(&raw_other);
+ return accumulator_.Merge(std::move(other->accumulator_),
group_id_mapping);
+ }
+
+ Status Consume(const ExecSpan& batch) override {
+ DCHECK_EQ(batch.values.size(), 3);
+ auto groups = batch[2].array.GetSpan<const uint32_t>(1, batch.length);
+ if (!batch[1].is_array()) {
+ return Status::NotImplemented("Consuming scalar pivot value");
+ }
+ if (batch[0].is_array()) {
+ ARROW_ASSIGN_OR_RAISE(span<const PivotWiderKeyIndex> keys,
+ key_mapper_->MapKeys(batch[0].array));
+ return accumulator_.Consume(groups, keys, batch[1].array);
+ } else {
+ ARROW_ASSIGN_OR_RAISE(PivotWiderKeyIndex key,
+ key_mapper_->MapKey(*batch[0].scalar));
+ return accumulator_.Consume(groups, key, batch[1].array);
+ }
+ }
+
+ Result<Datum> Finalize() override {
+ ARROW_ASSIGN_OR_RAISE(auto columns, accumulator_.Finalize());
+ DCHECK_EQ(columns.size(),
static_cast<size_t>(out_struct_type_->num_fields()));
+ return std::make_shared<StructArray>(out_type_, num_groups_,
std::move(columns),
+ /*null_bitmap=*/nullptr,
+ /*null_count=*/0);
+ }
+
+ std::shared_ptr<DataType> out_type() const override { return out_type_; }
+
+ std::shared_ptr<DataType> key_type_;
+ std::shared_ptr<DataType> out_type_;
+ const StructType* out_struct_type_;
+ const PivotWiderOptions* options_;
+ std::unique_ptr<PivotWiderKeyMapper> key_mapper_;
+ GroupedPivotAccumulator accumulator_;
+ int64_t num_groups_ = 0;
+};
+
+// ----------------------------------------------------------------------
+// Docstrings
+
const FunctionDoc hash_count_doc{
"Count the number of null / non-null values in each group",
("By default, only non-null values are counted.\n"
@@ -3456,6 +3856,20 @@ const FunctionDoc hash_one_doc{"Get one value from each
group",
const FunctionDoc hash_list_doc{"List all values in each group",
("Null values are also returned."),
{"array", "group_id_array"}};
+
+const FunctionDoc hash_pivot_doc{
+ "Pivot values according to a pivot key column",
+ ("Output is a struct array with as many fields as
`PivotWiderOptions.key_names`.\n"
+ "All output struct fields have the same type as `pivot_values`.\n"
+ "Each pivot key decides in which output field the corresponding pivot
value\n"
+ "is emitted. If a pivot key doesn't appear in a given group, null is
emitted.\n"
+ "If more than one non-null value is encountered in the same group for a\n"
+ "given pivot key, Invalid is raised.\n"
+ "Behavior of unexpected pivot keys is controlled by
`unexpected_key_behavior`\n"
+ "in PivotWiderOptions."),
+ {"pivot_keys", "pivot_values", "group_id_array"},
+ "PivotWiderOptions"};
+
} // namespace
void RegisterHashAggregateBasic(FunctionRegistry* registry) {
@@ -3705,6 +4119,20 @@ void RegisterHashAggregateBasic(FunctionRegistry*
registry) {
GroupedListFactory::Make, func.get()));
DCHECK_OK(registry->AddFunction(std::move(func)));
}
+
+ {
+ auto func = std::make_shared<HashAggregateFunction>("hash_pivot_wider",
+ Arity::Ternary(),
hash_pivot_doc);
+ for (auto key_type : BaseBinaryTypes()) {
+ // Anything that scatter() (i.e. take()) accepts can be passed as values
+ auto sig = KernelSignature::Make(
+ {key_type->id(), InputType::Any(), InputType(Type::UINT32)},
+ OutputType(ResolveGroupOutputType));
+ DCHECK_OK(func->AddKernel(
+ MakeKernel(std::move(sig), HashAggregateInit<GroupedPivotImpl>)));
+ }
+ DCHECK_OK(registry->AddFunction(std::move(func)));
+ }
}
} // namespace internal
diff --git a/cpp/src/arrow/compute/kernels/pivot_internal.cc
b/cpp/src/arrow/compute/kernels/pivot_internal.cc
new file mode 100644
index 0000000000..7a65ddc212
--- /dev/null
+++ b/cpp/src/arrow/compute/kernels/pivot_internal.cc
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/kernels/pivot_internal.h"
+
+#include <cstdint>
+
+#include "arrow/compute/exec.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/scalar.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/visit_type_inline.h"
+
+namespace arrow::compute::internal {
+
+using ::arrow::util::span;
+
+struct BasePivotKeyMapper : public PivotWiderKeyMapper {
+ Status Init(const PivotWiderOptions* options) override {
+ if (options->key_names.size() > static_cast<size_t>(kMaxPivotKey) + 1) {
+ return Status::NotImplemented("Pivoting to more than ",
+ static_cast<size_t>(kMaxPivotKey) + 1,
+ " columns: got ",
options->key_names.size());
+ }
+ key_name_map_.reserve(options->key_names.size());
+ PivotWiderKeyIndex index = 0;
+ for (const auto& key_name : options->key_names) {
+ bool inserted =
+ key_name_map_.try_emplace(std::string_view(key_name),
index++).second;
+ if (!inserted) {
+ return Status::KeyError("Duplicate key name '", key_name,
+ "' in PivotWiderOptions");
+ }
+ }
+ unexpected_key_behavior_ = options->unexpected_key_behavior;
+ return Status::OK();
+ }
+
+ protected:
+ Result<PivotWiderKeyIndex> KeyNotFound(std::string_view key_name) {
+ if (unexpected_key_behavior_ == PivotWiderOptions::kIgnore) {
+ return kNullPivotKey;
+ }
+ DCHECK_EQ(unexpected_key_behavior_, PivotWiderOptions::kRaise);
+ return Status::KeyError("Unexpected pivot key: ", key_name);
+ }
+
+ Result<PivotWiderKeyIndex> LookupKey(std::string_view key_name) {
+ const auto it = this->key_name_map_.find(key_name);
+ if (ARROW_PREDICT_FALSE(it == this->key_name_map_.end())) {
+ return KeyNotFound(key_name);
+ } else {
+ return it->second;
+ }
+ }
+
+ Status NullKeyName() { return Status::KeyError("pivot key name cannot be
null"); }
+
+ // The strings backing the string_views should be kept alive by
PivotWiderOptions.
+ std::unordered_map<std::string_view, PivotWiderKeyIndex> key_name_map_;
+ PivotWiderOptions::UnexpectedKeyBehavior unexpected_key_behavior_;
+ TypedBufferBuilder<PivotWiderKeyIndex> key_indices_buffer_;
+};
+
+template <typename KeyType>
+struct TypedPivotKeyMapper : public BasePivotKeyMapper {
+ Result<span<const PivotWiderKeyIndex>> MapKeys(const ArraySpan& array)
override {
+ // XXX Should use a faster hashing facility than unordered_map, for example
+ // Grouper or SwissTable.
+ RETURN_NOT_OK(this->key_indices_buffer_.Reserve(array.length));
+ PivotWiderKeyIndex* key_indices = this->key_indices_buffer_.mutable_data();
+ int64_t i = 0;
+ RETURN_NOT_OK(VisitArrayValuesInline<KeyType>(
+ array,
+ [&](std::string_view key_name) {
+ ARROW_ASSIGN_OR_RAISE(key_indices[i], LookupKey(key_name));
+ ++i;
+ return Status::OK();
+ },
+ [&]() { return NullKeyName(); }));
+ return span(key_indices, array.length);
+ }
+
+ Result<PivotWiderKeyIndex> MapKey(const Scalar& scalar) override {
+ if (!scalar.is_valid) {
+ return NullKeyName();
+ }
+ const auto& binary_scalar = checked_cast<const BaseBinaryScalar&>(scalar);
+ return LookupKey(binary_scalar.view());
+ }
+};
+
+Result<std::unique_ptr<PivotWiderKeyMapper>> PivotWiderKeyMapper::Make(
+ const DataType& key_type, const PivotWiderOptions* options) {
+ std::unique_ptr<PivotWiderKeyMapper> instance;
+
+ auto visit_key_type =
+ [&](auto&& key_type) -> Result<std::unique_ptr<PivotWiderKeyMapper>> {
+ using T = std::decay_t<decltype(key_type)>;
+ // Only binary-like keys are supported for now
+ if constexpr (is_base_binary_type<T>::value) {
+ instance = std::make_unique<TypedPivotKeyMapper<T>>();
+ RETURN_NOT_OK(instance->Init(options));
+ return std::move(instance);
+ }
+ return Status::NotImplemented("Pivot key type: ", key_type);
+ };
+
+ return VisitType(key_type, visit_key_type);
+}
+
+} // namespace arrow::compute::internal
diff --git a/cpp/src/arrow/compute/kernels/pivot_internal.h
b/cpp/src/arrow/compute/kernels/pivot_internal.h
new file mode 100644
index 0000000000..faa808b7a2
--- /dev/null
+++ b/cpp/src/arrow/compute/kernels/pivot_internal.h
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+
+#include "arrow/compute/api_aggregate.h"
+#include "arrow/compute/type_fwd.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/span.h"
+
+namespace arrow::compute::internal {
+
+using PivotWiderKeyIndex = uint8_t;
+
+constexpr PivotWiderKeyIndex kNullPivotKey =
+ std::numeric_limits<PivotWiderKeyIndex>::max();
+constexpr PivotWiderKeyIndex kMaxPivotKey = kNullPivotKey - 1;
+
+struct PivotWiderKeyMapper {
+ virtual ~PivotWiderKeyMapper() = default;
+
+ virtual Status Init(const PivotWiderOptions* options) = 0;
+ virtual Result<::arrow::util::span<const PivotWiderKeyIndex>> MapKeys(
+ const ArraySpan&) = 0;
+ virtual Result<PivotWiderKeyIndex> MapKey(const Scalar&) = 0;
+
+ static Result<std::unique_ptr<PivotWiderKeyMapper>> Make(
+ const DataType& key_type, const PivotWiderOptions* options);
+};
+
+} // namespace arrow::compute::internal
diff --git a/cpp/src/arrow/compute/registry.cc
b/cpp/src/arrow/compute/registry.cc
index ef9f3c7e1f..91df4155ba 100644
--- a/cpp/src/arrow/compute/registry.cc
+++ b/cpp/src/arrow/compute/registry.cc
@@ -327,6 +327,7 @@ static std::unique_ptr<FunctionRegistry>
CreateBuiltInRegistry() {
RegisterHashAggregateBasic(registry.get());
RegisterScalarAggregateBasic(registry.get());
RegisterScalarAggregateMode(registry.get());
+ RegisterScalarAggregatePivot(registry.get());
RegisterScalarAggregateQuantile(registry.get());
RegisterScalarAggregateTDigest(registry.get());
RegisterScalarAggregateVariance(registry.get());
diff --git a/cpp/src/arrow/compute/registry_internal.h
b/cpp/src/arrow/compute/registry_internal.h
index 8287e63050..d91e2693a3 100644
--- a/cpp/src/arrow/compute/registry_internal.h
+++ b/cpp/src/arrow/compute/registry_internal.h
@@ -63,6 +63,7 @@ void RegisterVectorOptions(FunctionRegistry* registry);
void RegisterHashAggregateBasic(FunctionRegistry* registry);
void RegisterScalarAggregateBasic(FunctionRegistry* registry);
void RegisterScalarAggregateMode(FunctionRegistry* registry);
+void RegisterScalarAggregatePivot(FunctionRegistry* registry);
void RegisterScalarAggregateQuantile(FunctionRegistry* registry);
void RegisterScalarAggregateTDigest(FunctionRegistry* registry);
void RegisterScalarAggregateVariance(FunctionRegistry* registry);
diff --git a/cpp/src/arrow/compute/type_fwd.h b/cpp/src/arrow/compute/type_fwd.h
index 89f32ceb0f..016d97a0db 100644
--- a/cpp/src/arrow/compute/type_fwd.h
+++ b/cpp/src/arrow/compute/type_fwd.h
@@ -40,6 +40,7 @@ class CastOptions;
struct ExecBatch;
class ExecContext;
+struct ExecValue;
class KernelContext;
struct Kernel;
diff --git a/docs/source/cpp/compute.rst b/docs/source/cpp/compute.rst
index 2a2a01c331..750572713c 100644
--- a/docs/source/cpp/compute.rst
+++ b/docs/source/cpp/compute.rst
@@ -214,35 +214,37 @@ the input to a single output value.
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
| count_distinct | Unary | Non-nested types | Scalar Int64 |
:struct:`CountOptions` | \(2) |
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| first | Unary | Numeric, Binary | Scalar Input type |
:struct:`ScalarAggregateOptions` | \(11) |
+| first | Unary | Numeric, Binary | Scalar Input type |
:struct:`ScalarAggregateOptions` | \(3) |
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| first_last | Unary | Numeric, Binary | Scalar Struct |
:struct:`ScalarAggregateOptions` | \(11) |
+| first_last | Unary | Numeric, Binary | Scalar Struct |
:struct:`ScalarAggregateOptions` | \(3) |
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| index | Unary | Any | Scalar Int64 |
:struct:`IndexOptions` | \(3) |
+| index | Unary | Any | Scalar Int64 |
:struct:`IndexOptions` | \(4) |
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| last | Unary | Numeric, Binary | Scalar Input type |
:struct:`ScalarAggregateOptions` | \(11) |
+| last | Unary | Numeric, Binary | Scalar Input type |
:struct:`ScalarAggregateOptions` | \(3) |
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
| max | Unary | Non-nested types | Scalar Input type |
:struct:`ScalarAggregateOptions` | |
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| mean | Unary | Numeric | Scalar Decimal/Float64 |
:struct:`ScalarAggregateOptions` | \(4) |
+| mean | Unary | Numeric | Scalar Decimal/Float64 |
:struct:`ScalarAggregateOptions` | \(5) |
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
| min | Unary | Non-nested types | Scalar Input type |
:struct:`ScalarAggregateOptions` | |
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| min_max | Unary | Non-nested types | Scalar Struct |
:struct:`ScalarAggregateOptions` | \(5) |
+| min_max | Unary | Non-nested types | Scalar Struct |
:struct:`ScalarAggregateOptions` | \(6) |
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| mode | Unary | Numeric | Struct |
:struct:`ModeOptions` | \(6) |
+| mode | Unary | Numeric | Struct |
:struct:`ModeOptions` | \(7) |
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| product | Unary | Numeric | Scalar Numeric |
:struct:`ScalarAggregateOptions` | \(7) |
+| pivot_wider | Binary | Binary, Any | Scalar Struct |
:struct:`PivotWiderOptions` | \(8) |
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| quantile | Unary | Numeric | Scalar Numeric |
:struct:`QuantileOptions` | \(8) |
+| product | Unary | Numeric | Scalar Numeric |
:struct:`ScalarAggregateOptions` | \(9) |
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| stddev | Unary | Numeric | Scalar Float64 |
:struct:`VarianceOptions` | \(9) |
+| quantile | Unary | Numeric | Scalar Numeric |
:struct:`QuantileOptions` | \(10) |
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| sum | Unary | Numeric | Scalar Numeric |
:struct:`ScalarAggregateOptions` | \(7) |
+| stddev | Unary | Numeric | Scalar Float64 |
:struct:`VarianceOptions` | \(11) |
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| tdigest | Unary | Numeric | Float64 |
:struct:`TDigestOptions` | \(10) |
+| sum | Unary | Numeric | Scalar Numeric |
:struct:`ScalarAggregateOptions` | \(9) |
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| variance | Unary | Numeric | Scalar Float64 |
:struct:`VarianceOptions` | \(9) |
+| tdigest | Unary | Numeric | Float64 |
:struct:`TDigestOptions` | \(12) |
++--------------------+---------+------------------+------------------------+----------------------------------+-------+
+| variance | Unary | Numeric | Scalar Float64 |
:struct:`VarianceOptions` | \(11) |
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
* \(1) If null values are taken into account, by setting the
@@ -252,37 +254,41 @@ the input to a single output value.
* \(2) CountMode controls whether only non-null values are counted (the
default), only null values are counted, or all values are counted.
-* \(3) Returns -1 if the value is not found. The index of a null value
+* \(3) Result is based on the ordering of input data.
+
+* \(4) Returns -1 if the value is not found. The index of a null value
is always -1, regardless of whether there are nulls in the input.
-* \(4) For decimal inputs, the resulting decimal will have the same
+* \(5) For decimal inputs, the resulting decimal will have the same
precision and scale. The result is rounded away from zero.
-* \(5) Output is a ``{"min": input type, "max": input type}`` Struct.
+* \(6) Output is a ``{"min": input type, "max": input type}`` Struct.
Of the interval types, only the month interval is supported, as the day-time
and month-day-nano types are not sortable.
-* \(6) Output is an array of ``{"mode": input type, "count": Int64}`` Struct.
+* \(7) Output is an array of ``{"mode": input type, "count": Int64}`` Struct.
It contains the *N* most common elements in the input, in descending
order, where *N* is given in :member:`ModeOptions::n`.
If two values have the same count, the smallest one comes first.
Note that the output can have less than *N* elements if the input has
less than *N* distinct values.
-* \(7) Output is Int64, UInt64, Float64, or Decimal128/256, depending on the
+* \(8) The first input contains the pivot key, while the second input contains
+ the values to be pivoted. The output is a Struct with one field for each key
+ in :member:`PivotOptions::key_names`.
+
+* \(9) Output is Int64, UInt64, Float64, or Decimal128/256, depending on the
input type.
-* \(8) Output is Float64 or input type, depending on QuantileOptions.
+* \(10) Output is Float64 or input type, depending on QuantileOptions.
-* \(9) Decimal arguments are cast to Float64 first.
+* \(11) Decimal arguments are cast to Float64 first.
-* \(10) tdigest/t-digest computes approximate quantiles, and so only needs a
+* \(12) tdigest/t-digest computes approximate quantiles, and so only needs a
fixed amount of memory. See the `reference implementation
<https://github.com/tdunning/t-digest>`_ for details.
-* \(11) Result is based on the ordering of input data
-
Decimal arguments are cast to Float64 first.
.. _grouped-aggregations-group-by:
@@ -350,11 +356,11 @@ equivalents above and reflects how they are implemented
internally.
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
| hash_distinct | Unary | Any |
List of input type | :struct:`CountOptions` | \(2) \(3) |
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
-| hash_first | Unary | Numeric, Binary |
Input type | :struct:`ScalarAggregateOptions` | \(10) |
+| hash_first | Unary | Numeric, Binary |
Input type | :struct:`ScalarAggregateOptions` | \(11) |
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
-| hash_first_last | Unary | Numeric, Binary |
Struct | :struct:`ScalarAggregateOptions` | \(10) |
+| hash_first_last | Unary | Numeric, Binary |
Struct | :struct:`ScalarAggregateOptions` | \(11) |
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
-| hash_last | Unary | Numeric, Binary |
Input type | :struct:`ScalarAggregateOptions` | \(10) |
+| hash_last | Unary | Numeric, Binary |
Input type | :struct:`ScalarAggregateOptions` | \(11) |
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
| hash_list | Unary | Any |
List of input type | | \(3) |
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
@@ -368,15 +374,17 @@ equivalents above and reflects how they are implemented
internally.
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
| hash_one | Unary | Any |
Input type | | \(6) |
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
-| hash_product | Unary | Numeric |
Numeric | :struct:`ScalarAggregateOptions` | \(7) |
+| hash_pivot_wider | Binary | Binary, Any |
Struct | :struct:`PivotWiderOptions` | \(7) |
++-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
+| hash_product | Unary | Numeric |
Numeric | :struct:`ScalarAggregateOptions` | \(8) |
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
-| hash_stddev | Unary | Numeric |
Float64 | :struct:`VarianceOptions` | \(8) |
+| hash_stddev | Unary | Numeric |
Float64 | :struct:`VarianceOptions` | \(9) |
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
-| hash_sum | Unary | Numeric |
Numeric | :struct:`ScalarAggregateOptions` | \(7) |
+| hash_sum | Unary | Numeric |
Numeric | :struct:`ScalarAggregateOptions` | \(8) |
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
-| hash_tdigest | Unary | Numeric |
FixedSizeList[Float64] | :struct:`TDigestOptions` | \(9) |
+| hash_tdigest | Unary | Numeric |
FixedSizeList[Float64] | :struct:`TDigestOptions` | \(10) |
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
-| hash_variance | Unary | Numeric |
Float64 | :struct:`VarianceOptions` | \(8) |
+| hash_variance | Unary | Numeric |
Float64 | :struct:`VarianceOptions` | \(9) |
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
* \(1) If null values are taken into account, by setting the
@@ -405,18 +413,21 @@ equivalents above and reflects how they are implemented
internally.
one non-null value for a certain group, that value is returned, and only if
all the values are ``null`` for the group will the function return ``null``.
-* \(7) Output is Int64, UInt64, Float64, or Decimal128/256, depending on the
+* \(7) The first input contains the pivot key, while the second input contains
+ the values to be pivoted. The output is a Struct with one field for each key
+ in :member:`PivotOptions::key_names`.
+
+* \(8) Output is Int64, UInt64, Float64, or Decimal128/256, depending on the
input type.
-* \(8) Decimal arguments are cast to Float64 first.
+* \(9) Decimal arguments are cast to Float64 first.
-* \(9) T-digest computes approximate quantiles, and so only needs a
+* \(10) T-digest computes approximate quantiles, and so only needs a
fixed amount of memory. See the `reference implementation
<https://github.com/tdunning/t-digest>`_ for details.
-* \(10) Result is based on ordering of the input data.
+* \(11) Result is based on ordering of the input data.
- Decimal arguments are cast to Float64 first.
Element-wise ("scalar") functions
---------------------------------
diff --git a/docs/source/python/api/compute.rst
b/docs/source/python/api/compute.rst
index 5423eebfba..0205457fec 100644
--- a/docs/source/python/api/compute.rst
+++ b/docs/source/python/api/compute.rst
@@ -38,6 +38,7 @@ Aggregations
min
min_max
mode
+ pivot_wider
product
quantile
stddev
@@ -557,6 +558,7 @@ Compute Options
PadOptions
PairwiseOptions
PartitionNthOptions
+ PivotWiderOptions
QuantileOptions
ReplaceSliceOptions
ReplaceSubstringOptions
diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx
index d23286dcdd..ee61077aa8 100644
--- a/python/pyarrow/_compute.pyx
+++ b/python/pyarrow/_compute.pyx
@@ -2396,6 +2396,50 @@ class RankQuantileOptions(_RankQuantileOptions):
self._set_options(sort_keys, null_placement)
+cdef class _PivotWiderOptions(FunctionOptions):
+
+ def _set_options(self, key_names, unexpected_key_behavior):
+ cdef:
+ vector[c_string] c_key_names
+ PivotWiderUnexpectedKeyBehavior c_unexpected_key_behavior
+ if unexpected_key_behavior == "ignore":
+ c_unexpected_key_behavior = PivotWiderUnexpectedKeyBehavior_Ignore
+ elif unexpected_key_behavior == "raise":
+ c_unexpected_key_behavior = PivotWiderUnexpectedKeyBehavior_Raise
+ else:
+ raise ValueError(
+ f"Unsupported value for unexpected_key_behavior: "
+ f"expected 'ignore' or 'raise', got
{unexpected_key_behavior!r}")
+
+ for k in key_names:
+ c_key_names.push_back(tobytes(k))
+
+ self.wrapped.reset(
+ new CPivotWiderOptions(move(c_key_names),
c_unexpected_key_behavior)
+ )
+
+
+class PivotWiderOptions(_PivotWiderOptions):
+ """
+ Options for the `pivot_wider` function.
+
+ Parameters
+ ----------
+ key_names : sequence of str
+ The pivot key names expected in the pivot key column.
+ For each entry in `key_names`, a column with the same name is emitted
+ in the struct output.
+ unexpected_key_behavior : str, default "ignore"
+ The behavior when pivot keys not in `key_names` are encountered.
+ Accepted values are "ignore", "raise".
+ If "ignore", unexpected keys are silently ignored.
+ If "raise", unexpected keys raise a KeyError.
+ """
+
+ def __init__(self, key_names, *, unexpected_key_behavior="ignore"):
+ self._set_options(key_names, unexpected_key_behavior)
+
+
cdef class Expression(_Weakrefable):
"""
A logical expression to be evaluated against some input.
diff --git a/python/pyarrow/compute.py b/python/pyarrow/compute.py
index 5348336235..e2c17ee61d 100644
--- a/python/pyarrow/compute.py
+++ b/python/pyarrow/compute.py
@@ -53,6 +53,7 @@ from pyarrow._compute import ( # noqa
PadOptions,
PairwiseOptions,
PartitionNthOptions,
+ PivotWiderOptions,
QuantileOptions,
RandomOptions,
RankOptions,
diff --git a/python/pyarrow/includes/libarrow.pxd
b/python/pyarrow/includes/libarrow.pxd
index 556696e344..8e666b114b 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -2823,6 +2823,16 @@ cdef extern from "arrow/compute/api.h" namespace
"arrow::compute" nogil:
vector[CSortKey] sort_keys
CNullPlacement null_placement
+ cdef enum PivotWiderUnexpectedKeyBehavior \
+ "arrow::compute::PivotWiderOptions::UnexpectedKeyBehavior":
+ PivotWiderUnexpectedKeyBehavior_Ignore
"arrow::compute::PivotWiderOptions::kIgnore"
+ PivotWiderUnexpectedKeyBehavior_Raise
"arrow::compute::PivotWiderOptions::kRaise"
+
+ cdef cppclass CPivotWiderOptions \
+ "arrow::compute::PivotWiderOptions"(CFunctionOptions):
+ CPivotWiderOptions(vector[c_string] key_names,
+ PivotWiderUnexpectedKeyBehavior)
+
cdef enum DatumType" arrow::Datum::type":
DatumType_NONE" arrow::Datum::NONE"
DatumType_SCALAR" arrow::Datum::SCALAR"
diff --git a/python/pyarrow/tests/test_compute.py
b/python/pyarrow/tests/test_compute.py
index ef02b476bd..99b3047d66 100644
--- a/python/pyarrow/tests/test_compute.py
+++ b/python/pyarrow/tests/test_compute.py
@@ -145,6 +145,7 @@ def test_option_class_equality(request):
pc.ArraySortOptions(),
pc.AssumeTimezoneOptions("UTC"),
pc.CastOptions.safe(pa.int8()),
+ pc.CumulativeOptions(start=None, skip_nulls=False),
pc.CountOptions(),
pc.DayOfWeekOptions(count_from_zero=False, week_start=0),
pc.DictionaryEncodeOptions(),
@@ -167,7 +168,7 @@ def test_option_class_equality(request):
pc.PadOptions(5),
pc.PairwiseOptions(period=1),
pc.PartitionNthOptions(1, null_placement="at_start"),
- pc.CumulativeOptions(start=None, skip_nulls=False),
+ pc.PivotWiderOptions(["height"], unexpected_key_behavior="raise"),
pc.QuantileOptions(),
pc.RandomOptions(),
pc.RankOptions(sort_keys="ascending",
@@ -3785,3 +3786,29 @@ def test_pairwise_diff():
with pytest.raises(pa.ArrowInvalid,
match="overflow"):
pa.compute.pairwise_diff_checked(arr, period=-1)
+
+
+def test_pivot_wider():
+ key_names = ["width", "height"]
+
+ result = pc.pivot_wider(["height", "width", "depth"], [10, None, 11])
+ assert result.as_py() == {}
+
+ result = pc.pivot_wider(["height", "width", "depth"], [10, None, 11],
+ key_names)
+ assert result.as_py() == {"width": None, "height": 10}
+ # check key order
+ assert list(result.as_py()) == ["width", "height"]
+
+ result = pc.pivot_wider(["height", "width", "depth"], [10, None, 11],
+ key_names=key_names)
+ assert result.as_py() == {"width": None, "height": 10}
+
+ with pytest.raises(KeyError, match="Unexpected pivot key: depth"):
+ result = pc.pivot_wider(["height", "width", "depth"], [10, None, 11],
+ key_names=key_names,
+ unexpected_key_behavior="raise")
+
+ with pytest.raises(ValueError, match="Encountered more than one non-null
value"):
+ result = pc.pivot_wider(["height", "width", "height"], [10, None, 11],
+ key_names=key_names)