rtpsw commented on code in PR #34311:
URL: https://github.com/apache/arrow/pull/34311#discussion_r1117884526


##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -199,21 +199,32 @@ class ARROW_EXPORT ProjectNodeOptions : public 
ExecNodeOptions {
   std::vector<std::string> names;
 };
 
-/// \brief Make a node which aggregates input batches, optionally grouped by 
keys.
+/// \brief Make a node which aggregates input batches, optionally grouped by 
keys and
+/// optionally segmented by segment-keys. Both keys and segment-keys determine 
the group.
+/// However segment-keys are also used for determining grouping segments, 
which should be
+/// large, and allow streaming a partial aggregation result after processing 
each segment.
+/// One common use-case for segment-keys is ordered aggregation, in which the 
segment-key
+/// attribute specifies a column with non-decreasing values or a 
lexigographically-ordered
+/// set of such columns.
 ///
 /// If the keys attribute is a non-empty vector, then each aggregate in 
`aggregates` is
 /// expected to be a HashAggregate function. If the keys attribute is an empty 
vector,
 /// then each aggregate is assumed to be a ScalarAggregate function.
 class ARROW_EXPORT AggregateNodeOptions : public ExecNodeOptions {
  public:
   explicit AggregateNodeOptions(std::vector<Aggregate> aggregates,
-                                std::vector<FieldRef> keys = {})
-      : aggregates(std::move(aggregates)), keys(std::move(keys)) {}
+                                std::vector<FieldRef> keys = {},
+                                std::vector<FieldRef> segment_keys = {})
+      : aggregates(std::move(aggregates)),
+        keys(std::move(keys)),
+        segment_keys(std::move(segment_keys)) {}
 
   // aggregations which will be applied to the targetted fields
   std::vector<Aggregate> aggregates;
   // keys by which aggregations will be grouped
   std::vector<FieldRef> keys;
+  // keys by which aggregations will be segmented
+  std::vector<FieldRef> segment_keys;

Review Comment:
   > So if I want to group by A, B, C and I know my data is sorted by A then I 
can do segments=A and keys=B C?
   
   Yes, that's what you'd do.
   
   I added the check to disallow.



##########
cpp/src/arrow/compute/kernels/hash_aggregate_test.cc:
##########
@@ -174,81 +242,117 @@ Result<Datum> RunGroupBy(const BatchesWithSchema& input,
   ARROW_ASSIGN_OR_RAISE(std::vector<ExecBatch> output_batches,
                         start_and_collect.MoveResult());
 
-  ArrayVector out_arrays(aggregates.size() + key_names.size());
   const auto& output_schema = plan->nodes()[0]->output()->output_schema();
+  if (!segmented) {
+    return MakeGroupByOutput(output_batches, output_schema, aggregates.size(),
+                             key_names.size(), naive);
+  }
+
+  std::vector<ArrayVector> out_arrays(aggregates.size() + key_names.size() +
+                                      segment_key_names.size());
   for (size_t i = 0; i < out_arrays.size(); ++i) {
     std::vector<std::shared_ptr<Array>> arrays(output_batches.size());
     for (size_t j = 0; j < output_batches.size(); ++j) {
-      arrays[j] = output_batches[j].values[i].make_array();
+      auto& value = output_batches[j].values[i];
+      if (value.is_scalar()) {
+        ARROW_ASSIGN_OR_RAISE(
+            arrays[j], MakeArrayFromScalar(*value.scalar(), 
output_batches[j].length));
+      } else if (value.is_array()) {
+        arrays[j] = value.make_array();
+      } else {
+        return Status::Invalid("GroupByUsingExecPlan unsupported value kind ",
+                               ToString(value.kind()));
+      }
     }
     if (arrays.empty()) {
+      arrays.resize(1);
       ARROW_ASSIGN_OR_RAISE(
-          out_arrays[i],
-          MakeArrayOfNull(output_schema->field(static_cast<int>(i))->type(),
-                          /*length=*/0));
-    } else {
-      ARROW_ASSIGN_OR_RAISE(out_arrays[i], Concatenate(arrays));
+          arrays[0], 
MakeArrayOfNull(output_schema->field(static_cast<int>(i))->type(),
+                                     /*length=*/0));
     }
+    out_arrays[i] = {std::move(arrays)};
   }
 
-  // The exec plan may reorder the output rows.  The tests are all setup to 
expect ouptut
-  // in ascending order of keys.  So we need to sort the result by the key 
columns.  To do
-  // that we create a table using the key columns, calculate the sort indices 
from that
-  // table (sorting on all fields) and then use those indices to calculate our 
result.
-  std::vector<std::shared_ptr<Field>> key_fields;
-  std::vector<std::shared_ptr<Array>> key_columns;
-  std::vector<SortKey> sort_keys;
-  for (std::size_t i = 0; i < key_names.size(); i++) {
-    const std::shared_ptr<Array>& arr = out_arrays[i + aggregates.size()];
-    if (arr->type_id() == Type::DICTIONARY) {
-      // Can't sort dictionary columns so need to decode
-      auto dict_arr = checked_pointer_cast<DictionaryArray>(arr);
-      ARROW_ASSIGN_OR_RAISE(auto decoded_arr,
-                            Take(*dict_arr->dictionary(), 
*dict_arr->indices()));
-      key_columns.push_back(decoded_arr);
-      key_fields.push_back(
-          field("name_does_not_matter", dict_arr->dict_type()->value_type()));
-    } else {
-      key_columns.push_back(arr);
-      key_fields.push_back(field("name_does_not_matter", arr->type()));
+  if (segmented && segment_key_names.size() > 0) {
+    ArrayVector struct_arrays;
+    struct_arrays.reserve(output_batches.size());
+    for (size_t j = 0; j < output_batches.size(); ++j) {
+      ArrayVector struct_fields;
+      struct_fields.reserve(out_arrays.size());
+      for (auto out_array : out_arrays) {
+        struct_fields.push_back(out_array[j]);
+      }
+      ARROW_ASSIGN_OR_RAISE(auto struct_array,
+                            StructArray::Make(struct_fields, 
output_schema->fields()));
+      struct_arrays.push_back(struct_array);
     }
-    sort_keys.emplace_back(static_cast<int>(i));
+    return ChunkedArray::Make(struct_arrays);
+  } else {
+    ArrayVector struct_fields(out_arrays.size());
+    for (size_t i = 0; i < out_arrays.size(); ++i) {
+      ARROW_ASSIGN_OR_RAISE(struct_fields[i], Concatenate(out_arrays[i]));
+    }
+    return StructArray::Make(std::move(struct_fields), 
output_schema->fields());
   }
-  std::shared_ptr<Schema> key_schema = schema(std::move(key_fields));
-  std::shared_ptr<Table> key_table = Table::Make(std::move(key_schema), 
key_columns);
-  SortOptions sort_options(std::move(sort_keys));
-  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> sort_indices,
-                        SortIndices(key_table, sort_options));
+}
 
-  ARROW_ASSIGN_OR_RAISE(
-      std::shared_ptr<Array> struct_arr,
-      StructArray::Make(std::move(out_arrays), output_schema->fields()));
+Result<Datum> RunGroupBy(const BatchesWithSchema& input,
+                         const std::vector<std::string>& key_names,
+                         const std::vector<std::string>& segment_key_names,
+                         const std::vector<Aggregate>& aggregates, bool 
use_threads,
+                         bool segmented = false, bool naive = false) {
+  if (segment_key_names.size() > 0) {
+    ARROW_ASSIGN_OR_RAISE(auto thread_pool, 
arrow::internal::ThreadPool::Make(1));
+    ExecContext seq_ctx(default_memory_pool(), thread_pool.get());
+    return RunGroupBy(input, key_names, segment_key_names, aggregates, 
&seq_ctx,
+                      use_threads, segmented, naive);
+  } else {
+    return RunGroupBy(input, key_names, segment_key_names, aggregates,
+                      threaded_exec_context(), use_threads, segmented, naive);
+  }
+}
 
-  return Take(struct_arr, sort_indices);
+Result<Datum> RunGroupBy(const BatchesWithSchema& input,
+                         const std::vector<std::string>& key_names,
+                         const std::vector<Aggregate>& aggregates, bool 
use_threads,
+                         bool segmented = false, bool naive = false) {
+  return RunGroupBy(input, key_names, {}, aggregates, use_threads, segmented);
 }
 
 /// Simpler overload where you can give the columns as datums
 Result<Datum> RunGroupBy(const std::vector<Datum>& arguments,
                          const std::vector<Datum>& keys,
-                         const std::vector<Aggregate>& aggregates,
-                         bool use_threads = false) {
+                         const std::vector<Datum>& segment_keys,
+                         const std::vector<Aggregate>& aggregates, bool 
use_threads,
+                         bool segmented = false, bool naive = false) {
   using arrow::compute::detail::ExecSpanIterator;
 
-  FieldVector scan_fields(arguments.size() + keys.size());
+  FieldVector scan_fields(arguments.size() + keys.size() + 
segment_keys.size());
   std::vector<std::string> key_names(keys.size());
+  std::vector<std::string> segment_key_names(segment_keys.size());
   for (size_t i = 0; i < arguments.size(); ++i) {
     auto name = std::string("agg_") + ToChars(i);
     scan_fields[i] = field(name, arguments[i].type());
   }
+  size_t base = arguments.size();
   for (size_t i = 0; i < keys.size(); ++i) {
     auto name = std::string("key_") + ToChars(i);
-    scan_fields[arguments.size() + i] = field(name, keys[i].type());
+    scan_fields[base + i] = field(name, keys[i].type());
     key_names[i] = std::move(name);
   }
+  base += keys.size();
+  size_t j = segmented ? keys.size() : keys.size();

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to