This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 391355eeae [pipelineX](agg) Support streaming agg (#23341)
391355eeae is described below

commit 391355eeae12b44263cfc7a361145141ef45b721
Author: Gabriel <[email protected]>
AuthorDate: Wed Aug 23 08:38:12 2023 +0800

    [pipelineX](agg) Support streaming agg (#23341)
---
 be/src/pipeline/exec/aggregation_sink_operator.h   |  17 +-
 be/src/pipeline/exec/aggregation_source_operator.h |   9 +-
 .../exec/streaming_aggregation_sink_operator.cpp   | 312 +++++++++++++++++++++
 .../exec/streaming_aggregation_sink_operator.h     |  48 ++++
 .../exec/streaming_aggregation_source_operator.cpp |  33 +++
 .../exec/streaming_aggregation_source_operator.h   |  11 +
 be/src/pipeline/pipeline_x/dependency.h            |   3 +
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  31 +-
 8 files changed, 444 insertions(+), 20 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 7ca62b3cab..72136cd3f3 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -52,11 +52,11 @@ class AggSinkLocalState : public PipelineXSinkLocalState {
 public:
     AggSinkLocalState(DataSinkOperatorX* parent, RuntimeState* state);
 
-    Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) 
override;
 
     Status try_spill_disk(bool eos = false);
 
-private:
+protected:
     friend class AggSinkOperatorX;
 
     Status _execute_without_key(vectorized::Block* block);
@@ -310,7 +310,7 @@ private:
     executor _executor;
 };
 
-class AggSinkOperatorX final : public DataSinkOperatorX {
+class AggSinkOperatorX : public DataSinkOperatorX {
 public:
     AggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
     Status init(const TDataSink& tsink) override {
@@ -321,13 +321,13 @@ public:
 
     Status prepare(RuntimeState* state) override;
     Status open(RuntimeState* state) override;
-    Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) 
override;
+    virtual Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& 
info) override;
 
-    Status sink(RuntimeState* state, vectorized::Block* in_block,
-                SourceState source_state) override;
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override;
 
-    Status close(RuntimeState* state) override;
-    bool can_write(RuntimeState* state) override { return true; }
+    virtual Status close(RuntimeState* state) override;
+    virtual bool can_write(RuntimeState* state) override { return true; }
 
     void get_dependency(DependencySPtr& dependency) override {
         dependency.reset(new AggDependency(id()));
@@ -335,6 +335,7 @@ public:
 
 private:
     friend class AggSinkLocalState;
+    friend class StreamingAggSinkLocalState;
     std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
     bool _can_short_circuit = false;
 
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h 
b/be/src/pipeline/exec/aggregation_source_operator.h
index 54902c779c..6831a110e6 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -59,6 +59,7 @@ public:
 
 private:
     friend class AggSourceOperatorX;
+    friend class StreamingAggSourceOperatorX;
 
     void _close_without_key();
     void _close_with_serialized_key();
@@ -105,16 +106,16 @@ private:
     bool _agg_data_created_without_key = false;
 };
 
-class AggSourceOperatorX final : public OperatorXBase {
+class AggSourceOperatorX : public OperatorXBase {
 public:
     AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs,
                        std::string op_name);
-    bool can_read(RuntimeState* state) override;
+    virtual bool can_read(RuntimeState* state) override;
 
     Status setup_local_state(RuntimeState* state, LocalStateInfo& info) 
override;
 
-    Status get_block(RuntimeState* state, vectorized::Block* block,
-                     SourceState& source_state) override;
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+                             SourceState& source_state) override;
 
     Status close(RuntimeState* state) override;
     bool is_source() const override { return true; }
diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
index 3a9e502969..f0a7cde605 100644
--- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
@@ -92,4 +92,316 @@ OperatorPtr 
StreamingAggSinkOperatorBuilder::build_operator() {
     return std::make_shared<StreamingAggSinkOperator>(this, _node, 
_data_queue);
 }
 
+/// The minimum reduction factor (input rows divided by output rows) to grow 
hash tables
+/// in a streaming preaggregation, given that the hash tables are currently 
the given
+/// size or above. The sizes roughly correspond to hash table sizes where the 
bucket
+/// arrays will fit in  a cache level. Intuitively, we don't want the working 
set of the
+/// aggregation to expand to the next level of cache unless we're reducing the 
input
+/// enough to outweigh the increased memory latency we'll incur for each hash 
table
+/// lookup.
+///
+/// Note that the current reduction achieved is not always a good estimate of 
the
+/// final reduction. It may be biased either way depending on the ordering of 
the
+/// input. If the input order is random, we will underestimate the final 
reduction
+/// factor because the probability of a row having the same key as a previous 
row
+/// increases as more input is processed.  If the input order is correlated 
with the
+/// key, skew may bias the estimate. If high cardinality keys appear first, we
+/// may overestimate and if low cardinality keys appear first, we 
underestimate.
+/// To estimate the eventual reduction achieved, we estimate the final 
reduction
+/// using the planner's estimated input cardinality and the assumption that 
input
+/// is in a random order. This means that we assume that the reduction factor 
will
+/// increase over time.
+struct StreamingHtMinReductionEntry {
+    // Use 'streaming_ht_min_reduction' if the total size of hash table bucket 
directories in
+    // bytes is greater than this threshold.
+    int min_ht_mem;
+    // The minimum reduction factor to expand the hash tables.
+    double streaming_ht_min_reduction;
+};
+
+// TODO: experimentally tune these values and also programmatically get the 
cache size
+// of the machine that we're running on.
+static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
+        // Expand up to L2 cache always.
+        {0, 0.0},
+        // Expand into L3 cache if we look like we're getting some reduction.
+        // At present, The L2 cache is generally 1024k or more
+        {1024 * 1024, 1.1},
+        // Expand into main memory if we're getting a significant reduction.
+        // The L3 cache is generally 16MB or more
+        {16 * 1024 * 1024, 2.0},
+};
+
+static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
+        sizeof(STREAMING_HT_MIN_REDUCTION) / 
sizeof(STREAMING_HT_MIN_REDUCTION[0]);
+
+StreamingAggSinkLocalState::StreamingAggSinkLocalState(DataSinkOperatorX* 
parent,
+                                                       RuntimeState* state)
+        : AggSinkLocalState(parent, state),
+          _queue_byte_size_counter(nullptr),
+          _queue_size_counter(nullptr),
+          _streaming_agg_timer(nullptr) {}
+
+Status StreamingAggSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
+    RETURN_IF_ERROR(AggSinkLocalState::init(state, info));
+    _queue_byte_size_counter = ADD_COUNTER(profile(), "MaxSizeInBlockQueue", 
TUnit::BYTES);
+    _queue_size_counter = ADD_COUNTER(profile(), "MaxSizeOfBlockQueue", 
TUnit::UNIT);
+    _streaming_agg_timer = ADD_TIMER(profile(), "StreamingAggTime");
+    return Status::OK();
+}
+
+Status StreamingAggSinkLocalState::do_pre_agg(vectorized::Block* input_block,
+                                              vectorized::Block* output_block) 
{
+    RETURN_IF_ERROR(_pre_agg_with_serialized_key(input_block, output_block));
+
+    // pre stream agg need use _num_row_return to decide whether to do pre 
stream agg
+    _num_rows_returned += output_block->rows();
+    _make_nullable_output_key(output_block);
+    //    COUNTER_SET(_rows_returned_counter, _num_rows_returned);
+    _executor.update_memusage();
+    return Status::OK();
+}
+
+void StreamingAggSinkLocalState::_make_nullable_output_key(vectorized::Block* 
block) {
+    if (block->rows() != 0) {
+        for (auto cid : _dependency->make_nullable_keys()) {
+            block->get_by_position(cid).column = 
make_nullable(block->get_by_position(cid).column);
+            block->get_by_position(cid).type = 
make_nullable(block->get_by_position(cid).type);
+        }
+    }
+}
+
+bool StreamingAggSinkLocalState::_should_expand_preagg_hash_tables() {
+    if (!_should_expand_hash_table) {
+        return false;
+    }
+
+    return std::visit(
+            [&](auto&& agg_method) -> bool {
+                auto& hash_tbl = agg_method.data;
+                auto [ht_mem, ht_rows] =
+                        std::pair {hash_tbl.get_buffer_size_in_bytes(), 
hash_tbl.size()};
+
+                // Need some rows in tables to have valid statistics.
+                if (ht_rows == 0) {
+                    return true;
+                }
+
+                // Find the appropriate reduction factor in our table for the 
current hash table sizes.
+                int cache_level = 0;
+                while (cache_level + 1 < STREAMING_HT_MIN_REDUCTION_SIZE &&
+                       ht_mem >= STREAMING_HT_MIN_REDUCTION[cache_level + 
1].min_ht_mem) {
+                    ++cache_level;
+                }
+
+                // Compare the number of rows in the hash table with the 
number of input rows that
+                // were aggregated into it. Exclude passed through rows from 
this calculation since
+                // they were not in hash tables.
+                const int64_t input_rows = _shared_state->input_num_rows;
+                const int64_t aggregated_input_rows = input_rows - 
_num_rows_returned;
+                // TODO chenhao
+                //  const int64_t expected_input_rows = 
estimated_input_cardinality_ - num_rows_returned_;
+                double current_reduction = 
static_cast<double>(aggregated_input_rows) / ht_rows;
+
+                // TODO: workaround for IMPALA-2490: subplan node 
rows_returned counter may be
+                // inaccurate, which could lead to a divide by zero below.
+                if (aggregated_input_rows <= 0) {
+                    return true;
+                }
+
+                // Extrapolate the current reduction factor (r) using the 
formula
+                // R = 1 + (N / n) * (r - 1), where R is the reduction factor 
over the full input data
+                // set, N is the number of input rows, excluding 
passed-through rows, and n is the
+                // number of rows inserted or merged into the hash tables. 
This is a very rough
+                // approximation but is good enough to be useful.
+                // TODO: consider collecting more statistics to better 
estimate reduction.
+                //  double estimated_reduction = aggregated_input_rows >= 
expected_input_rows
+                //      ? current_reduction
+                //      : 1 + (expected_input_rows / aggregated_input_rows) * 
(current_reduction - 1);
+                double min_reduction =
+                        
STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
+
+                //  COUNTER_SET(preagg_estimated_reduction_, 
estimated_reduction);
+                //    COUNTER_SET(preagg_streaming_ht_min_reduction_, 
min_reduction);
+                //  return estimated_reduction > min_reduction;
+                _should_expand_hash_table = current_reduction > min_reduction;
+                return _should_expand_hash_table;
+            },
+            _agg_data->method_variant);
+}
+
+Status StreamingAggSinkLocalState::_pre_agg_with_serialized_key(
+        doris::vectorized::Block* in_block, doris::vectorized::Block* 
out_block) {
+    SCOPED_TIMER(_build_timer);
+    DCHECK(!_shared_state->probe_expr_ctxs.empty());
+
+    size_t key_size = _shared_state->probe_expr_ctxs.size();
+    vectorized::ColumnRawPtrs key_columns(key_size);
+    {
+        SCOPED_TIMER(_expr_timer);
+        for (size_t i = 0; i < key_size; ++i) {
+            int result_column_id = -1;
+            RETURN_IF_ERROR(
+                    _shared_state->probe_expr_ctxs[i]->execute(in_block, 
&result_column_id));
+            in_block->get_by_position(result_column_id).column =
+                    in_block->get_by_position(result_column_id)
+                            .column->convert_to_full_column_if_const();
+            key_columns[i] = 
in_block->get_by_position(result_column_id).column.get();
+        }
+    }
+
+    int rows = in_block->rows();
+    if (_places.size() < rows) {
+        _places.resize(rows);
+    }
+
+    // Stop expanding hash tables if we're not reducing the input 
sufficiently. As our
+    // hash tables expand out of each level of cache hierarchy, every hash 
table lookup
+    // will take longer. We also may not be able to expand hash tables because 
of memory
+    // pressure. In either case we should always use the remaining space in 
the hash table
+    // to avoid wasting memory.
+    // But for fixed hash map, it never need to expand
+    bool ret_flag = false;
+    RETURN_IF_ERROR(std::visit(
+            [&](auto&& agg_method) -> Status {
+                if (auto& hash_tbl = agg_method.data; 
hash_tbl.add_elem_size_overflow(rows)) {
+                    /// If too much memory is used during the pre-aggregation 
stage,
+                    /// it is better to output the data directly without 
performing further aggregation.
+                    const bool used_too_much_memory =
+                            (_parent->cast<StreamingAggSinkOperatorX>()
+                                             ._external_agg_bytes_threshold > 
0 &&
+                             _memory_usage() > 
_parent->cast<StreamingAggSinkOperatorX>()
+                                                       
._external_agg_bytes_threshold);
+                    // do not try to do agg, just init and serialize directly 
return the out_block
+                    if (!_should_expand_preagg_hash_tables() || 
used_too_much_memory) {
+                        SCOPED_TIMER(_streaming_agg_timer);
+                        ret_flag = true;
+
+                        // will serialize value data to string column.
+                        // non-nullable column(id in `_make_nullable_keys`)
+                        // will be converted to nullable.
+                        bool mem_reuse =
+                                _dependency->make_nullable_keys().empty() && 
out_block->mem_reuse();
+
+                        std::vector<vectorized::DataTypePtr> data_types;
+                        vectorized::MutableColumns value_columns;
+                        for (int i = 0; i < 
_shared_state->aggregate_evaluators.size(); ++i) {
+                            auto data_type = 
_shared_state->aggregate_evaluators[i]
+                                                     ->function()
+                                                     ->get_serialized_type();
+                            if (mem_reuse) {
+                                value_columns.emplace_back(
+                                        
std::move(*out_block->get_by_position(i + key_size).column)
+                                                .mutate());
+                            } else {
+                                // slot type of value it should always be 
string type
+                                
value_columns.emplace_back(_shared_state->aggregate_evaluators[i]
+                                                                   ->function()
+                                                                   
->create_serialize_column());
+                            }
+                            data_types.emplace_back(data_type);
+                        }
+
+                        for (int i = 0; i != 
_shared_state->aggregate_evaluators.size(); ++i) {
+                            SCOPED_TIMER(_serialize_data_timer);
+                            
RETURN_IF_ERROR(_shared_state->aggregate_evaluators[i]
+                                                    
->streaming_agg_serialize_to_column(
+                                                            in_block, 
value_columns[i], rows,
+                                                            _agg_arena_pool));
+                        }
+
+                        if (!mem_reuse) {
+                            vectorized::ColumnsWithTypeAndName 
columns_with_schema;
+                            for (int i = 0; i < key_size; ++i) {
+                                columns_with_schema.emplace_back(
+                                        key_columns[i]->clone_resized(rows),
+                                        
_shared_state->probe_expr_ctxs[i]->root()->data_type(),
+                                        
_shared_state->probe_expr_ctxs[i]->root()->expr_name());
+                            }
+                            for (int i = 0; i < value_columns.size(); ++i) {
+                                
columns_with_schema.emplace_back(std::move(value_columns[i]),
+                                                                 
data_types[i], "");
+                            }
+                            
out_block->swap(vectorized::Block(columns_with_schema));
+                        } else {
+                            for (int i = 0; i < key_size; ++i) {
+                                
std::move(*out_block->get_by_position(i).column)
+                                        .mutate()
+                                        ->insert_range_from(*key_columns[i], 
0, rows);
+                            }
+                        }
+                    }
+                }
+                return Status::OK();
+            },
+            _agg_data->method_variant));
+
+    if (!ret_flag) {
+        RETURN_IF_CATCH_EXCEPTION(_emplace_into_hash_table(_places.data(), 
key_columns, rows));
+
+        for (int i = 0; i < _shared_state->aggregate_evaluators.size(); ++i) {
+            
RETURN_IF_ERROR(_shared_state->aggregate_evaluators[i]->execute_batch_add(
+                    in_block, _dependency->offsets_of_aggregate_states()[i], 
_places.data(),
+                    _agg_arena_pool, _should_expand_hash_table));
+        }
+    }
+
+    return Status::OK();
+}
+
+StreamingAggSinkOperatorX::StreamingAggSinkOperatorX(ObjectPool* pool, const 
TPlanNode& tnode,
+                                                     const DescriptorTbl& 
descs)
+        : AggSinkOperatorX(pool, tnode, descs) {}
+
+bool StreamingAggSinkOperatorX::can_write(RuntimeState* state) {
+    // sink and source in diff threads
+    return state->get_sink_local_state(id())
+            ->cast<StreamingAggSinkLocalState>()
+            ._shared_state->data_queue->has_enough_space_to_push();
+}
+
+Status StreamingAggSinkOperatorX::sink(RuntimeState* state, vectorized::Block* 
in_block,
+                                       SourceState source_state) {
+    auto& local_state = 
state->get_sink_local_state(id())->cast<StreamingAggSinkLocalState>();
+    local_state._shared_state->input_num_rows += in_block->rows();
+    Status ret = Status::OK();
+    if (in_block && in_block->rows() > 0) {
+        auto block_from_ctx = 
local_state._shared_state->data_queue->get_free_block();
+        RETURN_IF_ERROR(local_state.do_pre_agg(in_block, 
block_from_ctx.get()));
+        if (block_from_ctx->rows() == 0) {
+            
local_state._shared_state->data_queue->push_free_block(std::move(block_from_ctx));
+        } else {
+            
local_state._shared_state->data_queue->push_block(std::move(block_from_ctx));
+        }
+    }
+
+    if (UNLIKELY(source_state == SourceState::FINISHED)) {
+        local_state._shared_state->data_queue->set_finish();
+    }
+    return Status::OK();
+}
+
+Status StreamingAggSinkOperatorX::setup_local_state(RuntimeState* state, 
LocalSinkStateInfo& info) {
+    auto local_state = StreamingAggSinkLocalState::create_shared(this, state);
+    state->emplace_sink_local_state(id(), local_state);
+    return local_state->init(state, info);
+}
+
+Status StreamingAggSinkOperatorX::close(RuntimeState* state) {
+    auto& local_state = 
state->get_sink_local_state(id())->cast<StreamingAggSinkLocalState>();
+    if (local_state._shared_state->data_queue &&
+        !local_state._shared_state->data_queue->is_finish()) {
+        // finish should be set, if not set here means error.
+        local_state._shared_state->data_queue->set_canceled();
+    }
+    if (local_state._shared_state->data_queue) {
+        COUNTER_SET(local_state._queue_size_counter,
+                    
local_state._shared_state->data_queue->max_size_of_queue());
+        COUNTER_SET(local_state._queue_byte_size_counter,
+                    
local_state._shared_state->data_queue->max_bytes_in_queue());
+    }
+    local_state._preagg_block.clear();
+    return AggSinkOperatorX::close(state);
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
index 02525c275e..1f11a619c2 100644
--- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
@@ -21,6 +21,7 @@
 
 #include <memory>
 
+#include "aggregation_sink_operator.h"
 #include "common/status.h"
 #include "operator.h"
 #include "util/runtime_profile.h"
@@ -69,5 +70,52 @@ private:
     std::shared_ptr<DataQueue> _data_queue;
 };
 
+class StreamingAggSinkOperatorX;
+
+class StreamingAggSinkLocalState final : public AggSinkLocalState {
+    ENABLE_FACTORY_CREATOR(StreamingAggSinkLocalState);
+
+public:
+    StreamingAggSinkLocalState(DataSinkOperatorX* parent, RuntimeState* state);
+
+    Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    Status do_pre_agg(vectorized::Block* input_block, vectorized::Block* 
output_block);
+
+private:
+    friend class StreamingAggSinkOperatorX;
+
+    Status _pre_agg_with_serialized_key(doris::vectorized::Block* in_block,
+                                        doris::vectorized::Block* out_block);
+    void _make_nullable_output_key(vectorized::Block* block);
+    bool _should_expand_preagg_hash_tables();
+
+    vectorized::Block _preagg_block = vectorized::Block();
+
+    vectorized::PODArray<vectorized::AggregateDataPtr> _places;
+
+    RuntimeProfile::Counter* _queue_byte_size_counter;
+    RuntimeProfile::Counter* _queue_size_counter;
+    RuntimeProfile::Counter* _streaming_agg_timer;
+
+    bool _should_expand_hash_table = true;
+    int64_t _num_rows_returned = 0;
+};
+
+class StreamingAggSinkOperatorX final : public AggSinkOperatorX {
+public:
+    StreamingAggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
+    Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) 
override;
+
+    Status sink(RuntimeState* state, vectorized::Block* in_block,
+                SourceState source_state) override;
+
+    Status close(RuntimeState* state) override;
+    bool can_write(RuntimeState* state) override;
+
+    void get_dependency(DependencySPtr& dependency) override {
+        dependency.reset(new AggDependency(id()));
+    }
+};
+
 } // namespace pipeline
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
index c343253903..c73dd26c19 100644
--- a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
@@ -21,6 +21,7 @@
 
 #include "pipeline/exec/data_queue.h"
 #include "pipeline/exec/operator.h"
+#include "pipeline/pipeline_x/dependency.h"
 #include "runtime/descriptors.h"
 #include "util/runtime_profile.h"
 #include "vec/core/block.h"
@@ -71,5 +72,37 @@ OperatorPtr 
StreamingAggSourceOperatorBuilder::build_operator() {
     return std::make_shared<StreamingAggSourceOperator>(this, _node, 
_data_queue);
 }
 
+StreamingAggSourceOperatorX::StreamingAggSourceOperatorX(ObjectPool* pool, 
const TPlanNode& tnode,
+                                                         const DescriptorTbl& 
descs,
+                                                         std::string op_name)
+        : AggSourceOperatorX(pool, tnode, descs, op_name) {}
+
+Status StreamingAggSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* block,
+                                              SourceState& source_state) {
+    auto& local_state = state->get_local_state(id())->cast<AggLocalState>();
+    if (!local_state._shared_state->data_queue->data_exhausted()) {
+        std::unique_ptr<vectorized::Block> agg_block;
+        
RETURN_IF_ERROR(local_state._shared_state->data_queue->get_block_from_queue(&agg_block));
+
+        if (local_state._shared_state->data_queue->data_exhausted()) {
+            RETURN_IF_ERROR(AggSourceOperatorX::get_block(state, block, 
source_state));
+        } else {
+            block->swap(*agg_block);
+            agg_block->clear_column_data(row_desc().num_materialized_slots());
+            
local_state._shared_state->data_queue->push_free_block(std::move(agg_block));
+        }
+    } else {
+        RETURN_IF_ERROR(AggSourceOperatorX::get_block(state, block, 
source_state));
+    }
+
+    return Status::OK();
+}
+
+bool StreamingAggSourceOperatorX::can_read(RuntimeState* state) {
+    return state->get_local_state(id())
+            ->cast<AggLocalState>()
+            ._shared_state->data_queue->has_data_or_finished();
+}
+
 } // namespace pipeline
 } // namespace doris
diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_source_operator.h
index ef13e03c4b..b423e95b73 100644
--- a/be/src/pipeline/exec/streaming_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.h
@@ -22,6 +22,7 @@
 
 #include "common/status.h"
 #include "operator.h"
+#include "pipeline/exec/aggregation_source_operator.h"
 #include "vec/exec/vaggregation_node.h"
 
 namespace doris {
@@ -58,5 +59,15 @@ private:
     std::shared_ptr<DataQueue> _data_queue;
 };
 
+class StreamingAggSourceOperatorX final : public AggSourceOperatorX {
+public:
+    StreamingAggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode,
+                                const DescriptorTbl& descs, std::string 
op_name);
+    bool can_read(RuntimeState* state) override;
+
+    Status get_block(RuntimeState* state, vectorized::Block* block,
+                     SourceState& source_state) override;
+};
+
 } // namespace pipeline
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 21678a5194..478623637e 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include "pipeline/exec/data_queue.h"
 #include "vec/common/sort/sorter.h"
 #include "vec/exec/vaggregation_node.h"
 
@@ -46,6 +47,7 @@ public:
     AggSharedState() {
         agg_data = std::make_unique<vectorized::AggregatedDataVariants>();
         agg_arena_pool = std::make_unique<vectorized::Arena>();
+        data_queue = std::make_unique<DataQueue>(1);
     }
     void init_spill_partition_helper(size_t spill_partition_count_bits) {
         spill_partition_helper =
@@ -63,6 +65,7 @@ public:
     size_t input_num_rows = 0;
     std::vector<vectorized::AggregateDataPtr> values;
     std::unique_ptr<vectorized::Arena> agg_profile_arena;
+    std::unique_ptr<DataQueue> data_queue;
 };
 
 class AggDependency final : public Dependency {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 7f351f3146..0cb7d24405 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -51,6 +51,8 @@
 #include "pipeline/exec/scan_operator.h"
 #include "pipeline/exec/sort_sink_operator.h"
 #include "pipeline/exec/sort_source_operator.h"
+#include "pipeline/exec/streaming_aggregation_sink_operator.h"
+#include "pipeline/exec/streaming_aggregation_source_operator.h"
 #include "pipeline/task_scheduler.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
@@ -467,14 +469,27 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         break;
     }
     case TPlanNodeType::AGGREGATION_NODE: {
-        op.reset(new AggSourceOperatorX(pool, tnode, descs, 
"AggSourceXOperator"));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
-
-        cur_pipe = add_pipeline();
-        DataSinkOperatorXPtr sink;
-        sink.reset(new AggSinkOperatorX(pool, tnode, descs));
-        RETURN_IF_ERROR(cur_pipe->set_sink(sink));
-        RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
+        if (tnode.agg_node.__isset.use_streaming_preaggregation &&
+            tnode.agg_node.use_streaming_preaggregation) {
+            op.reset(new StreamingAggSourceOperatorX(pool, tnode, descs,
+                                                     
"StreamingAggSourceXOperator"));
+            RETURN_IF_ERROR(cur_pipe->add_operator(op));
+
+            cur_pipe = add_pipeline();
+            DataSinkOperatorXPtr sink;
+            sink.reset(new StreamingAggSinkOperatorX(pool, tnode, descs));
+            RETURN_IF_ERROR(cur_pipe->set_sink(sink));
+            RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
+        } else {
+            op.reset(new AggSourceOperatorX(pool, tnode, descs, 
"AggSourceXOperator"));
+            RETURN_IF_ERROR(cur_pipe->add_operator(op));
+
+            cur_pipe = add_pipeline();
+            DataSinkOperatorXPtr sink;
+            sink.reset(new AggSinkOperatorX(pool, tnode, descs));
+            RETURN_IF_ERROR(cur_pipe->set_sink(sink));
+            RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
+        }
         break;
     }
     case TPlanNodeType::SORT_NODE: {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to