rtpsw commented on code in PR #34311:
URL: https://github.com/apache/arrow/pull/34311#discussion_r1126965621


##########
cpp/src/arrow/compute/kernels/hash_aggregate_test.cc:
##########
@@ -174,81 +242,117 @@ Result<Datum> RunGroupBy(const BatchesWithSchema& input,
   ARROW_ASSIGN_OR_RAISE(std::vector<ExecBatch> output_batches,
                         start_and_collect.MoveResult());
 
-  ArrayVector out_arrays(aggregates.size() + key_names.size());
   const auto& output_schema = plan->nodes()[0]->output()->output_schema();
+  if (!segmented) {
+    return MakeGroupByOutput(output_batches, output_schema, aggregates.size(),
+                             key_names.size(), naive);
+  }
+
+  std::vector<ArrayVector> out_arrays(aggregates.size() + key_names.size() +
+                                      segment_key_names.size());
   for (size_t i = 0; i < out_arrays.size(); ++i) {
     std::vector<std::shared_ptr<Array>> arrays(output_batches.size());
     for (size_t j = 0; j < output_batches.size(); ++j) {
-      arrays[j] = output_batches[j].values[i].make_array();
+      auto& value = output_batches[j].values[i];
+      if (value.is_scalar()) {
+        ARROW_ASSIGN_OR_RAISE(
+            arrays[j], MakeArrayFromScalar(*value.scalar(), 
output_batches[j].length));
+      } else if (value.is_array()) {
+        arrays[j] = value.make_array();
+      } else {
+        return Status::Invalid("GroupByUsingExecPlan unsupported value kind ",
+                               ToString(value.kind()));
+      }
     }
     if (arrays.empty()) {
+      arrays.resize(1);
       ARROW_ASSIGN_OR_RAISE(
-          out_arrays[i],
-          MakeArrayOfNull(output_schema->field(static_cast<int>(i))->type(),
-                          /*length=*/0));
-    } else {
-      ARROW_ASSIGN_OR_RAISE(out_arrays[i], Concatenate(arrays));
+          arrays[0], 
MakeArrayOfNull(output_schema->field(static_cast<int>(i))->type(),
+                                     /*length=*/0));
     }
+    out_arrays[i] = {std::move(arrays)};
   }
 
-  // The exec plan may reorder the output rows.  The tests are all setup to 
expect ouptut
-  // in ascending order of keys.  So we need to sort the result by the key 
columns.  To do
-  // that we create a table using the key columns, calculate the sort indices 
from that
-  // table (sorting on all fields) and then use those indices to calculate our 
result.
-  std::vector<std::shared_ptr<Field>> key_fields;
-  std::vector<std::shared_ptr<Array>> key_columns;
-  std::vector<SortKey> sort_keys;
-  for (std::size_t i = 0; i < key_names.size(); i++) {
-    const std::shared_ptr<Array>& arr = out_arrays[i + aggregates.size()];
-    if (arr->type_id() == Type::DICTIONARY) {
-      // Can't sort dictionary columns so need to decode
-      auto dict_arr = checked_pointer_cast<DictionaryArray>(arr);
-      ARROW_ASSIGN_OR_RAISE(auto decoded_arr,
-                            Take(*dict_arr->dictionary(), 
*dict_arr->indices()));
-      key_columns.push_back(decoded_arr);
-      key_fields.push_back(
-          field("name_does_not_matter", dict_arr->dict_type()->value_type()));
-    } else {
-      key_columns.push_back(arr);
-      key_fields.push_back(field("name_does_not_matter", arr->type()));
+  if (segmented && segment_key_names.size() > 0) {
+    ArrayVector struct_arrays;
+    struct_arrays.reserve(output_batches.size());
+    for (size_t j = 0; j < output_batches.size(); ++j) {
+      ArrayVector struct_fields;
+      struct_fields.reserve(out_arrays.size());
+      for (auto out_array : out_arrays) {
+        struct_fields.push_back(out_array[j]);
+      }
+      ARROW_ASSIGN_OR_RAISE(auto struct_array,
+                            StructArray::Make(struct_fields, 
output_schema->fields()));
+      struct_arrays.push_back(struct_array);
     }
-    sort_keys.emplace_back(static_cast<int>(i));
+    return ChunkedArray::Make(struct_arrays);
+  } else {
+    ArrayVector struct_fields(out_arrays.size());
+    for (size_t i = 0; i < out_arrays.size(); ++i) {
+      ARROW_ASSIGN_OR_RAISE(struct_fields[i], Concatenate(out_arrays[i]));
+    }
+    return StructArray::Make(std::move(struct_fields), 
output_schema->fields());
   }
-  std::shared_ptr<Schema> key_schema = schema(std::move(key_fields));
-  std::shared_ptr<Table> key_table = Table::Make(std::move(key_schema), 
key_columns);
-  SortOptions sort_options(std::move(sort_keys));
-  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> sort_indices,
-                        SortIndices(key_table, sort_options));
+}
 
-  ARROW_ASSIGN_OR_RAISE(
-      std::shared_ptr<Array> struct_arr,
-      StructArray::Make(std::move(out_arrays), output_schema->fields()));
+Result<Datum> RunGroupBy(const BatchesWithSchema& input,
+                         const std::vector<std::string>& key_names,
+                         const std::vector<std::string>& segment_key_names,
+                         const std::vector<Aggregate>& aggregates, bool 
use_threads,
+                         bool segmented = false, bool naive = false) {
+  if (segment_key_names.size() > 0) {
+    ARROW_ASSIGN_OR_RAISE(auto thread_pool, 
arrow::internal::ThreadPool::Make(1));
+    ExecContext seq_ctx(default_memory_pool(), thread_pool.get());
+    return RunGroupBy(input, key_names, segment_key_names, aggregates, 
&seq_ctx,
+                      use_threads, segmented, naive);
+  } else {
+    return RunGroupBy(input, key_names, segment_key_names, aggregates,
+                      threaded_exec_context(), use_threads, segmented, naive);
+  }
+}
 
-  return Take(struct_arr, sort_indices);
+Result<Datum> RunGroupBy(const BatchesWithSchema& input,
+                         const std::vector<std::string>& key_names,
+                         const std::vector<Aggregate>& aggregates, bool 
use_threads,
+                         bool segmented = false, bool naive = false) {
+  return RunGroupBy(input, key_names, {}, aggregates, use_threads, segmented);
 }
 
 /// Simpler overload where you can give the columns as datums
 Result<Datum> RunGroupBy(const std::vector<Datum>& arguments,
                          const std::vector<Datum>& keys,
-                         const std::vector<Aggregate>& aggregates,
-                         bool use_threads = false) {
+                         const std::vector<Datum>& segment_keys,
+                         const std::vector<Aggregate>& aggregates, bool 
use_threads,
+                         bool segmented = false, bool naive = false) {

Review Comment:
   > I am still quite confused - what is this testing with naive=true vs 
naive=false?
   
   IIRC, we discussed this elsewhere in this PR. The `naive` flag means that 
the output is expected to be  that of like `NaiveGroupBy`, which in particular 
doesn't require sorting.
   
   > Are we test a tester function here?
   
   No. The reason for the `naive` flag is that the expected output of some 
test-cases (most, if not all, are pre-PR) is naive and of some others it is 
not. The common `RunGroupBy` function deals with both kinds of expected output. 
I'll add documentation for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to