This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 1fb857f02a ARROW-16686: [C++] Use shared_ptr with FunctionOptions
(#13344)
1fb857f02a is described below
commit 1fb857f02a59cdebd98943f82dd53a56a58d248e
Author: Vibhatha Lakmal Abeykoon <[email protected]>
AuthorDate: Wed Jun 22 01:18:40 2022 +0530
ARROW-16686: [C++] Use shared_ptr with FunctionOptions (#13344)
Includes changes to remove raw pointer usage with `FunctionOptions` used in
`Aggregate`.
This also includes changes in removing `keep_alives` from R aggregate
bindings as well.
Lead-authored-by: Vibhatha Abeykoon <[email protected]>
Co-authored-by: Vibhatha Lakmal Abeykoon <[email protected]>
Co-authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: David Li <[email protected]>
---
c_glib/arrow-glib/compute.cpp | 9 +-
.../arrow/execution_plan_documentation_examples.cc | 6 +-
cpp/src/arrow/compute/api_aggregate.h | 2 +-
cpp/src/arrow/compute/exec/aggregate.cc | 2 +-
cpp/src/arrow/compute/exec/aggregate_node.cc | 54 +--
cpp/src/arrow/compute/exec/plan_test.cc | 31 +-
cpp/src/arrow/compute/exec/tpch_benchmark.cc | 11 +-
.../arrow/compute/kernels/hash_aggregate_test.cc | 407 +++++++++++----------
python/pyarrow/_compute.pyx | 4 +-
python/pyarrow/includes/libarrow.pxd | 2 +-
r/src/compute-exec.cpp | 4 +-
11 files changed, 277 insertions(+), 255 deletions(-)
diff --git a/c_glib/arrow-glib/compute.cpp b/c_glib/arrow-glib/compute.cpp
index 96346edfd6..ca5bc1a76f 100644
--- a/c_glib/arrow-glib/compute.cpp
+++ b/c_glib/arrow-glib/compute.cpp
@@ -1265,7 +1265,14 @@ garrow_aggregate_node_options_new(GList *aggregations,
function_options =
garrow_function_options_get_raw(aggregation_priv->options);
};
- arrow_aggregates.push_back({aggregation_priv->function, function_options});
+ if (function_options) {
+ arrow_aggregates.push_back({
+ aggregation_priv->function,
+ function_options->Copy(),
+ });
+ } else {
+ arrow_aggregates.push_back({aggregation_priv->function, nullptr});
+ };
if (!garrow_field_refs_add(arrow_targets,
aggregation_priv->input,
error,
diff --git a/cpp/examples/arrow/execution_plan_documentation_examples.cc
b/cpp/examples/arrow/execution_plan_documentation_examples.cc
index 5f195c9b20..5f80119bbb 100644
--- a/cpp/examples/arrow/execution_plan_documentation_examples.cc
+++ b/cpp/examples/arrow/execution_plan_documentation_examples.cc
@@ -507,7 +507,7 @@ arrow::Status
SourceScalarAggregateSinkExample(cp::ExecContext& exec_context) {
/*names=*/{"sum(a)"}};
ARROW_ASSIGN_OR_RAISE(
cp::ExecNode * aggregate,
- cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
+ cp::MakeExecNode("aggregate", plan.get(), {source},
std::move(aggregate_options)));
ARROW_RETURN_NOT_OK(
cp::MakeExecNode("sink", plan.get(), {aggregate},
cp::SinkNodeOptions{&sink_gen}));
@@ -539,9 +539,9 @@ arrow::Status
SourceGroupAggregateSinkExample(cp::ExecContext& exec_context) {
ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
cp::MakeExecNode("source", plan.get(), {},
source_node_options));
- cp::CountOptions options(cp::CountOptions::ONLY_VALID);
+ auto options =
std::make_shared<cp::CountOptions>(cp::CountOptions::ONLY_VALID);
auto aggregate_options =
- cp::AggregateNodeOptions{/*aggregates=*/{{"hash_count", &options}},
+ cp::AggregateNodeOptions{/*aggregates=*/{{"hash_count", options}},
/*targets=*/{"a"},
/*names=*/{"count(a)"},
/*keys=*/{"b"}};
diff --git a/cpp/src/arrow/compute/api_aggregate.h
b/cpp/src/arrow/compute/api_aggregate.h
index 7977889bf7..becd5a7414 100644
--- a/cpp/src/arrow/compute/api_aggregate.h
+++ b/cpp/src/arrow/compute/api_aggregate.h
@@ -401,7 +401,7 @@ struct ARROW_EXPORT Aggregate {
std::string function;
/// options for the aggregation function
- const FunctionOptions* options;
+ std::shared_ptr<FunctionOptions> options;
};
} // namespace internal
diff --git a/cpp/src/arrow/compute/exec/aggregate.cc
b/cpp/src/arrow/compute/exec/aggregate.cc
index 1a81fdac3d..934cdd4d1f 100644
--- a/cpp/src/arrow/compute/exec/aggregate.cc
+++ b/cpp/src/arrow/compute/exec/aggregate.cc
@@ -55,7 +55,7 @@ Result<std::vector<std::unique_ptr<KernelState>>> InitKernels(
std::vector<std::unique_ptr<KernelState>> states(kernels.size());
for (size_t i = 0; i < aggregates.size(); ++i) {
- auto options = aggregates[i].options;
+ const FunctionOptions* options = aggregates[i].options.get();
if (options == nullptr) {
// use known default options for the named function if possible
diff --git a/cpp/src/arrow/compute/exec/aggregate_node.cc
b/cpp/src/arrow/compute/exec/aggregate_node.cc
index 712515bd58..c5c5d3efcf 100644
--- a/cpp/src/arrow/compute/exec/aggregate_node.cc
+++ b/cpp/src/arrow/compute/exec/aggregate_node.cc
@@ -43,18 +43,16 @@ namespace compute {
namespace {
-void AggregatesToString(
- std::stringstream* ss, const Schema& input_schema,
- const std::vector<internal::Aggregate>& aggs,
- const std::vector<int>& target_field_ids,
- const std::vector<std::unique_ptr<FunctionOptions>>& owned_options, int
indent = 0) {
+void AggregatesToString(std::stringstream* ss, const Schema& input_schema,
+ const std::vector<internal::Aggregate>& aggs,
+ const std::vector<int>& target_field_ids, int indent =
0) {
*ss << "aggregates=[" << std::endl;
for (size_t i = 0; i < aggs.size(); i++) {
for (int j = 0; j < indent; ++j) *ss << " ";
*ss << '\t' << aggs[i].function << '('
<< input_schema.field(target_field_ids[i])->name();
- if (owned_options[i]) {
- *ss << ", " << owned_options[i]->ToString();
+ if (aggs[i].options) {
+ *ss << ", " << aggs[i].options->ToString();
}
*ss << ")," << std::endl;
}
@@ -69,16 +67,14 @@ class ScalarAggregateNode : public ExecNode {
std::vector<int> target_field_ids,
std::vector<internal::Aggregate> aggs,
std::vector<const ScalarAggregateKernel*> kernels,
- std::vector<std::vector<std::unique_ptr<KernelState>>>
states,
- std::vector<std::unique_ptr<FunctionOptions>>
owned_options)
+ std::vector<std::vector<std::unique_ptr<KernelState>>>
states)
: ExecNode(plan, std::move(inputs), {"target"},
/*output_schema=*/std::move(output_schema),
/*num_outputs=*/1),
target_field_ids_(std::move(target_field_ids)),
aggs_(std::move(aggs)),
kernels_(std::move(kernels)),
- states_(std::move(states)),
- owned_options_(std::move(owned_options)) {}
+ states_(std::move(states)) {}
static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
@@ -95,7 +91,6 @@ class ScalarAggregateNode : public ExecNode {
FieldVector fields(kernels.size());
const auto& field_names = aggregate_options.names;
std::vector<int> target_field_ids(kernels.size());
- std::vector<std::unique_ptr<FunctionOptions>>
owned_options(aggregates.size());
for (size_t i = 0; i < kernels.size(); ++i) {
ARROW_ASSIGN_OR_RAISE(auto match,
@@ -116,11 +111,7 @@ class ScalarAggregateNode : public ExecNode {
kernels[i] = static_cast<const ScalarAggregateKernel*>(kernel);
if (aggregates[i].options == nullptr) {
- aggregates[i].options = function->default_options();
- }
- if (aggregates[i].options) {
- owned_options[i] = aggregates[i].options->Copy();
- aggregates[i].options = owned_options[i].get();
+ aggregates[i].options = function->default_options()->Copy();
}
KernelContext kernel_ctx{exec_ctx};
@@ -130,7 +121,7 @@ class ScalarAggregateNode : public ExecNode {
{
in_type,
},
- aggregates[i].options},
+
aggregates[i].options.get()},
&states[i]));
// pick one to resolve the kernel signature
@@ -143,8 +134,7 @@ class ScalarAggregateNode : public ExecNode {
return plan->EmplaceNode<ScalarAggregateNode>(
plan, std::move(inputs), schema(std::move(fields)),
std::move(target_field_ids),
- std::move(aggregates), std::move(kernels), std::move(states),
- std::move(owned_options));
+ std::move(aggregates), std::move(kernels), std::move(states));
}
const char* kind_name() const override { return "ScalarAggregateNode"; }
@@ -242,7 +232,7 @@ class ScalarAggregateNode : public ExecNode {
std::string ToStringExtra(int indent = 0) const override {
std::stringstream ss;
const auto input_schema = inputs_[0]->output_schema();
- AggregatesToString(&ss, *input_schema, aggs_, target_field_ids_,
owned_options_);
+ AggregatesToString(&ss, *input_schema, aggs_, target_field_ids_);
return ss.str();
}
@@ -277,7 +267,6 @@ class ScalarAggregateNode : public ExecNode {
const std::vector<const ScalarAggregateKernel*> kernels_;
std::vector<std::vector<std::unique_ptr<KernelState>>> states_;
- const std::vector<std::unique_ptr<FunctionOptions>> owned_options_;
ThreadIndexer get_thread_index_;
AtomicCounter input_counter_;
@@ -288,16 +277,14 @@ class GroupByNode : public ExecNode {
GroupByNode(ExecNode* input, std::shared_ptr<Schema> output_schema,
ExecContext* ctx,
std::vector<int> key_field_ids, std::vector<int>
agg_src_field_ids,
std::vector<internal::Aggregate> aggs,
- std::vector<const HashAggregateKernel*> agg_kernels,
- std::vector<std::unique_ptr<FunctionOptions>> owned_options)
+ std::vector<const HashAggregateKernel*> agg_kernels)
: ExecNode(input->plan(), {input}, {"groupby"}, std::move(output_schema),
/*num_outputs=*/1),
ctx_(ctx),
key_field_ids_(std::move(key_field_ids)),
agg_src_field_ids_(std::move(agg_src_field_ids)),
aggs_(std::move(aggs)),
- agg_kernels_(std::move(agg_kernels)),
- owned_options_(std::move(owned_options)) {}
+ agg_kernels_(std::move(agg_kernels)) {}
static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
@@ -363,17 +350,9 @@ class GroupByNode : public ExecNode {
output_fields[base + i] = input_schema->field(key_field_id);
}
- std::vector<std::unique_ptr<FunctionOptions>> owned_options;
- owned_options.reserve(aggs.size());
- for (auto& agg : aggs) {
- owned_options.push_back(agg.options ? agg.options->Copy() : nullptr);
- agg.options = owned_options.back().get();
- }
-
return input->plan()->EmplaceNode<GroupByNode>(
input, schema(std::move(output_fields)), ctx, std::move(key_field_ids),
- std::move(agg_src_field_ids), std::move(aggs), std::move(agg_kernels),
- std::move(owned_options));
+ std::move(agg_src_field_ids), std::move(aggs), std::move(agg_kernels));
}
const char* kind_name() const override { return "GroupByNode"; }
@@ -623,8 +602,7 @@ class GroupByNode : public ExecNode {
ss << '"' << input_schema->field(key_field_ids_[i])->name() << '"';
}
ss << "], ";
- AggregatesToString(&ss, *input_schema, aggs_, agg_src_field_ids_,
owned_options_,
- indent);
+ AggregatesToString(&ss, *input_schema, aggs_, agg_src_field_ids_, indent);
return ss.str();
}
@@ -684,8 +662,6 @@ class GroupByNode : public ExecNode {
const std::vector<int> agg_src_field_ids_;
const std::vector<internal::Aggregate> aggs_;
const std::vector<const HashAggregateKernel*> agg_kernels_;
- // ARROW-13638: must hold owned copy of function options
- const std::vector<std::unique_ptr<FunctionOptions>> owned_options_;
ThreadIndexer get_thread_index_;
AtomicCounter input_counter_, output_counter_;
diff --git a/cpp/src/arrow/compute/exec/plan_test.cc
b/cpp/src/arrow/compute/exec/plan_test.cc
index ea9d8ebf49..2df3c5e915 100644
--- a/cpp/src/arrow/compute/exec/plan_test.cc
+++ b/cpp/src/arrow/compute/exec/plan_test.cc
@@ -375,7 +375,8 @@ TEST(ExecPlan, ToString) {
)");
ASSERT_OK_AND_ASSIGN(plan, ExecPlan::Make());
- CountOptions options(CountOptions::ONLY_VALID);
+ std::shared_ptr<CountOptions> options =
+ std::make_shared<CountOptions>(CountOptions::ONLY_VALID);
ASSERT_OK(
Declaration::Sequence(
{
@@ -390,7 +391,7 @@ TEST(ExecPlan, ToString) {
}}},
{"aggregate",
AggregateNodeOptions{
- /*aggregates=*/{{"hash_sum", nullptr}, {"hash_count",
&options}},
+ /*aggregates=*/{{"hash_sum", nullptr}, {"hash_count",
options}},
/*targets=*/{"multiply(i32, 2)", "multiply(i32, 2)"},
/*names=*/{"sum(multiply(i32, 2))", "count(multiply(i32,
2))"},
/*keys=*/{"bool"}}},
@@ -428,17 +429,17 @@
custom_sink_label:OrderBySinkNode{by={sort_keys=[FieldRef.Name(sum(multiply(i32,
rhs.label = "rhs";
union_node.inputs.emplace_back(lhs);
union_node.inputs.emplace_back(rhs);
- ASSERT_OK(
- Declaration::Sequence(
- {
- union_node,
- {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"count",
&options}},
- /*targets=*/{"i32"},
- /*names=*/{"count(i32)"},
- /*keys=*/{}}},
- {"sink", SinkNodeOptions{&sink_gen}},
- })
- .AddToPlan(plan.get()));
+ ASSERT_OK(Declaration::Sequence(
+ {
+ union_node,
+ {"aggregate",
+ AggregateNodeOptions{/*aggregates=*/{{"count",
std::move(options)}},
+ /*targets=*/{"i32"},
+ /*names=*/{"count(i32)"},
+ /*keys=*/{}}},
+ {"sink", SinkNodeOptions{&sink_gen}},
+ })
+ .AddToPlan(plan.get()));
EXPECT_EQ(plan->ToString(), R"a(ExecPlan with 5 nodes:
:SinkNode{}
:ScalarAggregateNode{aggregates=[
@@ -1155,7 +1156,7 @@ TEST(ExecPlanExecution, AggregationPreservesOptions) {
basic_data.gen(/*parallel=*/false,
/*slow=*/false)}},
{"aggregate",
- AggregateNodeOptions{/*aggregates=*/{{"tdigest",
options.get()}},
+ AggregateNodeOptions{/*aggregates=*/{{"tdigest",
options}},
/*targets=*/{"i32"},
/*names=*/{"tdigest(i32)"}}},
{"sink", SinkNodeOptions{&sink_gen}},
@@ -1182,7 +1183,7 @@ TEST(ExecPlanExecution, AggregationPreservesOptions) {
{"source", SourceNodeOptions{data.schema,
data.gen(/*parallel=*/false,
/*slow=*/false)}},
{"aggregate",
- AggregateNodeOptions{/*aggregates=*/{{"hash_count",
options.get()}},
+ AggregateNodeOptions{/*aggregates=*/{{"hash_count",
options}},
/*targets=*/{"i32"},
/*names=*/{"count(i32)"},
/*keys=*/{"str"}}},
diff --git a/cpp/src/arrow/compute/exec/tpch_benchmark.cc
b/cpp/src/arrow/compute/exec/tpch_benchmark.cc
index 98a3226571..82584f58e9 100644
--- a/cpp/src/arrow/compute/exec/tpch_benchmark.cc
+++ b/cpp/src/arrow/compute/exec/tpch_benchmark.cc
@@ -74,12 +74,13 @@ std::shared_ptr<ExecPlan>
Plan_Q1(AsyncGenerator<util::optional<ExecBatch>>* sin
"sum_charge", "avg_qty", "avg_price", "avg_disc"};
ProjectNodeOptions project_opts(std::move(projection_list),
std::move(project_names));
- ScalarAggregateOptions sum_opts = ScalarAggregateOptions::Defaults();
- CountOptions count_opts(CountOptions::CountMode::ALL);
+ auto sum_opts =
+
std::make_shared<ScalarAggregateOptions>(ScalarAggregateOptions::Defaults());
+ auto count_opts =
std::make_shared<CountOptions>(CountOptions::CountMode::ALL);
std::vector<arrow::compute::internal::Aggregate> aggs = {
- {"hash_sum", &sum_opts}, {"hash_sum", &sum_opts}, {"hash_sum",
&sum_opts},
- {"hash_sum", &sum_opts}, {"hash_mean", &sum_opts}, {"hash_mean",
&sum_opts},
- {"hash_mean", &sum_opts}, {"hash_count", &count_opts}};
+ {"hash_sum", sum_opts}, {"hash_sum", sum_opts}, {"hash_sum",
sum_opts},
+ {"hash_sum", sum_opts}, {"hash_mean", sum_opts}, {"hash_mean",
sum_opts},
+ {"hash_mean", sum_opts}, {"hash_count", count_opts}};
std::vector<FieldRef> to_aggregate = {"sum_qty", "sum_base_price",
"sum_disc_price",
"sum_charge", "avg_qty",
"avg_price",
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
index 2ed10b9b2b..7b47845f23 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
@@ -101,8 +101,8 @@ Result<Datum> NaiveGroupBy(std::vector<Datum> arguments,
std::vector<Datum> keys
for (int64_t i_group = 0; i_group < grouper->num_groups(); ++i_group) {
auto slice = grouped_argument->value_slice(i_group);
if (slice->length() == 0) continue;
- ARROW_ASSIGN_OR_RAISE(
- Datum d, CallFunction(scalar_agg_function, {slice},
aggregates[i].options));
+ ARROW_ASSIGN_OR_RAISE(Datum d, CallFunction(scalar_agg_function, {slice},
+
aggregates[i].options.get()));
aggregated_scalars.push_back(d.scalar());
}
@@ -853,18 +853,18 @@ TEST(GroupBy, CountScalar) {
};
input.schema = schema({field("argument", int32()), field("key", int64())});
- CountOptions skip_nulls(CountOptions::ONLY_VALID);
- CountOptions keep_nulls(CountOptions::ONLY_NULL);
- CountOptions count_all(CountOptions::ALL);
+ auto skip_nulls = std::make_shared<CountOptions>(CountOptions::ONLY_VALID);
+ auto keep_nulls = std::make_shared<CountOptions>(CountOptions::ONLY_NULL);
+ auto count_all = std::make_shared<CountOptions>(CountOptions::ALL);
for (bool use_threads : {true, false}) {
SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
ASSERT_OK_AND_ASSIGN(
Datum actual,
GroupByUsingExecPlan(input, {"key"}, {"argument", "argument",
"argument"},
{
- {"hash_count", &skip_nulls},
- {"hash_count", &keep_nulls},
- {"hash_count", &count_all},
+ {"hash_count", skip_nulls},
+ {"hash_count", keep_nulls},
+ {"hash_count", count_all},
},
use_threads, default_exec_context()));
Datum expected = ArrayFromJSON(struct_({
@@ -1028,14 +1028,15 @@ TEST(GroupBy, MeanOnly) {
[null, 3]
])"});
- ScalarAggregateOptions min_count(/*skip_nulls=*/true, /*min_count=*/3);
+ auto min_count =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true,
/*min_count=*/3);
ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped,
internal::GroupBy({table->GetColumnByName("argument"),
table->GetColumnByName("argument")},
{table->GetColumnByName("key")},
{
{"hash_mean", nullptr},
- {"hash_mean", &min_count},
+ {"hash_mean", min_count},
},
use_threads));
SortBy({"key_0"}, &aggregated_and_grouped);
@@ -1178,7 +1179,7 @@ TEST(GroupBy, VarianceAndStddev) {
/*verbose=*/true);
// Test ddof
- VarianceOptions variance_options(/*ddof=*/2);
+ auto variance_options = std::make_shared<VarianceOptions>(/*ddof=*/2);
ASSERT_OK_AND_ASSIGN(aggregated_and_grouped,
internal::GroupBy(
{
@@ -1189,8 +1190,8 @@ TEST(GroupBy, VarianceAndStddev) {
batch->GetColumnByName("key"),
},
{
- {"hash_variance", &variance_options},
- {"hash_stddev", &variance_options},
+ {"hash_variance", variance_options},
+ {"hash_stddev", variance_options},
}));
AssertDatumsApproxEqual(ArrayFromJSON(struct_({
@@ -1276,15 +1277,19 @@ TEST(GroupBy, TDigest) {
[null, 4]
])");
- TDigestOptions options1(std::vector<double>{0.5, 0.9, 0.99});
- TDigestOptions options2(std::vector<double>{0.5, 0.9, 0.99}, /*delta=*/50,
- /*buffer_size=*/1024);
- TDigestOptions keep_nulls(/*q=*/0.5, /*delta=*/100, /*buffer_size=*/500,
- /*skip_nulls=*/false, /*min_count=*/0);
- TDigestOptions min_count(/*q=*/0.5, /*delta=*/100, /*buffer_size=*/500,
- /*skip_nulls=*/true, /*min_count=*/3);
- TDigestOptions keep_nulls_min_count(/*q=*/0.5, /*delta=*/100,
/*buffer_size=*/500,
- /*skip_nulls=*/false, /*min_count=*/3);
+ auto options1 = std::make_shared<TDigestOptions>(std::vector<double>{0.5,
0.9, 0.99});
+ auto options2 =
+ std::make_shared<TDigestOptions>(std::vector<double>{0.5, 0.9, 0.99},
/*delta=*/50,
+ /*buffer_size=*/1024);
+ auto keep_nulls =
+ std::make_shared<TDigestOptions>(/*q=*/0.5, /*delta=*/100,
/*buffer_size=*/500,
+ /*skip_nulls=*/false, /*min_count=*/0);
+ auto min_count =
+ std::make_shared<TDigestOptions>(/*q=*/0.5, /*delta=*/100,
/*buffer_size=*/500,
+ /*skip_nulls=*/true, /*min_count=*/3);
+ auto keep_nulls_min_count =
+ std::make_shared<TDigestOptions>(/*q=*/0.5, /*delta=*/100,
/*buffer_size=*/500,
+ /*skip_nulls=*/false, /*min_count=*/3);
ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped,
internal::GroupBy(
{
@@ -1300,11 +1305,11 @@ TEST(GroupBy, TDigest) {
},
{
{"hash_tdigest", nullptr},
- {"hash_tdigest", &options1},
- {"hash_tdigest", &options2},
- {"hash_tdigest", &keep_nulls},
- {"hash_tdigest", &min_count},
- {"hash_tdigest", &keep_nulls_min_count},
+ {"hash_tdigest", options1},
+ {"hash_tdigest", options2},
+ {"hash_tdigest", keep_nulls},
+ {"hash_tdigest", min_count},
+ {"hash_tdigest", keep_nulls_min_count},
}));
AssertDatumsApproxEqual(
@@ -1390,12 +1395,12 @@ TEST(GroupBy, ApproximateMedian) {
[null, 4]
])");
- ScalarAggregateOptions options;
- ScalarAggregateOptions keep_nulls(
+ std::shared_ptr<ScalarAggregateOptions> options;
+ auto keep_nulls = std::make_shared<ScalarAggregateOptions>(
/*skip_nulls=*/false, /*min_count=*/0);
- ScalarAggregateOptions min_count(
+ auto min_count = std::make_shared<ScalarAggregateOptions>(
/*skip_nulls=*/true, /*min_count=*/3);
- ScalarAggregateOptions keep_nulls_min_count(
+ auto keep_nulls_min_count = std::make_shared<ScalarAggregateOptions>(
/*skip_nulls=*/false, /*min_count=*/3);
ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped,
internal::GroupBy(
@@ -1409,10 +1414,10 @@ TEST(GroupBy, ApproximateMedian) {
batch->GetColumnByName("key"),
},
{
- {"hash_approximate_median", &options},
- {"hash_approximate_median", &keep_nulls},
- {"hash_approximate_median", &min_count},
- {"hash_approximate_median",
&keep_nulls_min_count},
+ {"hash_approximate_median", options},
+ {"hash_approximate_median", keep_nulls},
+ {"hash_approximate_median", min_count},
+ {"hash_approximate_median",
keep_nulls_min_count},
}));
AssertDatumsApproxEqual(ArrayFromJSON(struct_({
@@ -1502,31 +1507,34 @@ TEST(GroupBy, VarianceOptions) {
input.schema = schema(
{field("argument", int32()), field("argument1", float32()), field("key",
int64())});
- VarianceOptions keep_nulls(/*ddof=*/0, /*skip_nulls=*/false,
/*min_count=*/0);
- VarianceOptions min_count(/*ddof=*/0, /*skip_nulls=*/true, /*min_count=*/3);
- VarianceOptions keep_nulls_min_count(/*ddof=*/0, /*skip_nulls=*/false,
/*min_count=*/3);
+ auto keep_nulls = std::make_shared<VarianceOptions>(/*ddof=*/0,
/*skip_nulls=*/false,
+ /*min_count=*/0);
+ auto min_count =
+ std::make_shared<VarianceOptions>(/*ddof=*/0, /*skip_nulls=*/true,
/*min_count=*/3);
+ auto keep_nulls_min_count = std::make_shared<VarianceOptions>(
+ /*ddof=*/0, /*skip_nulls=*/false, /*min_count=*/3);
for (bool use_threads : {false}) {
SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
- ASSERT_OK_AND_ASSIGN(
- Datum actual, GroupByUsingExecPlan(input, {"key"},
- {
- "argument",
- "argument",
- "argument",
- "argument",
- "argument",
- "argument",
- },
- {
- {"hash_stddev", &keep_nulls},
- {"hash_stddev", &min_count},
- {"hash_stddev",
&keep_nulls_min_count},
- {"hash_variance", &keep_nulls},
- {"hash_variance", &min_count},
- {"hash_variance",
&keep_nulls_min_count},
- },
- use_threads,
default_exec_context()));
+ ASSERT_OK_AND_ASSIGN(Datum actual,
+ GroupByUsingExecPlan(input, {"key"},
+ {
+ "argument",
+ "argument",
+ "argument",
+ "argument",
+ "argument",
+ "argument",
+ },
+ {
+ {"hash_stddev", keep_nulls},
+ {"hash_stddev", min_count},
+ {"hash_stddev",
keep_nulls_min_count},
+ {"hash_variance",
keep_nulls},
+ {"hash_variance", min_count},
+ {"hash_variance",
keep_nulls_min_count},
+ },
+ use_threads,
default_exec_context()));
Datum expected = ArrayFromJSON(struct_({
field("hash_stddev", float64()),
field("hash_stddev", float64()),
@@ -1545,25 +1553,25 @@ TEST(GroupBy, VarianceOptions) {
ValidateOutput(expected);
AssertDatumsApproxEqual(expected, actual, /*verbose=*/true);
- ASSERT_OK_AND_ASSIGN(
- actual, GroupByUsingExecPlan(input, {"key"},
- {
- "argument1",
- "argument1",
- "argument1",
- "argument1",
- "argument1",
- "argument1",
- },
- {
- {"hash_stddev", &keep_nulls},
- {"hash_stddev", &min_count},
- {"hash_stddev",
&keep_nulls_min_count},
- {"hash_variance", &keep_nulls},
- {"hash_variance", &min_count},
- {"hash_variance",
&keep_nulls_min_count},
- },
- use_threads, default_exec_context()));
+ ASSERT_OK_AND_ASSIGN(actual,
+ GroupByUsingExecPlan(input, {"key"},
+ {
+ "argument1",
+ "argument1",
+ "argument1",
+ "argument1",
+ "argument1",
+ "argument1",
+ },
+ {
+ {"hash_stddev", keep_nulls},
+ {"hash_stddev", min_count},
+ {"hash_stddev",
keep_nulls_min_count},
+ {"hash_variance",
keep_nulls},
+ {"hash_variance", min_count},
+ {"hash_variance",
keep_nulls_min_count},
+ },
+ use_threads,
default_exec_context()));
expected = ArrayFromJSON(struct_({
field("hash_stddev", float64()),
field("hash_stddev", float64()),
@@ -2032,10 +2040,14 @@ TEST(GroupBy, AnyAndAll) {
[null, 3]
])"});
- ScalarAggregateOptions no_min(/*skip_nulls=*/true, /*min_count=*/0);
- ScalarAggregateOptions min_count(/*skip_nulls=*/true, /*min_count=*/3);
- ScalarAggregateOptions keep_nulls(/*skip_nulls=*/false, /*min_count=*/0);
- ScalarAggregateOptions keep_nulls_min_count(/*skip_nulls=*/false,
/*min_count=*/3);
+ auto no_min =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true,
/*min_count=*/0);
+ auto min_count =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true,
/*min_count=*/3);
+ auto keep_nulls =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false,
/*min_count=*/0);
+ auto keep_nulls_min_count =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false,
/*min_count=*/3);
ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped,
internal::GroupBy(
{
@@ -2050,14 +2062,14 @@ TEST(GroupBy, AnyAndAll) {
},
{table->GetColumnByName("key")},
{
- {"hash_any", &no_min},
- {"hash_any", &min_count},
- {"hash_any", &keep_nulls},
- {"hash_any", &keep_nulls_min_count},
- {"hash_all", &no_min},
- {"hash_all", &min_count},
- {"hash_all", &keep_nulls},
- {"hash_all", &keep_nulls_min_count},
+ {"hash_any", no_min},
+ {"hash_any", min_count},
+ {"hash_any", keep_nulls},
+ {"hash_any", keep_nulls_min_count},
+ {"hash_all", no_min},
+ {"hash_all", min_count},
+ {"hash_all", keep_nulls},
+ {"hash_all", keep_nulls_min_count},
},
use_threads));
SortBy({"key_0"}, &aggregated_and_grouped);
@@ -2103,7 +2115,8 @@ TEST(GroupBy, AnyAllScalar) {
};
input.schema = schema({field("argument", boolean()), field("key", int64())});
- ScalarAggregateOptions keep_nulls(/*skip_nulls=*/false, /*min_count=*/0);
+ auto keep_nulls =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false,
/*min_count=*/0);
for (bool use_threads : {true, false}) {
SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
ASSERT_OK_AND_ASSIGN(
@@ -2113,8 +2126,8 @@ TEST(GroupBy, AnyAllScalar) {
{
{"hash_any", nullptr},
{"hash_all", nullptr},
- {"hash_any", &keep_nulls},
- {"hash_all", &keep_nulls},
+ {"hash_any", keep_nulls},
+ {"hash_all", keep_nulls},
},
use_threads, default_exec_context()));
Datum expected = ArrayFromJSON(struct_({
@@ -2134,9 +2147,9 @@ TEST(GroupBy, AnyAllScalar) {
}
TEST(GroupBy, CountDistinct) {
- CountOptions all(CountOptions::ALL);
- CountOptions only_valid(CountOptions::ONLY_VALID);
- CountOptions only_null(CountOptions::ONLY_NULL);
+ auto all = std::make_shared<CountOptions>(CountOptions::ALL);
+ auto only_valid = std::make_shared<CountOptions>(CountOptions::ONLY_VALID);
+ auto only_null = std::make_shared<CountOptions>(CountOptions::ONLY_NULL);
for (bool use_threads : {true, false}) {
SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
@@ -2182,9 +2195,9 @@ TEST(GroupBy, CountDistinct) {
table->GetColumnByName("key"),
},
{
- {"hash_count_distinct", &all},
- {"hash_count_distinct", &only_valid},
- {"hash_count_distinct", &only_null},
+ {"hash_count_distinct", all},
+ {"hash_count_distinct", only_valid},
+ {"hash_count_distinct", only_null},
},
use_threads));
SortBy({"key_0"}, &aggregated_and_grouped);
@@ -2248,9 +2261,9 @@ TEST(GroupBy, CountDistinct) {
table->GetColumnByName("key"),
},
{
- {"hash_count_distinct", &all},
- {"hash_count_distinct", &only_valid},
- {"hash_count_distinct", &only_null},
+ {"hash_count_distinct", all},
+ {"hash_count_distinct", only_valid},
+ {"hash_count_distinct", only_null},
},
use_threads));
ValidateOutput(aggregated_and_grouped);
@@ -2294,9 +2307,9 @@ TEST(GroupBy, CountDistinct) {
table->GetColumnByName("key"),
},
{
- {"hash_count_distinct", &all},
- {"hash_count_distinct", &only_valid},
- {"hash_count_distinct", &only_null},
+ {"hash_count_distinct", all},
+ {"hash_count_distinct", only_valid},
+ {"hash_count_distinct", only_null},
},
use_threads));
ValidateOutput(aggregated_and_grouped);
@@ -2318,9 +2331,9 @@ TEST(GroupBy, CountDistinct) {
}
TEST(GroupBy, Distinct) {
- CountOptions all(CountOptions::ALL);
- CountOptions only_valid(CountOptions::ONLY_VALID);
- CountOptions only_null(CountOptions::ONLY_NULL);
+ auto all = std::make_shared<CountOptions>(CountOptions::ALL);
+ auto only_valid = std::make_shared<CountOptions>(CountOptions::ONLY_VALID);
+ auto only_null = std::make_shared<CountOptions>(CountOptions::ONLY_NULL);
for (bool use_threads : {false}) {
SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
@@ -2366,9 +2379,9 @@ TEST(GroupBy, Distinct) {
table->GetColumnByName("key"),
},
{
- {"hash_distinct", &all},
- {"hash_distinct", &only_valid},
- {"hash_distinct", &only_null},
+ {"hash_distinct", all},
+ {"hash_distinct", only_valid},
+ {"hash_distinct", only_null},
},
use_threads));
ValidateOutput(aggregated_and_grouped);
@@ -2439,9 +2452,9 @@ TEST(GroupBy, Distinct) {
table->GetColumnByName("key"),
},
{
- {"hash_distinct", &all},
- {"hash_distinct", &only_valid},
- {"hash_distinct", &only_null},
+ {"hash_distinct", all},
+ {"hash_distinct", only_valid},
+ {"hash_distinct", only_null},
},
use_threads));
ValidateOutput(aggregated_and_grouped);
@@ -3204,10 +3217,11 @@ TEST(GroupBy, CountAndSum) {
[null, 3]
])");
- CountOptions count_options;
- CountOptions count_nulls(CountOptions::ONLY_NULL);
- CountOptions count_all(CountOptions::ALL);
- ScalarAggregateOptions min_count(/*skip_nulls=*/true, /*min_count=*/3);
+ std::shared_ptr<CountOptions> count_options;
+ auto count_nulls = std::make_shared<CountOptions>(CountOptions::ONLY_NULL);
+ auto count_all = std::make_shared<CountOptions>(CountOptions::ALL);
+ auto min_count =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true,
/*min_count=*/3);
ASSERT_OK_AND_ASSIGN(
Datum aggregated_and_grouped,
internal::GroupBy(
@@ -3224,11 +3238,11 @@ TEST(GroupBy, CountAndSum) {
batch->GetColumnByName("key"),
},
{
- {"hash_count", &count_options},
- {"hash_count", &count_nulls},
- {"hash_count", &count_all},
+ {"hash_count", count_options},
+ {"hash_count", count_nulls},
+ {"hash_count", count_all},
{"hash_sum", nullptr},
- {"hash_sum", &min_count},
+ {"hash_sum", min_count},
{"hash_sum", nullptr},
}));
@@ -3268,7 +3282,8 @@ TEST(GroupBy, Product) {
[null, 3]
])");
- ScalarAggregateOptions min_count(/*skip_nulls=*/true, /*min_count=*/3);
+ auto min_count =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true,
/*min_count=*/3);
ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped,
internal::GroupBy(
{
@@ -3282,7 +3297,7 @@ TEST(GroupBy, Product) {
{
{"hash_product", nullptr},
{"hash_product", nullptr},
- {"hash_product", &min_count},
+ {"hash_product", min_count},
}));
AssertDatumsApproxEqual(ArrayFromJSON(struct_({
@@ -3342,8 +3357,9 @@ TEST(GroupBy, SumMeanProductKeepNulls) {
[null, 3]
])");
- ScalarAggregateOptions keep_nulls(/*skip_nulls=*/false);
- ScalarAggregateOptions min_count(/*skip_nulls=*/false, /*min_count=*/3);
+ auto keep_nulls =
std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false);
+ auto min_count =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false,
/*min_count=*/3);
ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped,
internal::GroupBy(
{
@@ -3358,12 +3374,12 @@ TEST(GroupBy, SumMeanProductKeepNulls) {
batch->GetColumnByName("key"),
},
{
- {"hash_sum", &keep_nulls},
- {"hash_sum", &min_count},
- {"hash_mean", &keep_nulls},
- {"hash_mean", &min_count},
- {"hash_product", &keep_nulls},
- {"hash_product", &min_count},
+ {"hash_sum", keep_nulls},
+ {"hash_sum", min_count},
+ {"hash_mean", keep_nulls},
+ {"hash_mean", min_count},
+ {"hash_product", keep_nulls},
+ {"hash_product", min_count},
}));
AssertDatumsApproxEqual(ArrayFromJSON(struct_({
@@ -3441,17 +3457,20 @@ TEST(GroupBy, ConcreteCaseWithValidateGroupBy) {
[null, "gama"]
])");
- ScalarAggregateOptions keepna{false, 1};
- CountOptions nulls(CountOptions::ONLY_NULL);
- CountOptions non_null(CountOptions::ONLY_VALID);
+ std::shared_ptr<ScalarAggregateOptions> keepna =
+ std::make_shared<ScalarAggregateOptions>(false, 1);
+ std::shared_ptr<CountOptions> nulls =
+ std::make_shared<CountOptions>(CountOptions::ONLY_NULL);
+ std::shared_ptr<CountOptions> non_null =
+ std::make_shared<CountOptions>(CountOptions::ONLY_VALID);
using internal::Aggregate;
for (auto agg : {
Aggregate{"hash_sum", nullptr},
- Aggregate{"hash_count", &non_null},
- Aggregate{"hash_count", &nulls},
+ Aggregate{"hash_count", non_null},
+ Aggregate{"hash_count", nulls},
Aggregate{"hash_min_max", nullptr},
- Aggregate{"hash_min_max", &keepna},
+ Aggregate{"hash_min_max", keepna},
}) {
SCOPED_TRACE(agg.function);
ValidateGroupBy({agg}, {batch->GetColumnByName("argument")},
@@ -3468,12 +3487,15 @@ TEST(GroupBy, CountNull) {
[3.0, "gama"]
])");
- CountOptions keepna{CountOptions::ONLY_NULL},
skipna{CountOptions::ONLY_VALID};
+ std::shared_ptr<CountOptions> keepna =
+ std::make_shared<CountOptions>(CountOptions::ONLY_NULL);
+ std::shared_ptr<CountOptions> skipna =
+ std::make_shared<CountOptions>(CountOptions::ONLY_VALID);
using internal::Aggregate;
for (auto agg : {
- Aggregate{"hash_count", &keepna},
- Aggregate{"hash_count", &skipna},
+ Aggregate{"hash_count", keepna},
+ Aggregate{"hash_count", skipna},
}) {
SCOPED_TRACE(agg.function);
ValidateGroupBy({agg}, {batch->GetColumnByName("argument")},
@@ -3482,7 +3504,8 @@ TEST(GroupBy, CountNull) {
}
TEST(GroupBy, RandomArraySum) {
- ScalarAggregateOptions options(/*skip_nulls=*/true, /*min_count=*/0);
+ std::shared_ptr<ScalarAggregateOptions> options =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true,
/*min_count=*/0);
for (int64_t length : {1 << 10, 1 << 12, 1 << 15}) {
for (auto null_probability : {0.0, 0.01, 0.5, 1.0}) {
auto batch = random::GenerateBatch(
@@ -3496,7 +3519,7 @@ TEST(GroupBy, RandomArraySum) {
ValidateGroupBy(
{
- {"hash_sum", &options},
+ {"hash_sum", options},
},
{batch->GetColumnByName("argument")},
{batch->GetColumnByName("key")});
}
@@ -3639,9 +3662,9 @@ TEST(GroupBy, CountWithNullType) {
[null, 3]
])"});
- CountOptions all(CountOptions::ALL);
- CountOptions only_valid(CountOptions::ONLY_VALID);
- CountOptions only_null(CountOptions::ONLY_NULL);
+ auto all = std::make_shared<CountOptions>(CountOptions::ALL);
+ auto only_valid = std::make_shared<CountOptions>(CountOptions::ONLY_VALID);
+ auto only_null = std::make_shared<CountOptions>(CountOptions::ONLY_NULL);
for (bool use_exec_plan : {false, true}) {
for (bool use_threads : {true, false}) {
@@ -3655,9 +3678,9 @@ TEST(GroupBy, CountWithNullType) {
},
{table->GetColumnByName("key")},
{
- {"hash_count", &all},
- {"hash_count", &only_valid},
- {"hash_count", &only_null},
+ {"hash_count", all},
+ {"hash_count", only_valid},
+ {"hash_count", only_null},
},
use_threads, use_exec_plan));
SortBy({"key_0"}, &aggregated_and_grouped);
@@ -3684,9 +3707,9 @@ TEST(GroupBy, CountWithNullTypeEmptyTable) {
auto table = TableFromJSON(schema({field("argument", null()), field("key",
int64())}),
{R"([])"});
- CountOptions all(CountOptions::ALL);
- CountOptions only_valid(CountOptions::ONLY_VALID);
- CountOptions only_null(CountOptions::ONLY_NULL);
+ auto all = std::make_shared<CountOptions>(CountOptions::ALL);
+ auto only_valid = std::make_shared<CountOptions>(CountOptions::ONLY_VALID);
+ auto only_null = std::make_shared<CountOptions>(CountOptions::ONLY_NULL);
for (bool use_exec_plan : {false, true}) {
for (bool use_threads : {true, false}) {
@@ -3700,9 +3723,9 @@ TEST(GroupBy, CountWithNullTypeEmptyTable) {
},
{table->GetColumnByName("key")},
{
- {"hash_count", &all},
- {"hash_count", &only_valid},
- {"hash_count", &only_null},
+ {"hash_count", all},
+ {"hash_count", only_valid},
+ {"hash_count", only_null},
},
use_threads, use_exec_plan));
auto struct_arr = aggregated_and_grouped.array_as<StructArray>();
@@ -3854,10 +3877,14 @@ TEST(GroupBy, SumNullType) {
[null, 3]
])"});
- ScalarAggregateOptions no_min(/*skip_nulls=*/true, /*min_count=*/0);
- ScalarAggregateOptions min_count(/*skip_nulls=*/true, /*min_count=*/3);
- ScalarAggregateOptions keep_nulls(/*skip_nulls=*/false, /*min_count=*/0);
- ScalarAggregateOptions keep_nulls_min_count(/*skip_nulls=*/false,
/*min_count=*/3);
+ auto no_min =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true,
/*min_count=*/0);
+ auto min_count =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true,
/*min_count=*/3);
+ auto keep_nulls =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false,
/*min_count=*/0);
+ auto keep_nulls_min_count =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false,
/*min_count=*/3);
for (bool use_exec_plan : {false, true}) {
for (bool use_threads : {true, false}) {
@@ -3872,10 +3899,10 @@ TEST(GroupBy, SumNullType) {
},
{table->GetColumnByName("key")},
{
- {"hash_sum", &no_min},
- {"hash_sum", &keep_nulls},
- {"hash_sum", &min_count},
- {"hash_sum", &keep_nulls_min_count},
+ {"hash_sum", no_min},
+ {"hash_sum", keep_nulls},
+ {"hash_sum", min_count},
+ {"hash_sum", keep_nulls_min_count},
},
use_threads, use_exec_plan));
SortBy({"key_0"}, &aggregated_and_grouped);
@@ -3918,10 +3945,14 @@ TEST(GroupBy, ProductNullType) {
[null, 3]
])"});
- ScalarAggregateOptions no_min(/*skip_nulls=*/true, /*min_count=*/0);
- ScalarAggregateOptions min_count(/*skip_nulls=*/true, /*min_count=*/3);
- ScalarAggregateOptions keep_nulls(/*skip_nulls=*/false, /*min_count=*/0);
- ScalarAggregateOptions keep_nulls_min_count(/*skip_nulls=*/false,
/*min_count=*/3);
+ auto no_min =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true,
/*min_count=*/0);
+ auto min_count =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true,
/*min_count=*/3);
+ auto keep_nulls =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false,
/*min_count=*/0);
+ auto keep_nulls_min_count =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false,
/*min_count=*/3);
for (bool use_exec_plan : {false, true}) {
for (bool use_threads : {true, false}) {
@@ -3936,10 +3967,10 @@ TEST(GroupBy, ProductNullType) {
},
{table->GetColumnByName("key")},
{
- {"hash_product", &no_min},
- {"hash_product", &keep_nulls},
- {"hash_product", &min_count},
- {"hash_product", &keep_nulls_min_count},
+ {"hash_product", no_min},
+ {"hash_product", keep_nulls},
+ {"hash_product", min_count},
+ {"hash_product", keep_nulls_min_count},
},
use_threads, use_exec_plan));
SortBy({"key_0"}, &aggregated_and_grouped);
@@ -3982,10 +4013,14 @@ TEST(GroupBy, MeanNullType) {
[null, 3]
])"});
- ScalarAggregateOptions no_min(/*skip_nulls=*/true, /*min_count=*/0);
- ScalarAggregateOptions min_count(/*skip_nulls=*/true, /*min_count=*/3);
- ScalarAggregateOptions keep_nulls(/*skip_nulls=*/false, /*min_count=*/0);
- ScalarAggregateOptions keep_nulls_min_count(/*skip_nulls=*/false,
/*min_count=*/3);
+ auto no_min =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true,
/*min_count=*/0);
+ auto min_count =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true,
/*min_count=*/3);
+ auto keep_nulls =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false,
/*min_count=*/0);
+ auto keep_nulls_min_count =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false,
/*min_count=*/3);
for (bool use_exec_plan : {false, true}) {
for (bool use_threads : {true, false}) {
@@ -4000,10 +4035,10 @@ TEST(GroupBy, MeanNullType) {
},
{table->GetColumnByName("key")},
{
- {"hash_mean", &no_min},
- {"hash_mean", &keep_nulls},
- {"hash_mean", &min_count},
- {"hash_mean", &keep_nulls_min_count},
+ {"hash_mean", no_min},
+ {"hash_mean", keep_nulls},
+ {"hash_mean", min_count},
+ {"hash_mean", keep_nulls_min_count},
},
use_threads, use_exec_plan));
SortBy({"key_0"}, &aggregated_and_grouped);
@@ -4031,10 +4066,14 @@ TEST(GroupBy, NullTypeEmptyTable) {
auto table = TableFromJSON(schema({field("argument", null()), field("key",
int64())}),
{R"([])"});
- ScalarAggregateOptions no_min(/*skip_nulls=*/true, /*min_count=*/0);
- ScalarAggregateOptions min_count(/*skip_nulls=*/true, /*min_count=*/3);
- ScalarAggregateOptions keep_nulls(/*skip_nulls=*/false, /*min_count=*/0);
- ScalarAggregateOptions keep_nulls_min_count(/*skip_nulls=*/false,
/*min_count=*/3);
+ auto no_min =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true,
/*min_count=*/0);
+ auto min_count =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/true,
/*min_count=*/3);
+ auto keep_nulls =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false,
/*min_count=*/0);
+ auto keep_nulls_min_count =
+ std::make_shared<ScalarAggregateOptions>(/*skip_nulls=*/false,
/*min_count=*/3);
for (bool use_exec_plan : {false, true}) {
for (bool use_threads : {true, false}) {
@@ -4048,9 +4087,9 @@ TEST(GroupBy, NullTypeEmptyTable) {
},
{table->GetColumnByName("key")},
{
- {"hash_sum", &no_min},
- {"hash_product", &min_count},
- {"hash_mean", &keep_nulls},
+ {"hash_sum", no_min},
+ {"hash_product", min_count},
+ {"hash_mean", keep_nulls},
},
use_threads, use_exec_plan));
auto struct_arr = aggregated_and_grouped.array_as<StructArray>();
diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx
index 936fe2f0cd..04b57859ad 100644
--- a/python/pyarrow/_compute.pyx
+++ b/python/pyarrow/_compute.pyx
@@ -2136,9 +2136,9 @@ def _group_by(args, keys, aggregations):
for aggr_func_name, aggr_opts in aggregations:
c_aggr.function = tobytes(aggr_func_name)
if aggr_opts is not None:
- c_aggr.options = (<FunctionOptions?> aggr_opts).get_options()
+ c_aggr.options = (<FunctionOptions?>aggr_opts).wrapped
else:
- c_aggr.options = NULL
+ c_aggr.options = <shared_ptr[CFunctionOptions]>nullptr
c_aggregations.push_back(c_aggr)
with nogil:
diff --git a/python/pyarrow/includes/libarrow.pxd
b/python/pyarrow/includes/libarrow.pxd
index 8597874ea1..302ac99c36 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -2410,7 +2410,7 @@ cdef extern from "arrow/compute/exec/aggregate.h"
namespace \
"arrow::compute::internal" nogil:
cdef cppclass CAggregate "arrow::compute::internal::Aggregate":
c_string function
- const CFunctionOptions* options
+ shared_ptr[CFunctionOptions] options
CResult[CDatum] GroupBy(const vector[CDatum]& arguments,
const vector[CDatum]& keys,
diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp
index d9eaf50cd7..089d1e71eb 100644
--- a/r/src/compute-exec.cpp
+++ b/r/src/compute-exec.cpp
@@ -231,15 +231,13 @@ std::shared_ptr<compute::ExecNode> ExecNode_Aggregate(
std::vector<std::string> target_names, std::vector<std::string>
out_field_names,
std::vector<std::string> key_names) {
std::vector<arrow::compute::internal::Aggregate> aggregates;
- std::vector<std::shared_ptr<arrow::compute::FunctionOptions>> keep_alives;
for (cpp11::list name_opts : options) {
auto name = cpp11::as_cpp<std::string>(name_opts[0]);
auto opts = make_compute_options(name, name_opts[1]);
aggregates.push_back(
- arrow::compute::internal::Aggregate{std::move(name), opts.get()});
- keep_alives.push_back(std::move(opts));
+ arrow::compute::internal::Aggregate{std::move(name), std::move(opts)});
}
std::vector<arrow::FieldRef> targets, keys;