westonpace commented on code in PR #15083:
URL: https://github.com/apache/arrow/pull/15083#discussion_r1064656505
##########
cpp/src/arrow/compute/exec.cc:
##########
@@ -163,6 +163,34 @@ Result<ExecBatch> ExecBatch::Make(std::vector<Datum>
values) {
continue;
}
+ if (length != value.length()) {
+ // all the arrays should have the same length
+ return -1;
+ }
Review Comment:
This seems more like an error case than `-1` which can't be distinguished
from the empty values case. Do you want to `DCHECK` and assert here instead?
Or change the return type to `Result`?
##########
cpp/src/arrow/compute/exec/aggregate.cc:
##########
@@ -38,33 +38,39 @@ namespace internal {
Result<std::vector<const HashAggregateKernel*>> GetKernels(
ExecContext* ctx, const std::vector<Aggregate>& aggregates,
- const std::vector<TypeHolder>& in_types) {
+ const std::vector<std::vector<TypeHolder>>& in_types) {
if (aggregates.size() != in_types.size()) {
return Status::Invalid(aggregates.size(), " aggregate functions were
specified but ",
in_types.size(), " arguments were provided.");
}
std::vector<const HashAggregateKernel*> kernels(in_types.size());
+ std::vector<TypeHolder> aggregate_in_types;
for (size_t i = 0; i < aggregates.size(); ++i) {
ARROW_ASSIGN_OR_RAISE(auto function,
ctx->func_registry()->GetFunction(aggregates[i].function));
+ // {in_types[i]..., uint32()}
+ aggregate_in_types.reserve(in_types[i].size() + 1);
Review Comment:
```suggestion
```
I'm not sure what purpose this `reserve` has
##########
cpp/src/arrow/compute/exec/aggregate.cc:
##########
@@ -38,33 +38,39 @@ namespace internal {
Result<std::vector<const HashAggregateKernel*>> GetKernels(
ExecContext* ctx, const std::vector<Aggregate>& aggregates,
- const std::vector<TypeHolder>& in_types) {
+ const std::vector<std::vector<TypeHolder>>& in_types) {
if (aggregates.size() != in_types.size()) {
return Status::Invalid(aggregates.size(), " aggregate functions were
specified but ",
in_types.size(), " arguments were provided.");
}
std::vector<const HashAggregateKernel*> kernels(in_types.size());
+ std::vector<TypeHolder> aggregate_in_types;
for (size_t i = 0; i < aggregates.size(); ++i) {
ARROW_ASSIGN_OR_RAISE(auto function,
ctx->func_registry()->GetFunction(aggregates[i].function));
+ // {in_types[i]..., uint32()}
Review Comment:
This comment is a bit cryptic
##########
cpp/src/arrow/compute/kernels/hash_aggregate.cc:
##########
@@ -223,6 +235,54 @@ void VisitGroupedValuesNonNull(const ExecSpan& batch,
ConsumeValue&& valid_func)
// ----------------------------------------------------------------------
// Count implementation
+// Nullary-count implementation -- COUNT(*).
+struct GroupedCountAllImpl : public GroupedAggregator {
+ Status Init(ExecContext* ctx, const KernelInitArgs& args) override {
+ counts_ = BufferBuilder(ctx->memory_pool());
+ return Status::OK();
+ }
+
+ Status Resize(int64_t new_num_groups) override {
+ auto added_groups = new_num_groups - num_groups_;
+ num_groups_ = new_num_groups;
+ return counts_.Append(added_groups * sizeof(int64_t), 0);
+ }
+
+ Status Merge(GroupedAggregator&& raw_other,
+ const ArrayData& group_id_mapping) override {
+ auto other = checked_cast<GroupedCountAllImpl*>(&raw_other);
+
+ auto counts = reinterpret_cast<int64_t*>(counts_.mutable_data());
+ auto other_counts = reinterpret_cast<const
int64_t*>(other->counts_.mutable_data());
+
+ auto g = group_id_mapping.GetValues<uint32_t>(1);
+ for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g,
++g) {
+ counts[*g] += other_counts[other_g];
+ }
+ return Status::OK();
+ }
+
+ Status Consume(const ExecSpan& batch) override {
+ auto counts = reinterpret_cast<int64_t*>(counts_.mutable_data());
+ auto g_begin = batch[0].array.GetValues<uint32_t>(1);
+
+ for (int64_t i = 0; i < batch.length; ++i, ++g_begin) {
+ counts[*g_begin] += 1;
+ }
Review Comment:
```suggestion
auto g_begin = batch[0].array.GetValues<uint32_t>(1);
for (auto g_itr = g_begin, end = g_itr + batch.length; g_itr != end;
g_itr++) {
counts[*g_itr] += 1;
}
```
I'm not sure if this version is more legible or not but the unused `i` was
throwing me off a bit.
##########
python/pyarrow/table.pxi:
##########
@@ -5364,30 +5364,31 @@ list[tuple(str, str, FunctionOptions)]
----
keys: [["a","b","c"]]
"""
- columns = [a[0] for a in aggregations]
+ target_cols = [a[0] if isinstance(a[0], (list, tuple)) else [a[0]] for
a in aggregations]
Review Comment:
I don't know if we want to be pythonic and accept any iterable here or not.
However, I suppose we're pretty strict elsewhere. Can you update the example
in the docstring with a call to `count(*)`?
CC @amol- or @jorisvandenbossche for canonical advice
##########
cpp/src/arrow/compute/exec/test_util.cc:
##########
@@ -416,7 +416,19 @@ static inline void PrintToImpl(const std::string&
factory_name,
*os << "function=" << agg.function << "<";
if (agg.options) PrintTo(*agg.options, os);
*os << ">,";
- *os << "target=" << agg.target.ToString() << ",";
+ *os << "target=";
+ if (agg.target.size() == 0) {
+ *os << "*";
+ } else if (agg.target.size() == 1) {
+ *os << agg.target[0].ToString();
+ } else {
Review Comment:
Ah, I see. What you have is better then.
##########
cpp/src/arrow/type.h:
##########
@@ -1692,9 +1692,7 @@ class ARROW_EXPORT FieldRef : public
util::EqualityComparable<FieldRef> {
FieldRef(int index) : impl_(FieldPath({index})) {} // NOLINT
runtime/explicit
/// Construct a nested FieldRef.
- FieldRef(std::vector<FieldRef> refs) { // NOLINT runtime/explicit
- Flatten(std::move(refs));
- }
+ explicit FieldRef(std::vector<FieldRef> refs) { Flatten(std::move(refs)); }
Review Comment:
This is a public API change but I think it is probably for the best.
##########
cpp/src/arrow/engine/substrait/extension_set.cc:
##########
@@ -883,21 +883,37 @@ ExtensionIdRegistry::SubstraitCallToArrow
DecodeConcatMapping() {
ExtensionIdRegistry::SubstraitAggregateToArrow DecodeBasicAggregate(
const std::string& arrow_function_name) {
return [arrow_function_name](const SubstraitCall& call) ->
Result<compute::Aggregate> {
- if (call.size() != 1) {
- return Status::NotImplemented(
- "Only unary aggregate functions are currently supported");
- }
- ARROW_ASSIGN_OR_RAISE(compute::Expression arg, call.GetValueArg(0));
- const FieldRef* arg_ref = arg.field_ref();
- if (!arg_ref) {
- return Status::Invalid("Expected an aggregate call ", call.id().uri, "#",
- call.id().name, " to have a direct reference");
- }
- std::string fixed_arrow_func = arrow_function_name;
+ std::string fixed_arrow_func;
if (call.is_hash()) {
- fixed_arrow_func = "hash_" + arrow_function_name;
+ fixed_arrow_func = "hash_";
}
- return compute::Aggregate{std::move(fixed_arrow_func), nullptr, *arg_ref,
""};
+
+ switch (call.size()) {
+ case 0: {
+ if (call.id().name == "count") {
+ fixed_arrow_func += "count_all";
+ return compute::Aggregate{std::move(fixed_arrow_func), ""};
+ }
+ return Status::Invalid("Expected aggregate call ", call.id().uri, "#",
+ call.id().name, " to have at least one
argument");
Review Comment:
This implies that `count_all` is the only nullary function at the moment
right? I'm not sure if that will always be true but it is true enough at the
moment I suppose.
##########
cpp/src/arrow/compute/exec/aggregate_node.cc:
##########
@@ -46,14 +46,26 @@ namespace {
void AggregatesToString(std::stringstream* ss, const Schema& input_schema,
const std::vector<Aggregate>& aggs,
- const std::vector<int>& target_field_ids, int indent =
0) {
+ const std::vector<std::vector<int>>& target_fieldsets,
+ int indent = 0) {
*ss << "aggregates=[" << std::endl;
for (size_t i = 0; i < aggs.size(); i++) {
for (int j = 0; j < indent; ++j) *ss << " ";
- *ss << '\t' << aggs[i].function << '('
- << input_schema.field(target_field_ids[i])->name();
+ *ss << '\t' << aggs[i].function << '(';
+ const auto& target = target_fieldsets[i];
+ if (target.size() == 0) {
+ *ss << "*";
+ } else {
+ *ss << input_schema.field(target[0])->name();
+ for (size_t k = 1; k < target.size(); k++) {
+ *ss << ", " << input_schema.field(target[k])->name();
+ }
+ }
if (aggs[i].options) {
- *ss << ", " << aggs[i].options->ToString();
+ auto* options_type = aggs[i].options->options_type();
+ if (options_type->num_properties() > 0) {
Review Comment:
If you're trying to guard against null options why not just check if
`aggs[i].options` is null?
##########
python/pyarrow/includes/libarrow.pxd:
##########
@@ -2489,6 +2489,8 @@ cdef extern from "arrow/compute/exec/aggregate.h"
namespace \
cdef cppclass CAggregate "arrow::compute::Aggregate":
c_string function
shared_ptr[CFunctionOptions] options
+ vector[CFieldRef] target
Review Comment:
I'm confused how we got away with this field never existing. Perhaps we
never use `Aggregate` directly?
##########
cpp/src/arrow/compute/exec/aggregate.cc:
##########
@@ -75,13 +81,13 @@ Result<std::vector<std::unique_ptr<KernelState>>>
InitKernels(
}
KernelContext kernel_ctx{ctx};
- ARROW_ASSIGN_OR_RAISE(states[i],
- kernels[i]->init(&kernel_ctx,
KernelInitArgs{kernels[i],
- {
-
in_types[i],
-
uint32(),
- },
-
options}));
+ // {in_types[i]..., uint32()}
+ agg_in_types.reserve(in_types[i].size() + 1);
Review Comment:
```suggestion
```
Same as above. Cryptic comment and dubious reserve.
##########
cpp/src/arrow/compute/kernels/hash_aggregate.cc:
##########
@@ -108,19 +108,31 @@ Result<TypeHolder> ResolveGroupOutputType(KernelContext*
ctx,
return checked_cast<GroupedAggregator*>(ctx->state())->out_type();
}
-HashAggregateKernel MakeKernel(InputType argument_type, KernelInit init) {
+HashAggregateKernel MakeKernel(std::shared_ptr<KernelSignature> signature,
+ KernelInit init) {
HashAggregateKernel kernel;
kernel.init = std::move(init);
- kernel.signature =
- KernelSignature::Make({std::move(argument_type),
InputType(Type::UINT32)},
- OutputType(ResolveGroupOutputType));
+ kernel.signature = std::move(signature);
kernel.resize = HashAggregateResize;
kernel.consume = HashAggregateConsume;
kernel.merge = HashAggregateMerge;
kernel.finalize = HashAggregateFinalize;
return kernel;
}
+HashAggregateKernel MakeKernel(InputType argument_type, KernelInit init) {
+ return MakeKernel(
+ KernelSignature::Make({std::move(argument_type),
InputType(Type::UINT32)},
+ OutputType(ResolveGroupOutputType)),
+ std::move(init));
+}
+
+HashAggregateKernel MakeUnaryKernel(KernelInit init) {
+ return MakeKernel(KernelSignature::Make({InputType(Type::UINT32)},
+ OutputType(ResolveGroupOutputType)),
+ std::move(init));
+}
+
Review Comment:
Hmm, `Unary` is a bit confusing here since one often doesn't think of the
group id as an "argument". In other words, your "unary" kernel is `COUNT(*)`
which I would normally think of as nillary.
##########
cpp/src/arrow/compute/exec/plan_test.cc:
##########
@@ -1298,17 +1305,53 @@ TEST(ExecPlanExecution, ScalarSourceScalarAggSink) {
})
.AddToPlan(plan.get()));
- ASSERT_THAT(
- StartAndCollect(plan.get(), sink_gen),
- Finishes(ResultWith(UnorderedElementsAreArray({
- ExecBatchFromJSON(
- {boolean(), boolean(), int64(), float64(), int64(), float64(),
int64(),
- float64(), float64()},
- {ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR,
ArgShape::SCALAR,
- ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR,
ArgShape::ARRAY,
- ArgShape::SCALAR},
- R"([[false, true, 6, 5.5, 26250, 0.7637626158259734, 33, 5.0,
0.5833333333333334]])"),
- }))));
+ auto exec_batch = ExecBatchFromJSON(
+ {boolean(), boolean(), int64(), int64(), float64(), int64(), float64(),
int64(),
+ float64(), float64()},
+ {ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR,
+ ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR,
+ ArgShape::ARRAY, ArgShape::SCALAR},
+ R"([[false, true, 6, 6, 5.5, 26250, 0.7637626158259734, 33, 5.0,
0.5833333333333334]])");
+
+ ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+ Finishes(ResultWith(UnorderedElementsAreArray({
+ std::move(exec_batch),
+ }))));
+}
+
+TEST(ExecPlanExecution, ScalarSourceStandaloneNullaryScalarAggSink) {
+ ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+ AsyncGenerator<std::optional<ExecBatch>> sink_gen;
+
+ BatchesWithSchema scalar_data;
+ scalar_data.batches = {
+ ExecBatchFromJSON({int32(), boolean()}, {ArgShape::SCALAR,
ArgShape::SCALAR},
+ "[[5, null], [5, false], [5, false]]"),
+ ExecBatchFromJSON({int32(), boolean()}, "[[5, true], [null, false], [7,
true]]")};
+ scalar_data.schema = schema({
+ field("a", int32()),
+ field("b", boolean()),
+ });
+
+ auto sequence = Declaration::Sequence({
+ {"source", SourceNodeOptions{scalar_data.schema,
scalar_data.gen(/*parallel=*/false,
+
/*slow=*/false)}},
+ {"aggregate", AggregateNodeOptions{/*aggregates=*/{
+ {"count_all", "count(*)"},
+ }}},
+ {"sink", SinkNodeOptions{&sink_gen}},
+ });
+
+ // index can't be tested as it's order-dependent
+ // mode/quantile can't be tested as they're technically vector kernels
Review Comment:
I'm not sure this comment is needed?
##########
cpp/src/arrow/compute/exec.cc:
##########
@@ -163,6 +163,34 @@ Result<ExecBatch> ExecBatch::Make(std::vector<Datum>
values) {
continue;
}
+ if (length != value.length()) {
+ // all the arrays should have the same length
+ return -1;
+ }
+ }
+
+ return length == -1 ? 1 : length;
+}
+
+Result<ExecBatch> ExecBatch::Make(std::vector<Datum> values, int64_t length) {
+ if (length < 0) {
Review Comment:
Can we call `InferLength` here? It seems like some repetition to have it
twice.
##########
cpp/src/arrow/compute/exec/plan_test.cc:
##########
@@ -1298,17 +1305,53 @@ TEST(ExecPlanExecution, ScalarSourceScalarAggSink) {
})
.AddToPlan(plan.get()));
- ASSERT_THAT(
- StartAndCollect(plan.get(), sink_gen),
- Finishes(ResultWith(UnorderedElementsAreArray({
- ExecBatchFromJSON(
- {boolean(), boolean(), int64(), float64(), int64(), float64(),
int64(),
- float64(), float64()},
- {ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR,
ArgShape::SCALAR,
- ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR,
ArgShape::ARRAY,
- ArgShape::SCALAR},
- R"([[false, true, 6, 5.5, 26250, 0.7637626158259734, 33, 5.0,
0.5833333333333334]])"),
- }))));
+ auto exec_batch = ExecBatchFromJSON(
+ {boolean(), boolean(), int64(), int64(), float64(), int64(), float64(),
int64(),
+ float64(), float64()},
+ {ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR,
+ ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR,
+ ArgShape::ARRAY, ArgShape::SCALAR},
+ R"([[false, true, 6, 6, 5.5, 26250, 0.7637626158259734, 33, 5.0,
0.5833333333333334]])");
+
+ ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+ Finishes(ResultWith(UnorderedElementsAreArray({
+ std::move(exec_batch),
+ }))));
+}
+
+TEST(ExecPlanExecution, ScalarSourceStandaloneNullaryScalarAggSink) {
+ ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+ AsyncGenerator<std::optional<ExecBatch>> sink_gen;
+
+ BatchesWithSchema scalar_data;
+ scalar_data.batches = {
+ ExecBatchFromJSON({int32(), boolean()}, {ArgShape::SCALAR,
ArgShape::SCALAR},
+ "[[5, null], [5, false], [5, false]]"),
+ ExecBatchFromJSON({int32(), boolean()}, "[[5, true], [null, false], [7,
true]]")};
+ scalar_data.schema = schema({
+ field("a", int32()),
+ field("b", boolean()),
+ });
+
+ auto sequence = Declaration::Sequence({
+ {"source", SourceNodeOptions{scalar_data.schema,
scalar_data.gen(/*parallel=*/false,
+
/*slow=*/false)}},
+ {"aggregate", AggregateNodeOptions{/*aggregates=*/{
+ {"count_all", "count(*)"},
+ }}},
+ {"sink", SinkNodeOptions{&sink_gen}},
+ });
+
+ // index can't be tested as it's order-dependent
+ // mode/quantile can't be tested as they're technically vector kernels
+ ASSERT_OK(sequence.AddToPlan(plan.get()));
+
+ auto exec_batch = ExecBatchFromJSON({int64()}, {ArgShape::SCALAR},
R"([[6]])");
+
+ ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+ Finishes(ResultWith(UnorderedElementsAreArray({
+ std::move(exec_batch),
+ }))));
}
Review Comment:
```suggestion
TEST(ExecPlanExecution, ScalarSourceStandaloneNullaryScalarAggSink) {
BatchesWithSchema scalar_data;
scalar_data.batches = {
ExecBatchFromJSON({int32(), boolean()}, {ArgShape::SCALAR,
ArgShape::SCALAR},
"[[5, null], [5, false], [5, false]]"),
ExecBatchFromJSON({int32(), boolean()}, "[[5, true], [null, false],
[7, true]]")};
scalar_data.schema = schema({
field("a", int32()),
field("b", boolean()),
});
Declaration plan = Declaration::Sequence(
{{"source",
SourceNodeOptions{scalar_data.schema,
scalar_data.gen(/*parallel=*/false,
/*slow=*/false)}},
{"aggregate", AggregateNodeOptions{/*aggregates=*/{
{"count_all", "count(*)"},
}}}});
ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema actual_batches,
DeclarationToExecBatches(std::move(plan)));
auto expected = ExecBatchFromJSON({int64()}, {ArgShape::SCALAR},
R"([[6]])");
AssertExecBatchesEqualIgnoringOrder(actual_batches.schema,
actual_batches.batches,
{expected});
}
```
We are slowly trying to move away from `StartAndCollect` to
`DeclarationToXyz` methods in tests. The latter is a bit more compact and
reflects the public API (`StartAndCollect` is an internal method) You don't
need to fix old tests but new tests should be written with the new style.
##########
cpp/src/arrow/engine/substrait/test_plan_builder.h:
##########
@@ -64,11 +64,12 @@ ARROW_ENGINE_EXPORT Result<std::shared_ptr<Buffer>>
CreateScanProjectSubstrait(
/// \brief Create a scan->aggregate->sink plan for tests
///
/// The plan will create an aggregate with one grouping set (defined by
-/// key_idxs) and one measure. The measure will be a unary function
-/// defined by `function_id` and a direct reference to `arg_idx`.
+/// key_idxs) and one measure. The measure will be a function
+/// defined by `function_id` and a direct references to `arg_idxs`.
Review Comment:
```suggestion
/// defined by `function_id` and direct references to `arg_idxs`.
```
##########
cpp/src/arrow/engine/substrait/function_test.cc:
##########
@@ -669,7 +673,15 @@ TEST(FunctionMapping, AggregateCases) {
{int8()},
"[3]",
"[2, 1]",
- int64()}};
+ int64()},
+ {{kSubstraitAggregateGenericFunctionsUri, "count"},
+ {"[1, 2, 30]"},
Review Comment:
```suggestion
{"[1, null, 30]"},
```
##########
cpp/src/arrow/compute/exec.cc:
##########
@@ -147,9 +147,9 @@ ExecBatch ExecBatch::Slice(int64_t offset, int64_t length)
const {
return out;
}
-Result<ExecBatch> ExecBatch::Make(std::vector<Datum> values) {
+int64_t ExecBatch::InferLength(const std::vector<Datum>& values) {
Review Comment:
How does this function compare to `InferBatchLength` further down in the
file? Could you expose that function instead?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]