This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new b41386d0e5 GH-45269: [C++][Compute] Add "pivot_wider" and 
"hash_pivot_wider" functions (#45562)
b41386d0e5 is described below

commit b41386d0e520298551783a0313914b9f30e708be
Author: Antoine Pitrou <[email protected]>
AuthorDate: Tue Mar 4 17:21:38 2025 +0100

    GH-45269: [C++][Compute] Add "pivot_wider" and "hash_pivot_wider" functions 
(#45562)
    
    ### Rationale for this change
    
    Add "pivot wider" functionality such as [in 
Pandas](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.pivot.html),
 through two dedicated functions:
    1. a "pivot_wider" scalar aggregate function returning a Struct scalar
    2. a "hash_pivot_wider" grouped aggregate function returning a Struct array
    
    Both functions take two arguments (the column of pivot keys and the column 
of pivot values) and require passing a `PivotWiderOptions` structure with the 
expected pivot keys, so as to determine the output Struct type.
    
    ### Are these changes tested?
    
    Yes, by dedicated unit tests.
    
    ### Are there any user-facing changes?
    
    No, just new APIs.
    * GitHub Issue: #45269
    
    Authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/CMakeLists.txt                     |   2 +
 cpp/src/arrow/acero/groupby_aggregate_node.cc    |   5 +
 cpp/src/arrow/acero/hash_aggregate_test.cc       | 731 ++++++++++++++++++++++-
 cpp/src/arrow/compute/api_aggregate.cc           |  31 +-
 cpp/src/arrow/compute/api_aggregate.h            |  82 +++
 cpp/src/arrow/compute/kernels/aggregate_pivot.cc | 188 ++++++
 cpp/src/arrow/compute/kernels/aggregate_test.cc  | 289 +++++++++
 cpp/src/arrow/compute/kernels/hash_aggregate.cc  | 432 +++++++++++++-
 cpp/src/arrow/compute/kernels/pivot_internal.cc  | 127 ++++
 cpp/src/arrow/compute/kernels/pivot_internal.h   |  51 ++
 cpp/src/arrow/compute/registry.cc                |   1 +
 cpp/src/arrow/compute/registry_internal.h        |   1 +
 cpp/src/arrow/compute/type_fwd.h                 |   1 +
 docs/source/cpp/compute.rst                      |  83 +--
 docs/source/python/api/compute.rst               |   2 +
 python/pyarrow/_compute.pyx                      |  44 ++
 python/pyarrow/compute.py                        |   1 +
 python/pyarrow/includes/libarrow.pxd             |  10 +
 python/pyarrow/tests/test_compute.py             |  29 +-
 19 files changed, 2063 insertions(+), 47 deletions(-)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index b9b8785cbc..775e3633aa 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -753,10 +753,12 @@ if(ARROW_COMPUTE)
        ARROW_COMPUTE_SRCS
        compute/kernels/aggregate_basic.cc
        compute/kernels/aggregate_mode.cc
+       compute/kernels/aggregate_pivot.cc
        compute/kernels/aggregate_quantile.cc
        compute/kernels/aggregate_tdigest.cc
        compute/kernels/aggregate_var_std.cc
        compute/kernels/hash_aggregate.cc
+       compute/kernels/pivot_internal.cc
        compute/kernels/scalar_arithmetic.cc
        compute/kernels/scalar_boolean.cc
        compute/kernels/scalar_compare.cc
diff --git a/cpp/src/arrow/acero/groupby_aggregate_node.cc 
b/cpp/src/arrow/acero/groupby_aggregate_node.cc
index 06b034ab2d..2beef360b4 100644
--- a/cpp/src/arrow/acero/groupby_aggregate_node.cc
+++ b/cpp/src/arrow/acero/groupby_aggregate_node.cc
@@ -282,6 +282,11 @@ Status GroupByNode::Merge() {
       DCHECK(state0->agg_states[span_i]);
       batch_ctx.SetState(state0->agg_states[span_i].get());
 
+      // XXX this resizes each KernelState (state0->agg_states[span_i]) 
multiple times.
+      // An alternative would be a two-pass algorithm:
+      // 1. Compute all transpositions (one per local state) and the final 
number of
+      // groups.
+      // 2. Process all agg kernels, resizing each KernelState only once.
       RETURN_NOT_OK(
           agg_kernels_[span_i]->resize(&batch_ctx, 
state0->grouper->num_groups()));
       RETURN_NOT_OK(agg_kernels_[span_i]->merge(
diff --git a/cpp/src/arrow/acero/hash_aggregate_test.cc 
b/cpp/src/arrow/acero/hash_aggregate_test.cc
index 7f4b6dd752..e50113968e 100644
--- a/cpp/src/arrow/acero/hash_aggregate_test.cc
+++ b/cpp/src/arrow/acero/hash_aggregate_test.cc
@@ -20,6 +20,7 @@
 #include <algorithm>
 #include <limits>
 #include <memory>
+#include <random>
 #include <type_traits>
 #include <unordered_map>
 #include <utility>
@@ -30,16 +31,14 @@
 #include "arrow/acero/options.h"
 #include "arrow/acero/test_util_internal.h"
 #include "arrow/array.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
 #include "arrow/array/concatenate.h"
 #include "arrow/chunked_array.h"
 #include "arrow/compute/api_aggregate.h"
-#include "arrow/compute/api_scalar.h"
-#include "arrow/compute/api_vector.h"
 #include "arrow/compute/cast.h"
 #include "arrow/compute/exec.h"
 #include "arrow/compute/exec_internal.h"
-#include "arrow/compute/kernels/aggregate_internal.h"
-#include "arrow/compute/kernels/codegen_internal.h"
 #include "arrow/compute/registry.h"
 #include "arrow/compute/row/grouper.h"
 #include "arrow/table.h"
@@ -50,9 +49,7 @@
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/async_generator.h"
-#include "arrow/util/bitmap_reader.h"
 #include "arrow/util/checked_cast.h"
-#include "arrow/util/int_util_overflow.h"
 #include "arrow/util/key_value_metadata.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/string.h"
@@ -64,7 +61,6 @@ using testing::HasSubstr;
 
 namespace arrow {
 
-using internal::BitmapReader;
 using internal::checked_cast;
 using internal::checked_pointer_cast;
 using internal::ToChars;
@@ -77,6 +73,7 @@ using compute::ExecBatchFromJSON;
 using compute::ExecSpan;
 using compute::FunctionOptions;
 using compute::Grouper;
+using compute::PivotWiderOptions;
 using compute::RowSegmenter;
 using compute::ScalarAggregateOptions;
 using compute::Segment;
@@ -565,6 +562,7 @@ class GroupBy : public 
::testing::TestWithParam<GroupByFunction> {
     return acero::GroupByTest(GetParam(), arguments, keys, aggregates, 
use_threads);
   }
 
+  // This is not named GroupByTest to avoid ambiguities between overloads
   Result<Datum> AltGroupBy(const std::vector<Datum>& arguments,
                            const std::vector<Datum>& keys,
                            const std::vector<Datum>& segment_keys,
@@ -574,6 +572,70 @@ class GroupBy : public 
::testing::TestWithParam<GroupByFunction> {
                       /*naive=*/false);
   }
 
+  Result<Datum> RunPivot(const std::shared_ptr<DataType>& key_type,
+                         const std::shared_ptr<DataType>& value_type,
+                         const PivotWiderOptions& options,
+                         const std::shared_ptr<Table>& table, bool use_threads 
= false) {
+    Aggregate agg{"hash_pivot_wider", 
std::make_shared<PivotWiderOptions>(options),
+                  /*target=*/std::vector<FieldRef>{"agg_0", "agg_1"}, 
/*name=*/"out"};
+    ARROW_ASSIGN_OR_RAISE(
+        Datum aggregated_and_grouped,
+        AltGroupBy({table->GetColumnByName("key"), 
table->GetColumnByName("value")},
+                   {table->GetColumnByName("group_key")},
+                   /*segment_keys=*/{}, {agg}, use_threads));
+    ValidateOutput(aggregated_and_grouped);
+    return aggregated_and_grouped;
+  }
+
+  Result<Datum> RunPivot(const std::shared_ptr<DataType>& key_type,
+                         const std::shared_ptr<DataType>& value_type,
+                         const PivotWiderOptions& options,
+                         const std::vector<std::string>& table_json,
+                         bool use_threads = false) {
+    auto table =
+        TableFromJSON(schema({field("group_key", int64()), field("key", 
key_type),
+                              field("value", value_type)}),
+                      table_json);
+    return RunPivot(key_type, value_type, options, table, use_threads);
+  }
+
+  void CheckPivoted(const std::shared_ptr<DataType>& key_type,
+                    const std::shared_ptr<DataType>& value_type,
+                    const PivotWiderOptions& options, const Datum& pivoted,
+                    const std::string& expected_json) {
+    FieldVector pivoted_fields;
+    for (const auto& key_name : options.key_names) {
+      pivoted_fields.push_back(field(key_name, value_type));
+    }
+    auto expected_type = struct_({
+        field("key_0", int64()),
+        field("out", struct_(std::move(pivoted_fields))),
+    });
+    auto expected = ArrayFromJSON(expected_type, expected_json);
+    AssertDatumsEqual(expected, pivoted, /*verbose=*/true);
+  }
+
+  void TestPivot(const std::shared_ptr<DataType>& key_type,
+                 const std::shared_ptr<DataType>& value_type,
+                 const PivotWiderOptions& options,
+                 const std::vector<std::string>& table_json,
+                 const std::string& expected_json, bool use_threads) {
+    ASSERT_OK_AND_ASSIGN(
+        auto pivoted, RunPivot(key_type, value_type, options, table_json, 
use_threads));
+    CheckPivoted(key_type, value_type, options, pivoted, expected_json);
+  }
+
+  void TestPivot(const std::shared_ptr<DataType>& key_type,
+                 const std::shared_ptr<DataType>& value_type,
+                 const PivotWiderOptions& options,
+                 const std::vector<std::string>& table_json,
+                 const std::string& expected_json) {
+    for (bool use_threads : {false, true}) {
+      ARROW_SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+      TestPivot(key_type, value_type, options, table_json, expected_json, 
use_threads);
+    }
+  }
+
   void TestSegmentKey(const std::shared_ptr<Table>& table, Datum output,
                       const std::vector<Datum>& segment_keys) {
     return acero::TestSegmentKey(GetParam(), table, output, segment_keys);
@@ -4345,6 +4407,566 @@ TEST_P(GroupBy, OnlyKeys) {
   }
 }
 
+TEST_P(GroupBy, PivotBasics) {
+  auto key_type = utf8();
+  auto value_type = float32();
+  std::vector<std::string> table_json = {R"([
+      [1, "width",  10.5],
+      [2, "width",  11.5]
+      ])",
+                                         R"([
+      [2, "height", 12.5]
+      ])",
+                                         R"([
+      [3, "width",  13.5],
+      [1, "height", 14.5]
+      ])"};
+  std::string expected_json = R"([
+      [1, {"height": 14.5, "width": 10.5} ],
+      [2, {"height": 12.5, "width": 11.5} ],
+      [3, {"height": null, "width": 13.5} ]
+      ])";
+  for (auto unexpected_key_behavior :
+       {PivotWiderOptions::kIgnore, PivotWiderOptions::kRaise}) {
+    PivotWiderOptions options(/*key_names=*/{"height", "width"}, 
unexpected_key_behavior);
+    TestPivot(key_type, value_type, options, table_json, expected_json);
+  }
+}
+
+TEST_P(GroupBy, PivotAllKeyTypes) {
+  auto value_type = float32();
+  std::vector<std::string> table_json = {R"([
+      [1, "width", 10.5],
+      [2, "width", 11.5]
+      ])",
+                                         R"([
+      [2, "height", 12.5],
+      [3, "width",  13.5],
+      [1, "height", 14.5]
+      ])"};
+  std::string expected_json = R"([
+      [1, {"height": 14.5, "width": 10.5} ],
+      [2, {"height": 12.5, "width": 11.5} ],
+      [3, {"height": null, "width": 13.5} ]
+      ])";
+  PivotWiderOptions options(/*key_names=*/{"height", "width"});
+
+  for (const auto& key_type : BaseBinaryTypes()) {
+    ARROW_SCOPED_TRACE("key_type = ", *key_type);
+    TestPivot(key_type, value_type, options, table_json, expected_json);
+  }
+}
+
+TEST_P(GroupBy, PivotNumericValues) {
+  auto key_type = utf8();
+  std::vector<std::string> table_json = {R"([
+      [1, "width", 10],
+      [2, "width", 11]
+      ])",
+                                         R"([
+      [2, "height", 12],
+      [3, "width",  13],
+      [1, "height", 14]
+      ])"};
+  std::string expected_json = R"([
+      [1, {"height": 14,   "width": 10} ],
+      [2, {"height": 12,   "width": 11} ],
+      [3, {"height": null, "width": 13} ]
+      ])";
+  PivotWiderOptions options(/*key_names=*/{"height", "width"});
+
+  for (const auto& value_type : NumericTypes()) {
+    ARROW_SCOPED_TRACE("value_type = ", *value_type);
+    TestPivot(key_type, value_type, options, table_json, expected_json);
+  }
+}
+
+TEST_P(GroupBy, PivotBinaryLikeValues) {
+  auto key_type = utf8();
+  std::vector<std::string> table_json = {R"([
+      [1, "name",      "Bob"],
+      [2, "eye_color", "brown"]
+      ])",
+                                         R"([
+      [2, "name",      "Alice"],
+      [1, "eye_color", "gray"],
+      [3, "name",      "Mallaury"]
+      ])"};
+  std::string expected_json = R"([
+      [1, {"name": "Bob",      "eye_color": "gray"} ],
+      [2, {"name": "Alice",    "eye_color": "brown"} ],
+      [3, {"name": "Mallaury", "eye_color": null} ]
+      ])";
+  PivotWiderOptions options(/*key_names=*/{"name", "eye_color"});
+
+  for (const auto& value_type : BaseBinaryTypes()) {
+    ARROW_SCOPED_TRACE("value_type = ", *value_type);
+    TestPivot(key_type, value_type, options, table_json, expected_json);
+  }
+}
+
+TEST_P(GroupBy, PivotDecimalValues) {
+  auto key_type = utf8();
+  auto value_type = decimal128(9, 1);
+  std::vector<std::string> table_json = {R"([
+      [1, "width", "10.1"],
+      [2, "width", "11.1"]
+      ])",
+                                         R"([
+      [2, "height", "12.1"],
+      [3, "width",  "13.1"],
+      [1, "height", "14.1"]
+      ])"};
+  std::string expected_json = R"([
+      [1, {"height": "14.1", "width": "10.1"} ],
+      [2, {"height": "12.1", "width": "11.1"} ],
+      [3, {"height": null,   "width": "13.1"} ]
+      ])";
+  PivotWiderOptions options(/*key_names=*/{"height", "width"});
+  TestPivot(key_type, value_type, options, table_json, expected_json);
+}
+
+TEST_P(GroupBy, PivotStructValues) {
+  auto key_type = utf8();
+  auto value_type = struct_({{"value", float32()}});
+  std::vector<std::string> table_json = {R"([
+      [1, "width", [10.1]],
+      [2, "width", [11.1]]
+      ])",
+                                         R"([
+      [2, "height", [12.1]],
+      [3, "width",  [13.1]],
+      [1, "height", [14.1]]
+      ])"};
+  std::string expected_json = R"([
+      [1, {"height": [14.1], "width": [10.1]} ],
+      [2, {"height": [12.1], "width": [11.1]} ],
+      [3, {"height": null,   "width": [13.1]} ]
+      ])";
+  PivotWiderOptions options(/*key_names=*/{"height", "width"});
+  TestPivot(key_type, value_type, options, table_json, expected_json);
+}
+
+TEST_P(GroupBy, PivotListValues) {
+  auto key_type = utf8();
+  auto value_type = list(float32());
+  std::vector<std::string> table_json = {R"([
+      [1, "foo", [10.5, 11.5]],
+      [2, "bar", [12.5]]
+      ])",
+                                         R"([
+      [2, "foo", []],
+      [3, "bar", [13.5]],
+      [1, "foo", null]
+      ])"};
+  std::string expected_json = R"([
+      [1, {"foo": [10.5, 11.5], "bar": null}   ],
+      [2, {"foo": [],           "bar": [12.5]} ],
+      [3, {"foo": null,         "bar": [13.5]} ]
+      ])";
+  PivotWiderOptions options(/*key_names=*/{"foo", "bar"});
+  TestPivot(key_type, value_type, options, table_json, expected_json);
+}
+
+TEST_P(GroupBy, PivotNullValueType) {
+  auto key_type = utf8();
+  auto value_type = null();
+  std::vector<std::string> table_json = {R"([
+      [1, "foo", null],
+      [2, "bar", null]
+      ])",
+                                         R"([
+      [2, "foo", null],
+      [3, "bar", null],
+      [1, "foo", null]
+      ])"};
+  std::string expected_json = R"([
+      [1, {"foo": null, "bar": null} ],
+      [2, {"foo": null, "bar": null} ],
+      [3, {"foo": null, "bar": null} ]
+      ])";
+  PivotWiderOptions options(/*key_names=*/{"foo", "bar"});
+  TestPivot(key_type, value_type, options, table_json, expected_json);
+}
+
+TEST_P(GroupBy, PivotNullValues) {
+  auto key_type = utf8();
+  auto value_type = float32();
+  std::vector<std::string> table_json = {R"([
+      [1, "width", 10.5],
+      [2, "width", null]
+      ])",
+                                         R"([
+      [2, "height", 12.5],
+      [2, "width",  13.5],
+      [1, "width",  null],
+      [2, "height", null]
+      ])",
+                                         R"([
+      [1, "width",  null],
+      [2, "height", null]
+      ])"};
+  std::string expected_json = R"([
+      [1, {"height": null, "width": 10.5} ],
+      [2, {"height": 12.5, "width": 13.5} ]
+      ])";
+  PivotWiderOptions options(/*key_names=*/{"height", "width"}, 
PivotWiderOptions::kRaise);
+  TestPivot(key_type, value_type, options, table_json, expected_json);
+}
+
+TEST_P(GroupBy, PivotScalarKey) {
+  BatchesWithSchema input;
+  std::vector<TypeHolder> types = {int32(), utf8(), float32()};
+  std::vector<ArgShape> shapes = {ArgShape::ARRAY, ArgShape::SCALAR, 
ArgShape::ARRAY};
+  input.batches = {
+      ExecBatchFromJSON(types, shapes,
+                        R"([
+        [1, "width",  10.5],
+        [2, "width",  11.5]
+        ])"),
+      ExecBatchFromJSON(types, shapes,
+                        R"([
+        [2, "width",  null]
+        ])"),
+      ExecBatchFromJSON(types, shapes,
+                        R"([
+        [3, "height", null],
+        [3, "height", null]
+        ])"),
+      ExecBatchFromJSON(types, shapes,
+                        R"([
+        [3, "height", 12.5],
+        [1, "height", 13.5]
+        ])"),
+  };
+  input.schema = schema({field("group_key", int32()), field("pivot_key", 
utf8()),
+                         field("pivot_value", float32())});
+  Datum expected = ArrayFromJSON(
+      struct_({field("group_key", int32()),
+               field("pivoted",
+                     struct_({field("height", float32()), field("width", 
float32())}))}),
+      R"([
+      [1, {"height": 13.5, "width": 10.5} ],
+      [2, {"height": null, "width": 11.5} ],
+      [3, {"height": 12.5, "width": null} ]
+      ])");
+  auto options = std::make_shared<PivotWiderOptions>(
+      PivotWiderOptions(/*key_names=*/{"height", "width"}));
+  Aggregate aggregate{"hash_pivot_wider", options,
+                      std::vector<FieldRef>{"pivot_key", "pivot_value"}, 
"pivoted"};
+  for (bool use_threads : {false, true}) {
+    SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+    ASSERT_OK_AND_ASSIGN(Datum actual,
+                         RunGroupBy(input, {"group_key"}, {aggregate}, 
use_threads));
+    ValidateOutput(actual);
+    AssertDatumsApproxEqual(expected, actual, /*verbose=*/true);
+  }
+}
+
+TEST_P(GroupBy, PivotUnusedKeyName) {
+  auto key_type = utf8();
+  auto value_type = float32();
+  std::vector<std::string> table_json = {R"([
+      [1, "width", 10.5],
+      [2, "width", 11.5]
+      ])",
+                                         R"([
+      [2, "height", 12.5],
+      [3, "width",  13.5],
+      [1, "height", 14.5]
+      ])"};
+  std::string expected_json = R"([
+      [1, {"height": 14.5, "depth": null, "width": 10.5} ],
+      [2, {"height": 12.5, "depth": null, "width": 11.5} ],
+      [3, {"height": null, "depth": null, "width": 13.5} ]
+      ])";
+  for (auto unexpected_key_behavior :
+       {PivotWiderOptions::kIgnore, PivotWiderOptions::kRaise}) {
+    PivotWiderOptions options(/*key_names=*/{"height", "depth", "width"},
+                              unexpected_key_behavior);
+    TestPivot(key_type, value_type, options, table_json, expected_json);
+  }
+}
+
+TEST_P(GroupBy, PivotUnexpectedKeyName) {
+  auto key_type = utf8();
+  auto value_type = float32();
+  std::vector<std::string> table_json = {R"([
+      [1, "width", 10.5],
+      [2, "width", 11.5]
+      ])",
+                                         R"([
+      [2, "height", 12.5],
+      [3, "width",  13.5],
+      [1, "depth",  15.5],
+      [1, "height", 14.5]
+      ])"};
+  PivotWiderOptions options(/*key_names=*/{"height", "width"});
+  std::string expected_json = R"([
+      [1, {"height": 14.5, "width": 10.5} ],
+      [2, {"height": 12.5, "width": 11.5} ],
+      [3, {"height": null, "width": 13.5} ]
+      ])";
+  TestPivot(key_type, value_type, options, table_json, expected_json);
+  options.unexpected_key_behavior = PivotWiderOptions::kRaise;
+  for (bool use_threads : {false, true}) {
+    ARROW_SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        KeyError, HasSubstr("Unexpected pivot key: depth"),
+        RunPivot(key_type, value_type, options, table_json, use_threads));
+  }
+}
+TEST_P(GroupBy, PivotNullKeys) {
+  auto key_type = utf8();
+  auto value_type = float32();
+  std::vector<std::string> table_json = {R"([
+      [1, "width", 10.5],
+      [2, null,    11.5]
+      ])"};
+  PivotWiderOptions options(/*key_names=*/{"height", "width"});
+  for (bool use_threads : {false, true}) {
+    ARROW_SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        KeyError, HasSubstr("pivot key name cannot be null"),
+        RunPivot(key_type, value_type, options, table_json, use_threads));
+  }
+}
+
+TEST_P(GroupBy, PivotDuplicateKeys) {
+  auto key_type = utf8();
+  auto value_type = float32();
+  std::vector<std::string> table_json = {R"([])"};
+  PivotWiderOptions options(/*key_names=*/{"height", "width", "height"});
+  EXPECT_RAISES_WITH_MESSAGE_THAT(
+      KeyError, HasSubstr("Duplicate key name 'height' in PivotWiderOptions"),
+      RunPivot(key_type, value_type, options, table_json));
+}
+
+TEST_P(GroupBy, PivotDuplicateValues) {
+  auto key_type = utf8();
+  auto value_type = float32();
+  PivotWiderOptions options(/*key_names=*/{"height", "width"});
+
+  for (bool use_threads : {false, true}) {
+    ARROW_SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+
+    // Duplicate values in same chunk
+    std::vector<std::string> table_json = {R"([
+        [1, "width", 10.5],
+        [2, "width", 11.5],
+        [1, "width", 11.5]
+        ])"};
+    EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid,
+                                    HasSubstr("Encountered more than one 
non-null value"),
+                                    RunPivot(key_type, value_type, options, 
table_json));
+
+    // Duplicate values in different chunks
+    table_json = {R"([
+        [1, "width", 10.5],
+        [2, "width", 11.5]
+        ])",
+                  R"([
+        [1, "width", 11.5]
+        ])"};
+    EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid,
+                                    HasSubstr("Encountered more than one 
non-null value"),
+                                    RunPivot(key_type, value_type, options, 
table_json));
+  }
+}
+
+TEST_P(GroupBy, PivotScalarKeyWithDuplicateValues) {
+  BatchesWithSchema input;
+  std::vector<TypeHolder> types = {int32(), utf8(), float32()};
+  std::vector<ArgShape> shapes = {ArgShape::ARRAY, ArgShape::SCALAR, 
ArgShape::ARRAY};
+  input.schema = schema({field("group_key", int32()), field("pivot_key", 
utf8()),
+                         field("pivot_value", float32())});
+  auto options = std::make_shared<PivotWiderOptions>(
+      PivotWiderOptions(/*key_names=*/{"height", "width"}));
+  Aggregate aggregate{"hash_pivot_wider", options,
+                      std::vector<FieldRef>{"pivot_key", "pivot_value"}, 
"pivoted"};
+
+  // Duplicate values in same chunk
+  input.batches = {
+      ExecBatchFromJSON(types, shapes,
+                        R"([
+        [1, "width",  10.5],
+        [1, "width",  11.5]
+        ])"),
+  };
+  for (bool use_threads : {false, true}) {
+    SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        Invalid, HasSubstr("Encountered more than one non-null value"),
+        RunGroupBy(input, {"group_key"}, {aggregate}, use_threads));
+  }
+
+  // Duplicate values in different chunks
+  input.batches = {
+      ExecBatchFromJSON(types, shapes,
+                        R"([
+        [1, "width",  10.5],
+        [2, "width",  11.5]
+        ])"),
+      ExecBatchFromJSON(types, shapes,
+                        R"([
+        [2, "width",  12.5]
+        ])"),
+  };
+  for (bool use_threads : {false, true}) {
+    SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        Invalid, HasSubstr("Encountered more than one non-null value"),
+        RunGroupBy(input, {"group_key"}, {aggregate}, use_threads));
+  }
+}
+
+struct RandomPivotTestCase {
+  PivotWiderOptions options;
+  std::shared_ptr<RecordBatch> input;
+  std::shared_ptr<Array> expected_output;
+};
+
+Result<RandomPivotTestCase> MakeRandomPivot(int64_t length) {
+  constexpr double kKeyPresenceProbability = 0.8;
+  constexpr double kValueValidityProbability = 0.7;
+
+  const std::vector<std::string> key_names = {"height", "width", "depth"};
+  std::default_random_engine gen(42);
+  std::uniform_real_distribution<float> value_dist(0.0f, 1.0f);
+  std::bernoulli_distribution key_presence_dist(kKeyPresenceProbability);
+  std::bernoulli_distribution value_validity_dist(kValueValidityProbability);
+
+  Int64Builder group_key_builder;
+  StringBuilder key_builder;
+  FloatBuilder value_builder;
+  RETURN_NOT_OK(group_key_builder.Reserve(length));
+  RETURN_NOT_OK(key_builder.Reserve(length));
+  RETURN_NOT_OK(value_builder.Reserve(length));
+
+  // The last input key name will not be part of the result
+  PivotWiderOptions options(
+      std::vector<std::string>(key_names.begin(), key_names.end() - 1));
+  Int64Builder pivoted_group_builder;
+  std::vector<FloatBuilder> pivoted_value_builders(options.key_names.size());
+
+  auto finish_group = [&](int64_t group_key) -> Status {
+    // First check if *any* pivoted column was populated (otherwise there was
+    // no valid value at all in this group, and no output row should be 
generated).
+    RETURN_NOT_OK(pivoted_group_builder.Append(group_key));
+    // Make sure all pivoted columns are populated and in sync with the group 
key column
+    for (auto& pivoted_value_builder : pivoted_value_builders) {
+      if (pivoted_value_builder.length() < pivoted_group_builder.length()) {
+        RETURN_NOT_OK(pivoted_value_builder.AppendNull());
+      }
+      EXPECT_EQ(pivoted_value_builder.length(), 
pivoted_group_builder.length());
+    }
+    return Status::OK();
+  };
+
+  int64_t group_key = 1000;
+  bool group_started = false;
+  int key_id = 0;
+  while (group_key_builder.length() < length) {
+    // For the current group_key and key_id we can either:
+    // 1. not add a row
+    // 2. add a row with a null value
+    // 3. add a row with a non-null value
+    //    3a. the row will end up in the pivoted data iff the key is part of
+    //        the PivotWiderOptions.key_names
+    if (key_presence_dist(gen)) {
+      group_key_builder.UnsafeAppend(group_key);
+      group_started = true;
+      RETURN_NOT_OK(key_builder.Append(key_names[key_id]));
+      if (value_validity_dist(gen)) {
+        const auto value = value_dist(gen);
+        value_builder.UnsafeAppend(value);
+        if (key_id < static_cast<int>(pivoted_value_builders.size())) {
+          RETURN_NOT_OK(pivoted_value_builders[key_id].Append(value));
+        }
+      } else {
+        value_builder.UnsafeAppendNull();
+      }
+    }
+    if (++key_id >= static_cast<int>(key_names.size())) {
+      // We've considered all keys for this group.
+      // Emit a pivoted row only if any key was emitted in the input.
+      if (group_started) {
+        RETURN_NOT_OK(finish_group(group_key));
+      }
+      // Initiate new group
+      ++group_key;
+      group_started = false;
+      key_id = 0;
+    }
+  }
+  if (group_started) {
+    // We've started this group, finish it
+    RETURN_NOT_OK(finish_group(group_key));
+  }
+  ARROW_ASSIGN_OR_RAISE(auto group_keys, group_key_builder.Finish());
+  ARROW_ASSIGN_OR_RAISE(auto keys, key_builder.Finish());
+  ARROW_ASSIGN_OR_RAISE(auto values, value_builder.Finish());
+  auto input_schema =
+      schema({{"group_key", int64()}, {"key", utf8()}, {"value", float32()}});
+  auto input = RecordBatch::Make(input_schema, length, {group_keys, keys, 
values});
+  RETURN_NOT_OK(input->Validate());
+
+  ARROW_ASSIGN_OR_RAISE(auto pivoted_groups, pivoted_group_builder.Finish());
+  ArrayVector pivoted_value_columns;
+  for (auto& pivoted_value_builder : pivoted_value_builders) {
+    ARROW_ASSIGN_OR_RAISE(pivoted_value_columns.emplace_back(),
+                          pivoted_value_builder.Finish());
+  }
+  ARROW_ASSIGN_OR_RAISE(
+      auto pivoted_values,
+      StructArray::Make(std::move(pivoted_value_columns), options.key_names));
+  ARROW_ASSIGN_OR_RAISE(auto output,
+                        StructArray::Make({pivoted_groups, pivoted_values},
+                                          std::vector<std::string>{"key_0", 
"out"}));
+  RETURN_NOT_OK(output->Validate());
+
+  return RandomPivotTestCase{std::move(options), std::move(input), 
std::move(output)};
+}
+
+TEST_P(GroupBy, PivotRandom) {
+  constexpr int64_t kLength = 900;
+  // Larger than 256 to exercise take-index dispatch in pivot implementation
+  constexpr int64_t kChunkLength = 300;
+  ASSERT_OK_AND_ASSIGN(auto pivot_case, MakeRandomPivot(kLength));
+
+  for (bool shuffle : {false, true}) {
+    ARROW_SCOPED_TRACE("shuffle = ", shuffle);
+    auto input = Datum(pivot_case.input);
+    if (shuffle) {
+      // Since the "value" column is random-generated, sorting on it produces
+      // a random shuffle.
+      ASSERT_OK_AND_ASSIGN(
+          auto shuffle_indices,
+          SortIndices(pivot_case.input, SortOptions({SortKey("value")})));
+      ASSERT_OK_AND_ASSIGN(input, Take(input, shuffle_indices));
+    }
+    ASSERT_EQ(input.kind(), Datum::RECORD_BATCH);
+    RecordBatchVector chunks;
+    for (int64_t start = 0; start < kLength; start += kChunkLength) {
+      const auto chunk_length = std::min(kLength - start, kChunkLength);
+      chunks.push_back(input.record_batch()->Slice(start, chunk_length));
+    }
+    ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(chunks));
+
+    for (bool use_threads : {false, true}) {
+      ARROW_SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+      ASSERT_OK_AND_ASSIGN(auto pivoted, RunPivot(utf8(), float32(), 
pivot_case.options,
+                                                  table, use_threads));
+      // XXX For some reason this works even in the shuffled case
+      // (I would expect the test to require sorting of the output).
+      // This might depend on implementation details of group id generation
+      // by the hash-aggregate logic (the pivot implementation implicitly
+      // orders the output by ascending group id).
+      AssertDatumsEqual(pivot_case.expected_output, pivoted, /*verbose=*/true);
+    }
+  }
+}
+
 INSTANTIATE_TEST_SUITE_P(GroupBy, GroupBy, ::testing::Values(RunGroupByImpl));
 
 class SegmentedScalarGroupBy : public GroupBy {};
@@ -4620,6 +5242,101 @@ TEST_P(SegmentedKeyGroupBy, MultiSegmentKeyCombined) {
   TestMultiSegmentKey(GetParam(), GetMultiSegmentInputAsCombined);
 }
 
+TEST_P(SegmentedKeyGroupBy, PivotSegmentKey) {
+  auto group_by = GetParam();
+  auto key_type = utf8();
+  auto value_type = float32();
+
+  std::vector<std::string> table_json = {R"([
+      [1, "width",  10.5],
+      [1, "height", 11.5]
+      ])",
+                                         R"([
+      [2, "height", 12.5],
+      [2, "width",  13.5],
+      [3, "width",  14.5]
+      ])",
+                                         R"([
+      [3, "width",  null],
+      [4, "height", 15.5]
+      ])"};
+  std::vector<std::string> expected_json = {
+      R"([[1, {"height": 11.5, "width": 10.5}]])",
+      R"([[2, {"height": 12.5, "width": 13.5}]])",
+      R"([[3, {"height": null, "width": 14.5}]])",
+      R"([[4, {"height": 15.5, "width": null}]])",
+  };
+
+  auto table =
+      TableFromJSON(schema({field("segment_key", int64()), field("pivot_key", 
key_type),
+                            field("pivot_value", value_type)}),
+                    table_json);
+
+  auto options = std::make_shared<PivotWiderOptions>(
+      PivotWiderOptions(/*key_names=*/{"height", "width"}));
+  Aggregate aggregate{"pivot_wider", options, std::vector<FieldRef>{"agg_0", 
"agg_1"},
+                      "pivoted"};
+  ASSERT_OK_AND_ASSIGN(Datum actual,
+                       group_by(
+                           {
+                               table->GetColumnByName("pivot_key"),
+                               table->GetColumnByName("pivot_value"),
+                           },
+                           {}, {table->GetColumnByName("segment_key")}, 
{aggregate},
+                           /*use_threads=*/false, /*naive=*/false));
+  ValidateOutput(actual);
+  auto expected = ChunkedArrayFromJSON(
+      struct_({field("key_0", int64()),
+               field("pivoted", struct_({field("height", value_type),
+                                         field("width", value_type)}))}),
+      expected_json);
+  AssertDatumsEqual(expected, actual, /*verbose=*/true);
+}
+
+TEST_P(SegmentedKeyGroupBy, PivotSegmentKeyDuplicateValues) {
+  // NOTE: besides testing "pivot_wider" behavior, this test also checks that 
errors
+  // produced when consuming or merging an aggregate don't corrupt
+  // execution engine internals.
+  auto group_by = GetParam();
+  auto key_type = utf8();
+  auto value_type = float32();
+  auto options = std::make_shared<PivotWiderOptions>(
+      PivotWiderOptions(/*key_names=*/{"height", "width"}));
+  auto table_schema = schema({field("segment_key", int64()), 
field("pivot_key", key_type),
+                              field("pivot_value", value_type)});
+
+  auto test_duplicate_values = [&](const std::vector<std::string>& table_json) 
{
+    auto table = TableFromJSON(table_schema, table_json);
+    Aggregate aggregate{"pivot_wider", options, std::vector<FieldRef>{"agg_0", 
"agg_1"},
+                        "pivoted"};
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        Invalid,
+        HasSubstr("Encountered more than one non-null value for the same pivot 
key"),
+        group_by(
+            {
+                table->GetColumnByName("pivot_key"),
+                table->GetColumnByName("pivot_value"),
+            },
+            {}, {table->GetColumnByName("segment_key")}, {aggregate},
+            /*use_threads=*/false, /*naive=*/false));
+  };
+
+  // Duplicate values in the same chunk
+  test_duplicate_values({R"([
+      [1, "width",  10.5],
+      [2, "width",  11.5],
+      [2, "width",  12.5]
+      ])"});
+  // Duplicate values in two different chunks
+  test_duplicate_values({R"([
+      [1, "width",  10.5],
+      [2, "width",  11.5]
+      ])",
+                         R"([
+      [2, "width",  12.5]
+      ])"});
+}
+
 INSTANTIATE_TEST_SUITE_P(SegmentedScalarGroupBy, SegmentedScalarGroupBy,
                          ::testing::Values(RunSegmentedGroupByImpl));
 
diff --git a/cpp/src/arrow/compute/api_aggregate.cc 
b/cpp/src/arrow/compute/api_aggregate.cc
index 49d8709660..37119d2b67 100644
--- a/cpp/src/arrow/compute/api_aggregate.cc
+++ b/cpp/src/arrow/compute/api_aggregate.cc
@@ -24,8 +24,8 @@
 #include "arrow/util/logging.h"
 
 namespace arrow {
-
 namespace internal {
+
 template <>
 struct EnumTraits<compute::CountOptions::CountMode>
     : BasicEnumTraits<compute::CountOptions::CountMode, 
compute::CountOptions::ONLY_VALID,
@@ -67,6 +67,24 @@ struct EnumTraits<compute::QuantileOptions::Interpolation>
     return "<INVALID>";
   }
 };
+
+template <>
+struct EnumTraits<compute::PivotWiderOptions::UnexpectedKeyBehavior>
+    : BasicEnumTraits<compute::PivotWiderOptions::UnexpectedKeyBehavior,
+                      compute::PivotWiderOptions::kIgnore,
+                      compute::PivotWiderOptions::kRaise> {
+  static std::string name() { return 
"PivotWiderOptions::UnexpectedKeyBehavior"; }
+  static std::string 
value_name(compute::PivotWiderOptions::UnexpectedKeyBehavior value) {
+    switch (value) {
+      case compute::PivotWiderOptions::kIgnore:
+        return "kIgnore";
+      case compute::PivotWiderOptions::kRaise:
+        return "kRaise";
+    }
+    return "<INVALID>";
+  }
+};
+
 }  // namespace internal
 
 namespace compute {
@@ -101,6 +119,9 @@ static auto kTDigestOptionsType = 
GetFunctionOptionsType<TDigestOptions>(
     DataMember("buffer_size", &TDigestOptions::buffer_size),
     DataMember("skip_nulls", &TDigestOptions::skip_nulls),
     DataMember("min_count", &TDigestOptions::min_count));
+static auto kPivotOptionsType = GetFunctionOptionsType<PivotWiderOptions>(
+    DataMember("key_names", &PivotWiderOptions::key_names),
+    DataMember("unexpected_key_behavior", 
&PivotWiderOptions::unexpected_key_behavior));
 static auto kIndexOptionsType =
     GetFunctionOptionsType<IndexOptions>(DataMember("value", 
&IndexOptions::value));
 }  // namespace
@@ -164,6 +185,13 @@ TDigestOptions::TDigestOptions(std::vector<double> q, 
uint32_t delta,
       min_count{min_count} {}
 constexpr char TDigestOptions::kTypeName[];
 
+PivotWiderOptions::PivotWiderOptions(std::vector<std::string> key_names,
+                                     UnexpectedKeyBehavior 
unexpected_key_behavior)
+    : FunctionOptions(internal::kPivotOptionsType),
+      key_names(std::move(key_names)),
+      unexpected_key_behavior(unexpected_key_behavior) {}
+PivotWiderOptions::PivotWiderOptions() : 
FunctionOptions(internal::kPivotOptionsType) {}
+
 IndexOptions::IndexOptions(std::shared_ptr<Scalar> value)
     : FunctionOptions(internal::kIndexOptionsType), value{std::move(value)} {}
 IndexOptions::IndexOptions() : IndexOptions(std::make_shared<NullScalar>()) {}
@@ -177,6 +205,7 @@ void RegisterAggregateOptions(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunctionOptionsType(kVarianceOptionsType));
   DCHECK_OK(registry->AddFunctionOptionsType(kQuantileOptionsType));
   DCHECK_OK(registry->AddFunctionOptionsType(kTDigestOptionsType));
+  DCHECK_OK(registry->AddFunctionOptionsType(kPivotOptionsType));
   DCHECK_OK(registry->AddFunctionOptionsType(kIndexOptionsType));
 }
 }  // namespace internal
diff --git a/cpp/src/arrow/compute/api_aggregate.h 
b/cpp/src/arrow/compute/api_aggregate.h
index 2e5210b073..66876ee66e 100644
--- a/cpp/src/arrow/compute/api_aggregate.h
+++ b/cpp/src/arrow/compute/api_aggregate.h
@@ -175,6 +175,88 @@ class ARROW_EXPORT TDigestOptions : public FunctionOptions 
{
   uint32_t min_count;
 };
 
+/// \brief Control Pivot kernel behavior
+///
+/// These options apply to the "pivot_wider" and "hash_pivot_wider" functions.
+///
+/// Constraints:
+/// - The corresponding `Aggregate::target` must have two FieldRef elements;
+///   the first one points to the pivot key column, the second points to the
+///   pivoted data column.
+/// - The pivot key column must be string-like; its values will be matched
+///   against `key_names` in order to dispatch the pivoted data into the
+///   output.
+///
+/// "pivot_wider" example
+/// ---------------------
+///
+/// Assuming the following two input columns with types utf8 and int16 
(respectively):
+/// ```
+/// width   |  11
+/// height  |  13
+/// ```
+/// and the options `PivotWiderOptions(.key_names = {"height", "width"})`
+///
+/// then the output will be a scalar with the type
+/// `struct{"height": int16, "width": int16}`
+/// and the value `{"height": 13, "width": 11}`.
+///
+/// "hash_pivot_wider" example
+/// --------------------------
+///
+/// Assuming the following input with schema
+/// `{"group": int32, "key": utf8, "value": int16}`:
+/// ```
+///  group |  key     |  value
+/// -----------------------------
+///   1    |  height  |    11
+///   1    |  width   |    12
+///   2    |  width   |    13
+///   3    |  height  |    14
+///   3    |  depth   |    15
+/// ```
+/// and the following settings:
+/// - a hash grouping key "group"
+/// - Aggregate(
+///     .function = "hash_pivot_wider",
+///     .options = PivotWiderOptions(.key_names = {"height", "width"}),
+///     .target = {"key", "value"},
+///     .name = {"properties"})
+///
+/// then the output will have the schema
+/// `{"group": int32, "properties": struct{"height": int16, "width": int16}}`
+/// and the following value:
+/// ```
+///  group |     properties
+///        |  height  |   width
+/// -----------------------------
+///   1    |   11     |    12
+///   2    |   null   |    13
+///   3    |   14     |    null
+/// ```
+class ARROW_EXPORT PivotWiderOptions : public FunctionOptions {
+ public:
+  /// Configure the behavior of pivot keys not in `key_names`
+  enum UnexpectedKeyBehavior {
+    /// Unexpected pivot keys are ignored silently
+    kIgnore,
+    /// Unexpected pivot keys return a KeyError
+    kRaise
+  };
+
+  explicit PivotWiderOptions(std::vector<std::string> key_names,
+                             UnexpectedKeyBehavior unexpected_key_behavior = 
kIgnore);
+  // Default constructor for serialization
+  PivotWiderOptions();
+  static constexpr char const kTypeName[] = "PivotWiderOptions";
+  static PivotWiderOptions Defaults() { return PivotWiderOptions{}; }
+
+  /// The values expected in the pivot key column
+  std::vector<std::string> key_names;
+  /// The behavior when pivot keys not in `key_names` are encountered
+  UnexpectedKeyBehavior unexpected_key_behavior = kIgnore;
+};
+
 /// \brief Control Index kernel behavior
 class ARROW_EXPORT IndexOptions : public FunctionOptions {
  public:
diff --git a/cpp/src/arrow/compute/kernels/aggregate_pivot.cc 
b/cpp/src/arrow/compute/kernels/aggregate_pivot.cc
new file mode 100644
index 0000000000..bcc2f53ac1
--- /dev/null
+++ b/cpp/src/arrow/compute/kernels/aggregate_pivot.cc
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/api_aggregate.h"
+#include "arrow/compute/kernels/aggregate_internal.h"
+#include "arrow/compute/kernels/common_internal.h"
+#include "arrow/compute/kernels/pivot_internal.h"
+#include "arrow/scalar.h"
+#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::compute::internal {
+namespace {
+
+using arrow::internal::VisitSetBitRunsVoid;
+using arrow::util::span;
+
+struct PivotImpl : public ScalarAggregator {
+  Status Init(const PivotWiderOptions& options, const std::vector<TypeHolder>& 
in_types) {
+    options_ = &options;
+    key_type_ = in_types[0].GetSharedPtr();
+    auto value_type = in_types[1].GetSharedPtr();
+    FieldVector fields;
+    fields.reserve(options_->key_names.size());
+    values_.reserve(options_->key_names.size());
+    for (const auto& key_name : options_->key_names) {
+      fields.push_back(field(key_name, value_type));
+      values_.push_back(MakeNullScalar(value_type));
+    }
+    out_type_ = struct_(std::move(fields));
+    ARROW_ASSIGN_OR_RAISE(key_mapper_, PivotWiderKeyMapper::Make(*key_type_, 
options_));
+    return Status::OK();
+  }
+
+  Status Consume(KernelContext*, const ExecSpan& batch) override {
+    DCHECK_EQ(batch.num_values(), 2);
+    if (batch[0].is_array()) {
+      ARROW_ASSIGN_OR_RAISE(span<const PivotWiderKeyIndex> keys,
+                            key_mapper_->MapKeys(batch[0].array));
+      if (batch[1].is_array()) {
+        // Array keys, array values
+        auto values = batch[1].array.ToArray();
+        for (int64_t i = 0; i < batch.length; ++i) {
+          PivotWiderKeyIndex key = keys[i];
+          if (key != kNullPivotKey && !values->IsNull(i)) {
+            if (ARROW_PREDICT_FALSE(values_[key]->is_valid)) {
+              return DuplicateValue();
+            }
+            ARROW_ASSIGN_OR_RAISE(values_[key], values->GetScalar(i));
+            DCHECK(values_[key]->is_valid);
+          }
+        }
+      } else {
+        // Array keys, scalar value
+        const Scalar* value = batch[1].scalar;
+        if (value->is_valid) {
+          for (int64_t i = 0; i < batch.length; ++i) {
+            PivotWiderKeyIndex key = keys[i];
+            if (key != kNullPivotKey) {
+              if (ARROW_PREDICT_FALSE(values_[key]->is_valid)) {
+                return DuplicateValue();
+              }
+              values_[key] = value->GetSharedPtr();
+            }
+          }
+        }
+      }
+    } else {
+      ARROW_ASSIGN_OR_RAISE(PivotWiderKeyIndex key,
+                            key_mapper_->MapKey(*batch[0].scalar));
+      if (key != kNullPivotKey) {
+        if (batch[1].is_array()) {
+          // Scalar key, array values
+          auto values = batch[1].array.ToArray();
+          for (int64_t i = 0; i < batch.length; ++i) {
+            if (!values->IsNull(i)) {
+              if (ARROW_PREDICT_FALSE(values_[key]->is_valid)) {
+                return DuplicateValue();
+              }
+              ARROW_ASSIGN_OR_RAISE(values_[key], values->GetScalar(i));
+              DCHECK(values_[key]->is_valid);
+            }
+          }
+        } else {
+          // Scalar key, scalar value
+          const Scalar* value = batch[1].scalar;
+          if (value->is_valid) {
+            if (batch.length > 1 || values_[key]->is_valid) {
+              return DuplicateValue();
+            }
+            values_[key] = value->GetSharedPtr();
+          }
+        }
+      }
+    }
+    return Status::OK();
+  }
+
+  Status MergeFrom(KernelContext*, KernelState&& src) override {
+    const auto& other_state = checked_cast<const PivotImpl&>(src);
+    for (int64_t key = 0; key < static_cast<int64_t>(values_.size()); ++key) {
+      if (other_state.values_[key]->is_valid) {
+        if (ARROW_PREDICT_FALSE(values_[key]->is_valid)) {
+          return DuplicateValue();
+        }
+        values_[key] = other_state.values_[key];
+      }
+    }
+    return Status::OK();
+  }
+
+  Status Finalize(KernelContext* ctx, Datum* out) override {
+    *out = std::make_shared<StructScalar>(std::move(values_), out_type_);
+    return Status::OK();
+  }
+
+  Status DuplicateValue() {
+    return Status::Invalid(
+        "Encountered more than one non-null value for the same pivot key");
+  }
+
+  std::shared_ptr<DataType> out_type() const { return out_type_; }
+
+  std::shared_ptr<DataType> key_type_;
+  std::shared_ptr<DataType> out_type_;
+  const PivotWiderOptions* options_;
+  std::unique_ptr<PivotWiderKeyMapper> key_mapper_;
+  ScalarVector values_;
+};
+
+Result<std::unique_ptr<KernelState>> PivotInit(KernelContext* ctx,
+                                               const KernelInitArgs& args) {
+  const auto& options = checked_cast<const PivotWiderOptions&>(*args.options);
+  DCHECK_EQ(args.inputs.size(), 2);
+  DCHECK(is_base_binary_like(args.inputs[0].id()));
+  auto state = std::make_unique<PivotImpl>();
+  RETURN_NOT_OK(state->Init(options, args.inputs));
+  return state;
+}
+
+Result<TypeHolder> ResolveOutputType(KernelContext* ctx, const 
std::vector<TypeHolder>&) {
+  return checked_cast<PivotImpl*>(ctx->state())->out_type();
+}
+
+const FunctionDoc pivot_doc{
+    "Pivot values according to a pivot key column",
+    ("Output is a struct with as many fields as 
`PivotWiderOptions.key_names`.\n"
+     "All output struct fields have the same type as `pivot_values`.\n"
+     "Each pivot key decides in which output field the corresponding pivot 
value\n"
+     "is emitted. If a pivot key doesn't appear, null is emitted.\n"
+     "If more than one non-null value is encountered for a given pivot key,\n"
+     "Invalid is raised.\n"
+     "Behavior of unexpected pivot keys is controlled by 
`unexpected_key_behavior`\n"
+     "in PivotWiderOptions."),
+    {"pivot_keys", "pivot_values"},
+    "PivotWiderOptions"};
+
+}  // namespace
+
+void RegisterScalarAggregatePivot(FunctionRegistry* registry) {
+  static auto default_pivot_options = PivotWiderOptions::Defaults();
+
+  auto func = std::make_shared<ScalarAggregateFunction>(
+      "pivot_wider", Arity::Binary(), pivot_doc, &default_pivot_options);
+
+  for (auto key_type : BaseBinaryTypes()) {
+    auto sig = KernelSignature::Make({key_type->id(), InputType::Any()},
+                                     OutputType(ResolveOutputType));
+    AddAggKernel(std::move(sig), PivotInit, func.get());
+  }
+  DCHECK_OK(registry->AddFunction(std::move(func)));
+}
+
+}  // namespace arrow::compute::internal
diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc 
b/cpp/src/arrow/compute/kernels/aggregate_test.cc
index e6ad915fd5..36bb783d35 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc
@@ -4307,5 +4307,294 @@ TEST(TestTDigestKernel, ApproximateMedian) {
   }
 }
 
+//
+// Pivot
+//
+
+class TestPivotKernel : public ::testing::Test {
+ public:
+  void AssertPivot(const Datum& keys, const Datum& values, const Scalar& 
expected,
+                   const PivotWiderOptions& options) {
+    SCOPED_TRACE(options.ToString());
+    ASSERT_OK_AND_ASSIGN(Datum out,
+                         CallFunction("pivot_wider", {keys, values}, 
&options));
+    ValidateOutput(out);
+    ASSERT_TRUE(out.is_scalar());
+    AssertScalarsEqual(expected, *out.scalar(), /*verbose=*/true);
+  }
+};
+
+TEST_F(TestPivotKernel, Basics) {
+  auto key_type = utf8();
+  auto value_type = float32();
+
+  auto keys = ArrayFromJSON(key_type, R"(["width", "height"])");
+  auto values = ArrayFromJSON(value_type, "[10.5, 11.5]");
+  auto expected = ScalarFromJSON(
+      struct_({field("height", value_type), field("width", value_type)}), 
"[11.5, 10.5]");
+  AssertPivot(keys, values, *expected,
+              PivotWiderOptions(/*key_names=*/{"height", "width"}));
+}
+
+TEST_F(TestPivotKernel, AllKeyTypes) {
+  for (auto key_type : BaseBinaryTypes()) {
+    auto value_type = float32();
+
+    auto keys = ArrayFromJSON(key_type, R"(["width", "height"])");
+    auto values = ArrayFromJSON(value_type, "[10.5, 11.5]");
+    auto expected =
+        ScalarFromJSON(struct_({field("height", value_type), field("width", 
value_type)}),
+                       "[11.5, 10.5]");
+    AssertPivot(keys, values, *expected,
+                PivotWiderOptions(/*key_names=*/{"height", "width"}));
+  }
+}
+
+TEST_F(TestPivotKernel, Numbers) {
+  auto key_type = utf8();
+  for (auto value_type : NumericTypes()) {
+    auto keys = ArrayFromJSON(key_type, R"(["width", "height"])");
+    auto values = ArrayFromJSON(value_type, "[10, 11]");
+    auto expected = ScalarFromJSON(
+        struct_({field("height", value_type), field("width", value_type)}), 
"[11, 10]");
+    AssertPivot(keys, values, *expected,
+                PivotWiderOptions(/*key_names=*/{"height", "width"}));
+  }
+}
+
+TEST_F(TestPivotKernel, Binary) {
+  auto key_type = utf8();
+  for (auto value_type : BaseBinaryTypes()) {
+    auto keys = ArrayFromJSON(key_type, R"(["abc", "def"])");
+    auto values = ArrayFromJSON(value_type, R"(["foo", "bar"])");
+    auto expected =
+        ScalarFromJSON(struct_({field("abc", value_type), field("def", 
value_type)}),
+                       R"(["foo", "bar"])");
+    AssertPivot(keys, values, *expected, 
PivotWiderOptions(/*key_names=*/{"abc", "def"}));
+  }
+}
+
+TEST_F(TestPivotKernel, NullType) {
+  auto key_type = utf8();
+  auto value_type = null();
+
+  auto keys = ArrayFromJSON(key_type, R"(["abc", "def"])");
+  auto values = ArrayFromJSON(value_type, "[null, null]");
+  auto expected = ScalarFromJSON(
+      struct_({field("abc", value_type), field("def", value_type)}), R"([null, 
null])");
+  AssertPivot(keys, values, *expected, PivotWiderOptions(/*key_names=*/{"abc", 
"def"}));
+}
+
+TEST_F(TestPivotKernel, NullValues) {
+  auto key_type = utf8();
+  auto value_type = float32();
+
+  auto keys = ArrayFromJSON(key_type, R"(["width", "height", "height", 
"width"])");
+  auto values = ArrayFromJSON(value_type, "[null, 10.5, null, 11.5]");
+  auto expected = ScalarFromJSON(
+      struct_({field("height", value_type), field("width", value_type)}), 
"[10.5, 11.5]");
+  AssertPivot(keys, values, *expected,
+              PivotWiderOptions(/*key_names=*/{"height", "width"}));
+}
+
+TEST_F(TestPivotKernel, ChunkedInput) {
+  auto key_type = utf8();
+  auto value_type = float32();
+
+  auto keys = ChunkedArrayFromJSON(key_type,
+                                   {R"(["width"])", R"(["height", "height", 
"width"])"});
+  auto values = ChunkedArrayFromJSON(value_type, {"[null, 10.5]", "[null, 
11.5]"});
+  auto expected = ScalarFromJSON(
+      struct_({field("height", value_type), field("width", value_type)}), 
"[10.5, 11.5]");
+  AssertPivot(keys, values, *expected,
+              PivotWiderOptions(/*key_names=*/{"height", "width"}));
+}
+
+TEST_F(TestPivotKernel, AllInputKinds) {
+  auto key_type = utf8();
+  auto value_type = float32();
+
+  DatumVector key_args = {
+      ScalarFromJSON(key_type, R"("width")"),
+      ArrayFromJSON(key_type, R"(["width"])"),
+      ChunkedArrayFromJSON(key_type, {R"(["width"])"}),
+  };
+  DatumVector value_args = {
+      ScalarFromJSON(value_type, "11.5"),
+      ArrayFromJSON(value_type, "[11.5]"),
+      ChunkedArrayFromJSON(value_type, {"[11.5]"}),
+  };
+  auto expected = ScalarFromJSON(
+      struct_({field("height", value_type), field("width", value_type)}), 
"[null, 11.5]");
+
+  for (const Datum& keys : key_args) {
+    ARROW_SCOPED_TRACE("keys = ", keys.ToString());
+    for (const Datum& values : value_args) {
+      ARROW_SCOPED_TRACE("values = ", keys.ToString());
+      AssertPivot(keys, values, *expected,
+                  PivotWiderOptions(/*key_names=*/{"height", "width"}));
+    }
+  }
+}
+
+TEST_F(TestPivotKernel, ScalarKey) {
+  auto key_type = utf8();
+  auto value_type = float32();
+  auto expected_type = struct_({field("height", value_type), field("width", 
value_type)});
+
+  auto keys = ScalarFromJSON(key_type, R"("width")");
+  auto values = ArrayFromJSON(value_type, "[null, 11.5, null]");
+  auto expected = ScalarFromJSON(expected_type, "[null, 11.5]");
+  AssertPivot(keys, values, *expected,
+              PivotWiderOptions(/*key_names=*/{"height", "width"}));
+}
+
+TEST_F(TestPivotKernel, ScalarValue) {
+  auto key_type = utf8();
+  auto value_type = float32();
+  auto expected_type = struct_({field("height", value_type), field("width", 
value_type)});
+
+  auto keys = ArrayFromJSON(key_type, R"(["width", "height"])");
+  auto values = ScalarFromJSON(value_type, "11.5");
+  auto expected = ScalarFromJSON(expected_type, "[11.5, 11.5]");
+  AssertPivot(keys, values, *expected,
+              PivotWiderOptions(/*key_names=*/{"height", "width"}));
+}
+
+TEST_F(TestPivotKernel, EmptyInput) {
+  auto key_type = utf8();
+  auto value_type = float32();
+  auto options = PivotWiderOptions(/*key_names=*/{"height", "width"});
+  auto expected_type = struct_({field("height", value_type), field("width", 
value_type)});
+  auto expected = ScalarFromJSON(expected_type, "[null, null]");
+
+  AssertPivot(ArrayFromJSON(key_type, "[]"), ArrayFromJSON(value_type, "[]"), 
*expected,
+              options);
+  AssertPivot(ChunkedArrayFromJSON(key_type, {}), 
ChunkedArrayFromJSON(value_type, {}),
+              *expected, options);
+}
+
+TEST_F(TestPivotKernel, MissingKey) {
+  auto key_type = utf8();
+  auto value_type = float32();
+
+  auto keys = ArrayFromJSON(key_type, R"(["width", "height"])");
+  auto values = ArrayFromJSON(value_type, "[10.5, 11.5]");
+  auto options = PivotWiderOptions(/*key_names=*/{"height", "width", "depth"});
+  auto expected =
+      ScalarFromJSON(struct_({field("height", value_type), field("width", 
value_type),
+                              field("depth", value_type)}),
+                     "[11.5, 10.5, null]");
+  AssertPivot(keys, values, *expected, options);
+}
+
+TEST_F(TestPivotKernel, UnexpectedKey) {
+  auto key_type = utf8();
+  auto value_type = float32();
+  auto expected_type = struct_({field("height", value_type), field("width", 
value_type)});
+
+  auto options = PivotWiderOptions(/*key_names=*/{"height", "width"});
+  auto options_raise =
+      PivotWiderOptions(/*key_names=*/{"height", "width"}, 
PivotWiderOptions::kRaise);
+
+  {
+    auto keys = ArrayFromJSON(key_type, R"(["width", "height", "depth"])");
+    auto values = ArrayFromJSON(value_type, "[10.5, 11.5, 12.5]");
+    auto expected = ScalarFromJSON(expected_type, "[11.5, 10.5]");
+    AssertPivot(keys, values, *expected, options);
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        KeyError, ::testing::HasSubstr("Unexpected pivot key: depth"),
+        CallFunction("pivot_wider", {keys, values}, &options_raise));
+  }
+  {
+    // Scalar key
+    auto keys = ScalarFromJSON(key_type, R"("depth")");
+    auto expected = ScalarFromJSON(expected_type, "[null, null]");
+    for (const Datum& values : DatumVector{ArrayFromJSON(value_type, "[10.5]"),
+                                           ScalarFromJSON(value_type, 
"10.5")}) {
+      AssertPivot(keys, values, *expected, options);
+      EXPECT_RAISES_WITH_MESSAGE_THAT(
+          KeyError, ::testing::HasSubstr("Unexpected pivot key: depth"),
+          CallFunction("pivot_wider", {keys, values}, &options_raise));
+    }
+  }
+  {
+    // Scalar value
+    auto values = ScalarFromJSON(value_type, "10.5");
+    auto expected = ScalarFromJSON(expected_type, "[null, null]");
+    for (const Datum& keys : DatumVector{ArrayFromJSON(key_type, 
R"(["depth"])"),
+                                         ScalarFromJSON(key_type, 
R"("depth")")}) {
+      AssertPivot(keys, values, *expected, options);
+      EXPECT_RAISES_WITH_MESSAGE_THAT(
+          KeyError, ::testing::HasSubstr("Unexpected pivot key: depth"),
+          CallFunction("pivot_wider", {keys, values}, &options_raise));
+    }
+  }
+}
+
+TEST_F(TestPivotKernel, NullKey) {
+  auto key_type = utf8();
+  auto value_type = float32();
+
+  auto keys = ArrayFromJSON(key_type, R"(["width", null])");
+  auto values = ArrayFromJSON(value_type, "[10.5, 11.5]");
+  auto options = PivotWiderOptions(/*key_names=*/{"height", "width"});
+  EXPECT_RAISES_WITH_MESSAGE_THAT(KeyError,
+                                  ::testing::HasSubstr("pivot key name cannot 
be null"),
+                                  CallFunction("pivot_wider", {keys, values}, 
&options));
+}
+
+TEST_F(TestPivotKernel, DuplicateKeyNames) {
+  auto key_type = utf8();
+  auto value_type = float32();
+
+  auto keys = ArrayFromJSON(key_type, "[]");
+  auto values = ArrayFromJSON(value_type, "[]");
+  auto options = PivotWiderOptions(/*key_names=*/{"height", "height", 
"width"});
+  EXPECT_RAISES_WITH_MESSAGE_THAT(
+      KeyError, ::testing::HasSubstr("Duplicate key name 'height' in 
PivotWiderOptions"),
+      CallFunction("pivot_wider", {keys, values}, &options));
+}
+
+TEST_F(TestPivotKernel, DuplicateValues) {
+  auto key_type = utf8();
+  auto value_type = float32();
+  auto options = PivotWiderOptions(/*key_names=*/{"height", "width"});
+
+  {
+    // Duplicate values in the same chunk
+    auto keys = ArrayFromJSON(key_type, R"(["width", "height", "height"])");
+    auto values = ArrayFromJSON(value_type, "[10.5, 11.5, 12.5]");
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        Invalid, ::testing::HasSubstr("Encountered more than one non-null 
value"),
+        CallFunction("pivot_wider", {keys, values}, &options));
+  }
+  {
+    // Duplicate values in different chunks
+    auto keys =
+        ChunkedArrayFromJSON(key_type, {R"(["width", "height"])", 
R"(["height"])"});
+    auto values = ChunkedArrayFromJSON(value_type, {"[10.5, 11.5]", "[12.5]"});
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        Invalid, ::testing::HasSubstr("Encountered more than one non-null 
value"),
+        CallFunction("pivot_wider", {keys, values}, &options));
+  }
+  {
+    // Duplicate values with scalar key
+    auto keys = ScalarFromJSON(key_type, R"("width")");
+    auto values = ArrayFromJSON(value_type, "[10.5, 11.5]");
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        Invalid, ::testing::HasSubstr("Encountered more than one non-null 
value"),
+        CallFunction("pivot_wider", {keys, values}, &options));
+  }
+  {
+    // Duplicate values with scalar value
+    auto keys = ArrayFromJSON(key_type, R"(["width", "height", "height"])");
+    auto values = ScalarFromJSON(value_type, "10.5");
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        Invalid, ::testing::HasSubstr("Encountered more than one non-null 
value"),
+        CallFunction("pivot_wider", {keys, values}, &options));
+  }
+}
+
 }  // namespace compute
 }  // namespace arrow
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc 
b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
index 21b7bd9bf6..b52f1c8286 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
@@ -26,6 +26,7 @@
 
 #include "arrow/array/builder_nested.h"
 #include "arrow/array/builder_primitive.h"
+#include "arrow/array/concatenate.h"
 #include "arrow/buffer_builder.h"
 #include "arrow/compute/api_aggregate.h"
 #include "arrow/compute/api_vector.h"
@@ -33,6 +34,7 @@
 #include "arrow/compute/kernels/aggregate_internal.h"
 #include "arrow/compute/kernels/aggregate_var_std_internal.h"
 #include "arrow/compute/kernels/common_internal.h"
+#include "arrow/compute/kernels/pivot_internal.h"
 #include "arrow/compute/kernels/util_internal.h"
 #include "arrow/compute/row/grouper.h"
 #include "arrow/compute/row/row_encoder_internal.h"
@@ -40,6 +42,7 @@
 #include "arrow/stl_allocator.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/bit_run_reader.h"
+#include "arrow/util/bit_util.h"
 #include "arrow/util/bitmap_ops.h"
 #include "arrow/util/bitmap_writer.h"
 #include "arrow/util/checked_cast.h"
@@ -47,6 +50,7 @@
 #include "arrow/util/int128_internal.h"
 #include "arrow/util/int_util_overflow.h"
 #include "arrow/util/ree_util.h"
+#include "arrow/util/span.h"
 #include "arrow/util/task_group.h"
 #include "arrow/util/tdigest.h"
 #include "arrow/util/thread_pool.h"
@@ -56,6 +60,7 @@ namespace arrow {
 
 using internal::checked_cast;
 using internal::FirstTimeBitmapWriter;
+using util::span;
 
 namespace compute {
 namespace internal {
@@ -3319,9 +3324,404 @@ struct GroupedListFactory {
   HashAggregateKernel kernel;
   InputType argument_type;
 };
-}  // namespace
 
-namespace {
+// ----------------------------------------------------------------------
+// Pivot implementation
+
+struct GroupedPivotAccumulator {
+  Status Init(ExecContext* ctx, std::shared_ptr<DataType> value_type,
+              const PivotWiderOptions* options) {
+    ctx_ = ctx;
+    value_type_ = std::move(value_type);
+    num_keys_ = static_cast<int>(options->key_names.size());
+    num_groups_ = 0;
+    columns_.resize(num_keys_);
+    scratch_buffer_ = BufferBuilder(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Consume(span<const uint32_t> groups, span<const PivotWiderKeyIndex> 
keys,
+                 const ArraySpan& values) {
+    // To dispatch the values into the right (group, key) coordinates,
+    // we first compute a vector of take indices for each output column.
+    //
+    // For each index #i, we set take_indices[keys[#i]][groups[#i]] = #i.
+    // Unpopulated take_indices entries are null.
+    //
+    // For example, assuming we get:
+    //   groups  |  keys
+    // ===================
+    //    1      |   0
+    //    3      |   1
+    //    1      |   1
+    //    0      |   1
+    //
+    // We are going to compute:
+    // - take_indices[key = 0] = [null, 0, null, null]
+    // - take_indices[key = 1] = [3, 2, null, 1]
+    //
+    // Then each output column is computed by taking the values with the
+    // respective take_indices for the column's keys.
+    //
+
+    DCHECK_EQ(groups.size(), keys.size());
+    DCHECK_EQ(groups.size(), static_cast<size_t>(values.length));
+
+    std::shared_ptr<DataType> take_index_type;
+    std::vector<std::shared_ptr<Buffer>> take_indices(num_keys_);
+    std::vector<std::shared_ptr<Buffer>> take_bitmaps(num_keys_);
+
+    // A generic lambda that computes the take indices with the desired 
integer width
+    auto compute_take_indices = [&](auto typed_index) {
+      ARROW_UNUSED(typed_index);
+      using TakeIndex = std::decay_t<decltype(typed_index)>;
+      take_index_type = CTypeTraits<TakeIndex>::type_singleton();
+
+      const int64_t take_indices_size =
+          bit_util::RoundUpToMultipleOf64(num_groups_ * sizeof(TakeIndex));
+      const int64_t take_bitmap_size =
+          bit_util::RoundUpToMultipleOf64(bit_util::BytesForBits(num_groups_));
+      const int64_t total_scratch_size =
+          num_keys_ * (take_indices_size + take_bitmap_size);
+      RETURN_NOT_OK(scratch_buffer_.Resize(total_scratch_size, 
/*shrink_to_fit=*/false));
+
+      // Slice the scratch space into individual buffers for each output 
column's
+      // take_indices array.
+      std::vector<TakeIndex*> take_indices_data(num_keys_);
+      std::vector<uint8_t*> take_bitmap_data(num_keys_);
+      int64_t offset = 0;
+      for (int i = 0; i < num_keys_; ++i) {
+        take_indices[i] = std::make_shared<MutableBuffer>(
+            scratch_buffer_.mutable_data() + offset, take_indices_size);
+        take_indices_data[i] = take_indices[i]->mutable_data_as<TakeIndex>();
+        offset += take_indices_size;
+        take_bitmaps[i] = std::make_shared<MutableBuffer>(
+            scratch_buffer_.mutable_data() + offset, take_bitmap_size);
+        take_bitmap_data[i] = take_bitmaps[i]->mutable_data();
+        memset(take_bitmap_data[i], 0, take_bitmap_size);
+        offset += take_bitmap_size;
+      }
+      DCHECK_LE(offset, scratch_buffer_.capacity());
+
+      // Populate the take_indices for each output column
+      for (int64_t i = 0; i < values.length; ++i) {
+        const PivotWiderKeyIndex key = keys[i];
+        if (key != kNullPivotKey && !values.IsNull(i)) {
+          DCHECK_LT(static_cast<int>(key), num_keys_);
+          const uint32_t group = groups[i];
+          if (bit_util::GetBit(take_bitmap_data[key], group)) {
+            return DuplicateValue();
+          }
+          // For row #group in column #key, we are going to take the value at 
index #i
+          bit_util::SetBit(take_bitmap_data[key], group);
+          take_indices_data[key][group] = static_cast<TakeIndex>(i);
+        }
+      }
+      return Status::OK();
+    };
+
+    // Call compute_take_indices with the optimal integer width
+    if (values.length <= 
static_cast<int64_t>(std::numeric_limits<uint8_t>::max())) {
+      RETURN_NOT_OK(compute_take_indices(uint8_t{}));
+    } else if (values.length <=
+               static_cast<int64_t>(std::numeric_limits<uint16_t>::max())) {
+      RETURN_NOT_OK(compute_take_indices(uint16_t{}));
+    } else if (values.length <=
+               static_cast<int64_t>(std::numeric_limits<uint32_t>::max())) {
+      RETURN_NOT_OK(compute_take_indices(uint32_t{}));
+    } else {
+      RETURN_NOT_OK(compute_take_indices(uint64_t{}));
+    }
+
+    // Use take_indices to compute the output columns for this batch
+    auto values_data = values.ToArrayData();
+    ArrayVector new_columns(num_keys_);
+    TakeOptions take_options(/*boundscheck=*/false);
+    for (int i = 0; i < num_keys_; ++i) {
+      auto indices_data =
+          ArrayData::Make(take_index_type, num_groups_,
+                          {std::move(take_bitmaps[i]), 
std::move(take_indices[i])});
+      // If indices_data is all nulls, we can just ignore this column.
+      if (indices_data->GetNullCount() != indices_data->length) {
+        ARROW_ASSIGN_OR_RAISE(Datum grouped_column,
+                              Take(values_data, indices_data, take_options, 
ctx_));
+        new_columns[i] = grouped_column.make_array();
+      }
+    }
+    // Merge them with the previous columns
+    return MergeColumns(std::move(new_columns));
+  }
+
+  Status Consume(span<const uint32_t> groups, const PivotWiderKeyIndex key,
+                 const ArraySpan& values) {
+    if (key == kNullPivotKey) {
+      // Nothing to update
+      return Status::OK();
+    }
+    DCHECK_LT(static_cast<int>(key), num_keys_);
+    DCHECK_EQ(groups.size(), static_cast<size_t>(values.length));
+
+    // The algorithm is simpler than in the array-taking version of Consume()
+    // below, since only the column #key needs to be updated.
+    std::shared_ptr<DataType> take_index_type;
+    std::shared_ptr<Buffer> take_indices;
+    std::shared_ptr<Buffer> take_bitmap;
+
+    // A generic lambda that computes the take indices with the desired 
integer width
+    auto compute_take_indices = [&](auto typed_index) {
+      ARROW_UNUSED(typed_index);
+      using TakeIndex = std::decay_t<decltype(typed_index)>;
+      take_index_type = CTypeTraits<TakeIndex>::type_singleton();
+
+      const int64_t take_indices_size =
+          bit_util::RoundUpToMultipleOf64(num_groups_ * sizeof(TakeIndex));
+      const int64_t take_bitmap_size =
+          bit_util::RoundUpToMultipleOf64(bit_util::BytesForBits(num_groups_));
+      const int64_t total_scratch_size = take_indices_size + take_bitmap_size;
+      RETURN_NOT_OK(scratch_buffer_.Resize(total_scratch_size, 
/*shrink_to_fit=*/false));
+
+      take_indices = 
std::make_shared<MutableBuffer>(scratch_buffer_.mutable_data(),
+                                                     take_indices_size);
+      take_bitmap = std::make_shared<MutableBuffer>(
+          scratch_buffer_.mutable_data() + take_indices_size, 
take_bitmap_size);
+      auto take_indices_data = take_indices->mutable_data_as<TakeIndex>();
+      auto take_bitmap_data = take_bitmap->mutable_data();
+      memset(take_bitmap_data, 0, take_bitmap_size);
+
+      for (int64_t i = 0; i < values.length; ++i) {
+        const uint32_t group = groups[i];
+        if (!values.IsNull(i)) {
+          if (bit_util::GetBit(take_bitmap_data, group)) {
+            return DuplicateValue();
+          }
+          bit_util::SetBit(take_bitmap_data, group);
+          take_indices_data[group] = static_cast<TakeIndex>(i);
+        }
+      }
+      return Status::OK();
+    };
+
+    // Call compute_take_indices with the optimal integer width
+    if (values.length <= 
static_cast<int64_t>(std::numeric_limits<uint8_t>::max())) {
+      RETURN_NOT_OK(compute_take_indices(uint8_t{}));
+    } else if (values.length <=
+               static_cast<int64_t>(std::numeric_limits<uint16_t>::max())) {
+      RETURN_NOT_OK(compute_take_indices(uint16_t{}));
+    } else if (values.length <=
+               static_cast<int64_t>(std::numeric_limits<uint32_t>::max())) {
+      RETURN_NOT_OK(compute_take_indices(uint32_t{}));
+    } else {
+      RETURN_NOT_OK(compute_take_indices(uint64_t{}));
+    }
+
+    // Use take_indices to update column #key
+    auto values_data = values.ToArrayData();
+    auto indices_data = ArrayData::Make(
+        take_index_type, num_groups_, {std::move(take_bitmap), 
std::move(take_indices)});
+    TakeOptions take_options(/*boundscheck=*/false);
+    ARROW_ASSIGN_OR_RAISE(Datum grouped_column,
+                          Take(values_data, indices_data, take_options, ctx_));
+    return MergeColumn(&columns_[key], grouped_column.make_array());
+  }
+
+  Status Resize(int64_t new_num_groups) {
+    if (new_num_groups > std::numeric_limits<int32_t>::max()) {
+      return Status::NotImplemented("Pivot with more 2**31 groups");
+    }
+    return ResizeColumns(new_num_groups);
+  }
+
+  Status Merge(GroupedPivotAccumulator&& other, const ArrayData& 
group_id_mapping) {
+    // To merge `other` into `*this`, we simply merge their respective columns.
+    // However, we must first transpose `other`'s rows using 
`group_id_mapping`.
+    // This is a logical "scatter" operation.
+    //
+    // Since `scatter(indices)` is implemented as 
`take(inverse_permutation(indices))`,
+    // we can save time by computing `inverse_permutation(indices)` once for 
all
+    // columns.
+
+    // Scatter/InversePermutation only accept signed indices. We checked
+    // in Resize() above that we were inside the limites for int32.
+    auto scatter_indices = group_id_mapping.Copy();
+    scatter_indices->type = int32();
+    std::shared_ptr<DataType> take_indices_type;
+    if (num_groups_ - 1 <= std::numeric_limits<int8_t>::max()) {
+      take_indices_type = int8();
+    } else if (num_groups_ - 1 <= std::numeric_limits<int16_t>::max()) {
+      take_indices_type = int16();
+    } else {
+      DCHECK_GE(num_groups_ - 1, std::numeric_limits<int32_t>::max());
+      take_indices_type = int32();
+    }
+    InversePermutationOptions options(/*max_index=*/num_groups_ - 1, 
take_indices_type);
+    ARROW_ASSIGN_OR_RAISE(auto take_indices,
+                          InversePermutation(scatter_indices, options, ctx_));
+    auto scatter_column =
+        [&](const std::shared_ptr<Array>& column) -> 
Result<std::shared_ptr<Array>> {
+      TakeOptions take_options(/*boundscheck=*/false);
+      ARROW_ASSIGN_OR_RAISE(auto scattered,
+                            Take(column, take_indices, take_options, ctx_));
+      return scattered.make_array();
+    };
+    return MergeColumns(std::move(other.columns_), std::move(scatter_column));
+  }
+
+  Result<ArrayVector> Finalize() {
+    // Ensure that columns are allocated even if num_groups_ == 0
+    RETURN_NOT_OK(ResizeColumns(num_groups_));
+    return std::move(columns_);
+  }
+
+ protected:
+  Status ResizeColumns(int64_t new_num_groups) {
+    if (new_num_groups == num_groups_ && num_groups_ != 0) {
+      return Status::OK();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        auto array_suffix,
+        MakeArrayOfNull(value_type_, new_num_groups - num_groups_, 
ctx_->memory_pool()));
+    for (auto& column : columns_) {
+      if (num_groups_ != 0) {
+        DCHECK_NE(column, nullptr);
+        ARROW_ASSIGN_OR_RAISE(
+            column, Concatenate({std::move(column), array_suffix}, 
ctx_->memory_pool()));
+      } else {
+        column = array_suffix;
+      }
+      DCHECK_EQ(column->length(), new_num_groups);
+    }
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  using ColumnTransform =
+      std::function<Result<std::shared_ptr<Array>>(const 
std::shared_ptr<Array>&)>;
+
+  Status MergeColumns(ArrayVector&& other_columns,
+                      const ColumnTransform& transform = {}) {
+    DCHECK_EQ(columns_.size(), other_columns.size());
+    for (int i = 0; i < num_keys_; ++i) {
+      if (other_columns[i]) {
+        RETURN_NOT_OK(MergeColumn(&columns_[i], std::move(other_columns[i]), 
transform));
+      }
+    }
+    return Status::OK();
+  }
+
+  Status MergeColumn(std::shared_ptr<Array>* column, std::shared_ptr<Array> 
other_column,
+                     const ColumnTransform& transform = {}) {
+    if (other_column->null_count() == other_column->length()) {
+      // Avoid paying for the transform step below, since merging will be a 
no-op anyway.
+      return Status::OK();
+    }
+    if (transform) {
+      ARROW_ASSIGN_OR_RAISE(other_column, transform(other_column));
+    }
+    DCHECK_EQ(num_groups_, other_column->length());
+    if (!*column) {
+      *column = other_column;
+      return Status::OK();
+    }
+    if ((*column)->null_count() == (*column)->length()) {
+      *column = other_column;
+      return Status::OK();
+    }
+    int64_t expected_non_nulls = (num_groups_ - (*column)->null_count()) +
+                                 (num_groups_ - other_column->null_count());
+    ARROW_ASSIGN_OR_RAISE(auto coalesced,
+                          CallFunction("coalesce", {*column, other_column}, 
ctx_));
+    // Check that all non-null values in other_column and column were kept in 
the result.
+    if (expected_non_nulls != num_groups_ - coalesced.null_count()) {
+      DCHECK_GT(expected_non_nulls, num_groups_ - coalesced.null_count());
+      return DuplicateValue();
+    }
+    *column = coalesced.make_array();
+    return Status::OK();
+  }
+
+  Status DuplicateValue() {
+    return Status::Invalid(
+        "Encountered more than one non-null value for the same grouped pivot 
key");
+  }
+
+  ExecContext* ctx_;
+  std::shared_ptr<DataType> value_type_;
+  int num_keys_;
+  int64_t num_groups_;
+  ArrayVector columns_;
+  // A persistent scratch buffer to store the take indices in Consume
+  BufferBuilder scratch_buffer_;
+};
+
+struct GroupedPivotImpl : public GroupedAggregator {
+  Status Init(ExecContext* ctx, const KernelInitArgs& args) override {
+    DCHECK_EQ(args.inputs.size(), 3);
+    key_type_ = args.inputs[0].GetSharedPtr();
+    options_ = checked_cast<const PivotWiderOptions*>(args.options);
+    DCHECK_NE(options_, nullptr);
+    auto value_type = args.inputs[1].GetSharedPtr();
+    FieldVector fields;
+    fields.reserve(options_->key_names.size());
+    for (const auto& key_name : options_->key_names) {
+      fields.push_back(field(key_name, value_type));
+    }
+    out_type_ = struct_(std::move(fields));
+    out_struct_type_ = checked_cast<const StructType*>(out_type_.get());
+    ARROW_ASSIGN_OR_RAISE(key_mapper_, PivotWiderKeyMapper::Make(*key_type_, 
options_));
+    RETURN_NOT_OK(accumulator_.Init(ctx, value_type, options_));
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return accumulator_.Resize(new_num_groups);
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedPivotImpl*>(&raw_other);
+    return accumulator_.Merge(std::move(other->accumulator_), 
group_id_mapping);
+  }
+
+  Status Consume(const ExecSpan& batch) override {
+    DCHECK_EQ(batch.values.size(), 3);
+    auto groups = batch[2].array.GetSpan<const uint32_t>(1, batch.length);
+    if (!batch[1].is_array()) {
+      return Status::NotImplemented("Consuming scalar pivot value");
+    }
+    if (batch[0].is_array()) {
+      ARROW_ASSIGN_OR_RAISE(span<const PivotWiderKeyIndex> keys,
+                            key_mapper_->MapKeys(batch[0].array));
+      return accumulator_.Consume(groups, keys, batch[1].array);
+    } else {
+      ARROW_ASSIGN_OR_RAISE(PivotWiderKeyIndex key,
+                            key_mapper_->MapKey(*batch[0].scalar));
+      return accumulator_.Consume(groups, key, batch[1].array);
+    }
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto columns, accumulator_.Finalize());
+    DCHECK_EQ(columns.size(), 
static_cast<size_t>(out_struct_type_->num_fields()));
+    return std::make_shared<StructArray>(out_type_, num_groups_, 
std::move(columns),
+                                         /*null_bitmap=*/nullptr,
+                                         /*null_count=*/0);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return out_type_; }
+
+  std::shared_ptr<DataType> key_type_;
+  std::shared_ptr<DataType> out_type_;
+  const StructType* out_struct_type_;
+  const PivotWiderOptions* options_;
+  std::unique_ptr<PivotWiderKeyMapper> key_mapper_;
+  GroupedPivotAccumulator accumulator_;
+  int64_t num_groups_ = 0;
+};
+
+// ----------------------------------------------------------------------
+// Docstrings
+
 const FunctionDoc hash_count_doc{
     "Count the number of null / non-null values in each group",
     ("By default, only non-null values are counted.\n"
@@ -3456,6 +3856,20 @@ const FunctionDoc hash_one_doc{"Get one value from each 
group",
 const FunctionDoc hash_list_doc{"List all values in each group",
                                 ("Null values are also returned."),
                                 {"array", "group_id_array"}};
+
+const FunctionDoc hash_pivot_doc{
+    "Pivot values according to a pivot key column",
+    ("Output is a struct array with as many fields as 
`PivotWiderOptions.key_names`.\n"
+     "All output struct fields have the same type as `pivot_values`.\n"
+     "Each pivot key decides in which output field the corresponding pivot 
value\n"
+     "is emitted. If a pivot key doesn't appear in a given group, null is 
emitted.\n"
+     "If more than one non-null value is encountered in the same group for a\n"
+     "given pivot key, Invalid is raised.\n"
+     "Behavior of unexpected pivot keys is controlled by 
`unexpected_key_behavior`\n"
+     "in PivotWiderOptions."),
+    {"pivot_keys", "pivot_values", "group_id_array"},
+    "PivotWiderOptions"};
+
 }  // namespace
 
 void RegisterHashAggregateBasic(FunctionRegistry* registry) {
@@ -3705,6 +4119,20 @@ void RegisterHashAggregateBasic(FunctionRegistry* 
registry) {
                                 GroupedListFactory::Make, func.get()));
     DCHECK_OK(registry->AddFunction(std::move(func)));
   }
+
+  {
+    auto func = std::make_shared<HashAggregateFunction>("hash_pivot_wider",
+                                                        Arity::Ternary(), 
hash_pivot_doc);
+    for (auto key_type : BaseBinaryTypes()) {
+      // Anything that scatter() (i.e. take()) accepts can be passed as values
+      auto sig = KernelSignature::Make(
+          {key_type->id(), InputType::Any(), InputType(Type::UINT32)},
+          OutputType(ResolveGroupOutputType));
+      DCHECK_OK(func->AddKernel(
+          MakeKernel(std::move(sig), HashAggregateInit<GroupedPivotImpl>)));
+    }
+    DCHECK_OK(registry->AddFunction(std::move(func)));
+  }
 }
 
 }  // namespace internal
diff --git a/cpp/src/arrow/compute/kernels/pivot_internal.cc 
b/cpp/src/arrow/compute/kernels/pivot_internal.cc
new file mode 100644
index 0000000000..7a65ddc212
--- /dev/null
+++ b/cpp/src/arrow/compute/kernels/pivot_internal.cc
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/kernels/pivot_internal.h"
+
+#include <cstdint>
+
+#include "arrow/compute/exec.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/scalar.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/visit_type_inline.h"
+
+namespace arrow::compute::internal {
+
+using ::arrow::util::span;
+
+struct BasePivotKeyMapper : public PivotWiderKeyMapper {
+  Status Init(const PivotWiderOptions* options) override {
+    if (options->key_names.size() > static_cast<size_t>(kMaxPivotKey) + 1) {
+      return Status::NotImplemented("Pivoting to more than ",
+                                    static_cast<size_t>(kMaxPivotKey) + 1,
+                                    " columns: got ", 
options->key_names.size());
+    }
+    key_name_map_.reserve(options->key_names.size());
+    PivotWiderKeyIndex index = 0;
+    for (const auto& key_name : options->key_names) {
+      bool inserted =
+          key_name_map_.try_emplace(std::string_view(key_name), 
index++).second;
+      if (!inserted) {
+        return Status::KeyError("Duplicate key name '", key_name,
+                                "' in PivotWiderOptions");
+      }
+    }
+    unexpected_key_behavior_ = options->unexpected_key_behavior;
+    return Status::OK();
+  }
+
+ protected:
+  Result<PivotWiderKeyIndex> KeyNotFound(std::string_view key_name) {
+    if (unexpected_key_behavior_ == PivotWiderOptions::kIgnore) {
+      return kNullPivotKey;
+    }
+    DCHECK_EQ(unexpected_key_behavior_, PivotWiderOptions::kRaise);
+    return Status::KeyError("Unexpected pivot key: ", key_name);
+  }
+
+  Result<PivotWiderKeyIndex> LookupKey(std::string_view key_name) {
+    const auto it = this->key_name_map_.find(key_name);
+    if (ARROW_PREDICT_FALSE(it == this->key_name_map_.end())) {
+      return KeyNotFound(key_name);
+    } else {
+      return it->second;
+    }
+  }
+
+  Status NullKeyName() { return Status::KeyError("pivot key name cannot be 
null"); }
+
+  // The strings backing the string_views should be kept alive by 
PivotWiderOptions.
+  std::unordered_map<std::string_view, PivotWiderKeyIndex> key_name_map_;
+  PivotWiderOptions::UnexpectedKeyBehavior unexpected_key_behavior_;
+  TypedBufferBuilder<PivotWiderKeyIndex> key_indices_buffer_;
+};
+
+template <typename KeyType>
+struct TypedPivotKeyMapper : public BasePivotKeyMapper {
+  Result<span<const PivotWiderKeyIndex>> MapKeys(const ArraySpan& array) 
override {
+    // XXX Should use a faster hashing facility than unordered_map, for example
+    // Grouper or SwissTable.
+    RETURN_NOT_OK(this->key_indices_buffer_.Reserve(array.length));
+    PivotWiderKeyIndex* key_indices = this->key_indices_buffer_.mutable_data();
+    int64_t i = 0;
+    RETURN_NOT_OK(VisitArrayValuesInline<KeyType>(
+        array,
+        [&](std::string_view key_name) {
+          ARROW_ASSIGN_OR_RAISE(key_indices[i], LookupKey(key_name));
+          ++i;
+          return Status::OK();
+        },
+        [&]() { return NullKeyName(); }));
+    return span(key_indices, array.length);
+  }
+
+  Result<PivotWiderKeyIndex> MapKey(const Scalar& scalar) override {
+    if (!scalar.is_valid) {
+      return NullKeyName();
+    }
+    const auto& binary_scalar = checked_cast<const BaseBinaryScalar&>(scalar);
+    return LookupKey(binary_scalar.view());
+  }
+};
+
+Result<std::unique_ptr<PivotWiderKeyMapper>> PivotWiderKeyMapper::Make(
+    const DataType& key_type, const PivotWiderOptions* options) {
+  std::unique_ptr<PivotWiderKeyMapper> instance;
+
+  auto visit_key_type =
+      [&](auto&& key_type) -> Result<std::unique_ptr<PivotWiderKeyMapper>> {
+    using T = std::decay_t<decltype(key_type)>;
+    // Only binary-like keys are supported for now
+    if constexpr (is_base_binary_type<T>::value) {
+      instance = std::make_unique<TypedPivotKeyMapper<T>>();
+      RETURN_NOT_OK(instance->Init(options));
+      return std::move(instance);
+    }
+    return Status::NotImplemented("Pivot key type: ", key_type);
+  };
+
+  return VisitType(key_type, visit_key_type);
+}
+
+}  // namespace arrow::compute::internal
diff --git a/cpp/src/arrow/compute/kernels/pivot_internal.h 
b/cpp/src/arrow/compute/kernels/pivot_internal.h
new file mode 100644
index 0000000000..faa808b7a2
--- /dev/null
+++ b/cpp/src/arrow/compute/kernels/pivot_internal.h
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+
+#include "arrow/compute/api_aggregate.h"
+#include "arrow/compute/type_fwd.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/span.h"
+
+namespace arrow::compute::internal {
+
+using PivotWiderKeyIndex = uint8_t;
+
+constexpr PivotWiderKeyIndex kNullPivotKey =
+    std::numeric_limits<PivotWiderKeyIndex>::max();
+constexpr PivotWiderKeyIndex kMaxPivotKey = kNullPivotKey - 1;
+
+struct PivotWiderKeyMapper {
+  virtual ~PivotWiderKeyMapper() = default;
+
+  virtual Status Init(const PivotWiderOptions* options) = 0;
+  virtual Result<::arrow::util::span<const PivotWiderKeyIndex>> MapKeys(
+      const ArraySpan&) = 0;
+  virtual Result<PivotWiderKeyIndex> MapKey(const Scalar&) = 0;
+
+  static Result<std::unique_ptr<PivotWiderKeyMapper>> Make(
+      const DataType& key_type, const PivotWiderOptions* options);
+};
+
+}  // namespace arrow::compute::internal
diff --git a/cpp/src/arrow/compute/registry.cc 
b/cpp/src/arrow/compute/registry.cc
index ef9f3c7e1f..91df4155ba 100644
--- a/cpp/src/arrow/compute/registry.cc
+++ b/cpp/src/arrow/compute/registry.cc
@@ -327,6 +327,7 @@ static std::unique_ptr<FunctionRegistry> 
CreateBuiltInRegistry() {
   RegisterHashAggregateBasic(registry.get());
   RegisterScalarAggregateBasic(registry.get());
   RegisterScalarAggregateMode(registry.get());
+  RegisterScalarAggregatePivot(registry.get());
   RegisterScalarAggregateQuantile(registry.get());
   RegisterScalarAggregateTDigest(registry.get());
   RegisterScalarAggregateVariance(registry.get());
diff --git a/cpp/src/arrow/compute/registry_internal.h 
b/cpp/src/arrow/compute/registry_internal.h
index 8287e63050..d91e2693a3 100644
--- a/cpp/src/arrow/compute/registry_internal.h
+++ b/cpp/src/arrow/compute/registry_internal.h
@@ -63,6 +63,7 @@ void RegisterVectorOptions(FunctionRegistry* registry);
 void RegisterHashAggregateBasic(FunctionRegistry* registry);
 void RegisterScalarAggregateBasic(FunctionRegistry* registry);
 void RegisterScalarAggregateMode(FunctionRegistry* registry);
+void RegisterScalarAggregatePivot(FunctionRegistry* registry);
 void RegisterScalarAggregateQuantile(FunctionRegistry* registry);
 void RegisterScalarAggregateTDigest(FunctionRegistry* registry);
 void RegisterScalarAggregateVariance(FunctionRegistry* registry);
diff --git a/cpp/src/arrow/compute/type_fwd.h b/cpp/src/arrow/compute/type_fwd.h
index 89f32ceb0f..016d97a0db 100644
--- a/cpp/src/arrow/compute/type_fwd.h
+++ b/cpp/src/arrow/compute/type_fwd.h
@@ -40,6 +40,7 @@ class CastOptions;
 
 struct ExecBatch;
 class ExecContext;
+struct ExecValue;
 class KernelContext;
 
 struct Kernel;
diff --git a/docs/source/cpp/compute.rst b/docs/source/cpp/compute.rst
index 2a2a01c331..750572713c 100644
--- a/docs/source/cpp/compute.rst
+++ b/docs/source/cpp/compute.rst
@@ -214,35 +214,37 @@ the input to a single output value.
 
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
 | count_distinct     | Unary   | Non-nested types | Scalar Int64           | 
:struct:`CountOptions`           | \(2)  |
 
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| first              | Unary   | Numeric, Binary  | Scalar Input type      | 
:struct:`ScalarAggregateOptions` | \(11) |
+| first              | Unary   | Numeric, Binary  | Scalar Input type      | 
:struct:`ScalarAggregateOptions` | \(3) |
 
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| first_last         | Unary   | Numeric, Binary  | Scalar Struct          | 
:struct:`ScalarAggregateOptions` | \(11) |
+| first_last         | Unary   | Numeric, Binary  | Scalar Struct          | 
:struct:`ScalarAggregateOptions` | \(3) |
 
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| index              | Unary   | Any              | Scalar Int64           | 
:struct:`IndexOptions`           | \(3)  |
+| index              | Unary   | Any              | Scalar Int64           | 
:struct:`IndexOptions`           | \(4)  |
 
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| last               | Unary   | Numeric, Binary  | Scalar Input type      | 
:struct:`ScalarAggregateOptions` | \(11) |
+| last               | Unary   | Numeric, Binary  | Scalar Input type      | 
:struct:`ScalarAggregateOptions` | \(3) |
 
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
 | max                | Unary   | Non-nested types | Scalar Input type      | 
:struct:`ScalarAggregateOptions` |       |
 
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| mean               | Unary   | Numeric          | Scalar Decimal/Float64 | 
:struct:`ScalarAggregateOptions` | \(4)  |
+| mean               | Unary   | Numeric          | Scalar Decimal/Float64 | 
:struct:`ScalarAggregateOptions` | \(5)  |
 
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
 | min                | Unary   | Non-nested types | Scalar Input type      | 
:struct:`ScalarAggregateOptions` |       |
 
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| min_max            | Unary   | Non-nested types | Scalar Struct          | 
:struct:`ScalarAggregateOptions` | \(5)  |
+| min_max            | Unary   | Non-nested types | Scalar Struct          | 
:struct:`ScalarAggregateOptions` | \(6)  |
 
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| mode               | Unary   | Numeric          | Struct                 | 
:struct:`ModeOptions`            | \(6)  |
+| mode               | Unary   | Numeric          | Struct                 | 
:struct:`ModeOptions`            | \(7)  |
 
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| product            | Unary   | Numeric          | Scalar Numeric         | 
:struct:`ScalarAggregateOptions` | \(7)  |
+| pivot_wider        | Binary  | Binary, Any      | Scalar Struct          | 
:struct:`PivotWiderOptions`      | \(8)  |
 
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| quantile           | Unary   | Numeric          | Scalar Numeric         | 
:struct:`QuantileOptions`        | \(8)  |
+| product            | Unary   | Numeric          | Scalar Numeric         | 
:struct:`ScalarAggregateOptions` | \(9)  |
 
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| stddev             | Unary   | Numeric          | Scalar Float64         | 
:struct:`VarianceOptions`        | \(9)  |
+| quantile           | Unary   | Numeric          | Scalar Numeric         | 
:struct:`QuantileOptions`        | \(10) |
 
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| sum                | Unary   | Numeric          | Scalar Numeric         | 
:struct:`ScalarAggregateOptions` | \(7)  |
+| stddev             | Unary   | Numeric          | Scalar Float64         | 
:struct:`VarianceOptions`        | \(11) |
 
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| tdigest            | Unary   | Numeric          | Float64                | 
:struct:`TDigestOptions`         | \(10) |
+| sum                | Unary   | Numeric          | Scalar Numeric         | 
:struct:`ScalarAggregateOptions` | \(9)  |
 
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
-| variance           | Unary   | Numeric          | Scalar Float64         | 
:struct:`VarianceOptions`        | \(9)  |
+| tdigest            | Unary   | Numeric          | Float64                | 
:struct:`TDigestOptions`         | \(12) |
++--------------------+---------+------------------+------------------------+----------------------------------+-------+
+| variance           | Unary   | Numeric          | Scalar Float64         | 
:struct:`VarianceOptions`        | \(11) |
 
+--------------------+---------+------------------+------------------------+----------------------------------+-------+
 
 * \(1) If null values are taken into account, by setting the
@@ -252,37 +254,41 @@ the input to a single output value.
 * \(2) CountMode controls whether only non-null values are counted (the
   default), only null values are counted, or all values are counted.
 
-* \(3) Returns -1 if the value is not found. The index of a null value
+* \(3) Result is based on the ordering of input data.
+
+* \(4) Returns -1 if the value is not found. The index of a null value
   is always -1, regardless of whether there are nulls in the input.
 
-* \(4) For decimal inputs, the resulting decimal will have the same
+* \(5) For decimal inputs, the resulting decimal will have the same
   precision and scale. The result is rounded away from zero.
 
-* \(5) Output is a ``{"min": input type, "max": input type}`` Struct.
+* \(6) Output is a ``{"min": input type, "max": input type}`` Struct.
 
   Of the interval types, only the month interval is supported, as the day-time
   and month-day-nano types are not sortable.
 
-* \(6) Output is an array of ``{"mode": input type, "count": Int64}`` Struct.
+* \(7) Output is an array of ``{"mode": input type, "count": Int64}`` Struct.
   It contains the *N* most common elements in the input, in descending
   order, where *N* is given in :member:`ModeOptions::n`.
   If two values have the same count, the smallest one comes first.
   Note that the output can have less than *N* elements if the input has
   less than *N* distinct values.
 
-* \(7) Output is Int64, UInt64, Float64, or Decimal128/256, depending on the
+* \(8) The first input contains the pivot key, while the second input contains
+  the values to be pivoted. The output is a Struct with one field for each key
+  in :member:`PivotOptions::key_names`.
+
+* \(9) Output is Int64, UInt64, Float64, or Decimal128/256, depending on the
   input type.
 
-* \(8) Output is Float64 or input type, depending on QuantileOptions.
+* \(10) Output is Float64 or input type, depending on QuantileOptions.
 
-* \(9) Decimal arguments are cast to Float64 first.
+* \(11) Decimal arguments are cast to Float64 first.
 
-* \(10) tdigest/t-digest computes approximate quantiles, and so only needs a
+* \(12) tdigest/t-digest computes approximate quantiles, and so only needs a
   fixed amount of memory. See the `reference implementation
   <https://github.com/tdunning/t-digest>`_ for details.
 
-* \(11) Result is based on the ordering of input data
-
   Decimal arguments are cast to Float64 first.
 
 .. _grouped-aggregations-group-by:
@@ -350,11 +356,11 @@ equivalents above and reflects how they are implemented 
internally.
 
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
 | hash_distinct           | Unary   | Any                                | 
List of input type     | :struct:`CountOptions`           | \(2) \(3) |
 
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
-| hash_first              | Unary   | Numeric, Binary                    | 
Input type             | :struct:`ScalarAggregateOptions` | \(10)     |
+| hash_first              | Unary   | Numeric, Binary                    | 
Input type             | :struct:`ScalarAggregateOptions` | \(11)     |
 
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
-| hash_first_last         | Unary   | Numeric, Binary                    | 
Struct                 | :struct:`ScalarAggregateOptions` | \(10)     |
+| hash_first_last         | Unary   | Numeric, Binary                    | 
Struct                 | :struct:`ScalarAggregateOptions` | \(11)     |
 
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
-| hash_last               | Unary   | Numeric, Binary                    | 
Input type             | :struct:`ScalarAggregateOptions` | \(10)     |
+| hash_last               | Unary   | Numeric, Binary                    | 
Input type             | :struct:`ScalarAggregateOptions` | \(11)     |
 
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
 | hash_list               | Unary   | Any                                | 
List of input type     |                                  | \(3)      |
 
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
@@ -368,15 +374,17 @@ equivalents above and reflects how they are implemented 
internally.
 
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
 | hash_one                | Unary   | Any                                | 
Input type             |                                  | \(6)      |
 
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
-| hash_product            | Unary   | Numeric                            | 
Numeric                | :struct:`ScalarAggregateOptions` | \(7)      |
+| hash_pivot_wider        | Binary  | Binary, Any                        | 
Struct                 | :struct:`PivotWiderOptions`      | \(7)      |
++-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
+| hash_product            | Unary   | Numeric                            | 
Numeric                | :struct:`ScalarAggregateOptions` | \(8)      |
 
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
-| hash_stddev             | Unary   | Numeric                            | 
Float64                | :struct:`VarianceOptions`        | \(8)      |
+| hash_stddev             | Unary   | Numeric                            | 
Float64                | :struct:`VarianceOptions`        | \(9)      |
 
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
-| hash_sum                | Unary   | Numeric                            | 
Numeric                | :struct:`ScalarAggregateOptions` | \(7)      |
+| hash_sum                | Unary   | Numeric                            | 
Numeric                | :struct:`ScalarAggregateOptions` | \(8)      |
 
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
-| hash_tdigest            | Unary   | Numeric                            | 
FixedSizeList[Float64] | :struct:`TDigestOptions`         | \(9)      |
+| hash_tdigest            | Unary   | Numeric                            | 
FixedSizeList[Float64] | :struct:`TDigestOptions`         | \(10)     |
 
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
-| hash_variance           | Unary   | Numeric                            | 
Float64                | :struct:`VarianceOptions`        | \(8)      |
+| hash_variance           | Unary   | Numeric                            | 
Float64                | :struct:`VarianceOptions`        | \(9)      |
 
+-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+
 
 * \(1) If null values are taken into account, by setting the
@@ -405,18 +413,21 @@ equivalents above and reflects how they are implemented 
internally.
   one non-null value for a certain group, that value is returned, and only if
   all the values are ``null`` for the group will the function return ``null``.
 
-* \(7) Output is Int64, UInt64, Float64, or Decimal128/256, depending on the
+* \(7) The first input contains the pivot key, while the second input contains
+  the values to be pivoted. The output is a Struct with one field for each key
+  in :member:`PivotOptions::key_names`.
+
+* \(8) Output is Int64, UInt64, Float64, or Decimal128/256, depending on the
   input type.
 
-* \(8) Decimal arguments are cast to Float64 first.
+* \(9) Decimal arguments are cast to Float64 first.
 
-* \(9) T-digest computes approximate quantiles, and so only needs a
+* \(10) T-digest computes approximate quantiles, and so only needs a
   fixed amount of memory. See the `reference implementation
   <https://github.com/tdunning/t-digest>`_ for details.
 
-* \(10) Result is based on ordering of the input data.
+* \(11) Result is based on ordering of the input data.
 
-  Decimal arguments are cast to Float64 first.
 
 Element-wise ("scalar") functions
 ---------------------------------
diff --git a/docs/source/python/api/compute.rst 
b/docs/source/python/api/compute.rst
index 5423eebfba..0205457fec 100644
--- a/docs/source/python/api/compute.rst
+++ b/docs/source/python/api/compute.rst
@@ -38,6 +38,7 @@ Aggregations
    min
    min_max
    mode
+   pivot_wider
    product
    quantile
    stddev
@@ -557,6 +558,7 @@ Compute Options
    PadOptions
    PairwiseOptions
    PartitionNthOptions
+   PivotWiderOptions
    QuantileOptions
    ReplaceSliceOptions
    ReplaceSubstringOptions
diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx
index d23286dcdd..ee61077aa8 100644
--- a/python/pyarrow/_compute.pyx
+++ b/python/pyarrow/_compute.pyx
@@ -2396,6 +2396,50 @@ class RankQuantileOptions(_RankQuantileOptions):
         self._set_options(sort_keys, null_placement)
 
 
+cdef class _PivotWiderOptions(FunctionOptions):
+
+    def _set_options(self, key_names, unexpected_key_behavior):
+        cdef:
+            vector[c_string] c_key_names
+            PivotWiderUnexpectedKeyBehavior c_unexpected_key_behavior
+        if unexpected_key_behavior == "ignore":
+            c_unexpected_key_behavior = PivotWiderUnexpectedKeyBehavior_Ignore
+        elif unexpected_key_behavior == "raise":
+            c_unexpected_key_behavior = PivotWiderUnexpectedKeyBehavior_Raise
+        else:
+            raise ValueError(
+                f"Unsupported value for unexpected_key_behavior: "
+                f"expected 'ignore' or 'raise', got 
{unexpected_key_behavior!r}")
+
+        for k in key_names:
+            c_key_names.push_back(tobytes(k))
+
+        self.wrapped.reset(
+            new CPivotWiderOptions(move(c_key_names), 
c_unexpected_key_behavior)
+        )
+
+
+class PivotWiderOptions(_PivotWiderOptions):
+    """
+    Options for the `pivot_wider` function.
+
+    Parameters
+    ----------
+    key_names : sequence of str
+        The pivot key names expected in the pivot key column.
+        For each entry in `key_names`, a column with the same name is emitted
+        in the struct output.
+    unexpected_key_behavior : str, default "ignore"
+        The behavior when pivot keys not in `key_names` are encountered.
+        Accepted values are "ignore", "raise".
+        If "ignore", unexpected keys are silently ignored.
+        If "raise", unexpected keys raise a KeyError.
+    """
+
+    def __init__(self, key_names, *, unexpected_key_behavior="ignore"):
+        self._set_options(key_names, unexpected_key_behavior)
+
+
 cdef class Expression(_Weakrefable):
     """
     A logical expression to be evaluated against some input.
diff --git a/python/pyarrow/compute.py b/python/pyarrow/compute.py
index 5348336235..e2c17ee61d 100644
--- a/python/pyarrow/compute.py
+++ b/python/pyarrow/compute.py
@@ -53,6 +53,7 @@ from pyarrow._compute import (  # noqa
     PadOptions,
     PairwiseOptions,
     PartitionNthOptions,
+    PivotWiderOptions,
     QuantileOptions,
     RandomOptions,
     RankOptions,
diff --git a/python/pyarrow/includes/libarrow.pxd 
b/python/pyarrow/includes/libarrow.pxd
index 556696e344..8e666b114b 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -2823,6 +2823,16 @@ cdef extern from "arrow/compute/api.h" namespace 
"arrow::compute" nogil:
         vector[CSortKey] sort_keys
         CNullPlacement null_placement
 
+    cdef enum PivotWiderUnexpectedKeyBehavior \
+            "arrow::compute::PivotWiderOptions::UnexpectedKeyBehavior":
+        PivotWiderUnexpectedKeyBehavior_Ignore 
"arrow::compute::PivotWiderOptions::kIgnore"
+        PivotWiderUnexpectedKeyBehavior_Raise 
"arrow::compute::PivotWiderOptions::kRaise"
+
+    cdef cppclass CPivotWiderOptions \
+            "arrow::compute::PivotWiderOptions"(CFunctionOptions):
+        CPivotWiderOptions(vector[c_string] key_names,
+                           PivotWiderUnexpectedKeyBehavior)
+
     cdef enum DatumType" arrow::Datum::type":
         DatumType_NONE" arrow::Datum::NONE"
         DatumType_SCALAR" arrow::Datum::SCALAR"
diff --git a/python/pyarrow/tests/test_compute.py 
b/python/pyarrow/tests/test_compute.py
index ef02b476bd..99b3047d66 100644
--- a/python/pyarrow/tests/test_compute.py
+++ b/python/pyarrow/tests/test_compute.py
@@ -145,6 +145,7 @@ def test_option_class_equality(request):
         pc.ArraySortOptions(),
         pc.AssumeTimezoneOptions("UTC"),
         pc.CastOptions.safe(pa.int8()),
+        pc.CumulativeOptions(start=None, skip_nulls=False),
         pc.CountOptions(),
         pc.DayOfWeekOptions(count_from_zero=False, week_start=0),
         pc.DictionaryEncodeOptions(),
@@ -167,7 +168,7 @@ def test_option_class_equality(request):
         pc.PadOptions(5),
         pc.PairwiseOptions(period=1),
         pc.PartitionNthOptions(1, null_placement="at_start"),
-        pc.CumulativeOptions(start=None, skip_nulls=False),
+        pc.PivotWiderOptions(["height"], unexpected_key_behavior="raise"),
         pc.QuantileOptions(),
         pc.RandomOptions(),
         pc.RankOptions(sort_keys="ascending",
@@ -3785,3 +3786,29 @@ def test_pairwise_diff():
     with pytest.raises(pa.ArrowInvalid,
                        match="overflow"):
         pa.compute.pairwise_diff_checked(arr, period=-1)
+
+
+def test_pivot_wider():
+    key_names = ["width", "height"]
+
+    result = pc.pivot_wider(["height", "width", "depth"], [10, None, 11])
+    assert result.as_py() == {}
+
+    result = pc.pivot_wider(["height", "width", "depth"], [10, None, 11],
+                            key_names)
+    assert result.as_py() == {"width": None, "height": 10}
+    # check key order
+    assert list(result.as_py()) == ["width", "height"]
+
+    result = pc.pivot_wider(["height", "width", "depth"], [10, None, 11],
+                            key_names=key_names)
+    assert result.as_py() == {"width": None, "height": 10}
+
+    with pytest.raises(KeyError, match="Unexpected pivot key: depth"):
+        result = pc.pivot_wider(["height", "width", "depth"], [10, None, 11],
+                                key_names=key_names,
+                                unexpected_key_behavior="raise")
+
+    with pytest.raises(ValueError, match="Encountered more than one non-null 
value"):
+        result = pc.pivot_wider(["height", "width", "height"], [10, None, 11],
+                                key_names=key_names)

Reply via email to