This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fb857f02a ARROW-16686: [C++] Use shared_ptr with FunctionOptions 
(#13344)
1fb857f02a is described below

commit 1fb857f02a59cdebd98943f82dd53a56a58d248e
Author: Vibhatha Lakmal Abeykoon <[email protected]>
AuthorDate: Wed Jun 22 01:18:40 2022 +0530

    ARROW-16686: [C++] Use shared_ptr with FunctionOptions (#13344)
    
    Includes changes to remove raw pointer usage with `FunctionOptions` used in 
`Aggregate`.
    
    This also includes changes in removing `keep_alives` from R aggregate 
bindings as well.
    
    Lead-authored-by: Vibhatha Abeykoon <[email protected]>
    Co-authored-by: Vibhatha Lakmal Abeykoon <[email protected]>
    Co-authored-by: Sutou Kouhei <[email protected]>
    Signed-off-by: David Li <[email protected]>
---
 c_glib/arrow-glib/compute.cpp                      |   9 +-
 .../arrow/execution_plan_documentation_examples.cc |   6 +-
 cpp/src/arrow/compute/api_aggregate.h              |   2 +-
 cpp/src/arrow/compute/exec/aggregate.cc            |   2 +-
 cpp/src/arrow/compute/exec/aggregate_node.cc       |  54 +--
 cpp/src/arrow/compute/exec/plan_test.cc            |  31 +-
 cpp/src/arrow/compute/exec/tpch_benchmark.cc       |  11 +-
 .../arrow/compute/kernels/hash_aggregate_test.cc   | 407 +++++++++++----------
 python/pyarrow/_compute.pyx                        |   4 +-
 python/pyarrow/includes/libarrow.pxd               |   2 +-
 r/src/compute-exec.cpp                             |   4 +-
 11 files changed, 277 insertions(+), 255 deletions(-)

diff --git a/c_glib/arrow-glib/compute.cpp b/c_glib/arrow-glib/compute.cpp
index 96346edfd6..ca5bc1a76f 100644
--- a/c_glib/arrow-glib/compute.cpp
+++ b/c_glib/arrow-glib/compute.cpp
@@ -1265,7 +1265,14 @@ garrow_aggregate_node_options_new(GList *aggregations,
       function_options =
         garrow_function_options_get_raw(aggregation_priv->options);
     };
-    arrow_aggregates.push_back({aggregation_priv->function, function_options});
+    if (function_options) {
+      arrow_aggregates.push_back({
+        aggregation_priv->function,
+        function_options->Copy(),
+      });
+    } else {
+      arrow_aggregates.push_back({aggregation_priv->function, nullptr});
+    };
     if (!garrow_field_refs_add(arrow_targets,
                                aggregation_priv->input,
                                error,
diff --git a/cpp/examples/arrow/execution_plan_documentation_examples.cc 
b/cpp/examples/arrow/execution_plan_documentation_examples.cc
index 5f195c9b20..5f80119bbb 100644
--- a/cpp/examples/arrow/execution_plan_documentation_examples.cc
+++ b/cpp/examples/arrow/execution_plan_documentation_examples.cc
@@ -507,7 +507,7 @@ arrow::Status 
SourceScalarAggregateSinkExample(cp::ExecContext& exec_context) {
                                                     /*names=*/{"sum(a)"}};
   ARROW_ASSIGN_OR_RAISE(
       cp::ExecNode * aggregate,
-      cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
+      cp::MakeExecNode("aggregate", plan.get(), {source}, 
std::move(aggregate_options)));
 
   ARROW_RETURN_NOT_OK(
       cp::MakeExecNode("sink", plan.get(), {aggregate}, 
cp::SinkNodeOptions{&sink_gen}));
@@ -539,9 +539,9 @@ arrow::Status 
SourceGroupAggregateSinkExample(cp::ExecContext& exec_context) {
 
   ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
                         cp::MakeExecNode("source", plan.get(), {}, 
source_node_options));
-  cp::CountOptions options(cp::CountOptions::ONLY_VALID);
+  auto options = 
std::make_shared<cp::CountOptions>(cp::CountOptions::ONLY_VALID);
   auto aggregate_options =
-      cp::AggregateNodeOptions{/*aggregates=*/{{"hash_count", &options}},
+      cp::AggregateNodeOptions{/*aggregates=*/{{"hash_count", options}},
                                /*targets=*/{"a"},
                                /*names=*/{"count(a)"},
                                /*keys=*/{"b"}};
diff --git a/cpp/src/arrow/compute/api_aggregate.h 
b/cpp/src/arrow/compute/api_aggregate.h
index 7977889bf7..becd5a7414 100644
--- a/cpp/src/arrow/compute/api_aggregate.h
+++ b/cpp/src/arrow/compute/api_aggregate.h
@@ -401,7 +401,7 @@ struct ARROW_EXPORT Aggregate {
   std::string function;
 
   /// options for the aggregation function
-  const FunctionOptions* options;
+  std::shared_ptr<FunctionOptions> options;
 };
 
 }  // namespace internal
diff --git a/cpp/src/arrow/compute/exec/aggregate.cc 
b/cpp/src/arrow/compute/exec/aggregate.cc
index 1a81fdac3d..934cdd4d1f 100644
--- a/cpp/src/arrow/compute/exec/aggregate.cc
+++ b/cpp/src/arrow/compute/exec/aggregate.cc
@@ -55,7 +55,7 @@ Result<std::vector<std::unique_ptr<KernelState>>> InitKernels(
   std::vector<std::unique_ptr<KernelState>> states(kernels.size());
 
   for (size_t i = 0; i < aggregates.size(); ++i) {
-    auto options = aggregates[i].options;
+    const FunctionOptions* options = aggregates[i].options.get();
 
     if (options == nullptr) {
       // use known default options for the named function if possible
diff --git a/cpp/src/arrow/compute/exec/aggregate_node.cc 
b/cpp/src/arrow/compute/exec/aggregate_node.cc
index 712515bd58..c5c5d3efcf 100644
--- a/cpp/src/arrow/compute/exec/aggregate_node.cc
+++ b/cpp/src/arrow/compute/exec/aggregate_node.cc
@@ -43,18 +43,16 @@ namespace compute {
 
 namespace {
 
-void AggregatesToString(
-    std::stringstream* ss, const Schema& input_schema,
-    const std::vector<internal::Aggregate>& aggs,
-    const std::vector<int>& target_field_ids,
-    const std::vector<std::unique_ptr<FunctionOptions>>& owned_options, int 
indent = 0) {
+void AggregatesToString(std::stringstream* ss, const Schema& input_schema,
+                        const std::vector<internal::Aggregate>& aggs,
+                        const std::vector<int>& target_field_ids, int indent = 
0) {
   *ss << "aggregates=[" << std::endl;
   for (size_t i = 0; i < aggs.size(); i++) {
     for (int j = 0; j < indent; ++j) *ss << "  ";
     *ss << '\t' << aggs[i].function << '('
         << input_schema.field(target_field_ids[i])->name();
-    if (owned_options[i]) {
-      *ss << ", " << owned_options[i]->ToString();
+    if (aggs[i].options) {
+      *ss << ", " << aggs[i].options->ToString();
     }
     *ss << ")," << std::endl;
   }
@@ -69,16 +67,14 @@ class ScalarAggregateNode : public ExecNode {
                       std::vector<int> target_field_ids,
                       std::vector<internal::Aggregate> aggs,
                       std::vector<const ScalarAggregateKernel*> kernels,
-                      std::vector<std::vector<std::unique_ptr<KernelState>>> 
states,
-                      std::vector<std::unique_ptr<FunctionOptions>> 
owned_options)
+                      std::vector<std::vector<std::unique_ptr<KernelState>>> 
states)
       : ExecNode(plan, std::move(inputs), {"target"},
                  /*output_schema=*/std::move(output_schema),
                  /*num_outputs=*/1),
         target_field_ids_(std::move(target_field_ids)),
         aggs_(std::move(aggs)),
         kernels_(std::move(kernels)),
-        states_(std::move(states)),
-        owned_options_(std::move(owned_options)) {}
+        states_(std::move(states)) {}
 
   static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
                                 const ExecNodeOptions& options) {
@@ -95,7 +91,6 @@ class ScalarAggregateNode : public ExecNode {
     FieldVector fields(kernels.size());
     const auto& field_names = aggregate_options.names;
     std::vector<int> target_field_ids(kernels.size());
-    std::vector<std::unique_ptr<FunctionOptions>> 
owned_options(aggregates.size());
 
     for (size_t i = 0; i < kernels.size(); ++i) {
       ARROW_ASSIGN_OR_RAISE(auto match,
@@ -116,11 +111,7 @@ class ScalarAggregateNode : public ExecNode {
       kernels[i] = static_cast<const ScalarAggregateKernel*>(kernel);
 
       if (aggregates[i].options == nullptr) {
-        aggregates[i].options = function->default_options();
-      }
-      if (aggregates[i].options) {
-        owned_options[i] = aggregates[i].options->Copy();
-        aggregates[i].options = owned_options[i].get();
+        aggregates[i].options = function->default_options()->Copy();
       }
 
       KernelContext kernel_ctx{exec_ctx};
@@ -130,7 +121,7 @@ class ScalarAggregateNode : public ExecNode {
                                                    {
                                                        in_type,
                                                    },
-                                                   aggregates[i].options},
+                                                   
aggregates[i].options.get()},
                                     &states[i]));
 
       // pick one to resolve the kernel signature
@@ -143,8 +134,7 @@ class ScalarAggregateNode : public ExecNode {
 
     return plan->EmplaceNode<ScalarAggregateNode>(
         plan, std::move(inputs), schema(std::move(fields)), 
std::move(target_field_ids),
-        std::move(aggregates), std::move(kernels), std::move(states),
-        std::move(owned_options));
+        std::move(aggregates), std::move(kernels), std::move(states));
   }
 
   const char* kind_name() const override { return "ScalarAggregateNode"; }
@@ -242,7 +232,7 @@ class ScalarAggregateNode : public ExecNode {
   std::string ToStringExtra(int indent = 0) const override {
     std::stringstream ss;
     const auto input_schema = inputs_[0]->output_schema();
-    AggregatesToString(&ss, *input_schema, aggs_, target_field_ids_, 
owned_options_);
+    AggregatesToString(&ss, *input_schema, aggs_, target_field_ids_);
     return ss.str();
   }
 
@@ -277,7 +267,6 @@ class ScalarAggregateNode : public ExecNode {
   const std::vector<const ScalarAggregateKernel*> kernels_;
 
   std::vector<std::vector<std::unique_ptr<KernelState>>> states_;
-  const std::vector<std::unique_ptr<FunctionOptions>> owned_options_;
 
   ThreadIndexer get_thread_index_;
   AtomicCounter input_counter_;
@@ -288,16 +277,14 @@ class GroupByNode : public ExecNode {
   GroupByNode(ExecNode* input, std::shared_ptr<Schema> output_schema, 
ExecContext* ctx,
               std::vector<int> key_field_ids, std::vector<int> 
agg_src_field_ids,
               std::vector<internal::Aggregate> aggs,
-              std::vector<const HashAggregateKernel*> agg_kernels,
-              std::vector<std::unique_ptr<FunctionOptions>> owned_options)
+              std::vector<const HashAggregateKernel*> agg_kernels)
       : ExecNode(input->plan(), {input}, {"groupby"}, std::move(output_schema),
                  /*num_outputs=*/1),
         ctx_(ctx),
         key_field_ids_(std::move(key_field_ids)),
         agg_src_field_ids_(std::move(agg_src_field_ids)),
         aggs_(std::move(aggs)),
-        agg_kernels_(std::move(agg_kernels)),
-        owned_options_(std::move(owned_options)) {}
+        agg_kernels_(std::move(agg_kernels)) {}
 
   static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
                                 const ExecNodeOptions& options) {
@@ -363,17 +350,9 @@ class GroupByNode : public ExecNode {
       output_fields[base + i] = input_schema->field(key_field_id);
     }
 
-    std::vector<std::unique_ptr<FunctionOptions>> owned_options;
-    owned_options.reserve(aggs.size());
-    for (auto& agg : aggs) {
-      owned_options.push_back(agg.options ? agg.options->Copy() : nullptr);
-      agg.options = owned_options.back().get();
-    }
-
     return input->plan()->EmplaceNode<GroupByNode>(
         input, schema(std::move(output_fields)), ctx, std::move(key_field_ids),
-        std::move(agg_src_field_ids), std::move(aggs), std::move(agg_kernels),
-        std::move(owned_options));
+        std::move(agg_src_field_ids), std::move(aggs), std::move(agg_kernels));
   }
 
   const char* kind_name() const override { return "GroupByNode"; }
@@ -623,8 +602,7 @@ class GroupByNode : public ExecNode {
       ss << '"' << input_schema->field(key_field_ids_[i])->name() << '"';
     }
     ss << "], ";
-    AggregatesToString(&ss, *input_schema, aggs_, agg_src_field_ids_, 
owned_options_,
-                       indent);
+    AggregatesToString(&ss, *input_schema, aggs_, agg_src_field_ids_, indent);
     return ss.str();
   }
 
@@ -684,8 +662,6 @@ class GroupByNode : public ExecNode {
   const std::vector<int> agg_src_field_ids_;
   const std::vector<internal::Aggregate> aggs_;
   const std::vector<const HashAggregateKernel*> agg_kernels_;
-  // ARROW-13638: must hold owned copy of function options
-  const std::vector<std::unique_ptr<FunctionOptions>> owned_options_;
 
   ThreadIndexer get_thread_index_;
   AtomicCounter input_counter_, output_counter_;
diff --git a/cpp/src/arrow/compute/exec/plan_test.cc 
b/cpp/src/arrow/compute/exec/plan_test.cc
index ea9d8ebf49..2df3c5e915 100644
--- a/cpp/src/arrow/compute/exec/plan_test.cc
+++ b/cpp/src/arrow/compute/exec/plan_test.cc
@@ -375,7 +375,8 @@ TEST(ExecPlan, ToString) {
 )");
 
   ASSERT_OK_AND_ASSIGN(plan, ExecPlan::Make());
-  CountOptions options(CountOptions::ONLY_VALID);
+  std::shared_ptr<CountOptions> options =
+      std::make_shared<CountOptions>(CountOptions::ONLY_VALID);
   ASSERT_OK(
       Declaration::Sequence(
           {
@@ -390,7 +391,7 @@ TEST(ExecPlan, ToString) {
                           }}},
               {"aggregate",
                AggregateNodeOptions{
-                   /*aggregates=*/{{"hash_sum", nullptr}, {"hash_count", 
&options}},
+                   /*aggregates=*/{{"hash_sum", nullptr}, {"hash_count", 
options}},
                    /*targets=*/{"multiply(i32, 2)", "multiply(i32, 2)"},
                    /*names=*/{"sum(multiply(i32, 2))", "count(multiply(i32, 
2))"},
                    /*keys=*/{"bool"}}},
@@ -428,17 +429,17 @@ 
custom_sink_label:OrderBySinkNode{by={sort_keys=[FieldRef.Name(sum(multiply(i32,
   rhs.label = "rhs";
   union_node.inputs.emplace_back(lhs);
   union_node.inputs.emplace_back(rhs);
-  ASSERT_OK(
-      Declaration::Sequence(
-          {
-              union_node,
-              {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"count", 
&options}},
-                                                 /*targets=*/{"i32"},
-                                                 /*names=*/{"count(i32)"},
-                                                 /*keys=*/{}}},
-              {"sink", SinkNodeOptions{&sink_gen}},
-          })
-          .AddToPlan(plan.get()));
+  ASSERT_OK(Declaration::Sequence(
+                {
+                    union_node,
+                    {"aggregate",
+                     AggregateNodeOptions{/*aggregates=*/{{"count", 
std::move(options)}},
+                                          /*targets=*/{"i32"},
+                                          /*names=*/{"count(i32)"},
+                                          /*keys=*/{}}},
+                    {"sink", SinkNodeOptions{&sink_gen}},
+                })
+                .AddToPlan(plan.get()));
   EXPECT_EQ(plan->ToString(), R"a(ExecPlan with 5 nodes:
 :SinkNode{}
   :ScalarAggregateNode{aggregates=[
@@ -1155,7 +1156,7 @@ TEST(ExecPlanExecution, AggregationPreservesOptions) {
                                                      
basic_data.gen(/*parallel=*/false,
                                                                     
/*slow=*/false)}},
                         {"aggregate",
-                         AggregateNodeOptions{/*aggregates=*/{{"tdigest", 
options.get()}},
+                         AggregateNodeOptions{/*aggregates=*/{{"tdigest", 
options}},
                                               /*targets=*/{"i32"},
                                               /*names=*/{"tdigest(i32)"}}},
                         {"sink", SinkNodeOptions{&sink_gen}},
@@ -1182,7 +1183,7 @@ TEST(ExecPlanExecution, AggregationPreservesOptions) {
                   {"source", SourceNodeOptions{data.schema, 
data.gen(/*parallel=*/false,
                                                                      
/*slow=*/false)}},
                   {"aggregate",
-                   AggregateNodeOptions{/*aggregates=*/{{"hash_count", 
options.get()}},
+                   AggregateNodeOptions{/*aggregates=*/{{"hash_count", 
options}},
                                         /*targets=*/{"i32"},
                                         /*names=*/{"count(i32)"},
                                         /*keys=*/{"str"}}},
diff --git a/cpp/src/arrow/compute/exec/tpch_benchmark.cc 
b/cpp/src/arrow/compute/exec/tpch_benchmark.cc
index 98a3226571..82584f58e9 100644
--- a/cpp/src/arrow/compute/exec/tpch_benchmark.cc
+++ b/cpp/src/arrow/compute/exec/tpch_benchmark.cc
@@ -74,12 +74,13 @@ std::shared_ptr<ExecPlan> 
Plan_Q1(AsyncGenerator<util::optional<ExecBatch>>* sin
       "sum_charge",   "avg_qty",      "avg_price", "avg_disc"};
   ProjectNodeOptions project_opts(std::move(projection_list), 
std::move(project_names));
 
-  ScalarAggregateOptions sum_opts = ScalarAggregateOptions::Defaults();
-  CountOptions count_opts(CountOptions::CountMode::ALL);
+  auto sum_opts =
+      
std::make_shared<ScalarAggregateOptions>(ScalarAggregateOptions::Defaults());
+  auto count_opts = 
std::make_shared<CountOptions>(CountOptions::CountMode::ALL);
   std::vector<arrow::compute::internal::Aggregate> aggs = {
-      {"hash_sum", &sum_opts},  {"hash_sum", &sum_opts},    {"hash_sum", 
&sum_opts},
-      {"hash_sum", &sum_opts},  {"hash_mean", &sum_opts},   {"hash_mean", 
&sum_opts},
-      {"hash_mean", &sum_opts}, {"hash_count", &count_opts}};
+      {"hash_sum", sum_opts},  {"hash_sum", sum_opts},    {"hash_sum", 
sum_opts},
+      {"hash_sum", sum_opts},  {"hash_mean", sum_opts},   {"hash_mean", 
sum_opts},
+      {"hash_mean", sum_opts}, {"hash_count", count_opts}};
 
   std::vector<FieldRef> to_aggregate = {"sum_qty",    "sum_base_price", 
"sum_disc_price",
                                         "sum_charge", "avg_qty",        
"avg_price",
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc 
b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
index 2ed10b9b2b..7b47845f23 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
@@ -101,8 +101,8 @@ Result<Datum> NaiveGroupBy(std::vector<Datum> arguments, 
std::vector<Datum> keys
     for (int64_t i_group = 0; i_group < grouper->num_groups(); ++i_group) {
       auto slice = grouped_argument->value_slice(i_group);
       if (slice->length() == 0) continue;
-      ARROW_ASSIGN_OR_RAISE(
-          Datum d, CallFunction(scalar_agg_function, {slice}, 
aggregates[i].options));
+      ARROW_ASSIGN_OR_RAISE(Datum d, CallFunction(scalar_agg_function, {slice},
+                                                  
aggregates[i].options.get()));
       aggregated_scalars.push_back(d.scalar());
     }
 
@@ -853,18 +853,18 @@ TEST(GroupBy, CountScalar) {
   };
   input.schema = schema({field("argument", int32()), field("key", int64())});
 
-  CountOptions skip_nulls(CountOptions::ONLY_VALID);
-  CountOptions keep_nulls(CountOptions::ONLY_NULL);
-  CountOptions count_all(CountOptions::ALL);
+  auto skip_nulls = std::make_shared<CountOptions>(CountOptions::ONLY_VALID);
+  auto keep_nulls = std::make_shared<CountOptions>(CountOptions::ONLY_NULL);
+  auto count_all = std::make_shared<CountOptions>(CountOptions::ALL);
   for (bool use_threads : {true, false}) {
     SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
     ASSERT_OK_AND_ASSIGN(
         Datum actual,
         GroupByUsingExecPlan(input, {"key"}, {"argument", "argument", 
"argument"},
                              {
-                                 {"hash_count", &skip_nulls},
-                                 {"hash_count", &keep_nulls},
-                                 {"hash_count", &count_all},
+                                 {"hash_count", skip_nulls},
+                                 {"hash_count", keep_nulls},
+                                 {"hash_count", count_all},
                              },
                              use_threads, default_exec_context()));
     Datum expected = ArrayFromJSON(struct_({
@@ -1028,14 +1028,15 @@ TEST(GroupBy, MeanOnly) {
     [null,  3]
                         ])"});
 
-    ScalarAggregateOptions min_count(/*skip_nulls=*/true, /*min_count=*/3);
+    auto min_count =
+        std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true, 
/*min_count=*/3);
     ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped,
                          internal::GroupBy({table->GetColumnByName("argument"),
                                             
table->GetColumnByName("argument")},
                                            {table->GetColumnByName("key")},
                                            {
                                                {"hash_mean", nullptr},
-                                               {"hash_mean", &min_count},
+                                               {"hash_mean", min_count},
                                            },
                                            use_threads));
     SortBy({"key_0"}, &aggregated_and_grouped);
@@ -1178,7 +1179,7 @@ TEST(GroupBy, VarianceAndStddev) {
                           /*verbose=*/true);
 
   // Test ddof
-  VarianceOptions variance_options(/*ddof=*/2);
+  auto variance_options = std::make_shared<VarianceOptions>(/*ddof=*/2);
   ASSERT_OK_AND_ASSIGN(aggregated_and_grouped,
                        internal::GroupBy(
                            {
@@ -1189,8 +1190,8 @@ TEST(GroupBy, VarianceAndStddev) {
                                batch->GetColumnByName("key"),
                            },
                            {
-                               {"hash_variance", &variance_options},
-                               {"hash_stddev", &variance_options},
+                               {"hash_variance", variance_options},
+                               {"hash_stddev", variance_options},
                            }));
 
   AssertDatumsApproxEqual(ArrayFromJSON(struct_({
@@ -1276,15 +1277,19 @@ TEST(GroupBy, TDigest) {
     [null, 4]
   ])");
 
-  TDigestOptions options1(std::vector<double>{0.5, 0.9, 0.99});
-  TDigestOptions options2(std::vector<double>{0.5, 0.9, 0.99}, /*delta=*/50,
-                          /*buffer_size=*/1024);
-  TDigestOptions keep_nulls(/*q=*/0.5, /*delta=*/100, /*buffer_size=*/500,
-                            /*skip_nulls=*/false, /*min_count=*/0);
-  TDigestOptions min_count(/*q=*/0.5, /*delta=*/100, /*buffer_size=*/500,
-                           /*skip_nulls=*/true, /*min_count=*/3);
-  TDigestOptions keep_nulls_min_count(/*q=*/0.5, /*delta=*/100, 
/*buffer_size=*/500,
-                                      /*skip_nulls=*/false, /*min_count=*/3);
+  auto options1 = std::make_shared<TDigestOptions>(std::vector<double>{0.5, 
0.9, 0.99});
+  auto options2 =
+      std::make_shared<TDigestOptions>(std::vector<double>{0.5, 0.9, 0.99}, 
/*delta=*/50,
+                                       /*buffer_size=*/1024);
+  auto keep_nulls =
+      std::make_shared<TDigestOptions>(/*q=*/0.5, /*delta=*/100, 
/*buffer_size=*/500,
+                                       /*skip_nulls=*/false, /*min_count=*/0);
+  auto min_count =
+      std::make_shared<TDigestOptions>(/*q=*/0.5, /*delta=*/100, 
/*buffer_size=*/500,
+                                       /*skip_nulls=*/true, /*min_count=*/3);
+  auto keep_nulls_min_count =
+      std::make_shared<TDigestOptions>(/*q=*/0.5, /*delta=*/100, 
/*buffer_size=*/500,
+                                       /*skip_nulls=*/false, /*min_count=*/3);
   ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped,
                        internal::GroupBy(
                            {
@@ -1300,11 +1305,11 @@ TEST(GroupBy, TDigest) {
                            },
                            {
                                {"hash_tdigest", nullptr},
-                               {"hash_tdigest", &options1},
-                               {"hash_tdigest", &options2},
-                               {"hash_tdigest", &keep_nulls},
-                               {"hash_tdigest", &min_count},
-                               {"hash_tdigest", &keep_nulls_min_count},
+                               {"hash_tdigest", options1},
+                               {"hash_tdigest", options2},
+                               {"hash_tdigest", keep_nulls},
+                               {"hash_tdigest", min_count},
+                               {"hash_tdigest", keep_nulls_min_count},
                            }));
 
   AssertDatumsApproxEqual(
@@ -1390,12 +1395,12 @@ TEST(GroupBy, ApproximateMedian) {
     [null, 4]
   ])");
 
-    ScalarAggregateOptions options;
-    ScalarAggregateOptions keep_nulls(
+    std::shared_ptr<ScalarAggregateOptions> options;
+    auto keep_nulls = std::make_shared<ScalarAggregateOptions>(
         /*skip_nulls=*/false, /*min_count=*/0);
-    ScalarAggregateOptions min_count(
+    auto min_count = std::make_shared<ScalarAggregateOptions>(
         /*skip_nulls=*/true, /*min_count=*/3);
-    ScalarAggregateOptions keep_nulls_min_count(
+    auto keep_nulls_min_count = std::make_shared<ScalarAggregateOptions>(
         /*skip_nulls=*/false, /*min_count=*/3);
     ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped,
                          internal::GroupBy(
@@ -1409,10 +1414,10 @@ TEST(GroupBy, ApproximateMedian) {
                                  batch->GetColumnByName("key"),
                              },
                              {
-                                 {"hash_approximate_median", &options},
-                                 {"hash_approximate_median", &keep_nulls},
-                                 {"hash_approximate_median", &min_count},
-                                 {"hash_approximate_median", 
&keep_nulls_min_count},
+                                 {"hash_approximate_median", options},
+                                 {"hash_approximate_median", keep_nulls},
+                                 {"hash_approximate_median", min_count},
+                                 {"hash_approximate_median", 
keep_nulls_min_count},
                              }));
 
     AssertDatumsApproxEqual(ArrayFromJSON(struct_({
@@ -1502,31 +1507,34 @@ TEST(GroupBy, VarianceOptions) {
   input.schema = schema(
       {field("argument", int32()), field("argument1", float32()), field("key", 
int64())});
 
-  VarianceOptions keep_nulls(/*ddof=*/0, /*skip_nulls=*/false, 
/*min_count=*/0);
-  VarianceOptions min_count(/*ddof=*/0, /*skip_nulls=*/true, /*min_count=*/3);
-  VarianceOptions keep_nulls_min_count(/*ddof=*/0, /*skip_nulls=*/false, 
/*min_count=*/3);
+  auto keep_nulls = std::make_shared<VarianceOptions>(/*ddof=*/0, 
/*skip_nulls=*/false,
+                                                      /*min_count=*/0);
+  auto min_count =
+      std::make_shared<VarianceOptions>(/*ddof=*/0, /*skip_nulls=*/true, 
/*min_count=*/3);
+  auto keep_nulls_min_count = std::make_shared<VarianceOptions>(
+      /*ddof=*/0, /*skip_nulls=*/false, /*min_count=*/3);
 
   for (bool use_threads : {false}) {
     SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
-    ASSERT_OK_AND_ASSIGN(
-        Datum actual, GroupByUsingExecPlan(input, {"key"},
-                                           {
-                                               "argument",
-                                               "argument",
-                                               "argument",
-                                               "argument",
-                                               "argument",
-                                               "argument",
-                                           },
-                                           {
-                                               {"hash_stddev", &keep_nulls},
-                                               {"hash_stddev", &min_count},
-                                               {"hash_stddev", 
&keep_nulls_min_count},
-                                               {"hash_variance", &keep_nulls},
-                                               {"hash_variance", &min_count},
-                                               {"hash_variance", 
&keep_nulls_min_count},
-                                           },
-                                           use_threads, 
default_exec_context()));
+    ASSERT_OK_AND_ASSIGN(Datum actual,
+                         GroupByUsingExecPlan(input, {"key"},
+                                              {
+                                                  "argument",
+                                                  "argument",
+                                                  "argument",
+                                                  "argument",
+                                                  "argument",
+                                                  "argument",
+                                              },
+                                              {
+                                                  {"hash_stddev", keep_nulls},
+                                                  {"hash_stddev", min_count},
+                                                  {"hash_stddev", 
keep_nulls_min_count},
+                                                  {"hash_variance", 
keep_nulls},
+                                                  {"hash_variance", min_count},
+                                                  {"hash_variance", 
keep_nulls_min_count},
+                                              },
+                                              use_threads, 
default_exec_context()));
     Datum expected = ArrayFromJSON(struct_({
                                        field("hash_stddev", float64()),
                                        field("hash_stddev", float64()),
@@ -1545,25 +1553,25 @@ TEST(GroupBy, VarianceOptions) {
     ValidateOutput(expected);
     AssertDatumsApproxEqual(expected, actual, /*verbose=*/true);
 
-    ASSERT_OK_AND_ASSIGN(
-        actual, GroupByUsingExecPlan(input, {"key"},
-                                     {
-                                         "argument1",
-                                         "argument1",
-                                         "argument1",
-                                         "argument1",
-                                         "argument1",
-                                         "argument1",
-                                     },
-                                     {
-                                         {"hash_stddev", &keep_nulls},
-                                         {"hash_stddev", &min_count},
-                                         {"hash_stddev", 
&keep_nulls_min_count},
-                                         {"hash_variance", &keep_nulls},
-                                         {"hash_variance", &min_count},
-                                         {"hash_variance", 
&keep_nulls_min_count},
-                                     },
-                                     use_threads, default_exec_context()));
+    ASSERT_OK_AND_ASSIGN(actual,
+                         GroupByUsingExecPlan(input, {"key"},
+                                              {
+                                                  "argument1",
+                                                  "argument1",
+                                                  "argument1",
+                                                  "argument1",
+                                                  "argument1",
+                                                  "argument1",
+                                              },
+                                              {
+                                                  {"hash_stddev", keep_nulls},
+                                                  {"hash_stddev", min_count},
+                                                  {"hash_stddev", 
keep_nulls_min_count},
+                                                  {"hash_variance", 
keep_nulls},
+                                                  {"hash_variance", min_count},
+                                                  {"hash_variance", 
keep_nulls_min_count},
+                                              },
+                                              use_threads, 
default_exec_context()));
     expected = ArrayFromJSON(struct_({
                                  field("hash_stddev", float64()),
                                  field("hash_stddev", float64()),
@@ -2032,10 +2040,14 @@ TEST(GroupBy, AnyAndAll) {
     [null,  3]
                         ])"});
 
-    ScalarAggregateOptions no_min(/*skip_nulls=*/true, /*min_count=*/0);
-    ScalarAggregateOptions min_count(/*skip_nulls=*/true, /*min_count=*/3);
-    ScalarAggregateOptions keep_nulls(/*skip_nulls=*/false, /*min_count=*/0);
-    ScalarAggregateOptions keep_nulls_min_count(/*skip_nulls=*/false, 
/*min_count=*/3);
+    auto no_min =
+        std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true, 
/*min_count=*/0);
+    auto min_count =
+        std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true, 
/*min_count=*/3);
+    auto keep_nulls =
+        std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false, 
/*min_count=*/0);
+    auto keep_nulls_min_count =
+        std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false, 
/*min_count=*/3);
     ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped,
                          internal::GroupBy(
                              {
@@ -2050,14 +2062,14 @@ TEST(GroupBy, AnyAndAll) {
                              },
                              {table->GetColumnByName("key")},
                              {
-                                 {"hash_any", &no_min},
-                                 {"hash_any", &min_count},
-                                 {"hash_any", &keep_nulls},
-                                 {"hash_any", &keep_nulls_min_count},
-                                 {"hash_all", &no_min},
-                                 {"hash_all", &min_count},
-                                 {"hash_all", &keep_nulls},
-                                 {"hash_all", &keep_nulls_min_count},
+                                 {"hash_any", no_min},
+                                 {"hash_any", min_count},
+                                 {"hash_any", keep_nulls},
+                                 {"hash_any", keep_nulls_min_count},
+                                 {"hash_all", no_min},
+                                 {"hash_all", min_count},
+                                 {"hash_all", keep_nulls},
+                                 {"hash_all", keep_nulls_min_count},
                              },
                              use_threads));
     SortBy({"key_0"}, &aggregated_and_grouped);
@@ -2103,7 +2115,8 @@ TEST(GroupBy, AnyAllScalar) {
   };
   input.schema = schema({field("argument", boolean()), field("key", int64())});
 
-  ScalarAggregateOptions keep_nulls(/*skip_nulls=*/false, /*min_count=*/0);
+  auto keep_nulls =
+      std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false, 
/*min_count=*/0);
   for (bool use_threads : {true, false}) {
     SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
     ASSERT_OK_AND_ASSIGN(
@@ -2113,8 +2126,8 @@ TEST(GroupBy, AnyAllScalar) {
                              {
                                  {"hash_any", nullptr},
                                  {"hash_all", nullptr},
-                                 {"hash_any", &keep_nulls},
-                                 {"hash_all", &keep_nulls},
+                                 {"hash_any", keep_nulls},
+                                 {"hash_all", keep_nulls},
                              },
                              use_threads, default_exec_context()));
     Datum expected = ArrayFromJSON(struct_({
@@ -2134,9 +2147,9 @@ TEST(GroupBy, AnyAllScalar) {
 }
 
 TEST(GroupBy, CountDistinct) {
-  CountOptions all(CountOptions::ALL);
-  CountOptions only_valid(CountOptions::ONLY_VALID);
-  CountOptions only_null(CountOptions::ONLY_NULL);
+  auto all = std::make_shared<CountOptions>(CountOptions::ALL);
+  auto only_valid = std::make_shared<CountOptions>(CountOptions::ONLY_VALID);
+  auto only_null = std::make_shared<CountOptions>(CountOptions::ONLY_NULL);
   for (bool use_threads : {true, false}) {
     SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
 
@@ -2182,9 +2195,9 @@ TEST(GroupBy, CountDistinct) {
                                  table->GetColumnByName("key"),
                              },
                              {
-                                 {"hash_count_distinct", &all},
-                                 {"hash_count_distinct", &only_valid},
-                                 {"hash_count_distinct", &only_null},
+                                 {"hash_count_distinct", all},
+                                 {"hash_count_distinct", only_valid},
+                                 {"hash_count_distinct", only_null},
                              },
                              use_threads));
     SortBy({"key_0"}, &aggregated_and_grouped);
@@ -2248,9 +2261,9 @@ TEST(GroupBy, CountDistinct) {
                                  table->GetColumnByName("key"),
                              },
                              {
-                                 {"hash_count_distinct", &all},
-                                 {"hash_count_distinct", &only_valid},
-                                 {"hash_count_distinct", &only_null},
+                                 {"hash_count_distinct", all},
+                                 {"hash_count_distinct", only_valid},
+                                 {"hash_count_distinct", only_null},
                              },
                              use_threads));
     ValidateOutput(aggregated_and_grouped);
@@ -2294,9 +2307,9 @@ TEST(GroupBy, CountDistinct) {
                                  table->GetColumnByName("key"),
                              },
                              {
-                                 {"hash_count_distinct", &all},
-                                 {"hash_count_distinct", &only_valid},
-                                 {"hash_count_distinct", &only_null},
+                                 {"hash_count_distinct", all},
+                                 {"hash_count_distinct", only_valid},
+                                 {"hash_count_distinct", only_null},
                              },
                              use_threads));
     ValidateOutput(aggregated_and_grouped);
@@ -2318,9 +2331,9 @@ TEST(GroupBy, CountDistinct) {
 }
 
 TEST(GroupBy, Distinct) {
-  CountOptions all(CountOptions::ALL);
-  CountOptions only_valid(CountOptions::ONLY_VALID);
-  CountOptions only_null(CountOptions::ONLY_NULL);
+  auto all = std::make_shared<CountOptions>(CountOptions::ALL);
+  auto only_valid = std::make_shared<CountOptions>(CountOptions::ONLY_VALID);
+  auto only_null = std::make_shared<CountOptions>(CountOptions::ONLY_NULL);
   for (bool use_threads : {false}) {
     SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
 
@@ -2366,9 +2379,9 @@ TEST(GroupBy, Distinct) {
                                  table->GetColumnByName("key"),
                              },
                              {
-                                 {"hash_distinct", &all},
-                                 {"hash_distinct", &only_valid},
-                                 {"hash_distinct", &only_null},
+                                 {"hash_distinct", all},
+                                 {"hash_distinct", only_valid},
+                                 {"hash_distinct", only_null},
                              },
                              use_threads));
     ValidateOutput(aggregated_and_grouped);
@@ -2439,9 +2452,9 @@ TEST(GroupBy, Distinct) {
                                  table->GetColumnByName("key"),
                              },
                              {
-                                 {"hash_distinct", &all},
-                                 {"hash_distinct", &only_valid},
-                                 {"hash_distinct", &only_null},
+                                 {"hash_distinct", all},
+                                 {"hash_distinct", only_valid},
+                                 {"hash_distinct", only_null},
                              },
                              use_threads));
     ValidateOutput(aggregated_and_grouped);
@@ -3204,10 +3217,11 @@ TEST(GroupBy, CountAndSum) {
     [null,  3]
   ])");
 
-  CountOptions count_options;
-  CountOptions count_nulls(CountOptions::ONLY_NULL);
-  CountOptions count_all(CountOptions::ALL);
-  ScalarAggregateOptions min_count(/*skip_nulls=*/true, /*min_count=*/3);
+  std::shared_ptr<CountOptions> count_options;
+  auto count_nulls = std::make_shared<CountOptions>(CountOptions::ONLY_NULL);
+  auto count_all = std::make_shared<CountOptions>(CountOptions::ALL);
+  auto min_count =
+      std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true, 
/*min_count=*/3);
   ASSERT_OK_AND_ASSIGN(
       Datum aggregated_and_grouped,
       internal::GroupBy(
@@ -3224,11 +3238,11 @@ TEST(GroupBy, CountAndSum) {
               batch->GetColumnByName("key"),
           },
           {
-              {"hash_count", &count_options},
-              {"hash_count", &count_nulls},
-              {"hash_count", &count_all},
+              {"hash_count", count_options},
+              {"hash_count", count_nulls},
+              {"hash_count", count_all},
               {"hash_sum", nullptr},
-              {"hash_sum", &min_count},
+              {"hash_sum", min_count},
               {"hash_sum", nullptr},
           }));
 
@@ -3268,7 +3282,8 @@ TEST(GroupBy, Product) {
     [null,  3]
   ])");
 
-  ScalarAggregateOptions min_count(/*skip_nulls=*/true, /*min_count=*/3);
+  auto min_count =
+      std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true, 
/*min_count=*/3);
   ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped,
                        internal::GroupBy(
                            {
@@ -3282,7 +3297,7 @@ TEST(GroupBy, Product) {
                            {
                                {"hash_product", nullptr},
                                {"hash_product", nullptr},
-                               {"hash_product", &min_count},
+                               {"hash_product", min_count},
                            }));
 
   AssertDatumsApproxEqual(ArrayFromJSON(struct_({
@@ -3342,8 +3357,9 @@ TEST(GroupBy, SumMeanProductKeepNulls) {
     [null,  3]
   ])");
 
-  ScalarAggregateOptions keep_nulls(/*skip_nulls=*/false);
-  ScalarAggregateOptions min_count(/*skip_nulls=*/false, /*min_count=*/3);
+  auto keep_nulls = 
std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false);
+  auto min_count =
+      std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false, 
/*min_count=*/3);
   ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped,
                        internal::GroupBy(
                            {
@@ -3358,12 +3374,12 @@ TEST(GroupBy, SumMeanProductKeepNulls) {
                                batch->GetColumnByName("key"),
                            },
                            {
-                               {"hash_sum", &keep_nulls},
-                               {"hash_sum", &min_count},
-                               {"hash_mean", &keep_nulls},
-                               {"hash_mean", &min_count},
-                               {"hash_product", &keep_nulls},
-                               {"hash_product", &min_count},
+                               {"hash_sum", keep_nulls},
+                               {"hash_sum", min_count},
+                               {"hash_mean", keep_nulls},
+                               {"hash_mean", min_count},
+                               {"hash_product", keep_nulls},
+                               {"hash_product", min_count},
                            }));
 
   AssertDatumsApproxEqual(ArrayFromJSON(struct_({
@@ -3441,17 +3457,20 @@ TEST(GroupBy, ConcreteCaseWithValidateGroupBy) {
     [null,  "gama"]
   ])");
 
-  ScalarAggregateOptions keepna{false, 1};
-  CountOptions nulls(CountOptions::ONLY_NULL);
-  CountOptions non_null(CountOptions::ONLY_VALID);
+  std::shared_ptr<ScalarAggregateOptions> keepna =
+      std::make_shared<ScalarAggregateOptions>(false, 1);
+  std::shared_ptr<CountOptions> nulls =
+      std::make_shared<CountOptions>(CountOptions::ONLY_NULL);
+  std::shared_ptr<CountOptions> non_null =
+      std::make_shared<CountOptions>(CountOptions::ONLY_VALID);
 
   using internal::Aggregate;
   for (auto agg : {
            Aggregate{"hash_sum", nullptr},
-           Aggregate{"hash_count", &non_null},
-           Aggregate{"hash_count", &nulls},
+           Aggregate{"hash_count", non_null},
+           Aggregate{"hash_count", nulls},
            Aggregate{"hash_min_max", nullptr},
-           Aggregate{"hash_min_max", &keepna},
+           Aggregate{"hash_min_max", keepna},
        }) {
     SCOPED_TRACE(agg.function);
     ValidateGroupBy({agg}, {batch->GetColumnByName("argument")},
@@ -3468,12 +3487,15 @@ TEST(GroupBy, CountNull) {
     [3.0, "gama"]
   ])");
 
-  CountOptions keepna{CountOptions::ONLY_NULL}, 
skipna{CountOptions::ONLY_VALID};
+  std::shared_ptr<CountOptions> keepna =
+      std::make_shared<CountOptions>(CountOptions::ONLY_NULL);
+  std::shared_ptr<CountOptions> skipna =
+      std::make_shared<CountOptions>(CountOptions::ONLY_VALID);
 
   using internal::Aggregate;
   for (auto agg : {
-           Aggregate{"hash_count", &keepna},
-           Aggregate{"hash_count", &skipna},
+           Aggregate{"hash_count", keepna},
+           Aggregate{"hash_count", skipna},
        }) {
     SCOPED_TRACE(agg.function);
     ValidateGroupBy({agg}, {batch->GetColumnByName("argument")},
@@ -3482,7 +3504,8 @@ TEST(GroupBy, CountNull) {
 }
 
 TEST(GroupBy, RandomArraySum) {
-  ScalarAggregateOptions options(/*skip_nulls=*/true, /*min_count=*/0);
+  std::shared_ptr<ScalarAggregateOptions> options =
+      std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true, 
/*min_count=*/0);
   for (int64_t length : {1 << 10, 1 << 12, 1 << 15}) {
     for (auto null_probability : {0.0, 0.01, 0.5, 1.0}) {
       auto batch = random::GenerateBatch(
@@ -3496,7 +3519,7 @@ TEST(GroupBy, RandomArraySum) {
 
       ValidateGroupBy(
           {
-              {"hash_sum", &options},
+              {"hash_sum", options},
           },
           {batch->GetColumnByName("argument")}, 
{batch->GetColumnByName("key")});
     }
@@ -3639,9 +3662,9 @@ TEST(GroupBy, CountWithNullType) {
     [null, 3]
                         ])"});
 
-  CountOptions all(CountOptions::ALL);
-  CountOptions only_valid(CountOptions::ONLY_VALID);
-  CountOptions only_null(CountOptions::ONLY_NULL);
+  auto all = std::make_shared<CountOptions>(CountOptions::ALL);
+  auto only_valid = std::make_shared<CountOptions>(CountOptions::ONLY_VALID);
+  auto only_null = std::make_shared<CountOptions>(CountOptions::ONLY_NULL);
 
   for (bool use_exec_plan : {false, true}) {
     for (bool use_threads : {true, false}) {
@@ -3655,9 +3678,9 @@ TEST(GroupBy, CountWithNullType) {
                                },
                                {table->GetColumnByName("key")},
                                {
-                                   {"hash_count", &all},
-                                   {"hash_count", &only_valid},
-                                   {"hash_count", &only_null},
+                                   {"hash_count", all},
+                                   {"hash_count", only_valid},
+                                   {"hash_count", only_null},
                                },
                                use_threads, use_exec_plan));
       SortBy({"key_0"}, &aggregated_and_grouped);
@@ -3684,9 +3707,9 @@ TEST(GroupBy, CountWithNullTypeEmptyTable) {
   auto table = TableFromJSON(schema({field("argument", null()), field("key", 
int64())}),
                              {R"([])"});
 
-  CountOptions all(CountOptions::ALL);
-  CountOptions only_valid(CountOptions::ONLY_VALID);
-  CountOptions only_null(CountOptions::ONLY_NULL);
+  auto all = std::make_shared<CountOptions>(CountOptions::ALL);
+  auto only_valid = std::make_shared<CountOptions>(CountOptions::ONLY_VALID);
+  auto only_null = std::make_shared<CountOptions>(CountOptions::ONLY_NULL);
 
   for (bool use_exec_plan : {false, true}) {
     for (bool use_threads : {true, false}) {
@@ -3700,9 +3723,9 @@ TEST(GroupBy, CountWithNullTypeEmptyTable) {
                                },
                                {table->GetColumnByName("key")},
                                {
-                                   {"hash_count", &all},
-                                   {"hash_count", &only_valid},
-                                   {"hash_count", &only_null},
+                                   {"hash_count", all},
+                                   {"hash_count", only_valid},
+                                   {"hash_count", only_null},
                                },
                                use_threads, use_exec_plan));
       auto struct_arr = aggregated_and_grouped.array_as<StructArray>();
@@ -3854,10 +3877,14 @@ TEST(GroupBy, SumNullType) {
     [null, 3]
                         ])"});
 
-  ScalarAggregateOptions no_min(/*skip_nulls=*/true, /*min_count=*/0);
-  ScalarAggregateOptions min_count(/*skip_nulls=*/true, /*min_count=*/3);
-  ScalarAggregateOptions keep_nulls(/*skip_nulls=*/false, /*min_count=*/0);
-  ScalarAggregateOptions keep_nulls_min_count(/*skip_nulls=*/false, 
/*min_count=*/3);
+  auto no_min =
+      std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true, 
/*min_count=*/0);
+  auto min_count =
+      std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true, 
/*min_count=*/3);
+  auto keep_nulls =
+      std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false, 
/*min_count=*/0);
+  auto keep_nulls_min_count =
+      std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false, 
/*min_count=*/3);
 
   for (bool use_exec_plan : {false, true}) {
     for (bool use_threads : {true, false}) {
@@ -3872,10 +3899,10 @@ TEST(GroupBy, SumNullType) {
                                },
                                {table->GetColumnByName("key")},
                                {
-                                   {"hash_sum", &no_min},
-                                   {"hash_sum", &keep_nulls},
-                                   {"hash_sum", &min_count},
-                                   {"hash_sum", &keep_nulls_min_count},
+                                   {"hash_sum", no_min},
+                                   {"hash_sum", keep_nulls},
+                                   {"hash_sum", min_count},
+                                   {"hash_sum", keep_nulls_min_count},
                                },
                                use_threads, use_exec_plan));
       SortBy({"key_0"}, &aggregated_and_grouped);
@@ -3918,10 +3945,14 @@ TEST(GroupBy, ProductNullType) {
     [null, 3]
                         ])"});
 
-  ScalarAggregateOptions no_min(/*skip_nulls=*/true, /*min_count=*/0);
-  ScalarAggregateOptions min_count(/*skip_nulls=*/true, /*min_count=*/3);
-  ScalarAggregateOptions keep_nulls(/*skip_nulls=*/false, /*min_count=*/0);
-  ScalarAggregateOptions keep_nulls_min_count(/*skip_nulls=*/false, 
/*min_count=*/3);
+  auto no_min =
+      std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true, 
/*min_count=*/0);
+  auto min_count =
+      std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true, 
/*min_count=*/3);
+  auto keep_nulls =
+      std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false, 
/*min_count=*/0);
+  auto keep_nulls_min_count =
+      std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false, 
/*min_count=*/3);
 
   for (bool use_exec_plan : {false, true}) {
     for (bool use_threads : {true, false}) {
@@ -3936,10 +3967,10 @@ TEST(GroupBy, ProductNullType) {
                                },
                                {table->GetColumnByName("key")},
                                {
-                                   {"hash_product", &no_min},
-                                   {"hash_product", &keep_nulls},
-                                   {"hash_product", &min_count},
-                                   {"hash_product", &keep_nulls_min_count},
+                                   {"hash_product", no_min},
+                                   {"hash_product", keep_nulls},
+                                   {"hash_product", min_count},
+                                   {"hash_product", keep_nulls_min_count},
                                },
                                use_threads, use_exec_plan));
       SortBy({"key_0"}, &aggregated_and_grouped);
@@ -3982,10 +4013,14 @@ TEST(GroupBy, MeanNullType) {
     [null, 3]
                         ])"});
 
-  ScalarAggregateOptions no_min(/*skip_nulls=*/true, /*min_count=*/0);
-  ScalarAggregateOptions min_count(/*skip_nulls=*/true, /*min_count=*/3);
-  ScalarAggregateOptions keep_nulls(/*skip_nulls=*/false, /*min_count=*/0);
-  ScalarAggregateOptions keep_nulls_min_count(/*skip_nulls=*/false, 
/*min_count=*/3);
+  auto no_min =
+      std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true, 
/*min_count=*/0);
+  auto min_count =
+      std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true, 
/*min_count=*/3);
+  auto keep_nulls =
+      std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false, 
/*min_count=*/0);
+  auto keep_nulls_min_count =
+      std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false, 
/*min_count=*/3);
 
   for (bool use_exec_plan : {false, true}) {
     for (bool use_threads : {true, false}) {
@@ -4000,10 +4035,10 @@ TEST(GroupBy, MeanNullType) {
                                },
                                {table->GetColumnByName("key")},
                                {
-                                   {"hash_mean", &no_min},
-                                   {"hash_mean", &keep_nulls},
-                                   {"hash_mean", &min_count},
-                                   {"hash_mean", &keep_nulls_min_count},
+                                   {"hash_mean", no_min},
+                                   {"hash_mean", keep_nulls},
+                                   {"hash_mean", min_count},
+                                   {"hash_mean", keep_nulls_min_count},
                                },
                                use_threads, use_exec_plan));
       SortBy({"key_0"}, &aggregated_and_grouped);
@@ -4031,10 +4066,14 @@ TEST(GroupBy, NullTypeEmptyTable) {
   auto table = TableFromJSON(schema({field("argument", null()), field("key", 
int64())}),
                              {R"([])"});
 
-  ScalarAggregateOptions no_min(/*skip_nulls=*/true, /*min_count=*/0);
-  ScalarAggregateOptions min_count(/*skip_nulls=*/true, /*min_count=*/3);
-  ScalarAggregateOptions keep_nulls(/*skip_nulls=*/false, /*min_count=*/0);
-  ScalarAggregateOptions keep_nulls_min_count(/*skip_nulls=*/false, 
/*min_count=*/3);
+  auto no_min =
+      std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true, 
/*min_count=*/0);
+  auto min_count =
+      std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true, 
/*min_count=*/3);
+  auto keep_nulls =
+      std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false, 
/*min_count=*/0);
+  auto keep_nulls_min_count =
+      std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false, 
/*min_count=*/3);
 
   for (bool use_exec_plan : {false, true}) {
     for (bool use_threads : {true, false}) {
@@ -4048,9 +4087,9 @@ TEST(GroupBy, NullTypeEmptyTable) {
                                },
                                {table->GetColumnByName("key")},
                                {
-                                   {"hash_sum", &no_min},
-                                   {"hash_product", &min_count},
-                                   {"hash_mean", &keep_nulls},
+                                   {"hash_sum", no_min},
+                                   {"hash_product", min_count},
+                                   {"hash_mean", keep_nulls},
                                },
                                use_threads, use_exec_plan));
       auto struct_arr = aggregated_and_grouped.array_as<StructArray>();
diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx
index 936fe2f0cd..04b57859ad 100644
--- a/python/pyarrow/_compute.pyx
+++ b/python/pyarrow/_compute.pyx
@@ -2136,9 +2136,9 @@ def _group_by(args, keys, aggregations):
     for aggr_func_name, aggr_opts in aggregations:
         c_aggr.function = tobytes(aggr_func_name)
         if aggr_opts is not None:
-            c_aggr.options = (<FunctionOptions?> aggr_opts).get_options()
+            c_aggr.options = (<FunctionOptions?>aggr_opts).wrapped
         else:
-            c_aggr.options = NULL
+            c_aggr.options = <shared_ptr[CFunctionOptions]>nullptr
         c_aggregations.push_back(c_aggr)
 
     with nogil:
diff --git a/python/pyarrow/includes/libarrow.pxd 
b/python/pyarrow/includes/libarrow.pxd
index 8597874ea1..302ac99c36 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -2410,7 +2410,7 @@ cdef extern from "arrow/compute/exec/aggregate.h" 
namespace \
         "arrow::compute::internal" nogil:
     cdef cppclass CAggregate "arrow::compute::internal::Aggregate":
         c_string function
-        const CFunctionOptions* options
+        shared_ptr[CFunctionOptions] options
 
     CResult[CDatum] GroupBy(const vector[CDatum]& arguments,
                             const vector[CDatum]& keys,
diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp
index d9eaf50cd7..089d1e71eb 100644
--- a/r/src/compute-exec.cpp
+++ b/r/src/compute-exec.cpp
@@ -231,15 +231,13 @@ std::shared_ptr<compute::ExecNode> ExecNode_Aggregate(
     std::vector<std::string> target_names, std::vector<std::string> 
out_field_names,
     std::vector<std::string> key_names) {
   std::vector<arrow::compute::internal::Aggregate> aggregates;
-  std::vector<std::shared_ptr<arrow::compute::FunctionOptions>> keep_alives;
 
   for (cpp11::list name_opts : options) {
     auto name = cpp11::as_cpp<std::string>(name_opts[0]);
     auto opts = make_compute_options(name, name_opts[1]);
 
     aggregates.push_back(
-        arrow::compute::internal::Aggregate{std::move(name), opts.get()});
-    keep_alives.push_back(std::move(opts));
+        arrow::compute::internal::Aggregate{std::move(name), std::move(opts)});
   }
 
   std::vector<arrow::FieldRef> targets, keys;

Reply via email to