westonpace commented on code in PR #15083:
URL: https://github.com/apache/arrow/pull/15083#discussion_r1064656505


##########
cpp/src/arrow/compute/exec.cc:
##########
@@ -163,6 +163,34 @@ Result<ExecBatch> ExecBatch::Make(std::vector<Datum> 
values) {
       continue;
     }
 
+    if (length != value.length()) {
+      // all the arrays should have the same length
+      return -1;
+    }

Review Comment:
   This seems more like an error case than `-1` which can't be distinguished 
from the empty values case.  Do you want to `DCHECK` and assert here instead?  
Or change the return type to `Result`?



##########
cpp/src/arrow/compute/exec/aggregate.cc:
##########
@@ -38,33 +38,39 @@ namespace internal {
 
 Result<std::vector<const HashAggregateKernel*>> GetKernels(
     ExecContext* ctx, const std::vector<Aggregate>& aggregates,
-    const std::vector<TypeHolder>& in_types) {
+    const std::vector<std::vector<TypeHolder>>& in_types) {
   if (aggregates.size() != in_types.size()) {
     return Status::Invalid(aggregates.size(), " aggregate functions were 
specified but ",
                            in_types.size(), " arguments were provided.");
   }
 
   std::vector<const HashAggregateKernel*> kernels(in_types.size());
 
+  std::vector<TypeHolder> aggregate_in_types;
   for (size_t i = 0; i < aggregates.size(); ++i) {
     ARROW_ASSIGN_OR_RAISE(auto function,
                           
ctx->func_registry()->GetFunction(aggregates[i].function));
+    // {in_types[i]..., uint32()}
+    aggregate_in_types.reserve(in_types[i].size() + 1);

Review Comment:
   ```suggestion
   ```
   I'm not sure what purpose this `reserve` has



##########
cpp/src/arrow/compute/exec/aggregate.cc:
##########
@@ -38,33 +38,39 @@ namespace internal {
 
 Result<std::vector<const HashAggregateKernel*>> GetKernels(
     ExecContext* ctx, const std::vector<Aggregate>& aggregates,
-    const std::vector<TypeHolder>& in_types) {
+    const std::vector<std::vector<TypeHolder>>& in_types) {
   if (aggregates.size() != in_types.size()) {
     return Status::Invalid(aggregates.size(), " aggregate functions were 
specified but ",
                            in_types.size(), " arguments were provided.");
   }
 
   std::vector<const HashAggregateKernel*> kernels(in_types.size());
 
+  std::vector<TypeHolder> aggregate_in_types;
   for (size_t i = 0; i < aggregates.size(); ++i) {
     ARROW_ASSIGN_OR_RAISE(auto function,
                           
ctx->func_registry()->GetFunction(aggregates[i].function));
+    // {in_types[i]..., uint32()}

Review Comment:
   This comment is a bit cryptic



##########
cpp/src/arrow/compute/kernels/hash_aggregate.cc:
##########
@@ -223,6 +235,54 @@ void VisitGroupedValuesNonNull(const ExecSpan& batch, 
ConsumeValue&& valid_func)
 // ----------------------------------------------------------------------
 // Count implementation
 
+// Nullary-count implementation -- COUNT(*).
+struct GroupedCountAllImpl : public GroupedAggregator {
+  Status Init(ExecContext* ctx, const KernelInitArgs& args) override {
+    counts_ = BufferBuilder(ctx->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    auto added_groups = new_num_groups - num_groups_;
+    num_groups_ = new_num_groups;
+    return counts_.Append(added_groups * sizeof(int64_t), 0);
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedCountAllImpl*>(&raw_other);
+
+    auto counts = reinterpret_cast<int64_t*>(counts_.mutable_data());
+    auto other_counts = reinterpret_cast<const 
int64_t*>(other->counts_.mutable_data());
+
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+    for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, 
++g) {
+      counts[*g] += other_counts[other_g];
+    }
+    return Status::OK();
+  }
+
+  Status Consume(const ExecSpan& batch) override {
+    auto counts = reinterpret_cast<int64_t*>(counts_.mutable_data());
+    auto g_begin = batch[0].array.GetValues<uint32_t>(1);
+
+    for (int64_t i = 0; i < batch.length; ++i, ++g_begin) {
+      counts[*g_begin] += 1;
+    }

Review Comment:
   ```suggestion
       auto g_begin = batch[0].array.GetValues<uint32_t>(1);
       for (auto g_itr = g_begin, end = g_itr + batch.length; g_itr != end; 
g_itr++) {
         counts[*g_itr] += 1;
       }
   ```
   I'm not sure if this version is more legible or not but the unused `i` was 
throwing me off a bit.



##########
python/pyarrow/table.pxi:
##########
@@ -5364,30 +5364,31 @@ list[tuple(str, str, FunctionOptions)]
         ----
         keys: [["a","b","c"]]
         """
-        columns = [a[0] for a in aggregations]
+        target_cols = [a[0] if isinstance(a[0], (list, tuple)) else [a[0]] for 
a in aggregations]

Review Comment:
   I don't know if we want to be pythonic and accept any iterable here or not.  
However, I suppose we're pretty strict elsewhere.  Can you update the example 
in the docstring with a call to `count(*)`?
   
   CC @amol- or @jorisvandenbossche for canonical advice



##########
cpp/src/arrow/compute/exec/test_util.cc:
##########
@@ -416,7 +416,19 @@ static inline void PrintToImpl(const std::string& 
factory_name,
       *os << "function=" << agg.function << "<";
       if (agg.options) PrintTo(*agg.options, os);
       *os << ">,";
-      *os << "target=" << agg.target.ToString() << ",";
+      *os << "target=";
+      if (agg.target.size() == 0) {
+        *os << "*";
+      } else if (agg.target.size() == 1) {
+        *os << agg.target[0].ToString();
+      } else {

Review Comment:
   Ah, I see.  What you have is better then.



##########
cpp/src/arrow/type.h:
##########
@@ -1692,9 +1692,7 @@ class ARROW_EXPORT FieldRef : public 
util::EqualityComparable<FieldRef> {
   FieldRef(int index) : impl_(FieldPath({index})) {}  // NOLINT 
runtime/explicit
 
   /// Construct a nested FieldRef.
-  FieldRef(std::vector<FieldRef> refs) {  // NOLINT runtime/explicit
-    Flatten(std::move(refs));
-  }
+  explicit FieldRef(std::vector<FieldRef> refs) { Flatten(std::move(refs)); }

Review Comment:
   This is a public API change but I think it is probably for the best.



##########
cpp/src/arrow/engine/substrait/extension_set.cc:
##########
@@ -883,21 +883,37 @@ ExtensionIdRegistry::SubstraitCallToArrow 
DecodeConcatMapping() {
 ExtensionIdRegistry::SubstraitAggregateToArrow DecodeBasicAggregate(
     const std::string& arrow_function_name) {
   return [arrow_function_name](const SubstraitCall& call) -> 
Result<compute::Aggregate> {
-    if (call.size() != 1) {
-      return Status::NotImplemented(
-          "Only unary aggregate functions are currently supported");
-    }
-    ARROW_ASSIGN_OR_RAISE(compute::Expression arg, call.GetValueArg(0));
-    const FieldRef* arg_ref = arg.field_ref();
-    if (!arg_ref) {
-      return Status::Invalid("Expected an aggregate call ", call.id().uri, "#",
-                             call.id().name, " to have a direct reference");
-    }
-    std::string fixed_arrow_func = arrow_function_name;
+    std::string fixed_arrow_func;
     if (call.is_hash()) {
-      fixed_arrow_func = "hash_" + arrow_function_name;
+      fixed_arrow_func = "hash_";
     }
-    return compute::Aggregate{std::move(fixed_arrow_func), nullptr, *arg_ref, 
""};
+
+    switch (call.size()) {
+      case 0: {
+        if (call.id().name == "count") {
+          fixed_arrow_func += "count_all";
+          return compute::Aggregate{std::move(fixed_arrow_func), ""};
+        }
+        return Status::Invalid("Expected aggregate call ", call.id().uri, "#",
+                               call.id().name, " to have at least one 
argument");

Review Comment:
   This implies that `count_all` is the only nullary function at the moment 
right?  I'm not sure if that will always be true but it is true enough at the 
moment I suppose.



##########
cpp/src/arrow/compute/exec/aggregate_node.cc:
##########
@@ -46,14 +46,26 @@ namespace {
 
 void AggregatesToString(std::stringstream* ss, const Schema& input_schema,
                         const std::vector<Aggregate>& aggs,
-                        const std::vector<int>& target_field_ids, int indent = 
0) {
+                        const std::vector<std::vector<int>>& target_fieldsets,
+                        int indent = 0) {
   *ss << "aggregates=[" << std::endl;
   for (size_t i = 0; i < aggs.size(); i++) {
     for (int j = 0; j < indent; ++j) *ss << "  ";
-    *ss << '\t' << aggs[i].function << '('
-        << input_schema.field(target_field_ids[i])->name();
+    *ss << '\t' << aggs[i].function << '(';
+    const auto& target = target_fieldsets[i];
+    if (target.size() == 0) {
+      *ss << "*";
+    } else {
+      *ss << input_schema.field(target[0])->name();
+      for (size_t k = 1; k < target.size(); k++) {
+        *ss << ", " << input_schema.field(target[k])->name();
+      }
+    }
     if (aggs[i].options) {
-      *ss << ", " << aggs[i].options->ToString();
+      auto* options_type = aggs[i].options->options_type();
+      if (options_type->num_properties() > 0) {

Review Comment:
   If you're trying to guard against null options why not just check if 
`aggs[i].options` is null?



##########
python/pyarrow/includes/libarrow.pxd:
##########
@@ -2489,6 +2489,8 @@ cdef extern from "arrow/compute/exec/aggregate.h" 
namespace \
     cdef cppclass CAggregate "arrow::compute::Aggregate":
         c_string function
         shared_ptr[CFunctionOptions] options
+        vector[CFieldRef] target

Review Comment:
   I'm confused how we got away with this field never existing.  Perhaps we 
never use `Aggregate` directly?



##########
cpp/src/arrow/compute/exec/aggregate.cc:
##########
@@ -75,13 +81,13 @@ Result<std::vector<std::unique_ptr<KernelState>>> 
InitKernels(
     }
 
     KernelContext kernel_ctx{ctx};
-    ARROW_ASSIGN_OR_RAISE(states[i],
-                          kernels[i]->init(&kernel_ctx, 
KernelInitArgs{kernels[i],
-                                                                       {
-                                                                           
in_types[i],
-                                                                           
uint32(),
-                                                                       },
-                                                                       
options}));
+    // {in_types[i]..., uint32()}
+    agg_in_types.reserve(in_types[i].size() + 1);

Review Comment:
   ```suggestion
   ```
   Same as above.  Cryptic comment and dubious reserve.



##########
cpp/src/arrow/compute/kernels/hash_aggregate.cc:
##########
@@ -108,19 +108,31 @@ Result<TypeHolder> ResolveGroupOutputType(KernelContext* 
ctx,
   return checked_cast<GroupedAggregator*>(ctx->state())->out_type();
 }
 
-HashAggregateKernel MakeKernel(InputType argument_type, KernelInit init) {
+HashAggregateKernel MakeKernel(std::shared_ptr<KernelSignature> signature,
+                               KernelInit init) {
   HashAggregateKernel kernel;
   kernel.init = std::move(init);
-  kernel.signature =
-      KernelSignature::Make({std::move(argument_type), 
InputType(Type::UINT32)},
-                            OutputType(ResolveGroupOutputType));
+  kernel.signature = std::move(signature);
   kernel.resize = HashAggregateResize;
   kernel.consume = HashAggregateConsume;
   kernel.merge = HashAggregateMerge;
   kernel.finalize = HashAggregateFinalize;
   return kernel;
 }
 
+HashAggregateKernel MakeKernel(InputType argument_type, KernelInit init) {
+  return MakeKernel(
+      KernelSignature::Make({std::move(argument_type), 
InputType(Type::UINT32)},
+                            OutputType(ResolveGroupOutputType)),
+      std::move(init));
+}
+
+HashAggregateKernel MakeUnaryKernel(KernelInit init) {
+  return MakeKernel(KernelSignature::Make({InputType(Type::UINT32)},
+                                          OutputType(ResolveGroupOutputType)),
+                    std::move(init));
+}
+

Review Comment:
   Hmm, `Unary` is a bit confusing here since one often doesn't think of the 
group id as an "argument".  In other words, your "unary" kernel is `COUNT(*)` 
which I would normally think of as nillary.



##########
cpp/src/arrow/compute/exec/plan_test.cc:
##########
@@ -1298,17 +1305,53 @@ TEST(ExecPlanExecution, ScalarSourceScalarAggSink) {
                 })
                 .AddToPlan(plan.get()));
 
-  ASSERT_THAT(
-      StartAndCollect(plan.get(), sink_gen),
-      Finishes(ResultWith(UnorderedElementsAreArray({
-          ExecBatchFromJSON(
-              {boolean(), boolean(), int64(), float64(), int64(), float64(), 
int64(),
-               float64(), float64()},
-              {ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, 
ArgShape::SCALAR,
-               ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, 
ArgShape::ARRAY,
-               ArgShape::SCALAR},
-              R"([[false, true, 6, 5.5, 26250, 0.7637626158259734, 33, 5.0, 
0.5833333333333334]])"),
-      }))));
+  auto exec_batch = ExecBatchFromJSON(
+      {boolean(), boolean(), int64(), int64(), float64(), int64(), float64(), 
int64(),
+       float64(), float64()},
+      {ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR,
+       ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR,
+       ArgShape::ARRAY, ArgShape::SCALAR},
+      R"([[false, true, 6, 6, 5.5, 26250, 0.7637626158259734, 33, 5.0, 
0.5833333333333334]])");
+
+  ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+              Finishes(ResultWith(UnorderedElementsAreArray({
+                  std::move(exec_batch),
+              }))));
+}
+
+TEST(ExecPlanExecution, ScalarSourceStandaloneNullaryScalarAggSink) {
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+  AsyncGenerator<std::optional<ExecBatch>> sink_gen;
+
+  BatchesWithSchema scalar_data;
+  scalar_data.batches = {
+      ExecBatchFromJSON({int32(), boolean()}, {ArgShape::SCALAR, 
ArgShape::SCALAR},
+                        "[[5, null], [5, false], [5, false]]"),
+      ExecBatchFromJSON({int32(), boolean()}, "[[5, true], [null, false], [7, 
true]]")};
+  scalar_data.schema = schema({
+      field("a", int32()),
+      field("b", boolean()),
+  });
+
+  auto sequence = Declaration::Sequence({
+      {"source", SourceNodeOptions{scalar_data.schema, 
scalar_data.gen(/*parallel=*/false,
+                                                                       
/*slow=*/false)}},
+      {"aggregate", AggregateNodeOptions{/*aggregates=*/{
+                        {"count_all", "count(*)"},
+                    }}},
+      {"sink", SinkNodeOptions{&sink_gen}},
+  });
+
+  // index can't be tested as it's order-dependent
+  // mode/quantile can't be tested as they're technically vector kernels

Review Comment:
   I'm not sure this comment is needed?



##########
cpp/src/arrow/compute/exec.cc:
##########
@@ -163,6 +163,34 @@ Result<ExecBatch> ExecBatch::Make(std::vector<Datum> 
values) {
       continue;
     }
 
+    if (length != value.length()) {
+      // all the arrays should have the same length
+      return -1;
+    }
+  }
+
+  return length == -1 ? 1 : length;
+}
+
+Result<ExecBatch> ExecBatch::Make(std::vector<Datum> values, int64_t length) {
+  if (length < 0) {

Review Comment:
   Can we call `InferLength` here?  It seems like some repetition to have it 
twice.



##########
cpp/src/arrow/compute/exec/plan_test.cc:
##########
@@ -1298,17 +1305,53 @@ TEST(ExecPlanExecution, ScalarSourceScalarAggSink) {
                 })
                 .AddToPlan(plan.get()));
 
-  ASSERT_THAT(
-      StartAndCollect(plan.get(), sink_gen),
-      Finishes(ResultWith(UnorderedElementsAreArray({
-          ExecBatchFromJSON(
-              {boolean(), boolean(), int64(), float64(), int64(), float64(), 
int64(),
-               float64(), float64()},
-              {ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, 
ArgShape::SCALAR,
-               ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, 
ArgShape::ARRAY,
-               ArgShape::SCALAR},
-              R"([[false, true, 6, 5.5, 26250, 0.7637626158259734, 33, 5.0, 
0.5833333333333334]])"),
-      }))));
+  auto exec_batch = ExecBatchFromJSON(
+      {boolean(), boolean(), int64(), int64(), float64(), int64(), float64(), 
int64(),
+       float64(), float64()},
+      {ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR,
+       ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR,
+       ArgShape::ARRAY, ArgShape::SCALAR},
+      R"([[false, true, 6, 6, 5.5, 26250, 0.7637626158259734, 33, 5.0, 
0.5833333333333334]])");
+
+  ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+              Finishes(ResultWith(UnorderedElementsAreArray({
+                  std::move(exec_batch),
+              }))));
+}
+
+TEST(ExecPlanExecution, ScalarSourceStandaloneNullaryScalarAggSink) {
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+  AsyncGenerator<std::optional<ExecBatch>> sink_gen;
+
+  BatchesWithSchema scalar_data;
+  scalar_data.batches = {
+      ExecBatchFromJSON({int32(), boolean()}, {ArgShape::SCALAR, 
ArgShape::SCALAR},
+                        "[[5, null], [5, false], [5, false]]"),
+      ExecBatchFromJSON({int32(), boolean()}, "[[5, true], [null, false], [7, 
true]]")};
+  scalar_data.schema = schema({
+      field("a", int32()),
+      field("b", boolean()),
+  });
+
+  auto sequence = Declaration::Sequence({
+      {"source", SourceNodeOptions{scalar_data.schema, 
scalar_data.gen(/*parallel=*/false,
+                                                                       
/*slow=*/false)}},
+      {"aggregate", AggregateNodeOptions{/*aggregates=*/{
+                        {"count_all", "count(*)"},
+                    }}},
+      {"sink", SinkNodeOptions{&sink_gen}},
+  });
+
+  // index can't be tested as it's order-dependent
+  // mode/quantile can't be tested as they're technically vector kernels
+  ASSERT_OK(sequence.AddToPlan(plan.get()));
+
+  auto exec_batch = ExecBatchFromJSON({int64()}, {ArgShape::SCALAR}, 
R"([[6]])");
+
+  ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+              Finishes(ResultWith(UnorderedElementsAreArray({
+                  std::move(exec_batch),
+              }))));
 }

Review Comment:
   ```suggestion
   TEST(ExecPlanExecution, ScalarSourceStandaloneNullaryScalarAggSink) {
     BatchesWithSchema scalar_data;
     scalar_data.batches = {
         ExecBatchFromJSON({int32(), boolean()}, {ArgShape::SCALAR, 
ArgShape::SCALAR},
                           "[[5, null], [5, false], [5, false]]"),
         ExecBatchFromJSON({int32(), boolean()}, "[[5, true], [null, false], 
[7, true]]")};
     scalar_data.schema = schema({
         field("a", int32()),
         field("b", boolean()),
     });
   
     Declaration plan = Declaration::Sequence(
         {{"source",
           SourceNodeOptions{scalar_data.schema, 
scalar_data.gen(/*parallel=*/false,
                                                                 
/*slow=*/false)}},
          {"aggregate", AggregateNodeOptions{/*aggregates=*/{
                            {"count_all", "count(*)"},
                        }}}});
     ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema actual_batches,
                          DeclarationToExecBatches(std::move(plan)));
   
     auto expected = ExecBatchFromJSON({int64()}, {ArgShape::SCALAR}, 
R"([[6]])");
     AssertExecBatchesEqualIgnoringOrder(actual_batches.schema, 
actual_batches.batches,
                                         {expected});
   }
   ```
   
   We are slowly trying to move away from `StartAndCollect` to 
`DeclarationToXyz` methods in tests.  The latter is a bit more compact and 
reflects the public API (`StartAndCollect` is an internal method)  You don't 
need to fix old tests but new tests should be written with the new style.



##########
cpp/src/arrow/engine/substrait/test_plan_builder.h:
##########
@@ -64,11 +64,12 @@ ARROW_ENGINE_EXPORT Result<std::shared_ptr<Buffer>> 
CreateScanProjectSubstrait(
 /// \brief Create a scan->aggregate->sink plan for tests
 ///
 /// The plan will create an aggregate with one grouping set (defined by
-/// key_idxs) and one measure.  The measure will be a unary function
-/// defined by `function_id` and a direct reference to `arg_idx`.
+/// key_idxs) and one measure.  The measure will be a function
+/// defined by `function_id` and a direct references to `arg_idxs`.

Review Comment:
   ```suggestion
   /// defined by `function_id` and direct references to `arg_idxs`.
   ```



##########
cpp/src/arrow/engine/substrait/function_test.cc:
##########
@@ -669,7 +673,15 @@ TEST(FunctionMapping, AggregateCases) {
        {int8()},
        "[3]",
        "[2, 1]",
-       int64()}};
+       int64()},
+      {{kSubstraitAggregateGenericFunctionsUri, "count"},
+       {"[1, 2, 30]"},

Review Comment:
   ```suggestion
          {"[1, null, 30]"},
   ```
   



##########
cpp/src/arrow/compute/exec.cc:
##########
@@ -147,9 +147,9 @@ ExecBatch ExecBatch::Slice(int64_t offset, int64_t length) 
const {
   return out;
 }
 
-Result<ExecBatch> ExecBatch::Make(std::vector<Datum> values) {
+int64_t ExecBatch::InferLength(const std::vector<Datum>& values) {

Review Comment:
   How does this function compare to `InferBatchLength` further down in the 
file?  Could you expose that function instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to