This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 55e572df82 [pipelineX](analytic operator) Support analytic operator
(#23444)
55e572df82 is described below
commit 55e572df8238dc23263673505e71478c6f56ded9
Author: Gabriel <[email protected]>
AuthorDate: Thu Aug 24 23:05:29 2023 +0800
[pipelineX](analytic operator) Support analytic operator (#23444)
---
be/src/pipeline/exec/analytic_sink_operator.cpp | 199 +++++++++
be/src/pipeline/exec/analytic_sink_operator.h | 63 +++
be/src/pipeline/exec/analytic_source_operator.cpp | 476 ++++++++++++++++++++-
be/src/pipeline/exec/analytic_source_operator.h | 120 +++++-
be/src/pipeline/exec/exchange_sink_operator.cpp | 2 +-
be/src/pipeline/exec/operator.cpp | 18 +-
be/src/pipeline/exec/operator.h | 9 +-
be/src/pipeline/exec/sort_sink_operator.cpp | 5 +-
be/src/pipeline/pipeline_x/dependency.cpp | 118 +++++
be/src/pipeline/pipeline_x/dependency.h | 42 ++
.../pipeline_x/pipeline_x_fragment_context.cpp | 21 +-
be/src/vec/exec/vanalytic_eval_node.h | 3 +-
12 files changed, 1059 insertions(+), 17 deletions(-)
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index a0634c81f6..1ebe342e85 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -26,4 +26,203 @@ namespace doris::pipeline {
OPERATOR_CODE_GENERATOR(AnalyticSinkOperator, StreamingOperator)
+Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo&
info) {
+ RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info));
+ _mem_tracker = std::make_unique<MemTracker>("ExchangeSinkLocalState:");
+ auto& p = _parent->cast<AnalyticSinkOperatorX>();
+ _dependency = (AnalyticDependency*)info.dependency;
+ _shared_state = (AnalyticSharedState*)_dependency->shared_state();
+
_shared_state->partition_by_column_idxs.resize(p._partition_by_eq_expr_ctxs.size());
+
_shared_state->ordey_by_column_idxs.resize(p._order_by_eq_expr_ctxs.size());
+
+ _profile = state->obj_pool()->add(new
RuntimeProfile("AnalyticSinkLocalState"));
+ _memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
+ _blocks_memory_usage = _profile->AddHighWaterMarkCounter("Blocks",
TUnit::BYTES, "MemoryUsage");
+ _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime");
+
+ size_t agg_size = p._agg_expr_ctxs.size();
+ _agg_expr_ctxs.resize(agg_size);
+ _shared_state->agg_input_columns.resize(agg_size);
+ for (int i = 0; i < agg_size; ++i) {
+ _shared_state->agg_input_columns[i].resize(p._num_agg_input[i]);
+ _agg_expr_ctxs[i].resize(p._agg_expr_ctxs[i].size());
+ for (int j = 0; j < p._agg_expr_ctxs[i].size(); ++j) {
+ RETURN_IF_ERROR(p._agg_expr_ctxs[i][j]->clone(state,
_agg_expr_ctxs[i][j]));
+ }
+
+ for (size_t j = 0; j < _agg_expr_ctxs[i].size(); ++j) {
+ _shared_state->agg_input_columns[i][j] =
+ _agg_expr_ctxs[i][j]->root()->data_type()->create_column();
+ }
+ }
+
_shared_state->partition_by_eq_expr_ctxs.resize(p._partition_by_eq_expr_ctxs.size());
+ for (size_t i = 0; i < _shared_state->partition_by_eq_expr_ctxs.size();
i++) {
+ RETURN_IF_ERROR(p._partition_by_eq_expr_ctxs[i]->clone(
+ state, _shared_state->partition_by_eq_expr_ctxs[i]));
+ }
+
_shared_state->order_by_eq_expr_ctxs.resize(p._order_by_eq_expr_ctxs.size());
+ for (size_t i = 0; i < _shared_state->order_by_eq_expr_ctxs.size(); i++) {
+ RETURN_IF_ERROR(
+ p._order_by_eq_expr_ctxs[i]->clone(state,
_shared_state->order_by_eq_expr_ctxs[i]));
+ }
+ return Status::OK();
+}
+
+AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, const
TPlanNode& tnode,
+ const DescriptorTbl& descs)
+ : DataSinkOperatorX(tnode.node_id),
+ _buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id
+ ? tnode.analytic_node.buffered_tuple_id
+ : 0) {
+ _name = "SortSinkOperatorX";
+}
+
+Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState*
state) {
+ const TAnalyticNode& analytic_node = tnode.analytic_node;
+ size_t agg_size = analytic_node.analytic_functions.size();
+ _agg_expr_ctxs.resize(agg_size);
+ _num_agg_input.resize(agg_size);
+ for (int i = 0; i < agg_size; ++i) {
+ const TExpr& desc = analytic_node.analytic_functions[i];
+ _num_agg_input[i] = desc.nodes[0].num_children;
+ int node_idx = 0;
+ for (int j = 0; j < desc.nodes[0].num_children; ++j) {
+ ++node_idx;
+ vectorized::VExprSPtr expr;
+ vectorized::VExprContextSPtr ctx;
+ RETURN_IF_ERROR(
+ vectorized::VExpr::create_tree_from_thrift(desc.nodes,
&node_idx, expr, ctx));
+ _agg_expr_ctxs[i].emplace_back(ctx);
+ }
+ }
+
+
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(analytic_node.partition_exprs,
+
_partition_by_eq_expr_ctxs));
+
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(analytic_node.order_by_exprs,
+
_order_by_eq_expr_ctxs));
+ _agg_functions_size = agg_size;
+ return Status::OK();
+}
+
+Status AnalyticSinkOperatorX::prepare(RuntimeState* state) {
+ for (const auto& ctx : _agg_expr_ctxs) {
+ vectorized::VExpr::prepare(ctx, state, _child_x->row_desc());
+ }
+ if (!_partition_by_eq_expr_ctxs.empty() ||
!_order_by_eq_expr_ctxs.empty()) {
+ vector<TTupleId> tuple_ids;
+ tuple_ids.push_back(_child_x->row_desc().tuple_descriptors()[0]->id());
+ tuple_ids.push_back(_buffered_tuple_id);
+ RowDescriptor cmp_row_desc(state->desc_tbl(), tuple_ids,
vector<bool>(2, false));
+ if (!_partition_by_eq_expr_ctxs.empty()) {
+ RETURN_IF_ERROR(
+ vectorized::VExpr::prepare(_partition_by_eq_expr_ctxs,
state, cmp_row_desc));
+ }
+ if (!_order_by_eq_expr_ctxs.empty()) {
+ RETURN_IF_ERROR(
+ vectorized::VExpr::prepare(_order_by_eq_expr_ctxs, state,
cmp_row_desc));
+ }
+ }
+ _profile = state->obj_pool()->add(new
RuntimeProfile("AnalyticSinkOperatorX"));
+ return Status::OK();
+}
+
+bool AnalyticSinkOperatorX::can_write(RuntimeState* state) {
+ return state->get_sink_local_state(id())
+ ->cast<AnalyticSinkLocalState>()
+ ._shared_state->need_more_input;
+}
+
+Status AnalyticSinkOperatorX::open(RuntimeState* state) {
+ RETURN_IF_ERROR(vectorized::VExpr::open(_partition_by_eq_expr_ctxs,
state));
+ RETURN_IF_ERROR(vectorized::VExpr::open(_order_by_eq_expr_ctxs, state));
+ for (size_t i = 0; i < _agg_functions_size; ++i) {
+ RETURN_IF_ERROR(vectorized::VExpr::open(_agg_expr_ctxs[i], state));
+ }
+ return Status::OK();
+}
+
+Status AnalyticSinkOperatorX::setup_local_state(RuntimeState* state,
LocalSinkStateInfo& info) {
+ auto local_state = AnalyticSinkLocalState::create_shared(this, state);
+ state->emplace_sink_local_state(id(), local_state);
+ return local_state->init(state, info);
+}
+
+Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state,
vectorized::Block* input_block,
+ SourceState source_state) {
+ auto& local_state =
state->get_sink_local_state(id())->cast<AnalyticSinkLocalState>();
+ local_state._shared_state->input_eos = source_state ==
SourceState::FINISHED;
+ if (local_state._shared_state->input_eos && input_block->rows() == 0) {
+ local_state._shared_state->need_more_input = false;
+ return Status::OK();
+ }
+
+ local_state._shared_state->input_block_first_row_positions.emplace_back(
+ local_state._shared_state->input_total_rows);
+ size_t block_rows = input_block->rows();
+ local_state._shared_state->input_total_rows += block_rows;
+ local_state._shared_state->all_block_end.block_num =
+ local_state._shared_state->input_blocks.size();
+ local_state._shared_state->all_block_end.row_num = block_rows;
+ local_state._shared_state->all_block_end.pos =
local_state._shared_state->input_total_rows;
+
+ if (local_state._shared_state->origin_cols
+ .empty()) { //record origin columns, maybe be after this,
could cast some column but no need to save
+ for (int c = 0; c < input_block->columns(); ++c) {
+ local_state._shared_state->origin_cols.emplace_back(c);
+ }
+ }
+
+ for (size_t i = 0; i < _agg_functions_size;
+ ++i) { //insert _agg_input_columns, execute calculate for its
+ for (size_t j = 0; j < local_state._agg_expr_ctxs[i].size(); ++j) {
+ RETURN_IF_ERROR(_insert_range_column(
+ input_block, local_state._agg_expr_ctxs[i][j],
+ local_state._shared_state->agg_input_columns[i][j].get(),
block_rows));
+ }
+ }
+ //record column idx in block
+ for (size_t i = 0; i <
local_state._shared_state->partition_by_eq_expr_ctxs.size(); ++i) {
+ int result_col_id = -1;
+
RETURN_IF_ERROR(local_state._shared_state->partition_by_eq_expr_ctxs[i]->execute(
+ input_block, &result_col_id));
+ DCHECK_GE(result_col_id, 0);
+ local_state._shared_state->partition_by_column_idxs[i] = result_col_id;
+ }
+
+ for (size_t i = 0; i <
local_state._shared_state->order_by_eq_expr_ctxs.size(); ++i) {
+ int result_col_id = -1;
+
RETURN_IF_ERROR(local_state._shared_state->order_by_eq_expr_ctxs[i]->execute(
+ input_block, &result_col_id));
+ DCHECK_GE(result_col_id, 0);
+ local_state._shared_state->ordey_by_column_idxs[i] = result_col_id;
+ }
+
+ local_state.mem_tracker()->consume(input_block->allocated_bytes());
+ local_state._blocks_memory_usage->add(input_block->allocated_bytes());
+
+ //TODO: if need improvement, the is a tips to maintain a free queue,
+ //so the memory could reuse, no need to new/delete again;
+
local_state._shared_state->input_blocks.emplace_back(std::move(*input_block));
+ {
+ SCOPED_TIMER(local_state._evaluation_timer);
+ local_state._shared_state->found_partition_end =
+ local_state._dependency->get_partition_by_end();
+ }
+ local_state._shared_state->need_more_input =
+ local_state._dependency->whether_need_next_partition(
+ local_state._shared_state->found_partition_end);
+ return Status::OK();
+}
+
+Status AnalyticSinkOperatorX::_insert_range_column(vectorized::Block* block,
+ const
vectorized::VExprContextSPtr& expr,
+ vectorized::IColumn*
dst_column, size_t length) {
+ int result_col_id = -1;
+ RETURN_IF_ERROR(expr->execute(block, &result_col_id));
+ DCHECK_GE(result_col_id, 0);
+ auto column =
block->get_by_position(result_col_id).column->convert_to_full_column_if_const();
+ dst_column->insert_range_from(*column, 0, length);
+ return Status::OK();
+}
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h
b/be/src/pipeline/exec/analytic_sink_operator.h
index 2d16eb1c70..ed15cf3ab0 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -21,6 +21,7 @@
#include <stdint.h>
#include "operator.h"
+#include "pipeline/pipeline_x/dependency.h"
#include "vec/exec/vanalytic_eval_node.h"
namespace doris {
@@ -43,5 +44,67 @@ public:
bool can_write() override { return _node->can_write(); }
};
+class AnalyticSinkOperatorX;
+
+class AnalyticSinkLocalState : public PipelineXSinkLocalState {
+ ENABLE_FACTORY_CREATOR(AnalyticSinkLocalState);
+
+public:
+ AnalyticSinkLocalState(DataSinkOperatorX* parent, RuntimeState* state)
+ : PipelineXSinkLocalState(parent, state) {}
+
+ Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+
+private:
+ friend class AnalyticSinkOperatorX;
+ AnalyticDependency* _dependency;
+ AnalyticSharedState* _shared_state;
+
+ RuntimeProfile::Counter* _memory_usage_counter;
+ RuntimeProfile::Counter* _evaluation_timer;
+ RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage;
+
+ std::vector<vectorized::VExprContextSPtrs> _agg_expr_ctxs;
+};
+
+class AnalyticSinkOperatorX final : public DataSinkOperatorX {
+public:
+ AnalyticSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs);
+ Status init(const TDataSink& tsink) override {
+ return Status::InternalError("{} should not init with TPlanNode",
_name);
+ }
+
+ Status init(const TPlanNode& tnode, RuntimeState* state) override;
+
+ Status prepare(RuntimeState* state) override;
+ Status open(RuntimeState* state) override;
+ Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info)
override;
+
+ Status sink(RuntimeState* state, vectorized::Block* in_block,
+ SourceState source_state) override;
+
+ bool can_write(RuntimeState* state) override;
+
+ void get_dependency(DependencySPtr& dependency) override {
+ dependency.reset(new AnalyticDependency(id()));
+ }
+
+private:
+ Status _insert_range_column(vectorized::Block* block, const
vectorized::VExprContextSPtr& expr,
+ vectorized::IColumn* dst_column, size_t
length);
+
+ friend class AnalyticSinkLocalState;
+
+ std::vector<vectorized::VExprContextSPtrs> _agg_expr_ctxs;
+ vectorized::VExprContextSPtrs _partition_by_eq_expr_ctxs;
+ vectorized::VExprContextSPtrs _order_by_eq_expr_ctxs;
+
+ size_t _agg_functions_size = 0;
+
+ const TTupleId _buffered_tuple_id;
+
+ std::vector<size_t> _num_agg_input;
+};
+
} // namespace pipeline
} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp
b/be/src/pipeline/exec/analytic_source_operator.cpp
index 20a4bfb9cc..298a174d0d 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -25,4 +25,478 @@ namespace doris::pipeline {
OPERATOR_CODE_GENERATOR(AnalyticSourceOperator, SourceOperator)
-} // namespace doris::pipeline
\ No newline at end of file
+AnalyticLocalState::AnalyticLocalState(RuntimeState* state, OperatorXBase*
parent)
+ : PipelineXLocalState(state, parent),
+ _output_block_index(0),
+ _window_end_position(0),
+ _next_partition(false),
+ _rows_start_offset(0),
+ _rows_end_offset(0),
+ _fn_place_ptr(nullptr),
+ _agg_functions_size(0),
+ _agg_functions_created(false) {}
+
+Status AnalyticLocalState::init(RuntimeState* state, LocalStateInfo& info) {
+ RETURN_IF_ERROR(PipelineXLocalState::init(state, info));
+ _agg_arena_pool = std::make_unique<vectorized::Arena>();
+ _dependency = (AnalyticDependency*)info.dependency;
+ _shared_state = (AnalyticSharedState*)_dependency->shared_state();
+
+ auto& p = _parent->cast<AnalyticSourceOperatorX>();
+ _agg_functions_size = p._agg_functions.size();
+
+ _memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
+ _blocks_memory_usage =
+ profile()->AddHighWaterMarkCounter("Blocks", TUnit::BYTES,
"MemoryUsage");
+ _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime");
+
+ _agg_functions.resize(p._agg_functions.size());
+ for (size_t i = 0; i < _agg_functions.size(); i++) {
+ _agg_functions[i] = p._agg_functions[i]->clone(state,
state->obj_pool());
+ }
+
+ _fn_place_ptr =
_agg_arena_pool->aligned_alloc(p._total_size_of_aggregate_states,
+ p._align_aggregate_states);
+
+ if (!p._has_window) { //haven't set window, Unbounded: [unbounded
preceding,unbounded following]
+ _executor.get_next =
std::bind<Status>(&AnalyticLocalState::_get_next_for_partition, this,
+ std::placeholders::_1);
+
+ } else if (p._has_range_window) {
+ if (!p._has_window_end) { //haven't set end, so same as PARTITION,
[unbounded preceding, unbounded following]
+ _executor.get_next =
std::bind<Status>(&AnalyticLocalState::_get_next_for_partition,
+ this,
std::placeholders::_1);
+
+ } else {
+ _executor.get_next =
std::bind<Status>(&AnalyticLocalState::_get_next_for_range, this,
+ std::placeholders::_1);
+ }
+
+ } else {
+ if (!p._has_window_start &&
+ !p._has_window_end) { //haven't set start and end, same as
PARTITION
+ _executor.get_next =
std::bind<Status>(&AnalyticLocalState::_get_next_for_partition,
+ this,
std::placeholders::_1);
+
+ } else {
+ if (p._has_window_start) { //calculate start boundary
+ TAnalyticWindowBoundary b = p._window.window_start;
+ if (b.__isset.rows_offset_value) { //[offset , ]
+ _rows_start_offset = b.rows_offset_value;
+ if (b.type == TAnalyticWindowBoundaryType::PRECEDING) {
+ _rows_start_offset *= -1; //preceding--> negative
+ } //current_row 0
+ } else { //following positive
+ DCHECK_EQ(b.type,
TAnalyticWindowBoundaryType::CURRENT_ROW); //[current row, ]
+ _rows_start_offset = 0;
+ }
+ }
+
+ if (p._has_window_end) { //calculate end boundary
+ TAnalyticWindowBoundary b = p._window.window_end;
+ if (b.__isset.rows_offset_value) { //[ , offset]
+ _rows_end_offset = b.rows_offset_value;
+ if (b.type == TAnalyticWindowBoundaryType::PRECEDING) {
+ _rows_end_offset *= -1;
+ }
+ } else {
+ DCHECK_EQ(b.type,
TAnalyticWindowBoundaryType::CURRENT_ROW); //[ ,current row]
+ _rows_end_offset = 0;
+ }
+ }
+
+ _executor.get_next =
std::bind<Status>(&AnalyticLocalState::_get_next_for_rows, this,
+ std::placeholders::_1);
+ }
+ }
+ _executor.insert_result =
+ std::bind<void>(&AnalyticLocalState::_insert_result_info, this,
std::placeholders::_1);
+ _executor.execute =
+ std::bind<void>(&AnalyticLocalState::_execute_for_win_func, this,
std::placeholders::_1,
+ std::placeholders::_2, std::placeholders::_3,
std::placeholders::_4);
+
+ RETURN_IF_CATCH_EXCEPTION(_create_agg_status());
+ return Status::OK();
+}
+
+Status AnalyticLocalState::_reset_agg_status() {
+ for (size_t i = 0; i < _agg_functions_size; ++i) {
+ _agg_functions[i]->reset(
+ _fn_place_ptr +
+
_parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states[i]);
+ }
+ return Status::OK();
+}
+
+Status AnalyticLocalState::_create_agg_status() {
+ for (size_t i = 0; i < _agg_functions_size; ++i) {
+ try {
+ _agg_functions[i]->create(
+ _fn_place_ptr +
+
_parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states[i]);
+ } catch (...) {
+ for (int j = 0; j < i; ++j) {
+ _agg_functions[j]->destroy(
+ _fn_place_ptr +
+
_parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states[j]);
+ }
+ throw;
+ }
+ }
+ _agg_functions_created = true;
+ return Status::OK();
+}
+
+Status AnalyticLocalState::_destroy_agg_status() {
+ if (UNLIKELY(_fn_place_ptr == nullptr || !_agg_functions_created)) {
+ return Status::OK();
+ }
+ for (size_t i = 0; i < _agg_functions_size; ++i) {
+ _agg_functions[i]->destroy(
+ _fn_place_ptr +
+
_parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states[i]);
+ }
+ return Status::OK();
+}
+
+//now is execute for lead/lag row_number/rank/dense_rank/ntile functions
+//sum min max count avg first_value last_value functions
+void AnalyticLocalState::_execute_for_win_func(int64_t partition_start,
int64_t partition_end,
+ int64_t frame_start, int64_t
frame_end) {
+ for (size_t i = 0; i < _agg_functions_size; ++i) {
+ std::vector<const vectorized::IColumn*> agg_columns;
+ for (int j = 0; j < _shared_state->agg_input_columns[i].size(); ++j) {
+
agg_columns.push_back(_shared_state->agg_input_columns[i][j].get());
+ }
+ _agg_functions[i]->function()->add_range_single_place(
+ partition_start, partition_end, frame_start, frame_end,
+ _fn_place_ptr +
+
_parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states[i],
+ agg_columns.data(), nullptr);
+ }
+}
+
+void AnalyticLocalState::_insert_result_info(int64_t current_block_rows) {
+ int64_t current_block_row_pos =
+
_shared_state->input_block_first_row_positions[_output_block_index];
+ int64_t get_result_start = _shared_state->current_row_position -
current_block_row_pos;
+ if (_parent->cast<AnalyticSourceOperatorX>()._fn_scope ==
+ vectorized::AnalyticFnScope::PARTITION) {
+ int64_t get_result_end =
+ std::min<int64_t>(_shared_state->current_row_position +
current_block_rows,
+ _shared_state->partition_by_end.pos);
+ _window_end_position =
+ std::min<int64_t>(get_result_end - current_block_row_pos,
current_block_rows);
+ _shared_state->current_row_position += (_window_end_position -
get_result_start);
+ } else if (_parent->cast<AnalyticSourceOperatorX>()._fn_scope ==
+ vectorized::AnalyticFnScope::RANGE) {
+ _window_end_position =
+ std::min<int64_t>(_order_by_end.pos - current_block_row_pos,
current_block_rows);
+ _shared_state->current_row_position += (_window_end_position -
get_result_start);
+ } else {
+ _window_end_position++;
+ _shared_state->current_row_position++;
+ }
+
+ for (int i = 0; i < _agg_functions_size; ++i) {
+ for (int j = get_result_start; j < _window_end_position; ++j) {
+ _agg_functions[i]->insert_result_info(
+ _fn_place_ptr + _parent->cast<AnalyticSourceOperatorX>()
+ ._offsets_of_aggregate_states[i],
+ _result_window_columns[i].get());
+ }
+ }
+}
+
+Status AnalyticLocalState::_get_next_for_rows(size_t current_block_rows) {
+ while (_shared_state->current_row_position <
_shared_state->partition_by_end.pos &&
+ _window_end_position < current_block_rows) {
+ int64_t range_start, range_end;
+ if
(!_parent->cast<AnalyticSourceOperatorX>()._window.__isset.window_start &&
+ _parent->cast<AnalyticSourceOperatorX>()._window.window_end.type ==
+ TAnalyticWindowBoundaryType::
+ CURRENT_ROW) { //[preceding,
current_row],[current_row, following]
+ range_start = _shared_state->current_row_position;
+ range_end = _shared_state->current_row_position +
+ 1; //going on calculate,add up data, no need to reset
state
+ } else {
+ _reset_agg_status();
+ if (!_parent->cast<AnalyticSourceOperatorX>()
+ ._window.__isset
+ .window_start) { //[preceding, offset]
--unbound: [preceding, following]
+ range_start = _partition_by_start.pos;
+ } else {
+ range_start = _shared_state->current_row_position +
_rows_start_offset;
+ }
+ range_end = _shared_state->current_row_position + _rows_end_offset
+ 1;
+ }
+ _executor.execute(_partition_by_start.pos,
_shared_state->partition_by_end.pos, range_start,
+ range_end);
+ _executor.insert_result(current_block_rows);
+ }
+ return Status::OK();
+}
+
+Status AnalyticLocalState::_get_next_for_partition(size_t current_block_rows) {
+ if (_next_partition) {
+ _executor.execute(_partition_by_start.pos,
_shared_state->partition_by_end.pos,
+ _partition_by_start.pos,
_shared_state->partition_by_end.pos);
+ }
+ _executor.insert_result(current_block_rows);
+ return Status::OK();
+}
+
+Status AnalyticLocalState::_get_next_for_range(size_t current_block_rows) {
+ while (_shared_state->current_row_position <
_shared_state->partition_by_end.pos &&
+ _window_end_position < current_block_rows) {
+ if (_shared_state->current_row_position >= _order_by_end.pos) {
+ _update_order_by_range();
+ _executor.execute(_order_by_start.pos, _order_by_end.pos,
_order_by_start.pos,
+ _order_by_end.pos);
+ }
+ _executor.insert_result(current_block_rows);
+ }
+ return Status::OK();
+}
+
+void AnalyticLocalState::_update_order_by_range() {
+ _order_by_start = _order_by_end;
+ _order_by_end = _shared_state->partition_by_end;
+ for (size_t i = 0; i < _shared_state->order_by_eq_expr_ctxs.size(); ++i) {
+ _order_by_end =
_dependency->compare_row_to_find_end(_shared_state->ordey_by_column_idxs[i],
+ _order_by_start,
_order_by_end, true);
+ }
+ _order_by_start.pos =
+
_shared_state->input_block_first_row_positions[_order_by_start.block_num] +
+ _order_by_start.row_num;
+ _order_by_end.pos =
_shared_state->input_block_first_row_positions[_order_by_end.block_num] +
+ _order_by_end.row_num;
+ // `_order_by_end` will be assigned to `_order_by_start` next time,
+ // so make it a valid position.
+ if (_order_by_end.row_num ==
_shared_state->input_blocks[_order_by_end.block_num].rows()) {
+ _order_by_end.block_num++;
+ _order_by_end.row_num = 0;
+ }
+}
+
+Status AnalyticLocalState::init_result_columns() {
+ if (!_window_end_position) {
+ _result_window_columns.resize(_agg_functions_size);
+ for (size_t i = 0; i < _agg_functions_size; ++i) {
+ _result_window_columns[i] =
+ _agg_functions[i]->data_type()->create_column(); //return
type
+ }
+ }
+ return Status::OK();
+}
+
+//calculate pos have arrive partition end, so it's needed to init next
partition, and update the boundary of partition
+bool AnalyticLocalState::init_next_partition(vectorized::BlockRowPos
found_partition_end) {
+ if ((_shared_state->current_row_position >=
_shared_state->partition_by_end.pos) &&
+ ((_shared_state->partition_by_end.pos == 0) ||
+ (_shared_state->partition_by_end.pos != found_partition_end.pos))) {
+ _partition_by_start = _shared_state->partition_by_end;
+ _shared_state->partition_by_end = found_partition_end;
+ _shared_state->current_row_position = _partition_by_start.pos;
+ _reset_agg_status();
+ return true;
+ }
+ return false;
+}
+
+Status AnalyticLocalState::output_current_block(vectorized::Block* block) {
+ block->swap(std::move(_shared_state->input_blocks[_output_block_index]));
+ _blocks_memory_usage->add(-block->allocated_bytes());
+ mem_tracker()->consume(-block->allocated_bytes());
+ if (_shared_state->origin_cols.size() < block->columns()) {
+ block->erase_not_in(_shared_state->origin_cols);
+ }
+
+
DCHECK(_parent->cast<AnalyticSourceOperatorX>()._change_to_nullable_flags.size()
==
+ _result_window_columns.size());
+ for (size_t i = 0; i < _result_window_columns.size(); ++i) {
+ if
(_parent->cast<AnalyticSourceOperatorX>()._change_to_nullable_flags[i]) {
+ block->insert({make_nullable(std::move(_result_window_columns[i])),
+ make_nullable(_agg_functions[i]->data_type()), ""});
+ } else {
+ block->insert(
+ {std::move(_result_window_columns[i]),
_agg_functions[i]->data_type(), ""});
+ }
+ }
+
+ _output_block_index++;
+ _window_end_position = 0;
+
+ return Status::OK();
+}
+
+void AnalyticLocalState::release_mem() {
+ _agg_arena_pool = nullptr;
+
+ std::vector<vectorized::Block> tmp_input_blocks;
+ _shared_state->input_blocks.swap(tmp_input_blocks);
+
+ std::vector<std::vector<vectorized::MutableColumnPtr>>
tmp_agg_input_columns;
+ _shared_state->agg_input_columns.swap(tmp_agg_input_columns);
+
+ std::vector<vectorized::MutableColumnPtr> tmp_result_window_columns;
+ _result_window_columns.swap(tmp_result_window_columns);
+}
+
+AnalyticSourceOperatorX::AnalyticSourceOperatorX(ObjectPool* pool, const
TPlanNode& tnode,
+ const DescriptorTbl& descs,
std::string op_name)
+ : OperatorXBase(pool, tnode, descs, op_name),
+ _window(tnode.analytic_node.window),
+ _intermediate_tuple_id(tnode.analytic_node.intermediate_tuple_id),
+ _output_tuple_id(tnode.analytic_node.output_tuple_id),
+ _has_window(tnode.analytic_node.__isset.window),
+ _has_range_window(tnode.analytic_node.window.type ==
TAnalyticWindowType::RANGE),
+ _has_window_start(tnode.analytic_node.window.__isset.window_start),
+ _has_window_end(tnode.analytic_node.window.__isset.window_end) {
+ _fn_scope = vectorized::AnalyticFnScope::PARTITION;
+ if (tnode.analytic_node.__isset.window &&
+ tnode.analytic_node.window.type == TAnalyticWindowType::RANGE) {
+ DCHECK(!_window.__isset.window_start) << "RANGE windows must have
UNBOUNDED PRECEDING";
+ DCHECK(!_window.__isset.window_end ||
+ _window.window_end.type ==
TAnalyticWindowBoundaryType::CURRENT_ROW)
+ << "RANGE window end bound must be CURRENT ROW or UNBOUNDED
FOLLOWING";
+
+ if (_window.__isset
+ .window_end) { //haven't set end, so same as PARTITION,
[unbounded preceding, unbounded following]
+ _fn_scope =
+ vectorized::AnalyticFnScope::RANGE; //range: [unbounded
preceding,current row]
+ }
+
+ } else if (tnode.analytic_node.__isset.window) {
+ if (_window.__isset.window_start || _window.__isset.window_end) {
+ _fn_scope = vectorized::AnalyticFnScope::ROWS;
+ }
+ }
+}
+
+Status AnalyticSourceOperatorX::init(const TPlanNode& tnode, RuntimeState*
state) {
+ RETURN_IF_ERROR(OperatorXBase::init(tnode, state));
+ const TAnalyticNode& analytic_node = tnode.analytic_node;
+ size_t agg_size = analytic_node.analytic_functions.size();
+
+ for (int i = 0; i < agg_size; ++i) {
+ vectorized::AggFnEvaluator* evaluator = nullptr;
+ RETURN_IF_ERROR(vectorized::AggFnEvaluator::create(
+ _pool, analytic_node.analytic_functions[i], {}, &evaluator));
+ _agg_functions.emplace_back(evaluator);
+ }
+
+ return Status::OK();
+}
+
+Status AnalyticSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* block,
+ SourceState& source_state) {
+ auto& local_state =
state->get_local_state(id())->cast<AnalyticLocalState>();
+ if (local_state._shared_state->input_eos &&
+ (local_state._output_block_index ==
local_state._shared_state->input_blocks.size() ||
+ local_state._shared_state->input_total_rows == 0)) {
+ source_state = SourceState::FINISHED;
+ return Status::OK();
+ }
+
+ while (!local_state._shared_state->input_eos ||
+ local_state._output_block_index <
local_state._shared_state->input_blocks.size()) {
+ {
+ SCOPED_TIMER(local_state._evaluation_timer);
+ local_state._shared_state->found_partition_end =
+ local_state._dependency->get_partition_by_end();
+ }
+ local_state._shared_state->need_more_input =
+ local_state._dependency->whether_need_next_partition(
+ local_state._shared_state->found_partition_end);
+ if (local_state._shared_state->need_more_input) {
+ return Status::OK();
+ }
+ local_state._next_partition =
+
local_state.init_next_partition(local_state._shared_state->found_partition_end);
+ local_state.init_result_columns();
+ size_t current_block_rows =
+
local_state._shared_state->input_blocks[local_state._output_block_index].rows();
+ local_state._executor.get_next(current_block_rows);
+ if (local_state._window_end_position == current_block_rows) {
+ break;
+ }
+ }
+ RETURN_IF_ERROR(local_state.output_current_block(block));
+
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts,
block,
+ block->columns()));
+ local_state.reached_limit(block, source_state);
+ return Status::OK();
+}
+
+Status AnalyticSourceOperatorX::setup_local_state(RuntimeState* state,
LocalStateInfo& info) {
+ auto local_state = AnalyticLocalState::create_shared(state, this);
+ state->emplace_local_state(id(), local_state);
+ return local_state->init(state, info);
+}
+
+bool AnalyticSourceOperatorX::can_read(RuntimeState* state) {
+ auto& local_state =
state->get_local_state(id())->cast<AnalyticLocalState>();
+ if (local_state._shared_state->need_more_input) {
+ return false;
+ }
+ return true;
+}
+
+Status AnalyticSourceOperatorX::close(RuntimeState* state) {
+ auto& local_state =
state->get_local_state(id())->cast<AnalyticLocalState>();
+ for (auto* agg_function : local_state._agg_functions) {
+ agg_function->close(state);
+ }
+
+ local_state._destroy_agg_status();
+ local_state.release_mem();
+ return OperatorXBase::close(state);
+}
+
+Status AnalyticSourceOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(OperatorXBase::prepare(state));
+ DCHECK(_child_x->row_desc().is_prefix_of(_row_descriptor));
+ _intermediate_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
+ _output_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
+ for (size_t i = 0; i < _agg_functions.size(); ++i) {
+ SlotDescriptor* intermediate_slot_desc =
_intermediate_tuple_desc->slots()[i];
+ SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[i];
+ RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child_x->row_desc(),
+ intermediate_slot_desc,
output_slot_desc));
+ _change_to_nullable_flags.push_back(output_slot_desc->is_nullable() &&
+
!_agg_functions[i]->data_type()->is_nullable());
+ }
+
+ _offsets_of_aggregate_states.resize(_agg_functions.size());
+ for (size_t i = 0; i < _agg_functions.size(); ++i) {
+ _offsets_of_aggregate_states[i] = _total_size_of_aggregate_states;
+ const auto& agg_function = _agg_functions[i]->function();
+ // aggregate states are aligned based on maximum requirement
+ _align_aggregate_states = std::max(_align_aggregate_states,
agg_function->align_of_data());
+ _total_size_of_aggregate_states += agg_function->size_of_data();
+ // If not the last aggregate_state, we need pad it so that next
aggregate_state will be aligned.
+ if (i + 1 < _agg_functions.size()) {
+ size_t alignment_of_next_state = _agg_functions[i +
1]->function()->align_of_data();
+ if ((alignment_of_next_state & (alignment_of_next_state - 1)) !=
0) {
+ return Status::RuntimeError("Logical error: align_of_data is
not 2^N");
+ }
+ /// Extend total_size to next alignment requirement
+ /// Add padding by rounding up 'total_size_of_aggregate_states' to
be a multiplier of alignment_of_next_state.
+ _total_size_of_aggregate_states =
+ (_total_size_of_aggregate_states + alignment_of_next_state
- 1) /
+ alignment_of_next_state * alignment_of_next_state;
+ }
+ }
+ return Status::OK();
+}
+
+Status AnalyticSourceOperatorX::open(RuntimeState* state) {
+ RETURN_IF_ERROR(OperatorXBase::open(state));
+ for (auto* agg_function : _agg_functions) {
+ RETURN_IF_ERROR(agg_function->open(state));
+ }
+ return Status::OK();
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/analytic_source_operator.h
b/be/src/pipeline/exec/analytic_source_operator.h
index 60f1a86158..21045a56ea 100644
--- a/be/src/pipeline/exec/analytic_source_operator.h
+++ b/be/src/pipeline/exec/analytic_source_operator.h
@@ -45,5 +45,123 @@ public:
Status open(RuntimeState*) override { return Status::OK(); }
};
+class AnalyticSourceOperatorX;
+class AnalyticLocalState final : public PipelineXLocalState {
+public:
+ ENABLE_FACTORY_CREATOR(AnalyticLocalState);
+ AnalyticLocalState(RuntimeState* state, OperatorXBase* parent);
+
+ Status init(RuntimeState* state, LocalStateInfo& info) override;
+
+ Status init_result_columns();
+
+ Status output_current_block(vectorized::Block* block);
+
+ void release_mem();
+
+ bool init_next_partition(vectorized::BlockRowPos found_partition_end);
+
+private:
+ Status _get_next_for_rows(size_t rows);
+ Status _get_next_for_range(size_t rows);
+ Status _get_next_for_partition(size_t rows);
+
+ void _execute_for_win_func(int64_t partition_start, int64_t partition_end,
int64_t frame_start,
+ int64_t frame_end);
+ void _insert_result_info(int64_t current_block_rows);
+
+ void _update_order_by_range();
+
+ Status _reset_agg_status();
+ Status _create_agg_status();
+ Status _destroy_agg_status();
+
+ friend class AnalyticSourceOperatorX;
+
+ AnalyticDependency* _dependency;
+ AnalyticSharedState* _shared_state;
+
+ int64_t _output_block_index;
+ int64_t _window_end_position;
+ bool _next_partition;
+ std::vector<vectorized::MutableColumnPtr> _result_window_columns;
+
+ int64_t _rows_start_offset;
+ int64_t _rows_end_offset;
+ vectorized::AggregateDataPtr _fn_place_ptr;
+ size_t _agg_functions_size;
+ bool _agg_functions_created;
+ vectorized::BlockRowPos _order_by_start;
+ vectorized::BlockRowPos _order_by_end;
+ vectorized::BlockRowPos _partition_by_start;
+ std::unique_ptr<vectorized::Arena> _agg_arena_pool;
+ std::vector<vectorized::AggFnEvaluator*> _agg_functions;
+
+ RuntimeProfile::Counter* _memory_usage_counter;
+ RuntimeProfile::Counter* _evaluation_timer;
+ RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage;
+
+ using vectorized_execute = std::function<void(int64_t peer_group_start,
int64_t peer_group_end,
+ int64_t frame_start, int64_t
frame_end)>;
+ using vectorized_get_next = std::function<Status(size_t rows)>;
+ using vectorized_get_result = std::function<void(int64_t
current_block_rows)>;
+
+ struct executor {
+ vectorized_execute execute;
+ vectorized_get_next get_next;
+ vectorized_get_result insert_result;
+ };
+
+ executor _executor;
+};
+
+class AnalyticSourceOperatorX final : public OperatorXBase {
+public:
+ AnalyticSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs,
+ std::string op_name);
+ bool can_read(RuntimeState* state) override;
+
+ Status setup_local_state(RuntimeState* state, LocalStateInfo& info)
override;
+
+ Status get_block(RuntimeState* state, vectorized::Block* block,
+ SourceState& source_state) override;
+
+ Status close(RuntimeState* state) override;
+ bool is_source() const override { return true; }
+
+ Status init(const TPlanNode& tnode, RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
+ Status open(RuntimeState* state) override;
+
+private:
+ friend class AnalyticLocalState;
+
+ TAnalyticWindow _window;
+
+ TupleId _intermediate_tuple_id;
+ TupleId _output_tuple_id;
+
+ bool _has_window;
+ bool _has_range_window;
+ bool _has_window_start;
+ bool _has_window_end;
+
+ std::vector<vectorized::AggFnEvaluator*> _agg_functions;
+
+ vectorized::AnalyticFnScope _fn_scope;
+
+ TupleDescriptor* _intermediate_tuple_desc;
+ TupleDescriptor* _output_tuple_desc;
+
+ /// The offset of the n-th functions.
+ std::vector<size_t> _offsets_of_aggregate_states;
+ /// The total size of the row from the functions.
+ size_t _total_size_of_aggregate_states = 0;
+ /// The max align size for functions
+ size_t _align_aggregate_states = 1;
+
+ std::vector<bool> _change_to_nullable_flags;
+};
+
} // namespace pipeline
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index f58b9406bf..d3afe827c8 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -102,6 +102,7 @@ bool ExchangeSinkLocalState::transfer_large_data_by_brpc()
const {
}
Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo&
info) {
+ RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info));
_sender_id = info.sender_id;
_broadcast_pb_blocks.resize(config::num_broadcast_buffer);
_broadcast_pb_block_idx = 0;
@@ -128,7 +129,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
std::string title = "VDataStreamSender (dst_id={}, dst_fragments=[{}])";
_profile = p._pool->add(new RuntimeProfile(title));
SCOPED_TIMER(_profile->total_time_counter());
- _mem_tracker = std::make_unique<MemTracker>("ExchangeSinkLocalState:");
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
int local_size = 0;
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index b240521913..efb716f7ac 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -29,7 +29,10 @@ class RuntimeState;
namespace doris::pipeline {
OperatorBase::OperatorBase(OperatorBuilderBase* operator_builder)
- : _operator_builder(operator_builder), _is_closed(false) {}
+ : _operator_builder(operator_builder),
+ _child(nullptr),
+ _child_x(nullptr),
+ _is_closed(false) {}
bool OperatorBase::is_sink() const {
return _operator_builder->is_sink();
@@ -59,6 +62,11 @@ std::string OperatorBase::debug_string() const {
return ss.str();
}
+Status PipelineXSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo&
info) {
+ _mem_tracker = std::make_unique<MemTracker>(_parent->get_name());
+ return Status::OK();
+}
+
std::string OperatorXBase::debug_string() const {
std::stringstream ss;
ss << _op_name << ": is_source: " << is_source();
@@ -96,8 +104,8 @@ Status OperatorXBase::prepare(RuntimeState* state) {
RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state,
intermediate_row_desc()));
- if (_children) {
- RETURN_IF_ERROR(_children->prepare(state));
+ if (_child_x && !is_source()) {
+ RETURN_IF_ERROR(_child_x->prepare(state));
}
return Status::OK();
@@ -124,7 +132,9 @@ Status OperatorXBase::close(RuntimeState* state) {
}
auto local_state = state->get_local_state(id());
Status result;
- _children->close(state);
+ if (_child_x && !is_source()) {
+ _child_x->close(state);
+ }
if (local_state->_rows_returned_counter != nullptr) {
COUNTER_SET(local_state->_rows_returned_counter,
local_state->_num_rows_returned);
}
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index a03297bcf2..6daf9407e7 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -203,9 +203,6 @@ public:
}
Status set_child(OperatorXPtr child) {
- if (is_source()) {
- return Status::OK();
- }
_child_x = std::move(child);
return Status::OK();
}
@@ -270,7 +267,7 @@ protected:
// Used on pipeline X
OperatorXPtr _child_x;
- bool _is_closed = false;
+ bool _is_closed;
};
/**
@@ -577,7 +574,6 @@ public:
_type(tnode.node_type),
_pool(pool),
_tuple_ids(tnode.row_tuples),
- _children(nullptr),
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
_resource_profile(tnode.resource_profile),
_limit(tnode.limit),
@@ -683,7 +679,6 @@ protected:
vectorized::VExprContextSPtrs _conjuncts;
- OperatorXBase* _children;
RowDescriptor _row_descriptor;
std::unique_ptr<RowDescriptor> _output_row_descriptor;
@@ -709,7 +704,7 @@ public:
: _parent(parent_), _state(state_) {}
virtual ~PipelineXSinkLocalState() {}
- virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) {
return Status::OK(); }
+ virtual Status init(RuntimeState* state, LocalSinkStateInfo& info);
template <class TARGET>
TARGET& cast() {
DCHECK(dynamic_cast<TARGET*>(this));
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp
b/be/src/pipeline/exec/sort_sink_operator.cpp
index 182ce7bd71..af655b896c 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -83,7 +83,9 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, const
TPlanNode& tnode,
_limit(tnode.limit),
_use_topn_opt(tnode.sort_node.use_topn_opt),
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
- _use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read) {}
+ _use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read) {
+ _name = "SortSinkOperatorX";
+}
Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.sort_node.sort_info, _pool));
@@ -134,6 +136,7 @@ Status SortSinkOperatorX::prepare(RuntimeState* state) {
} else {
_algorithm = SortAlgorithm::FULL_SORT;
}
+ _profile = state->obj_pool()->add(new RuntimeProfile("SortSinkOperatorX"));
return _vsort_exec_exprs.prepare(state, _child_x->row_desc(),
_row_descriptor);
}
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp
b/be/src/pipeline/pipeline_x/dependency.cpp
index 238295e63b..49262ce0e5 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -99,4 +99,122 @@ void AggDependency::release_tracker() {
mem_tracker()->release(_mem_usage_record.used_in_state +
_mem_usage_record.used_in_arena);
}
+vectorized::BlockRowPos AnalyticDependency::get_partition_by_end() {
+ if (_analytic_state.current_row_position <
+ _analytic_state.partition_by_end.pos) { //still have data, return
partition_by_end directly
+ return _analytic_state.partition_by_end;
+ }
+
+ if (_analytic_state.partition_by_eq_expr_ctxs.empty() ||
+ (_analytic_state.input_total_rows == 0)) { //no partition_by, the all
block is end
+ return _analytic_state.all_block_end;
+ }
+
+ vectorized::BlockRowPos cal_end = _analytic_state.all_block_end;
+ for (size_t i = 0; i < _analytic_state.partition_by_eq_expr_ctxs.size();
+ ++i) { //have partition_by, binary search the partiton end
+ cal_end =
compare_row_to_find_end(_analytic_state.partition_by_column_idxs[i],
+ _analytic_state.partition_by_end,
cal_end);
+ }
+ cal_end.pos =
+ _analytic_state.input_block_first_row_positions[cal_end.block_num]
+ cal_end.row_num;
+ return cal_end;
+}
+
+//_partition_by_columns,_order_by_columns save in blocks, so if need to
calculate the boundary, may find in which blocks firstly
+vectorized::BlockRowPos AnalyticDependency::compare_row_to_find_end(int idx,
+
vectorized::BlockRowPos start,
+
vectorized::BlockRowPos end,
+ bool
need_check_first) {
+ int64_t start_init_row_num = start.row_num;
+ vectorized::ColumnPtr start_column =
+
_analytic_state.input_blocks[start.block_num].get_by_position(idx).column;
+ vectorized::ColumnPtr start_next_block_column = start_column;
+
+ DCHECK_LE(start.block_num, end.block_num);
+ DCHECK_LE(start.block_num, _analytic_state.input_blocks.size() - 1);
+ int64_t start_block_num = start.block_num;
+ int64_t end_block_num = end.block_num;
+ int64_t mid_blcok_num = end.block_num;
+ // To fix this problem: https://github.com/apache/doris/issues/15951
+ // in this case, the partition by column is last row of block, so it's
pointed to a new block at row = 0, range is: [left, right)
+ // From the perspective of order by column, the two values are exactly
equal.
+ // so the range will be get wrong because it's compare_at == 0 with next
block at row = 0
+ if (need_check_first && end.block_num > 0 && end.row_num == 0) {
+ end.block_num--;
+ end_block_num--;
+ end.row_num = _analytic_state.input_blocks[end_block_num].rows();
+ }
+ //binary search find in which block
+ while (start_block_num < end_block_num) {
+ mid_blcok_num = (start_block_num + end_block_num + 1) >> 1;
+ start_next_block_column =
+
_analytic_state.input_blocks[mid_blcok_num].get_by_position(idx).column;
+ //Compares (*this)[n] and rhs[m], this: start[init_row] rhs: mid[0]
+ if (start_column->compare_at(start_init_row_num, 0,
*start_next_block_column, 1) == 0) {
+ start_block_num = mid_blcok_num;
+ } else {
+ end_block_num = mid_blcok_num - 1;
+ }
+ }
+
+ // have check the start.block_num: start_column[start_init_row_num] with
mid_blcok_num start_next_block_column[0]
+ // now next block must not be result, so need check with end_block_num:
start_next_block_column[last_row]
+ if (end_block_num == mid_blcok_num - 1) {
+ start_next_block_column =
+
_analytic_state.input_blocks[end_block_num].get_by_position(idx).column;
+ int64_t block_size =
_analytic_state.input_blocks[end_block_num].rows();
+ if ((start_column->compare_at(start_init_row_num, block_size - 1,
*start_next_block_column,
+ 1) == 0)) {
+ start.block_num = end_block_num + 1;
+ start.row_num = 0;
+ return start;
+ }
+ }
+
+ //check whether need get column again, maybe same as first init
+ // if the start_block_num have move to forword, so need update start block
num and compare it from row_num=0
+ if (start_block_num != start.block_num) {
+ start_init_row_num = 0;
+ start.block_num = start_block_num;
+ start_column =
_analytic_state.input_blocks[start.block_num].get_by_position(idx).column;
+ }
+ //binary search, set start and end pos
+ int64_t start_pos = start_init_row_num;
+ int64_t end_pos = _analytic_state.input_blocks[start.block_num].rows() - 1;
+ //if end_block_num haven't moved, only start_block_num go to the end block
+ //so could use the end.row_num for binary search
+ if (start.block_num == end.block_num) {
+ end_pos = end.row_num;
+ }
+ while (start_pos < end_pos) {
+ int64_t mid_pos = (start_pos + end_pos) >> 1;
+ if (start_column->compare_at(start_init_row_num, mid_pos,
*start_column, 1)) {
+ end_pos = mid_pos;
+ } else {
+ start_pos = mid_pos + 1;
+ }
+ }
+ start.row_num = start_pos; //update row num, return the find end
+ return start;
+}
+
+bool AnalyticDependency::whether_need_next_partition(vectorized::BlockRowPos
found_partition_end) {
+ if (_analytic_state.input_eos ||
+ (_analytic_state.current_row_position <
+ _analytic_state.partition_by_end.pos)) { //now still have partition
data
+ return false;
+ }
+ if ((_analytic_state.partition_by_eq_expr_ctxs.empty() &&
!_analytic_state.input_eos) ||
+ (found_partition_end.pos == 0)) { //no partition, get until fetch to
EOS
+ return true;
+ }
+ if (!_analytic_state.partition_by_eq_expr_ctxs.empty() &&
+ found_partition_end.pos == _analytic_state.all_block_end.pos &&
+ !_analytic_state.input_eos) { //current partition data calculate done
+ return true;
+ }
+ return false;
+}
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 478623637e..152d0e69c1 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -20,6 +20,7 @@
#include "pipeline/exec/data_queue.h"
#include "vec/common/sort/sorter.h"
#include "vec/exec/vaggregation_node.h"
+#include "vec/exec/vanalytic_eval_node.h"
namespace doris {
namespace pipeline {
@@ -141,5 +142,46 @@ private:
SortSharedState _sort_state;
};
+struct AnalyticSharedState {
+public:
+ AnalyticSharedState() = default;
+
+ int64_t current_row_position = 0;
+ vectorized::BlockRowPos partition_by_end;
+ vectorized::VExprContextSPtrs partition_by_eq_expr_ctxs;
+ int64_t input_total_rows = 0;
+ vectorized::BlockRowPos all_block_end;
+ std::vector<vectorized::Block> input_blocks;
+ bool input_eos = false;
+ std::atomic_bool need_more_input = true;
+ vectorized::BlockRowPos found_partition_end;
+ std::vector<int64_t> origin_cols;
+ vectorized::VExprContextSPtrs order_by_eq_expr_ctxs;
+ std::vector<int64_t> input_block_first_row_positions;
+ std::vector<std::vector<vectorized::MutableColumnPtr>> agg_input_columns;
+
+ // TODO: maybe global?
+ std::vector<int64_t> partition_by_column_idxs;
+ std::vector<int64_t> ordey_by_column_idxs;
+};
+
+class AnalyticDependency final : public Dependency {
+public:
+ AnalyticDependency(int id) : Dependency(id) {}
+ ~AnalyticDependency() override = default;
+
+ void* shared_state() override { return (void*)&_analytic_state; };
+
+ vectorized::BlockRowPos get_partition_by_end();
+
+ bool whether_need_next_partition(vectorized::BlockRowPos
found_partition_end);
+ vectorized::BlockRowPos compare_row_to_find_end(int idx,
vectorized::BlockRowPos start,
+ vectorized::BlockRowPos
end,
+ bool need_check_first =
false);
+
+private:
+ AnalyticSharedState _analytic_state;
+};
+
} // namespace pipeline
} // namespace doris
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 13e89e8766..7e97696a62 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -42,6 +42,8 @@
#include "io/fs/stream_load_pipe.h"
#include "pipeline/exec/aggregation_sink_operator.h"
#include "pipeline/exec/aggregation_source_operator.h"
+#include "pipeline/exec/analytic_sink_operator.h"
+#include "pipeline/exec/analytic_source_operator.h"
#include "pipeline/exec/data_queue.h"
#include "pipeline/exec/datagen_operator.h"
#include "pipeline/exec/exchange_sink_operator.h"
@@ -471,7 +473,7 @@ Status
PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool,
RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
- if (op->get_child()) {
+ if (op->get_child() && !op->is_source()) {
op->get_runtime_profile()->add_child(op->get_child()->get_runtime_profile(),
true, nullptr);
}
@@ -550,6 +552,23 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
break;
}
+ case TPlanNodeType::ANALYTIC_EVAL_NODE: {
+ op.reset(new AnalyticSourceOperatorX(pool, tnode, descs,
"AnalyticSourceXOperator"));
+ RETURN_IF_ERROR(cur_pipe->add_operator(op));
+
+ const auto downstream_pipeline_id = cur_pipe->id();
+ if (_dag.find(downstream_pipeline_id) == _dag.end()) {
+ _dag.insert({downstream_pipeline_id, {}});
+ }
+ cur_pipe = add_pipeline();
+ _dag[downstream_pipeline_id].push_back(cur_pipe->id());
+
+ DataSinkOperatorXPtr sink;
+ sink.reset(new AnalyticSinkOperatorX(pool, tnode, descs));
+ RETURN_IF_ERROR(cur_pipe->set_sink(sink));
+ RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
+ break;
+ }
default:
return Status::InternalError("Unsupported exec type in pipelineX: {}",
print_plan_node_type(tnode.node_type));
diff --git a/be/src/vec/exec/vanalytic_eval_node.h
b/be/src/vec/exec/vanalytic_eval_node.h
index d232c28ff9..9e3c24fb81 100644
--- a/be/src/vec/exec/vanalytic_eval_node.h
+++ b/be/src/vec/exec/vanalytic_eval_node.h
@@ -67,6 +67,8 @@ struct BlockRowPos {
class AggFnEvaluator;
+enum AnalyticFnScope { PARTITION, RANGE, ROWS };
+
class VAnalyticEvalNode : public ExecNode {
public:
~VAnalyticEvalNode() override = default;
@@ -134,7 +136,6 @@ private:
void _release_mem();
private:
- enum AnalyticFnScope { PARTITION, RANGE, ROWS };
std::vector<Block> _input_blocks;
std::vector<int64_t> input_block_first_row_positions;
std::vector<AggFnEvaluator*> _agg_functions;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]