westonpace commented on code in PR #34311:
URL: https://github.com/apache/arrow/pull/34311#discussion_r1128831353
##########
cpp/src/arrow/compute/kernels/hash_aggregate_test.cc:
##########
@@ -174,81 +255,117 @@ Result<Datum> RunGroupBy(const BatchesWithSchema& input,
ARROW_ASSIGN_OR_RAISE(std::vector<ExecBatch> output_batches,
start_and_collect.MoveResult());
- ArrayVector out_arrays(aggregates.size() + key_names.size());
const auto& output_schema = plan->nodes()[0]->output()->output_schema();
+ if (!segmented) {
+ return MakeGroupByOutput(output_batches, output_schema, aggregates.size(),
+ key_names.size(), naive);
+ }
+
+ std::vector<ArrayVector> out_arrays(aggregates.size() + key_names.size() +
+ segment_key_names.size());
for (size_t i = 0; i < out_arrays.size(); ++i) {
std::vector<std::shared_ptr<Array>> arrays(output_batches.size());
for (size_t j = 0; j < output_batches.size(); ++j) {
- arrays[j] = output_batches[j].values[i].make_array();
+ auto& value = output_batches[j].values[i];
+ if (value.is_scalar()) {
+ ARROW_ASSIGN_OR_RAISE(
+ arrays[j], MakeArrayFromScalar(*value.scalar(),
output_batches[j].length));
+ } else if (value.is_array()) {
+ arrays[j] = value.make_array();
+ } else {
+ return Status::Invalid("GroupByUsingExecPlan unsupported value kind ",
+ ToString(value.kind()));
+ }
}
if (arrays.empty()) {
+ arrays.resize(1);
ARROW_ASSIGN_OR_RAISE(
- out_arrays[i],
- MakeArrayOfNull(output_schema->field(static_cast<int>(i))->type(),
- /*length=*/0));
- } else {
- ARROW_ASSIGN_OR_RAISE(out_arrays[i], Concatenate(arrays));
+ arrays[0],
MakeArrayOfNull(output_schema->field(static_cast<int>(i))->type(),
+ /*length=*/0));
}
+ out_arrays[i] = {std::move(arrays)};
}
- // The exec plan may reorder the output rows. The tests are all setup to
expect ouptut
- // in ascending order of keys. So we need to sort the result by the key
columns. To do
- // that we create a table using the key columns, calculate the sort indices
from that
- // table (sorting on all fields) and then use those indices to calculate our
result.
- std::vector<std::shared_ptr<Field>> key_fields;
- std::vector<std::shared_ptr<Array>> key_columns;
- std::vector<SortKey> sort_keys;
- for (std::size_t i = 0; i < key_names.size(); i++) {
- const std::shared_ptr<Array>& arr = out_arrays[i + aggregates.size()];
- if (arr->type_id() == Type::DICTIONARY) {
- // Can't sort dictionary columns so need to decode
- auto dict_arr = checked_pointer_cast<DictionaryArray>(arr);
- ARROW_ASSIGN_OR_RAISE(auto decoded_arr,
- Take(*dict_arr->dictionary(),
*dict_arr->indices()));
- key_columns.push_back(decoded_arr);
- key_fields.push_back(
- field("name_does_not_matter", dict_arr->dict_type()->value_type()));
- } else {
- key_columns.push_back(arr);
- key_fields.push_back(field("name_does_not_matter", arr->type()));
+ if (segmented && segment_key_names.size() > 0) {
+ ArrayVector struct_arrays;
+ struct_arrays.reserve(output_batches.size());
+ for (size_t j = 0; j < output_batches.size(); ++j) {
+ ArrayVector struct_fields;
+ struct_fields.reserve(out_arrays.size());
+ for (auto out_array : out_arrays) {
+ struct_fields.push_back(out_array[j]);
+ }
+ ARROW_ASSIGN_OR_RAISE(auto struct_array,
+ StructArray::Make(struct_fields,
output_schema->fields()));
+ struct_arrays.push_back(struct_array);
}
- sort_keys.emplace_back(static_cast<int>(i));
+ return ChunkedArray::Make(struct_arrays);
+ } else {
+ ArrayVector struct_fields(out_arrays.size());
+ for (size_t i = 0; i < out_arrays.size(); ++i) {
+ ARROW_ASSIGN_OR_RAISE(struct_fields[i], Concatenate(out_arrays[i]));
+ }
+ return StructArray::Make(std::move(struct_fields),
output_schema->fields());
}
- std::shared_ptr<Schema> key_schema = schema(std::move(key_fields));
- std::shared_ptr<Table> key_table = Table::Make(std::move(key_schema),
key_columns);
- SortOptions sort_options(std::move(sort_keys));
- ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> sort_indices,
- SortIndices(key_table, sort_options));
+}
- ARROW_ASSIGN_OR_RAISE(
- std::shared_ptr<Array> struct_arr,
- StructArray::Make(std::move(out_arrays), output_schema->fields()));
+Result<Datum> RunGroupBy(const BatchesWithSchema& input,
+ const std::vector<std::string>& key_names,
+ const std::vector<std::string>& segment_key_names,
+ const std::vector<Aggregate>& aggregates, bool
use_threads,
+ bool segmented = false, bool naive = false) {
+ if (segment_key_names.size() > 0) {
+ ARROW_ASSIGN_OR_RAISE(auto thread_pool,
arrow::internal::ThreadPool::Make(1));
+ ExecContext seq_ctx(default_memory_pool(), thread_pool.get());
+ return RunGroupBy(input, key_names, segment_key_names, aggregates,
&seq_ctx,
Review Comment:
It appears that `use_threads` determines whether we are using threads in the
mock input (i.e. simulating reading multiple files at the same time) while
`thread_pool` is used for the CPU execution of the plan nodes themselves.
We could change `use_threads` to `use_io_threads` or `simulate_io_threads`.
However, given this already existed, I don't think we should chase it in this
PR.
It'd be nice to refactor this entire file. It seems overly complex.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]