This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 1f3e6b5e38 [GLUTEN-7986][CH] Improve lazy expand for high cardinality
aggregation (#7995)
1f3e6b5e38 is described below
commit 1f3e6b5e38e38b7508ac8f67a2a426aa9137c0e1
Author: lgbo <[email protected]>
AuthorDate: Wed Nov 20 11:55:46 2024 +0800
[GLUTEN-7986][CH] Improve lazy expand for high cardinality aggregation
(#7995)
[CH] Improve lazy expand for high cardinality aggregation
---
.../local-engine/Operator/AdvancedExpandStep.cpp | 91 +++++++---------------
cpp-ch/local-engine/Operator/AdvancedExpandStep.h | 13 ++--
2 files changed, 34 insertions(+), 70 deletions(-)
diff --git a/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
b/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
index ee767b31bd..72721ce859 100644
--- a/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
+++ b/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
@@ -25,7 +25,7 @@
#include <Interpreters/Aggregator.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/castColumn.h>
-#include <Operator/GraceAggregatingTransform.h>
+#include <Operator/StreamingAggregatingStep.h>
#include <Processors/ResizeProcessor.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <QueryPipeline/Pipe.h>
@@ -110,8 +110,7 @@ void
AdvancedExpandStep::transformPipeline(DB::QueryPipelineBuilder & pipeline,
auto expand_output_header =
expand_processor->getOutputs().front().getHeader();
auto transform_params =
std::make_shared<DB::AggregatingTransformParams>(expand_output_header, params,
false);
- auto aggregate_processor
- =
std::make_shared<GraceAggregatingTransform>(expand_output_header,
transform_params, context, false, false);
+ auto aggregate_processor =
std::make_shared<StreamingAggregatingTransform>(context, expand_output_header,
transform_params);
DB::connect(expand_processor->getOutputs().back(),
aggregate_processor->getInputs().front());
new_processors.push_back(aggregate_processor);
auto aggregate_output_header =
aggregate_processor->getOutputs().front().getHeader();
@@ -146,8 +145,15 @@ AdvancedExpandTransform::AdvancedExpandTransform(
, project_set_exprs(project_set_exprs_)
, input_header(input_header_)
{
- for (size_t i = 0; i < project_set_exprs.getKinds().size(); ++i)
- is_low_cardinality_expand.push_back(true);
+ for (size_t i = 0; i < project_set_exprs.getExpandRows(); ++i)
+ {
+ const auto & kinds = project_set_exprs.getKinds()[i];
+ size_t n = 0;
+ for (size_t k = 0; k < grouping_keys; ++k)
+ if (kinds[k] == EXPAND_FIELD_KIND_SELECTION)
+ n += 1;
+ need_to_aggregate.push_back((n != grouping_keys));
+ }
for (auto & port : outputs)
output_ports.push_back(&port);
@@ -167,9 +173,11 @@ DB::IProcessor::Status AdvancedExpandTransform::prepare()
if (has_output)
{
- auto & output_port =
*output_ports[is_low_cardinality_expand[expand_expr_iterator - 1]];
+ auto & output_port =
*output_ports[need_to_aggregate[expand_expr_iterator - 1]];
if (output_port.canPush())
{
+ output_blocks[need_to_aggregate[expand_expr_iterator - 1]] += 1;
+ output_rows[need_to_aggregate[expand_expr_iterator - 1]] +=
output_chunk.getNumRows();
output_port.push(std::move(output_chunk));
has_output = false;
auto status = expand_expr_iterator >=
project_set_exprs.getExpandRows() ? Status::NeedData : Status::Ready;
@@ -185,17 +193,18 @@ DB::IProcessor::Status AdvancedExpandTransform::prepare()
{
if (input.isFinished())
{
- if (!cardinality_detect_blocks.empty())
- {
- input_finished = true;
- return Status::Ready;
- }
- else
- {
- output_ports[0]->finish();
- output_ports[1]->finish();
- return Status::Finished;
- }
+ LOG_DEBUG(
+ getLogger("AdvancedExpandTransform"),
+ "Input rows/blocks={}/{}. output rows/blocks=[{}/{}, {}/{}]",
+ input_rows,
+ input_blocks,
+ output_rows[0],
+ output_blocks[0],
+ output_rows[1],
+ output_blocks[1]);
+ output_ports[0]->finish();
+ output_ports[1]->finish();
+ return Status::Finished;
}
input.setNeeded();
@@ -204,6 +213,8 @@ DB::IProcessor::Status AdvancedExpandTransform::prepare()
input_chunk = input.pull(true);
has_input = true;
expand_expr_iterator = 0;
+ input_blocks += 1;
+ input_rows += input_chunk.getNumRows();
}
return Status::Ready;
@@ -211,55 +222,9 @@ DB::IProcessor::Status AdvancedExpandTransform::prepare()
void AdvancedExpandTransform::work()
{
- if (!input_finished && cardinality_detect_rows <
rows_for_detect_cardinality)
- {
-
cardinality_detect_blocks.push_back(input_header.cloneWithColumns(input_chunk.detachColumns()));
- cardinality_detect_rows += cardinality_detect_blocks.back().rows();
- has_input = false;
- }
- if ((input_finished || cardinality_detect_rows >=
rows_for_detect_cardinality) && !cardinality_detect_blocks.empty())
- detectCardinality();
- else if (!input_finished && cardinality_detect_rows <
rows_for_detect_cardinality)
- return;
-
- /// The phase of detecting grouping keys' cardinality is finished here.
expandInputChunk();
}
-void AdvancedExpandTransform::detectCardinality()
-{
- DB::Block block =
BlockUtil::concatenateBlocksMemoryEfficiently(std::move(cardinality_detect_blocks));
- std::vector<bool> is_col_low_cardinality;
- for (size_t i = 0; i < grouping_keys; ++i)
- {
- DB::WeakHash32 hash = block.getByPosition(i).column->getWeakHash32();
- std::unordered_set<UInt32> distinct_ids;
- const auto & data = hash.getData();
- for (size_t j = 0; j < cardinality_detect_rows; ++j)
- distinct_ids.insert(data[j]);
- size_t distinct_ids_cnt = distinct_ids.size();
- is_col_low_cardinality.push_back(distinct_ids.size() < 1000);
- }
-
- for (size_t i = 0; i < project_set_exprs.getExpandRows(); ++i)
- {
- const auto & kinds = project_set_exprs.getKinds()[i];
- for (size_t k = 0; k < grouping_keys; ++k)
- {
- const auto & kind = kinds[k];
- if (kind == EXPAND_FIELD_KIND_SELECTION &&
!is_col_low_cardinality[k])
- {
- is_low_cardinality_expand[i] = false;
- break;
- }
- }
- }
- LOG_DEBUG(getLogger("AdvancedExpandTransform"), "Low cardinality expand:
{}", fmt::join(is_low_cardinality_expand, ","));
-
- input_chunk = DB::Chunk(block.getColumns(), block.rows());
- cardinality_detect_blocks.clear();
-}
-
void AdvancedExpandTransform::expandInputChunk()
{
const auto & input_columns = input_chunk.getColumns();
diff --git a/cpp-ch/local-engine/Operator/AdvancedExpandStep.h
b/cpp-ch/local-engine/Operator/AdvancedExpandStep.h
index 2950846585..343a7f5227 100644
--- a/cpp-ch/local-engine/Operator/AdvancedExpandStep.h
+++ b/cpp-ch/local-engine/Operator/AdvancedExpandStep.h
@@ -78,19 +78,18 @@ private:
bool has_input = false;
bool has_output = false;
size_t expand_expr_iterator = 0;
- std::vector<bool> is_low_cardinality_expand;
- std::vector<size_t> approximate_grouping_keys;
- size_t cardinality_detect_rows = 0;
- std::vector<DB::Block> cardinality_detect_blocks;
- static constexpr size_t rows_for_detect_cardinality = 10000;
- bool input_finished = false;
+ std::vector<bool> need_to_aggregate;
std::vector<DB::OutputPort *> output_ports;
DB::Chunk input_chunk;
DB::Chunk output_chunk;
- void detectCardinality();
+ size_t input_blocks = 0;
+ size_t input_rows = 0;
+ std::vector<size_t> output_blocks = {0, 0};
+ std::vector<size_t> output_rows = {0, 0};
+
void expandInputChunk();
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]