This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 1f3e6b5e38 [GLUTEN-7986][CH] Improve lazy expand for high cardinality 
aggregation (#7995)
1f3e6b5e38 is described below

commit 1f3e6b5e38e38b7508ac8f67a2a426aa9137c0e1
Author: lgbo <[email protected]>
AuthorDate: Wed Nov 20 11:55:46 2024 +0800

    [GLUTEN-7986][CH] Improve lazy expand for high cardinality aggregation 
(#7995)
    
    [CH] Improve lazy expand for high cardinality aggregation
---
 .../local-engine/Operator/AdvancedExpandStep.cpp   | 91 +++++++---------------
 cpp-ch/local-engine/Operator/AdvancedExpandStep.h  | 13 ++--
 2 files changed, 34 insertions(+), 70 deletions(-)

diff --git a/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp 
b/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
index ee767b31bd..72721ce859 100644
--- a/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
+++ b/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
@@ -25,7 +25,7 @@
 #include <Interpreters/Aggregator.h>
 #include <Interpreters/ExpressionActions.h>
 #include <Interpreters/castColumn.h>
-#include <Operator/GraceAggregatingTransform.h>
+#include <Operator/StreamingAggregatingStep.h>
 #include <Processors/ResizeProcessor.h>
 #include <Processors/Transforms/ExpressionTransform.h>
 #include <QueryPipeline/Pipe.h>
@@ -110,8 +110,7 @@ void 
AdvancedExpandStep::transformPipeline(DB::QueryPipelineBuilder & pipeline,
             auto expand_output_header = 
expand_processor->getOutputs().front().getHeader();
 
             auto transform_params = 
std::make_shared<DB::AggregatingTransformParams>(expand_output_header, params, 
false);
-            auto aggregate_processor
-                = 
std::make_shared<GraceAggregatingTransform>(expand_output_header, 
transform_params, context, false, false);
+            auto aggregate_processor = 
std::make_shared<StreamingAggregatingTransform>(context, expand_output_header, 
transform_params);
             DB::connect(expand_processor->getOutputs().back(), 
aggregate_processor->getInputs().front());
             new_processors.push_back(aggregate_processor);
             auto aggregate_output_header = 
aggregate_processor->getOutputs().front().getHeader();
@@ -146,8 +145,15 @@ AdvancedExpandTransform::AdvancedExpandTransform(
     , project_set_exprs(project_set_exprs_)
     , input_header(input_header_)
 {
-    for (size_t i = 0; i < project_set_exprs.getKinds().size(); ++i)
-        is_low_cardinality_expand.push_back(true);
+    for (size_t i = 0; i < project_set_exprs.getExpandRows(); ++i)
+    {
+        const auto & kinds = project_set_exprs.getKinds()[i];
+        size_t n = 0;
+        for (size_t k = 0; k < grouping_keys; ++k)
+            if (kinds[k] == EXPAND_FIELD_KIND_SELECTION)
+                n += 1;
+        need_to_aggregate.push_back((n != grouping_keys));
+    }
 
     for (auto & port : outputs)
         output_ports.push_back(&port);
@@ -167,9 +173,11 @@ DB::IProcessor::Status AdvancedExpandTransform::prepare()
 
     if (has_output)
     {
-        auto & output_port = 
*output_ports[is_low_cardinality_expand[expand_expr_iterator - 1]];
+        auto & output_port = 
*output_ports[need_to_aggregate[expand_expr_iterator - 1]];
         if (output_port.canPush())
         {
+            output_blocks[need_to_aggregate[expand_expr_iterator - 1]] += 1;
+            output_rows[need_to_aggregate[expand_expr_iterator - 1]] += 
output_chunk.getNumRows();
             output_port.push(std::move(output_chunk));
             has_output = false;
             auto status = expand_expr_iterator >= 
project_set_exprs.getExpandRows() ? Status::NeedData : Status::Ready;
@@ -185,17 +193,18 @@ DB::IProcessor::Status AdvancedExpandTransform::prepare()
     {
         if (input.isFinished())
         {
-            if (!cardinality_detect_blocks.empty())
-            {
-                input_finished = true;
-                return Status::Ready;
-            }
-            else
-            {
-                output_ports[0]->finish();
-                output_ports[1]->finish();
-                return Status::Finished;
-            }
+            LOG_DEBUG(
+                getLogger("AdvancedExpandTransform"),
+                "Input rows/blocks={}/{}. output rows/blocks=[{}/{}, {}/{}]",
+                input_rows,
+                input_blocks,
+                output_rows[0],
+                output_blocks[0],
+                output_rows[1],
+                output_blocks[1]);
+            output_ports[0]->finish();
+            output_ports[1]->finish();
+            return Status::Finished;
         }
 
         input.setNeeded();
@@ -204,6 +213,8 @@ DB::IProcessor::Status AdvancedExpandTransform::prepare()
         input_chunk = input.pull(true);
         has_input = true;
         expand_expr_iterator = 0;
+        input_blocks += 1;
+        input_rows += input_chunk.getNumRows();
     }
 
     return Status::Ready;
@@ -211,55 +222,9 @@ DB::IProcessor::Status AdvancedExpandTransform::prepare()
 
 void AdvancedExpandTransform::work()
 {
-    if (!input_finished && cardinality_detect_rows < 
rows_for_detect_cardinality)
-    {
-        
cardinality_detect_blocks.push_back(input_header.cloneWithColumns(input_chunk.detachColumns()));
-        cardinality_detect_rows += cardinality_detect_blocks.back().rows();
-        has_input = false;
-    }
-    if ((input_finished || cardinality_detect_rows >= 
rows_for_detect_cardinality) && !cardinality_detect_blocks.empty())
-        detectCardinality();
-    else if (!input_finished && cardinality_detect_rows < 
rows_for_detect_cardinality)
-        return;
-
-    /// The phase of detecting grouping keys' cardinality is finished here.
     expandInputChunk();
 }
 
-void AdvancedExpandTransform::detectCardinality()
-{
-    DB::Block block = 
BlockUtil::concatenateBlocksMemoryEfficiently(std::move(cardinality_detect_blocks));
-    std::vector<bool> is_col_low_cardinality;
-    for (size_t i = 0; i < grouping_keys; ++i)
-    {
-        DB::WeakHash32 hash = block.getByPosition(i).column->getWeakHash32();
-        std::unordered_set<UInt32> distinct_ids;
-        const auto & data = hash.getData();
-        for (size_t j = 0; j < cardinality_detect_rows; ++j)
-            distinct_ids.insert(data[j]);
-        size_t distinct_ids_cnt = distinct_ids.size();
-        is_col_low_cardinality.push_back(distinct_ids.size() < 1000);
-    }
-
-    for (size_t i = 0; i < project_set_exprs.getExpandRows(); ++i)
-    {
-        const auto & kinds = project_set_exprs.getKinds()[i];
-        for (size_t k = 0; k < grouping_keys; ++k)
-        {
-            const auto & kind = kinds[k];
-            if (kind == EXPAND_FIELD_KIND_SELECTION && 
!is_col_low_cardinality[k])
-            {
-                is_low_cardinality_expand[i] = false;
-                break;
-            }
-        }
-    }
-    LOG_DEBUG(getLogger("AdvancedExpandTransform"), "Low cardinality expand: 
{}", fmt::join(is_low_cardinality_expand, ","));
-
-    input_chunk = DB::Chunk(block.getColumns(), block.rows());
-    cardinality_detect_blocks.clear();
-}
-
 void AdvancedExpandTransform::expandInputChunk()
 {
     const auto & input_columns = input_chunk.getColumns();
diff --git a/cpp-ch/local-engine/Operator/AdvancedExpandStep.h 
b/cpp-ch/local-engine/Operator/AdvancedExpandStep.h
index 2950846585..343a7f5227 100644
--- a/cpp-ch/local-engine/Operator/AdvancedExpandStep.h
+++ b/cpp-ch/local-engine/Operator/AdvancedExpandStep.h
@@ -78,19 +78,18 @@ private:
     bool has_input = false;
     bool has_output = false;
     size_t expand_expr_iterator = 0;
-    std::vector<bool> is_low_cardinality_expand;
-    std::vector<size_t> approximate_grouping_keys;
-    size_t cardinality_detect_rows = 0;
-    std::vector<DB::Block> cardinality_detect_blocks;
-    static constexpr size_t rows_for_detect_cardinality = 10000;
-    bool input_finished = false;
+    std::vector<bool> need_to_aggregate;
 
     std::vector<DB::OutputPort *> output_ports;
 
     DB::Chunk input_chunk;
     DB::Chunk output_chunk;
 
-    void detectCardinality();
+    size_t input_blocks = 0;
+    size_t input_rows = 0;
+    std::vector<size_t> output_blocks = {0, 0};
+    std::vector<size_t> output_rows = {0, 0};
+
     void expandInputChunk();
 };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to