This is an automated email from the ASF dual-hosted git repository.
lgbo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 17d9cd8dfb [GLUTEN-7959][CH] `AdvancedExpandStep` generates less row
than expected (#7960)
17d9cd8dfb is described below
commit 17d9cd8dfbfa0afc551d71be954553f959ce7f26
Author: lgbo <[email protected]>
AuthorDate: Mon Nov 18 10:18:43 2024 +0800
[GLUTEN-7959][CH] `AdvancedExpandStep` generates less row than expected
(#7960)
* detect cardinality
* fix aggregate params
---
cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp | 19 ++++++-------------
1 file changed, 6 insertions(+), 13 deletions(-)
diff --git a/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
b/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
index 6ac5f5fc8f..b777731a91 100644
--- a/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
+++ b/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
@@ -115,6 +115,7 @@ void
AdvancedExpandStep::transformPipeline(DB::QueryPipelineBuilder & pipeline,
break;
aggregate_grouping_keys.push_back(col.name);
}
+ // partial to partial aggregate
DB::Aggregator::Params params(
aggregate_grouping_keys,
aggregate_descriptions,
@@ -122,10 +123,10 @@ void
AdvancedExpandStep::transformPipeline(DB::QueryPipelineBuilder & pipeline,
settings[DB::Setting::max_rows_to_group_by],
settings[DB::Setting::group_by_overflow_mode],
settings[DB::Setting::group_by_two_level_threshold],
- settings[DB::Setting::group_by_two_level_threshold_bytes],
- settings[DB::Setting::max_bytes_before_external_group_by],
+ 0,
+ 0,
settings[DB::Setting::empty_result_for_aggregation_by_empty_set],
- context->getTempDataOnDisk(),
+ nullptr,
settings[DB::Setting::max_threads],
settings[DB::Setting::min_free_disk_space_for_temporary_data],
true,
@@ -149,7 +150,7 @@ void
AdvancedExpandStep::transformPipeline(DB::QueryPipelineBuilder & pipeline,
new_processors.push_back(expand_processor);
auto expand_output_header =
expand_processor->getOutputs().front().getHeader();
-
+
auto transform_params =
std::make_shared<DB::AggregatingTransformParams>(expand_output_header, params,
false);
auto aggregate_processor
=
std::make_shared<GraceAggregatingTransform>(expand_output_header,
transform_params, context, false, false);
@@ -188,14 +189,10 @@ AdvancedExpandTransform::AdvancedExpandTransform(
, input_header(input_header_)
{
for (size_t i = 0; i < project_set_exprs.getKinds().size(); ++i)
- {
is_low_cardinality_expand.push_back(true);
- }
for (auto & port : outputs)
- {
output_ports.push_back(&port);
- }
}
DB::IProcessor::Status AdvancedExpandTransform::prepare()
@@ -245,9 +242,7 @@ DB::IProcessor::Status AdvancedExpandTransform::prepare()
input.setNeeded();
if (!input.hasData())
- {
return Status::NeedData;
- }
input_chunk = input.pull(true);
has_input = true;
expand_expr_iterator = 0;
@@ -265,9 +260,7 @@ void AdvancedExpandTransform::work()
has_input = false;
}
if ((input_finished || cardinality_detect_rows >=
rows_for_detect_cardinality) && !cardinality_detect_blocks.empty())
- {
detectCardinality();
- }
else if (!input_finished && cardinality_detect_rows <
rows_for_detect_cardinality)
return;
@@ -281,7 +274,7 @@ void AdvancedExpandTransform::detectCardinality()
std::vector<bool> is_col_low_cardinality;
for (size_t i = 0; i < grouping_keys; ++i)
{
- DB::WeakHash32 hash(cardinality_detect_rows);
+ DB::WeakHash32 hash = block.getByPosition(i).column->getWeakHash32();
std::unordered_set<UInt32> distinct_ids;
const auto & data = hash.getData();
for (size_t j = 0; j < cardinality_detect_rows; ++j)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]