This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 391355eeae [pipelineX](agg) Support streaming agg (#23341)
391355eeae is described below
commit 391355eeae12b44263cfc7a361145141ef45b721
Author: Gabriel <[email protected]>
AuthorDate: Wed Aug 23 08:38:12 2023 +0800
[pipelineX](agg) Support streaming agg (#23341)
---
be/src/pipeline/exec/aggregation_sink_operator.h | 17 +-
be/src/pipeline/exec/aggregation_source_operator.h | 9 +-
.../exec/streaming_aggregation_sink_operator.cpp | 312 +++++++++++++++++++++
.../exec/streaming_aggregation_sink_operator.h | 48 ++++
.../exec/streaming_aggregation_source_operator.cpp | 33 +++
.../exec/streaming_aggregation_source_operator.h | 11 +
be/src/pipeline/pipeline_x/dependency.h | 3 +
.../pipeline_x/pipeline_x_fragment_context.cpp | 31 +-
8 files changed, 444 insertions(+), 20 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 7ca62b3cab..72136cd3f3 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -52,11 +52,11 @@ class AggSinkLocalState : public PipelineXSinkLocalState {
public:
AggSinkLocalState(DataSinkOperatorX* parent, RuntimeState* state);
- Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+ virtual Status init(RuntimeState* state, LocalSinkStateInfo& info)
override;
Status try_spill_disk(bool eos = false);
-private:
+protected:
friend class AggSinkOperatorX;
Status _execute_without_key(vectorized::Block* block);
@@ -310,7 +310,7 @@ private:
executor _executor;
};
-class AggSinkOperatorX final : public DataSinkOperatorX {
+class AggSinkOperatorX : public DataSinkOperatorX {
public:
AggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs);
Status init(const TDataSink& tsink) override {
@@ -321,13 +321,13 @@ public:
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
- Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info)
override;
+ virtual Status setup_local_state(RuntimeState* state, LocalSinkStateInfo&
info) override;
- Status sink(RuntimeState* state, vectorized::Block* in_block,
- SourceState source_state) override;
+ virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+ SourceState source_state) override;
- Status close(RuntimeState* state) override;
- bool can_write(RuntimeState* state) override { return true; }
+ virtual Status close(RuntimeState* state) override;
+ virtual bool can_write(RuntimeState* state) override { return true; }
void get_dependency(DependencySPtr& dependency) override {
dependency.reset(new AggDependency(id()));
@@ -335,6 +335,7 @@ public:
private:
friend class AggSinkLocalState;
+ friend class StreamingAggSinkLocalState;
std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
bool _can_short_circuit = false;
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h
b/be/src/pipeline/exec/aggregation_source_operator.h
index 54902c779c..6831a110e6 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -59,6 +59,7 @@ public:
private:
friend class AggSourceOperatorX;
+ friend class StreamingAggSourceOperatorX;
void _close_without_key();
void _close_with_serialized_key();
@@ -105,16 +106,16 @@ private:
bool _agg_data_created_without_key = false;
};
-class AggSourceOperatorX final : public OperatorXBase {
+class AggSourceOperatorX : public OperatorXBase {
public:
AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs,
std::string op_name);
- bool can_read(RuntimeState* state) override;
+ virtual bool can_read(RuntimeState* state) override;
Status setup_local_state(RuntimeState* state, LocalStateInfo& info)
override;
- Status get_block(RuntimeState* state, vectorized::Block* block,
- SourceState& source_state) override;
+ virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+ SourceState& source_state) override;
Status close(RuntimeState* state) override;
bool is_source() const override { return true; }
diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
index 3a9e502969..f0a7cde605 100644
--- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
@@ -92,4 +92,316 @@ OperatorPtr
StreamingAggSinkOperatorBuilder::build_operator() {
return std::make_shared<StreamingAggSinkOperator>(this, _node,
_data_queue);
}
+/// The minimum reduction factor (input rows divided by output rows) to grow
hash tables
+/// in a streaming preaggregation, given that the hash tables are currently
the given
+/// size or above. The sizes roughly correspond to hash table sizes where the
bucket
+/// arrays will fit in a cache level. Intuitively, we don't want the working
set of the
+/// aggregation to expand to the next level of cache unless we're reducing the
input
+/// enough to outweigh the increased memory latency we'll incur for each hash
table
+/// lookup.
+///
+/// Note that the current reduction achieved is not always a good estimate of
the
+/// final reduction. It may be biased either way depending on the ordering of
the
+/// input. If the input order is random, we will underestimate the final
reduction
+/// factor because the probability of a row having the same key as a previous
row
+/// increases as more input is processed. If the input order is correlated
with the
+/// key, skew may bias the estimate. If high cardinality keys appear first, we
+/// may overestimate and if low cardinality keys appear first, we
underestimate.
+/// To estimate the eventual reduction achieved, we estimate the final
reduction
+/// using the planner's estimated input cardinality and the assumption that
input
+/// is in a random order. This means that we assume that the reduction factor
will
+/// increase over time.
+struct StreamingHtMinReductionEntry {
+ // Use 'streaming_ht_min_reduction' if the total size of hash table bucket
directories in
+ // bytes is greater than this threshold.
+ int min_ht_mem;
+ // The minimum reduction factor to expand the hash tables.
+ double streaming_ht_min_reduction;
+};
+
+// TODO: experimentally tune these values and also programmatically get the
cache size
+// of the machine that we're running on.
+static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
+ // Expand up to L2 cache always.
+ {0, 0.0},
+ // Expand into L3 cache if we look like we're getting some reduction.
+ // At present, The L2 cache is generally 1024k or more
+ {1024 * 1024, 1.1},
+ // Expand into main memory if we're getting a significant reduction.
+ // The L3 cache is generally 16MB or more
+ {16 * 1024 * 1024, 2.0},
+};
+
+static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
+ sizeof(STREAMING_HT_MIN_REDUCTION) /
sizeof(STREAMING_HT_MIN_REDUCTION[0]);
+
+StreamingAggSinkLocalState::StreamingAggSinkLocalState(DataSinkOperatorX*
parent,
+ RuntimeState* state)
+ : AggSinkLocalState(parent, state),
+ _queue_byte_size_counter(nullptr),
+ _queue_size_counter(nullptr),
+ _streaming_agg_timer(nullptr) {}
+
+Status StreamingAggSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info) {
+ RETURN_IF_ERROR(AggSinkLocalState::init(state, info));
+ _queue_byte_size_counter = ADD_COUNTER(profile(), "MaxSizeInBlockQueue",
TUnit::BYTES);
+ _queue_size_counter = ADD_COUNTER(profile(), "MaxSizeOfBlockQueue",
TUnit::UNIT);
+ _streaming_agg_timer = ADD_TIMER(profile(), "StreamingAggTime");
+ return Status::OK();
+}
+
+Status StreamingAggSinkLocalState::do_pre_agg(vectorized::Block* input_block,
+ vectorized::Block* output_block)
{
+ RETURN_IF_ERROR(_pre_agg_with_serialized_key(input_block, output_block));
+
+ // pre stream agg need use _num_row_return to decide whether to do pre
stream agg
+ _num_rows_returned += output_block->rows();
+ _make_nullable_output_key(output_block);
+ // COUNTER_SET(_rows_returned_counter, _num_rows_returned);
+ _executor.update_memusage();
+ return Status::OK();
+}
+
+void StreamingAggSinkLocalState::_make_nullable_output_key(vectorized::Block*
block) {
+ if (block->rows() != 0) {
+ for (auto cid : _dependency->make_nullable_keys()) {
+ block->get_by_position(cid).column =
make_nullable(block->get_by_position(cid).column);
+ block->get_by_position(cid).type =
make_nullable(block->get_by_position(cid).type);
+ }
+ }
+}
+
+bool StreamingAggSinkLocalState::_should_expand_preagg_hash_tables() {
+ if (!_should_expand_hash_table) {
+ return false;
+ }
+
+ return std::visit(
+ [&](auto&& agg_method) -> bool {
+ auto& hash_tbl = agg_method.data;
+ auto [ht_mem, ht_rows] =
+ std::pair {hash_tbl.get_buffer_size_in_bytes(),
hash_tbl.size()};
+
+ // Need some rows in tables to have valid statistics.
+ if (ht_rows == 0) {
+ return true;
+ }
+
+ // Find the appropriate reduction factor in our table for the
current hash table sizes.
+ int cache_level = 0;
+ while (cache_level + 1 < STREAMING_HT_MIN_REDUCTION_SIZE &&
+ ht_mem >= STREAMING_HT_MIN_REDUCTION[cache_level +
1].min_ht_mem) {
+ ++cache_level;
+ }
+
+ // Compare the number of rows in the hash table with the
number of input rows that
+ // were aggregated into it. Exclude passed through rows from
this calculation since
+ // they were not in hash tables.
+ const int64_t input_rows = _shared_state->input_num_rows;
+ const int64_t aggregated_input_rows = input_rows -
_num_rows_returned;
+ // TODO chenhao
+ // const int64_t expected_input_rows =
estimated_input_cardinality_ - num_rows_returned_;
+ double current_reduction =
static_cast<double>(aggregated_input_rows) / ht_rows;
+
+ // TODO: workaround for IMPALA-2490: subplan node
rows_returned counter may be
+ // inaccurate, which could lead to a divide by zero below.
+ if (aggregated_input_rows <= 0) {
+ return true;
+ }
+
+ // Extrapolate the current reduction factor (r) using the
formula
+ // R = 1 + (N / n) * (r - 1), where R is the reduction factor
over the full input data
+ // set, N is the number of input rows, excluding
passed-through rows, and n is the
+ // number of rows inserted or merged into the hash tables.
This is a very rough
+ // approximation but is good enough to be useful.
+ // TODO: consider collecting more statistics to better
estimate reduction.
+ // double estimated_reduction = aggregated_input_rows >=
expected_input_rows
+ // ? current_reduction
+ // : 1 + (expected_input_rows / aggregated_input_rows) *
(current_reduction - 1);
+ double min_reduction =
+
STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
+
+ // COUNTER_SET(preagg_estimated_reduction_,
estimated_reduction);
+ // COUNTER_SET(preagg_streaming_ht_min_reduction_,
min_reduction);
+ // return estimated_reduction > min_reduction;
+ _should_expand_hash_table = current_reduction > min_reduction;
+ return _should_expand_hash_table;
+ },
+ _agg_data->method_variant);
+}
+
+Status StreamingAggSinkLocalState::_pre_agg_with_serialized_key(
+ doris::vectorized::Block* in_block, doris::vectorized::Block*
out_block) {
+ SCOPED_TIMER(_build_timer);
+ DCHECK(!_shared_state->probe_expr_ctxs.empty());
+
+ size_t key_size = _shared_state->probe_expr_ctxs.size();
+ vectorized::ColumnRawPtrs key_columns(key_size);
+ {
+ SCOPED_TIMER(_expr_timer);
+ for (size_t i = 0; i < key_size; ++i) {
+ int result_column_id = -1;
+ RETURN_IF_ERROR(
+ _shared_state->probe_expr_ctxs[i]->execute(in_block,
&result_column_id));
+ in_block->get_by_position(result_column_id).column =
+ in_block->get_by_position(result_column_id)
+ .column->convert_to_full_column_if_const();
+ key_columns[i] =
in_block->get_by_position(result_column_id).column.get();
+ }
+ }
+
+ int rows = in_block->rows();
+ if (_places.size() < rows) {
+ _places.resize(rows);
+ }
+
+ // Stop expanding hash tables if we're not reducing the input
sufficiently. As our
+ // hash tables expand out of each level of cache hierarchy, every hash
table lookup
+ // will take longer. We also may not be able to expand hash tables because
of memory
+ // pressure. In either case we should always use the remaining space in
the hash table
+ // to avoid wasting memory.
+ // But for fixed hash map, it never need to expand
+ bool ret_flag = false;
+ RETURN_IF_ERROR(std::visit(
+ [&](auto&& agg_method) -> Status {
+ if (auto& hash_tbl = agg_method.data;
hash_tbl.add_elem_size_overflow(rows)) {
+ /// If too much memory is used during the pre-aggregation
stage,
+ /// it is better to output the data directly without
performing further aggregation.
+ const bool used_too_much_memory =
+ (_parent->cast<StreamingAggSinkOperatorX>()
+ ._external_agg_bytes_threshold >
0 &&
+ _memory_usage() >
_parent->cast<StreamingAggSinkOperatorX>()
+
._external_agg_bytes_threshold);
+ // do not try to do agg, just init and serialize directly
return the out_block
+ if (!_should_expand_preagg_hash_tables() ||
used_too_much_memory) {
+ SCOPED_TIMER(_streaming_agg_timer);
+ ret_flag = true;
+
+ // will serialize value data to string column.
+ // non-nullable column(id in `_make_nullable_keys`)
+ // will be converted to nullable.
+ bool mem_reuse =
+ _dependency->make_nullable_keys().empty() &&
out_block->mem_reuse();
+
+ std::vector<vectorized::DataTypePtr> data_types;
+ vectorized::MutableColumns value_columns;
+ for (int i = 0; i <
_shared_state->aggregate_evaluators.size(); ++i) {
+ auto data_type =
_shared_state->aggregate_evaluators[i]
+ ->function()
+ ->get_serialized_type();
+ if (mem_reuse) {
+ value_columns.emplace_back(
+
std::move(*out_block->get_by_position(i + key_size).column)
+ .mutate());
+ } else {
+ // slot type of value it should always be
string type
+
value_columns.emplace_back(_shared_state->aggregate_evaluators[i]
+ ->function()
+
->create_serialize_column());
+ }
+ data_types.emplace_back(data_type);
+ }
+
+ for (int i = 0; i !=
_shared_state->aggregate_evaluators.size(); ++i) {
+ SCOPED_TIMER(_serialize_data_timer);
+
RETURN_IF_ERROR(_shared_state->aggregate_evaluators[i]
+
->streaming_agg_serialize_to_column(
+ in_block,
value_columns[i], rows,
+ _agg_arena_pool));
+ }
+
+ if (!mem_reuse) {
+ vectorized::ColumnsWithTypeAndName
columns_with_schema;
+ for (int i = 0; i < key_size; ++i) {
+ columns_with_schema.emplace_back(
+ key_columns[i]->clone_resized(rows),
+
_shared_state->probe_expr_ctxs[i]->root()->data_type(),
+
_shared_state->probe_expr_ctxs[i]->root()->expr_name());
+ }
+ for (int i = 0; i < value_columns.size(); ++i) {
+
columns_with_schema.emplace_back(std::move(value_columns[i]),
+
data_types[i], "");
+ }
+
out_block->swap(vectorized::Block(columns_with_schema));
+ } else {
+ for (int i = 0; i < key_size; ++i) {
+
std::move(*out_block->get_by_position(i).column)
+ .mutate()
+ ->insert_range_from(*key_columns[i],
0, rows);
+ }
+ }
+ }
+ }
+ return Status::OK();
+ },
+ _agg_data->method_variant));
+
+ if (!ret_flag) {
+ RETURN_IF_CATCH_EXCEPTION(_emplace_into_hash_table(_places.data(),
key_columns, rows));
+
+ for (int i = 0; i < _shared_state->aggregate_evaluators.size(); ++i) {
+
RETURN_IF_ERROR(_shared_state->aggregate_evaluators[i]->execute_batch_add(
+ in_block, _dependency->offsets_of_aggregate_states()[i],
_places.data(),
+ _agg_arena_pool, _should_expand_hash_table));
+ }
+ }
+
+ return Status::OK();
+}
+
+StreamingAggSinkOperatorX::StreamingAggSinkOperatorX(ObjectPool* pool, const
TPlanNode& tnode,
+ const DescriptorTbl&
descs)
+ : AggSinkOperatorX(pool, tnode, descs) {}
+
+bool StreamingAggSinkOperatorX::can_write(RuntimeState* state) {
+ // sink and source in diff threads
+ return state->get_sink_local_state(id())
+ ->cast<StreamingAggSinkLocalState>()
+ ._shared_state->data_queue->has_enough_space_to_push();
+}
+
+Status StreamingAggSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
in_block,
+ SourceState source_state) {
+ auto& local_state =
state->get_sink_local_state(id())->cast<StreamingAggSinkLocalState>();
+ local_state._shared_state->input_num_rows += in_block->rows();
+ Status ret = Status::OK();
+ if (in_block && in_block->rows() > 0) {
+ auto block_from_ctx =
local_state._shared_state->data_queue->get_free_block();
+ RETURN_IF_ERROR(local_state.do_pre_agg(in_block,
block_from_ctx.get()));
+ if (block_from_ctx->rows() == 0) {
+
local_state._shared_state->data_queue->push_free_block(std::move(block_from_ctx));
+ } else {
+
local_state._shared_state->data_queue->push_block(std::move(block_from_ctx));
+ }
+ }
+
+ if (UNLIKELY(source_state == SourceState::FINISHED)) {
+ local_state._shared_state->data_queue->set_finish();
+ }
+ return Status::OK();
+}
+
+Status StreamingAggSinkOperatorX::setup_local_state(RuntimeState* state,
LocalSinkStateInfo& info) {
+ auto local_state = StreamingAggSinkLocalState::create_shared(this, state);
+ state->emplace_sink_local_state(id(), local_state);
+ return local_state->init(state, info);
+}
+
+Status StreamingAggSinkOperatorX::close(RuntimeState* state) {
+ auto& local_state =
state->get_sink_local_state(id())->cast<StreamingAggSinkLocalState>();
+ if (local_state._shared_state->data_queue &&
+ !local_state._shared_state->data_queue->is_finish()) {
+ // finish should be set, if not set here means error.
+ local_state._shared_state->data_queue->set_canceled();
+ }
+ if (local_state._shared_state->data_queue) {
+ COUNTER_SET(local_state._queue_size_counter,
+
local_state._shared_state->data_queue->max_size_of_queue());
+ COUNTER_SET(local_state._queue_byte_size_counter,
+
local_state._shared_state->data_queue->max_bytes_in_queue());
+ }
+ local_state._preagg_block.clear();
+ return AggSinkOperatorX::close(state);
+}
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
index 02525c275e..1f11a619c2 100644
--- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
@@ -21,6 +21,7 @@
#include <memory>
+#include "aggregation_sink_operator.h"
#include "common/status.h"
#include "operator.h"
#include "util/runtime_profile.h"
@@ -69,5 +70,52 @@ private:
std::shared_ptr<DataQueue> _data_queue;
};
+class StreamingAggSinkOperatorX;
+
+class StreamingAggSinkLocalState final : public AggSinkLocalState {
+ ENABLE_FACTORY_CREATOR(StreamingAggSinkLocalState);
+
+public:
+ StreamingAggSinkLocalState(DataSinkOperatorX* parent, RuntimeState* state);
+
+ Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+ Status do_pre_agg(vectorized::Block* input_block, vectorized::Block*
output_block);
+
+private:
+ friend class StreamingAggSinkOperatorX;
+
+ Status _pre_agg_with_serialized_key(doris::vectorized::Block* in_block,
+ doris::vectorized::Block* out_block);
+ void _make_nullable_output_key(vectorized::Block* block);
+ bool _should_expand_preagg_hash_tables();
+
+ vectorized::Block _preagg_block = vectorized::Block();
+
+ vectorized::PODArray<vectorized::AggregateDataPtr> _places;
+
+ RuntimeProfile::Counter* _queue_byte_size_counter;
+ RuntimeProfile::Counter* _queue_size_counter;
+ RuntimeProfile::Counter* _streaming_agg_timer;
+
+ bool _should_expand_hash_table = true;
+ int64_t _num_rows_returned = 0;
+};
+
+class StreamingAggSinkOperatorX final : public AggSinkOperatorX {
+public:
+ StreamingAggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs);
+ Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info)
override;
+
+ Status sink(RuntimeState* state, vectorized::Block* in_block,
+ SourceState source_state) override;
+
+ Status close(RuntimeState* state) override;
+ bool can_write(RuntimeState* state) override;
+
+ void get_dependency(DependencySPtr& dependency) override {
+ dependency.reset(new AggDependency(id()));
+ }
+};
+
} // namespace pipeline
} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
index c343253903..c73dd26c19 100644
--- a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
@@ -21,6 +21,7 @@
#include "pipeline/exec/data_queue.h"
#include "pipeline/exec/operator.h"
+#include "pipeline/pipeline_x/dependency.h"
#include "runtime/descriptors.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
@@ -71,5 +72,37 @@ OperatorPtr
StreamingAggSourceOperatorBuilder::build_operator() {
return std::make_shared<StreamingAggSourceOperator>(this, _node,
_data_queue);
}
+StreamingAggSourceOperatorX::StreamingAggSourceOperatorX(ObjectPool* pool,
const TPlanNode& tnode,
+ const DescriptorTbl&
descs,
+ std::string op_name)
+ : AggSourceOperatorX(pool, tnode, descs, op_name) {}
+
+Status StreamingAggSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* block,
+ SourceState& source_state) {
+ auto& local_state = state->get_local_state(id())->cast<AggLocalState>();
+ if (!local_state._shared_state->data_queue->data_exhausted()) {
+ std::unique_ptr<vectorized::Block> agg_block;
+
RETURN_IF_ERROR(local_state._shared_state->data_queue->get_block_from_queue(&agg_block));
+
+ if (local_state._shared_state->data_queue->data_exhausted()) {
+ RETURN_IF_ERROR(AggSourceOperatorX::get_block(state, block,
source_state));
+ } else {
+ block->swap(*agg_block);
+ agg_block->clear_column_data(row_desc().num_materialized_slots());
+
local_state._shared_state->data_queue->push_free_block(std::move(agg_block));
+ }
+ } else {
+ RETURN_IF_ERROR(AggSourceOperatorX::get_block(state, block,
source_state));
+ }
+
+ return Status::OK();
+}
+
+bool StreamingAggSourceOperatorX::can_read(RuntimeState* state) {
+ return state->get_local_state(id())
+ ->cast<AggLocalState>()
+ ._shared_state->data_queue->has_data_or_finished();
+}
+
} // namespace pipeline
} // namespace doris
diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.h
b/be/src/pipeline/exec/streaming_aggregation_source_operator.h
index ef13e03c4b..b423e95b73 100644
--- a/be/src/pipeline/exec/streaming_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.h
@@ -22,6 +22,7 @@
#include "common/status.h"
#include "operator.h"
+#include "pipeline/exec/aggregation_source_operator.h"
#include "vec/exec/vaggregation_node.h"
namespace doris {
@@ -58,5 +59,15 @@ private:
std::shared_ptr<DataQueue> _data_queue;
};
+class StreamingAggSourceOperatorX final : public AggSourceOperatorX {
+public:
+ StreamingAggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode,
+ const DescriptorTbl& descs, std::string
op_name);
+ bool can_read(RuntimeState* state) override;
+
+ Status get_block(RuntimeState* state, vectorized::Block* block,
+ SourceState& source_state) override;
+};
+
} // namespace pipeline
} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 21678a5194..478623637e 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -17,6 +17,7 @@
#pragma once
+#include "pipeline/exec/data_queue.h"
#include "vec/common/sort/sorter.h"
#include "vec/exec/vaggregation_node.h"
@@ -46,6 +47,7 @@ public:
AggSharedState() {
agg_data = std::make_unique<vectorized::AggregatedDataVariants>();
agg_arena_pool = std::make_unique<vectorized::Arena>();
+ data_queue = std::make_unique<DataQueue>(1);
}
void init_spill_partition_helper(size_t spill_partition_count_bits) {
spill_partition_helper =
@@ -63,6 +65,7 @@ public:
size_t input_num_rows = 0;
std::vector<vectorized::AggregateDataPtr> values;
std::unique_ptr<vectorized::Arena> agg_profile_arena;
+ std::unique_ptr<DataQueue> data_queue;
};
class AggDependency final : public Dependency {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 7f351f3146..0cb7d24405 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -51,6 +51,8 @@
#include "pipeline/exec/scan_operator.h"
#include "pipeline/exec/sort_sink_operator.h"
#include "pipeline/exec/sort_source_operator.h"
+#include "pipeline/exec/streaming_aggregation_sink_operator.h"
+#include "pipeline/exec/streaming_aggregation_source_operator.h"
#include "pipeline/task_scheduler.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
@@ -467,14 +469,27 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
break;
}
case TPlanNodeType::AGGREGATION_NODE: {
- op.reset(new AggSourceOperatorX(pool, tnode, descs,
"AggSourceXOperator"));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
-
- cur_pipe = add_pipeline();
- DataSinkOperatorXPtr sink;
- sink.reset(new AggSinkOperatorX(pool, tnode, descs));
- RETURN_IF_ERROR(cur_pipe->set_sink(sink));
- RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
+ if (tnode.agg_node.__isset.use_streaming_preaggregation &&
+ tnode.agg_node.use_streaming_preaggregation) {
+ op.reset(new StreamingAggSourceOperatorX(pool, tnode, descs,
+
"StreamingAggSourceXOperator"));
+ RETURN_IF_ERROR(cur_pipe->add_operator(op));
+
+ cur_pipe = add_pipeline();
+ DataSinkOperatorXPtr sink;
+ sink.reset(new StreamingAggSinkOperatorX(pool, tnode, descs));
+ RETURN_IF_ERROR(cur_pipe->set_sink(sink));
+ RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode,
_runtime_state.get()));
+ } else {
+ op.reset(new AggSourceOperatorX(pool, tnode, descs,
"AggSourceXOperator"));
+ RETURN_IF_ERROR(cur_pipe->add_operator(op));
+
+ cur_pipe = add_pipeline();
+ DataSinkOperatorXPtr sink;
+ sink.reset(new AggSinkOperatorX(pool, tnode, descs));
+ RETURN_IF_ERROR(cur_pipe->set_sink(sink));
+ RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode,
_runtime_state.get()));
+ }
break;
}
case TPlanNodeType::SORT_NODE: {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]