rtpsw commented on code in PR #34311: URL: https://github.com/apache/arrow/pull/34311#discussion_r1126793417
########## cpp/src/arrow/compute/kernels/hash_aggregate_test.cc: ########## @@ -174,81 +242,117 @@ Result<Datum> RunGroupBy(const BatchesWithSchema& input, ARROW_ASSIGN_OR_RAISE(std::vector<ExecBatch> output_batches, start_and_collect.MoveResult()); - ArrayVector out_arrays(aggregates.size() + key_names.size()); const auto& output_schema = plan->nodes()[0]->output()->output_schema(); + if (!segmented) { + return MakeGroupByOutput(output_batches, output_schema, aggregates.size(), + key_names.size(), naive); + } + + std::vector<ArrayVector> out_arrays(aggregates.size() + key_names.size() + + segment_key_names.size()); for (size_t i = 0; i < out_arrays.size(); ++i) { std::vector<std::shared_ptr<Array>> arrays(output_batches.size()); for (size_t j = 0; j < output_batches.size(); ++j) { - arrays[j] = output_batches[j].values[i].make_array(); + auto& value = output_batches[j].values[i]; + if (value.is_scalar()) { + ARROW_ASSIGN_OR_RAISE( + arrays[j], MakeArrayFromScalar(*value.scalar(), output_batches[j].length)); + } else if (value.is_array()) { + arrays[j] = value.make_array(); + } else { + return Status::Invalid("GroupByUsingExecPlan unsupported value kind ", + ToString(value.kind())); + } } if (arrays.empty()) { + arrays.resize(1); ARROW_ASSIGN_OR_RAISE( - out_arrays[i], - MakeArrayOfNull(output_schema->field(static_cast<int>(i))->type(), - /*length=*/0)); - } else { - ARROW_ASSIGN_OR_RAISE(out_arrays[i], Concatenate(arrays)); + arrays[0], MakeArrayOfNull(output_schema->field(static_cast<int>(i))->type(), + /*length=*/0)); } + out_arrays[i] = {std::move(arrays)}; } - // The exec plan may reorder the output rows. The tests are all setup to expect ouptut - // in ascending order of keys. So we need to sort the result by the key columns. To do - // that we create a table using the key columns, calculate the sort indices from that - // table (sorting on all fields) and then use those indices to calculate our result. - std::vector<std::shared_ptr<Field>> key_fields; - std::vector<std::shared_ptr<Array>> key_columns; - std::vector<SortKey> sort_keys; - for (std::size_t i = 0; i < key_names.size(); i++) { - const std::shared_ptr<Array>& arr = out_arrays[i + aggregates.size()]; - if (arr->type_id() == Type::DICTIONARY) { - // Can't sort dictionary columns so need to decode - auto dict_arr = checked_pointer_cast<DictionaryArray>(arr); - ARROW_ASSIGN_OR_RAISE(auto decoded_arr, - Take(*dict_arr->dictionary(), *dict_arr->indices())); - key_columns.push_back(decoded_arr); - key_fields.push_back( - field("name_does_not_matter", dict_arr->dict_type()->value_type())); - } else { - key_columns.push_back(arr); - key_fields.push_back(field("name_does_not_matter", arr->type())); + if (segmented && segment_key_names.size() > 0) { + ArrayVector struct_arrays; + struct_arrays.reserve(output_batches.size()); + for (size_t j = 0; j < output_batches.size(); ++j) { + ArrayVector struct_fields; + struct_fields.reserve(out_arrays.size()); + for (auto out_array : out_arrays) { + struct_fields.push_back(out_array[j]); + } + ARROW_ASSIGN_OR_RAISE(auto struct_array, + StructArray::Make(struct_fields, output_schema->fields())); + struct_arrays.push_back(struct_array); } - sort_keys.emplace_back(static_cast<int>(i)); + return ChunkedArray::Make(struct_arrays); + } else { + ArrayVector struct_fields(out_arrays.size()); + for (size_t i = 0; i < out_arrays.size(); ++i) { + ARROW_ASSIGN_OR_RAISE(struct_fields[i], Concatenate(out_arrays[i])); + } + return StructArray::Make(std::move(struct_fields), output_schema->fields()); } - std::shared_ptr<Schema> key_schema = schema(std::move(key_fields)); - std::shared_ptr<Table> key_table = Table::Make(std::move(key_schema), key_columns); - SortOptions sort_options(std::move(sort_keys)); - ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> sort_indices, - SortIndices(key_table, sort_options)); +} - ARROW_ASSIGN_OR_RAISE( - std::shared_ptr<Array> struct_arr, - StructArray::Make(std::move(out_arrays), output_schema->fields())); +Result<Datum> RunGroupBy(const BatchesWithSchema& input, + const std::vector<std::string>& key_names, + const std::vector<std::string>& segment_key_names, + const std::vector<Aggregate>& aggregates, bool use_threads, + bool segmented = false, bool naive = false) { + if (segment_key_names.size() > 0) { + ARROW_ASSIGN_OR_RAISE(auto thread_pool, arrow::internal::ThreadPool::Make(1)); + ExecContext seq_ctx(default_memory_pool(), thread_pool.get()); + return RunGroupBy(input, key_names, segment_key_names, aggregates, &seq_ctx, + use_threads, segmented, naive); + } else { + return RunGroupBy(input, key_names, segment_key_names, aggregates, + threaded_exec_context(), use_threads, segmented, naive); + } +} - return Take(struct_arr, sort_indices); +Result<Datum> RunGroupBy(const BatchesWithSchema& input, + const std::vector<std::string>& key_names, + const std::vector<Aggregate>& aggregates, bool use_threads, + bool segmented = false, bool naive = false) { + return RunGroupBy(input, key_names, {}, aggregates, use_threads, segmented); } /// Simpler overload where you can give the columns as datums Result<Datum> RunGroupBy(const std::vector<Datum>& arguments, const std::vector<Datum>& keys, - const std::vector<Aggregate>& aggregates, - bool use_threads = false) { + const std::vector<Datum>& segment_keys, + const std::vector<Aggregate>& aggregates, bool use_threads, + bool segmented = false, bool naive = false) { Review Comment: > Sorry I am still confused - what is "naive group by"? `NaiveGroupBy` is [a tester function](https://github.com/apache/arrow/blob/d5b3b4737838774db658d3c488fcd3e72bc13f7e/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc#L75-L136) that computes the expected aggregation result (in a naive/simple/non-optimized way). > What do you mean by "segmented aggregation with empty segment-keys"? Isn't this what happens when segment-keys is empty? (The original hash aggregate) Let's take a step back. When `segment_keys` is non-empty the `segmented` flag is always `true`; otherwise (when empty), it may still be set to `true`, and this is the case we're discussing here. In this case, the tester restructures (without changing the data of) the result of `RunGroupBy` from `std::vector<ExecBatch> output_batches` to `std::vector<ArrayVector> out_arrays`, which have the structure typical of the case of a non-empty `segment_keys` (with multiple arrays per column, one array per segment) but only one array per column (because, technically, there is only one segment in this case). What all this boils down to is a test focusing on the structure, rather than the data, of the result. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org