rtpsw commented on code in PR #14352:
URL: https://github.com/apache/arrow/pull/14352#discussion_r1019636999


##########
cpp/src/arrow/compute/exec/aggregate.cc:
##########
@@ -101,136 +106,401 @@ Result<FieldVector> ResolveKernels(
   return fields;
 }
 
-Result<Datum> GroupBy(const std::vector<Datum>& arguments, const 
std::vector<Datum>& keys,
-                      const std::vector<Aggregate>& aggregates, bool 
use_threads,
-                      ExecContext* ctx) {
-  auto task_group =
-      use_threads
-          ? 
arrow::internal::TaskGroup::MakeThreaded(arrow::internal::GetCpuThreadPool())
-          : arrow::internal::TaskGroup::MakeSerial();
+namespace {
 
-  std::vector<const HashAggregateKernel*> kernels;
-  std::vector<std::vector<std::unique_ptr<KernelState>>> states;
-  FieldVector out_fields;
+template <typename T>
+inline std::string ToString(const std::vector<T>& v) {
+  std::stringstream s;
+  s << '[';
+  for (size_t i = 0; i < v.size(); i++) {
+    if (i != 0) s << ',';
+    s << v[i];
+  }
+  s << ']';
+  return s.str();
+}
 
-  using arrow::compute::detail::ExecSpanIterator;
-  ExecSpanIterator argument_iterator;
+int64_t FindLength(const std::vector<Datum>& arguments, const 
std::vector<Datum>& keys,
+                   const std::vector<Datum>& segment_keys) {
+  int64_t length = -1;
+  for (const auto& datums : {arguments, keys, segment_keys}) {
+    for (const auto& datum : datums) {
+      if (datum.is_scalar()) {
+        // do nothing
+      } else if (datum.is_array() || datum.is_chunked_array()) {
+        int64_t datum_length =
+            datum.is_array() ? datum.array()->length : 
datum.chunked_array()->length();
+        if (length == -1) {
+          length = datum_length;
+        } else if (length != datum_length) {
+          return -1;
+        }
+      } else {
+        ARROW_DCHECK(false);
+      }
+    }
+  }
+  return length;
+}
 
-  ExecBatch args_batch;
-  if (!arguments.empty()) {
-    ARROW_ASSIGN_OR_RAISE(args_batch, ExecBatch::Make(arguments));
+class GroupByProcess {
+ public:
+  struct BatchInfo {
+    ExecBatch args_batch;
+    std::vector<TypeHolder> argument_types;
+    ExecBatch keys_batch;
+    std::vector<TypeHolder> key_types;
+    ExecBatch segment_keys_batch;
+    std::vector<TypeHolder> segment_key_types;

Review Comment:
   Yes. The organization is that `BatchInfo` has fields changing from batch to 
batch, `StateInfo` has fields that are determined by the stream of batches 
processed so far, and `GroupByProcess` has fields that are initialized once.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to