This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 55e572df82 [pipelineX](analytic operator) Support analytic operator 
(#23444)
55e572df82 is described below

commit 55e572df8238dc23263673505e71478c6f56ded9
Author: Gabriel <[email protected]>
AuthorDate: Thu Aug 24 23:05:29 2023 +0800

    [pipelineX](analytic operator) Support analytic operator (#23444)
---
 be/src/pipeline/exec/analytic_sink_operator.cpp    | 199 +++++++++
 be/src/pipeline/exec/analytic_sink_operator.h      |  63 +++
 be/src/pipeline/exec/analytic_source_operator.cpp  | 476 ++++++++++++++++++++-
 be/src/pipeline/exec/analytic_source_operator.h    | 120 +++++-
 be/src/pipeline/exec/exchange_sink_operator.cpp    |   2 +-
 be/src/pipeline/exec/operator.cpp                  |  18 +-
 be/src/pipeline/exec/operator.h                    |   9 +-
 be/src/pipeline/exec/sort_sink_operator.cpp        |   5 +-
 be/src/pipeline/pipeline_x/dependency.cpp          | 118 +++++
 be/src/pipeline/pipeline_x/dependency.h            |  42 ++
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  21 +-
 be/src/vec/exec/vanalytic_eval_node.h              |   3 +-
 12 files changed, 1059 insertions(+), 17 deletions(-)

diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp 
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index a0634c81f6..1ebe342e85 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -26,4 +26,203 @@ namespace doris::pipeline {
 
 OPERATOR_CODE_GENERATOR(AnalyticSinkOperator, StreamingOperator)
 
+Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& 
info) {
+    RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info));
+    _mem_tracker = std::make_unique<MemTracker>("ExchangeSinkLocalState:");
+    auto& p = _parent->cast<AnalyticSinkOperatorX>();
+    _dependency = (AnalyticDependency*)info.dependency;
+    _shared_state = (AnalyticSharedState*)_dependency->shared_state();
+    
_shared_state->partition_by_column_idxs.resize(p._partition_by_eq_expr_ctxs.size());
+    
_shared_state->ordey_by_column_idxs.resize(p._order_by_eq_expr_ctxs.size());
+
+    _profile = state->obj_pool()->add(new 
RuntimeProfile("AnalyticSinkLocalState"));
+    _memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
+    _blocks_memory_usage = _profile->AddHighWaterMarkCounter("Blocks", 
TUnit::BYTES, "MemoryUsage");
+    _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime");
+
+    size_t agg_size = p._agg_expr_ctxs.size();
+    _agg_expr_ctxs.resize(agg_size);
+    _shared_state->agg_input_columns.resize(agg_size);
+    for (int i = 0; i < agg_size; ++i) {
+        _shared_state->agg_input_columns[i].resize(p._num_agg_input[i]);
+        _agg_expr_ctxs[i].resize(p._agg_expr_ctxs[i].size());
+        for (int j = 0; j < p._agg_expr_ctxs[i].size(); ++j) {
+            RETURN_IF_ERROR(p._agg_expr_ctxs[i][j]->clone(state, 
_agg_expr_ctxs[i][j]));
+        }
+
+        for (size_t j = 0; j < _agg_expr_ctxs[i].size(); ++j) {
+            _shared_state->agg_input_columns[i][j] =
+                    _agg_expr_ctxs[i][j]->root()->data_type()->create_column();
+        }
+    }
+    
_shared_state->partition_by_eq_expr_ctxs.resize(p._partition_by_eq_expr_ctxs.size());
+    for (size_t i = 0; i < _shared_state->partition_by_eq_expr_ctxs.size(); 
i++) {
+        RETURN_IF_ERROR(p._partition_by_eq_expr_ctxs[i]->clone(
+                state, _shared_state->partition_by_eq_expr_ctxs[i]));
+    }
+    
_shared_state->order_by_eq_expr_ctxs.resize(p._order_by_eq_expr_ctxs.size());
+    for (size_t i = 0; i < _shared_state->order_by_eq_expr_ctxs.size(); i++) {
+        RETURN_IF_ERROR(
+                p._order_by_eq_expr_ctxs[i]->clone(state, 
_shared_state->order_by_eq_expr_ctxs[i]));
+    }
+    return Status::OK();
+}
+
+AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, const 
TPlanNode& tnode,
+                                             const DescriptorTbl& descs)
+        : DataSinkOperatorX(tnode.node_id),
+          _buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id
+                                     ? tnode.analytic_node.buffered_tuple_id
+                                     : 0) {
+    _name = "SortSinkOperatorX";
+}
+
+Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
+    const TAnalyticNode& analytic_node = tnode.analytic_node;
+    size_t agg_size = analytic_node.analytic_functions.size();
+    _agg_expr_ctxs.resize(agg_size);
+    _num_agg_input.resize(agg_size);
+    for (int i = 0; i < agg_size; ++i) {
+        const TExpr& desc = analytic_node.analytic_functions[i];
+        _num_agg_input[i] = desc.nodes[0].num_children;
+        int node_idx = 0;
+        for (int j = 0; j < desc.nodes[0].num_children; ++j) {
+            ++node_idx;
+            vectorized::VExprSPtr expr;
+            vectorized::VExprContextSPtr ctx;
+            RETURN_IF_ERROR(
+                    vectorized::VExpr::create_tree_from_thrift(desc.nodes, 
&node_idx, expr, ctx));
+            _agg_expr_ctxs[i].emplace_back(ctx);
+        }
+    }
+
+    
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(analytic_node.partition_exprs,
+                                                         
_partition_by_eq_expr_ctxs));
+    
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(analytic_node.order_by_exprs,
+                                                         
_order_by_eq_expr_ctxs));
+    _agg_functions_size = agg_size;
+    return Status::OK();
+}
+
+Status AnalyticSinkOperatorX::prepare(RuntimeState* state) {
+    for (const auto& ctx : _agg_expr_ctxs) {
+        vectorized::VExpr::prepare(ctx, state, _child_x->row_desc());
+    }
+    if (!_partition_by_eq_expr_ctxs.empty() || 
!_order_by_eq_expr_ctxs.empty()) {
+        vector<TTupleId> tuple_ids;
+        tuple_ids.push_back(_child_x->row_desc().tuple_descriptors()[0]->id());
+        tuple_ids.push_back(_buffered_tuple_id);
+        RowDescriptor cmp_row_desc(state->desc_tbl(), tuple_ids, 
vector<bool>(2, false));
+        if (!_partition_by_eq_expr_ctxs.empty()) {
+            RETURN_IF_ERROR(
+                    vectorized::VExpr::prepare(_partition_by_eq_expr_ctxs, 
state, cmp_row_desc));
+        }
+        if (!_order_by_eq_expr_ctxs.empty()) {
+            RETURN_IF_ERROR(
+                    vectorized::VExpr::prepare(_order_by_eq_expr_ctxs, state, 
cmp_row_desc));
+        }
+    }
+    _profile = state->obj_pool()->add(new 
RuntimeProfile("AnalyticSinkOperatorX"));
+    return Status::OK();
+}
+
+bool AnalyticSinkOperatorX::can_write(RuntimeState* state) {
+    return state->get_sink_local_state(id())
+            ->cast<AnalyticSinkLocalState>()
+            ._shared_state->need_more_input;
+}
+
+Status AnalyticSinkOperatorX::open(RuntimeState* state) {
+    RETURN_IF_ERROR(vectorized::VExpr::open(_partition_by_eq_expr_ctxs, 
state));
+    RETURN_IF_ERROR(vectorized::VExpr::open(_order_by_eq_expr_ctxs, state));
+    for (size_t i = 0; i < _agg_functions_size; ++i) {
+        RETURN_IF_ERROR(vectorized::VExpr::open(_agg_expr_ctxs[i], state));
+    }
+    return Status::OK();
+}
+
+Status AnalyticSinkOperatorX::setup_local_state(RuntimeState* state, 
LocalSinkStateInfo& info) {
+    auto local_state = AnalyticSinkLocalState::create_shared(this, state);
+    state->emplace_sink_local_state(id(), local_state);
+    return local_state->init(state, info);
+}
+
+Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, 
vectorized::Block* input_block,
+                                   SourceState source_state) {
+    auto& local_state = 
state->get_sink_local_state(id())->cast<AnalyticSinkLocalState>();
+    local_state._shared_state->input_eos = source_state == 
SourceState::FINISHED;
+    if (local_state._shared_state->input_eos && input_block->rows() == 0) {
+        local_state._shared_state->need_more_input = false;
+        return Status::OK();
+    }
+
+    local_state._shared_state->input_block_first_row_positions.emplace_back(
+            local_state._shared_state->input_total_rows);
+    size_t block_rows = input_block->rows();
+    local_state._shared_state->input_total_rows += block_rows;
+    local_state._shared_state->all_block_end.block_num =
+            local_state._shared_state->input_blocks.size();
+    local_state._shared_state->all_block_end.row_num = block_rows;
+    local_state._shared_state->all_block_end.pos = 
local_state._shared_state->input_total_rows;
+
+    if (local_state._shared_state->origin_cols
+                .empty()) { //record origin columns, maybe be after this, 
could cast some column but no need to save
+        for (int c = 0; c < input_block->columns(); ++c) {
+            local_state._shared_state->origin_cols.emplace_back(c);
+        }
+    }
+
+    for (size_t i = 0; i < _agg_functions_size;
+         ++i) { //insert _agg_input_columns, execute calculate for its
+        for (size_t j = 0; j < local_state._agg_expr_ctxs[i].size(); ++j) {
+            RETURN_IF_ERROR(_insert_range_column(
+                    input_block, local_state._agg_expr_ctxs[i][j],
+                    local_state._shared_state->agg_input_columns[i][j].get(), 
block_rows));
+        }
+    }
+    //record column idx in block
+    for (size_t i = 0; i < 
local_state._shared_state->partition_by_eq_expr_ctxs.size(); ++i) {
+        int result_col_id = -1;
+        
RETURN_IF_ERROR(local_state._shared_state->partition_by_eq_expr_ctxs[i]->execute(
+                input_block, &result_col_id));
+        DCHECK_GE(result_col_id, 0);
+        local_state._shared_state->partition_by_column_idxs[i] = result_col_id;
+    }
+
+    for (size_t i = 0; i < 
local_state._shared_state->order_by_eq_expr_ctxs.size(); ++i) {
+        int result_col_id = -1;
+        
RETURN_IF_ERROR(local_state._shared_state->order_by_eq_expr_ctxs[i]->execute(
+                input_block, &result_col_id));
+        DCHECK_GE(result_col_id, 0);
+        local_state._shared_state->ordey_by_column_idxs[i] = result_col_id;
+    }
+
+    local_state.mem_tracker()->consume(input_block->allocated_bytes());
+    local_state._blocks_memory_usage->add(input_block->allocated_bytes());
+
+    //TODO: if need improvement, the is a tips to maintain a free queue,
+    //so the memory could reuse, no need to new/delete again;
+    
local_state._shared_state->input_blocks.emplace_back(std::move(*input_block));
+    {
+        SCOPED_TIMER(local_state._evaluation_timer);
+        local_state._shared_state->found_partition_end =
+                local_state._dependency->get_partition_by_end();
+    }
+    local_state._shared_state->need_more_input =
+            local_state._dependency->whether_need_next_partition(
+                    local_state._shared_state->found_partition_end);
+    return Status::OK();
+}
+
+Status AnalyticSinkOperatorX::_insert_range_column(vectorized::Block* block,
+                                                   const 
vectorized::VExprContextSPtr& expr,
+                                                   vectorized::IColumn* 
dst_column, size_t length) {
+    int result_col_id = -1;
+    RETURN_IF_ERROR(expr->execute(block, &result_col_id));
+    DCHECK_GE(result_col_id, 0);
+    auto column = 
block->get_by_position(result_col_id).column->convert_to_full_column_if_const();
+    dst_column->insert_range_from(*column, 0, length);
+    return Status::OK();
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index 2d16eb1c70..ed15cf3ab0 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -21,6 +21,7 @@
 #include <stdint.h>
 
 #include "operator.h"
+#include "pipeline/pipeline_x/dependency.h"
 #include "vec/exec/vanalytic_eval_node.h"
 
 namespace doris {
@@ -43,5 +44,67 @@ public:
     bool can_write() override { return _node->can_write(); }
 };
 
+class AnalyticSinkOperatorX;
+
+class AnalyticSinkLocalState : public PipelineXSinkLocalState {
+    ENABLE_FACTORY_CREATOR(AnalyticSinkLocalState);
+
+public:
+    AnalyticSinkLocalState(DataSinkOperatorX* parent, RuntimeState* state)
+            : PipelineXSinkLocalState(parent, state) {}
+
+    Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+
+private:
+    friend class AnalyticSinkOperatorX;
+    AnalyticDependency* _dependency;
+    AnalyticSharedState* _shared_state;
+
+    RuntimeProfile::Counter* _memory_usage_counter;
+    RuntimeProfile::Counter* _evaluation_timer;
+    RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage;
+
+    std::vector<vectorized::VExprContextSPtrs> _agg_expr_ctxs;
+};
+
+class AnalyticSinkOperatorX final : public DataSinkOperatorX {
+public:
+    AnalyticSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
+    Status init(const TDataSink& tsink) override {
+        return Status::InternalError("{} should not init with TPlanNode", 
_name);
+    }
+
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+
+    Status prepare(RuntimeState* state) override;
+    Status open(RuntimeState* state) override;
+    Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) 
override;
+
+    Status sink(RuntimeState* state, vectorized::Block* in_block,
+                SourceState source_state) override;
+
+    bool can_write(RuntimeState* state) override;
+
+    void get_dependency(DependencySPtr& dependency) override {
+        dependency.reset(new AnalyticDependency(id()));
+    }
+
+private:
+    Status _insert_range_column(vectorized::Block* block, const 
vectorized::VExprContextSPtr& expr,
+                                vectorized::IColumn* dst_column, size_t 
length);
+
+    friend class AnalyticSinkLocalState;
+
+    std::vector<vectorized::VExprContextSPtrs> _agg_expr_ctxs;
+    vectorized::VExprContextSPtrs _partition_by_eq_expr_ctxs;
+    vectorized::VExprContextSPtrs _order_by_eq_expr_ctxs;
+
+    size_t _agg_functions_size = 0;
+
+    const TTupleId _buffered_tuple_id;
+
+    std::vector<size_t> _num_agg_input;
+};
+
 } // namespace pipeline
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp 
b/be/src/pipeline/exec/analytic_source_operator.cpp
index 20a4bfb9cc..298a174d0d 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -25,4 +25,478 @@ namespace doris::pipeline {
 
 OPERATOR_CODE_GENERATOR(AnalyticSourceOperator, SourceOperator)
 
-} // namespace doris::pipeline
\ No newline at end of file
+AnalyticLocalState::AnalyticLocalState(RuntimeState* state, OperatorXBase* 
parent)
+        : PipelineXLocalState(state, parent),
+          _output_block_index(0),
+          _window_end_position(0),
+          _next_partition(false),
+          _rows_start_offset(0),
+          _rows_end_offset(0),
+          _fn_place_ptr(nullptr),
+          _agg_functions_size(0),
+          _agg_functions_created(false) {}
+
+Status AnalyticLocalState::init(RuntimeState* state, LocalStateInfo& info) {
+    RETURN_IF_ERROR(PipelineXLocalState::init(state, info));
+    _agg_arena_pool = std::make_unique<vectorized::Arena>();
+    _dependency = (AnalyticDependency*)info.dependency;
+    _shared_state = (AnalyticSharedState*)_dependency->shared_state();
+
+    auto& p = _parent->cast<AnalyticSourceOperatorX>();
+    _agg_functions_size = p._agg_functions.size();
+
+    _memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
+    _blocks_memory_usage =
+            profile()->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, 
"MemoryUsage");
+    _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime");
+
+    _agg_functions.resize(p._agg_functions.size());
+    for (size_t i = 0; i < _agg_functions.size(); i++) {
+        _agg_functions[i] = p._agg_functions[i]->clone(state, 
state->obj_pool());
+    }
+
+    _fn_place_ptr = 
_agg_arena_pool->aligned_alloc(p._total_size_of_aggregate_states,
+                                                   p._align_aggregate_states);
+
+    if (!p._has_window) { //haven't set window, Unbounded:  [unbounded 
preceding,unbounded following]
+        _executor.get_next = 
std::bind<Status>(&AnalyticLocalState::_get_next_for_partition, this,
+                                               std::placeholders::_1);
+
+    } else if (p._has_range_window) {
+        if (!p._has_window_end) { //haven't set end, so same as PARTITION, 
[unbounded preceding, unbounded following]
+            _executor.get_next = 
std::bind<Status>(&AnalyticLocalState::_get_next_for_partition,
+                                                   this, 
std::placeholders::_1);
+
+        } else {
+            _executor.get_next = 
std::bind<Status>(&AnalyticLocalState::_get_next_for_range, this,
+                                                   std::placeholders::_1);
+        }
+
+    } else {
+        if (!p._has_window_start &&
+            !p._has_window_end) { //haven't set start and end, same as 
PARTITION
+            _executor.get_next = 
std::bind<Status>(&AnalyticLocalState::_get_next_for_partition,
+                                                   this, 
std::placeholders::_1);
+
+        } else {
+            if (p._has_window_start) { //calculate start boundary
+                TAnalyticWindowBoundary b = p._window.window_start;
+                if (b.__isset.rows_offset_value) { //[offset     ,   ]
+                    _rows_start_offset = b.rows_offset_value;
+                    if (b.type == TAnalyticWindowBoundaryType::PRECEDING) {
+                        _rows_start_offset *= -1; //preceding--> negative
+                    }                             //current_row  0
+                } else {                          //following    positive
+                    DCHECK_EQ(b.type, 
TAnalyticWindowBoundaryType::CURRENT_ROW); //[current row,   ]
+                    _rows_start_offset = 0;
+                }
+            }
+
+            if (p._has_window_end) { //calculate end boundary
+                TAnalyticWindowBoundary b = p._window.window_end;
+                if (b.__isset.rows_offset_value) { //[       , offset]
+                    _rows_end_offset = b.rows_offset_value;
+                    if (b.type == TAnalyticWindowBoundaryType::PRECEDING) {
+                        _rows_end_offset *= -1;
+                    }
+                } else {
+                    DCHECK_EQ(b.type, 
TAnalyticWindowBoundaryType::CURRENT_ROW); //[   ,current row]
+                    _rows_end_offset = 0;
+                }
+            }
+
+            _executor.get_next = 
std::bind<Status>(&AnalyticLocalState::_get_next_for_rows, this,
+                                                   std::placeholders::_1);
+        }
+    }
+    _executor.insert_result =
+            std::bind<void>(&AnalyticLocalState::_insert_result_info, this, 
std::placeholders::_1);
+    _executor.execute =
+            std::bind<void>(&AnalyticLocalState::_execute_for_win_func, this, 
std::placeholders::_1,
+                            std::placeholders::_2, std::placeholders::_3, 
std::placeholders::_4);
+
+    RETURN_IF_CATCH_EXCEPTION(_create_agg_status());
+    return Status::OK();
+}
+
+Status AnalyticLocalState::_reset_agg_status() {
+    for (size_t i = 0; i < _agg_functions_size; ++i) {
+        _agg_functions[i]->reset(
+                _fn_place_ptr +
+                
_parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states[i]);
+    }
+    return Status::OK();
+}
+
+Status AnalyticLocalState::_create_agg_status() {
+    for (size_t i = 0; i < _agg_functions_size; ++i) {
+        try {
+            _agg_functions[i]->create(
+                    _fn_place_ptr +
+                    
_parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states[i]);
+        } catch (...) {
+            for (int j = 0; j < i; ++j) {
+                _agg_functions[j]->destroy(
+                        _fn_place_ptr +
+                        
_parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states[j]);
+            }
+            throw;
+        }
+    }
+    _agg_functions_created = true;
+    return Status::OK();
+}
+
+Status AnalyticLocalState::_destroy_agg_status() {
+    if (UNLIKELY(_fn_place_ptr == nullptr || !_agg_functions_created)) {
+        return Status::OK();
+    }
+    for (size_t i = 0; i < _agg_functions_size; ++i) {
+        _agg_functions[i]->destroy(
+                _fn_place_ptr +
+                
_parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states[i]);
+    }
+    return Status::OK();
+}
+
+//now is execute for lead/lag row_number/rank/dense_rank/ntile functions
+//sum min max count avg first_value last_value functions
+void AnalyticLocalState::_execute_for_win_func(int64_t partition_start, 
int64_t partition_end,
+                                               int64_t frame_start, int64_t 
frame_end) {
+    for (size_t i = 0; i < _agg_functions_size; ++i) {
+        std::vector<const vectorized::IColumn*> agg_columns;
+        for (int j = 0; j < _shared_state->agg_input_columns[i].size(); ++j) {
+            
agg_columns.push_back(_shared_state->agg_input_columns[i][j].get());
+        }
+        _agg_functions[i]->function()->add_range_single_place(
+                partition_start, partition_end, frame_start, frame_end,
+                _fn_place_ptr +
+                        
_parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states[i],
+                agg_columns.data(), nullptr);
+    }
+}
+
+void AnalyticLocalState::_insert_result_info(int64_t current_block_rows) {
+    int64_t current_block_row_pos =
+            
_shared_state->input_block_first_row_positions[_output_block_index];
+    int64_t get_result_start = _shared_state->current_row_position - 
current_block_row_pos;
+    if (_parent->cast<AnalyticSourceOperatorX>()._fn_scope ==
+        vectorized::AnalyticFnScope::PARTITION) {
+        int64_t get_result_end =
+                std::min<int64_t>(_shared_state->current_row_position + 
current_block_rows,
+                                  _shared_state->partition_by_end.pos);
+        _window_end_position =
+                std::min<int64_t>(get_result_end - current_block_row_pos, 
current_block_rows);
+        _shared_state->current_row_position += (_window_end_position - 
get_result_start);
+    } else if (_parent->cast<AnalyticSourceOperatorX>()._fn_scope ==
+               vectorized::AnalyticFnScope::RANGE) {
+        _window_end_position =
+                std::min<int64_t>(_order_by_end.pos - current_block_row_pos, 
current_block_rows);
+        _shared_state->current_row_position += (_window_end_position - 
get_result_start);
+    } else {
+        _window_end_position++;
+        _shared_state->current_row_position++;
+    }
+
+    for (int i = 0; i < _agg_functions_size; ++i) {
+        for (int j = get_result_start; j < _window_end_position; ++j) {
+            _agg_functions[i]->insert_result_info(
+                    _fn_place_ptr + _parent->cast<AnalyticSourceOperatorX>()
+                                            ._offsets_of_aggregate_states[i],
+                    _result_window_columns[i].get());
+        }
+    }
+}
+
+Status AnalyticLocalState::_get_next_for_rows(size_t current_block_rows) {
+    while (_shared_state->current_row_position < 
_shared_state->partition_by_end.pos &&
+           _window_end_position < current_block_rows) {
+        int64_t range_start, range_end;
+        if 
(!_parent->cast<AnalyticSourceOperatorX>()._window.__isset.window_start &&
+            _parent->cast<AnalyticSourceOperatorX>()._window.window_end.type ==
+                    TAnalyticWindowBoundaryType::
+                            CURRENT_ROW) { //[preceding, 
current_row],[current_row, following]
+            range_start = _shared_state->current_row_position;
+            range_end = _shared_state->current_row_position +
+                        1; //going on calculate,add up data, no need to reset 
state
+        } else {
+            _reset_agg_status();
+            if (!_parent->cast<AnalyticSourceOperatorX>()
+                         ._window.__isset
+                         .window_start) { //[preceding, offset]        
--unbound: [preceding, following]
+                range_start = _partition_by_start.pos;
+            } else {
+                range_start = _shared_state->current_row_position + 
_rows_start_offset;
+            }
+            range_end = _shared_state->current_row_position + _rows_end_offset 
+ 1;
+        }
+        _executor.execute(_partition_by_start.pos, 
_shared_state->partition_by_end.pos, range_start,
+                          range_end);
+        _executor.insert_result(current_block_rows);
+    }
+    return Status::OK();
+}
+
+Status AnalyticLocalState::_get_next_for_partition(size_t current_block_rows) {
+    if (_next_partition) {
+        _executor.execute(_partition_by_start.pos, 
_shared_state->partition_by_end.pos,
+                          _partition_by_start.pos, 
_shared_state->partition_by_end.pos);
+    }
+    _executor.insert_result(current_block_rows);
+    return Status::OK();
+}
+
+Status AnalyticLocalState::_get_next_for_range(size_t current_block_rows) {
+    while (_shared_state->current_row_position < 
_shared_state->partition_by_end.pos &&
+           _window_end_position < current_block_rows) {
+        if (_shared_state->current_row_position >= _order_by_end.pos) {
+            _update_order_by_range();
+            _executor.execute(_order_by_start.pos, _order_by_end.pos, 
_order_by_start.pos,
+                              _order_by_end.pos);
+        }
+        _executor.insert_result(current_block_rows);
+    }
+    return Status::OK();
+}
+
+void AnalyticLocalState::_update_order_by_range() {
+    _order_by_start = _order_by_end;
+    _order_by_end = _shared_state->partition_by_end;
+    for (size_t i = 0; i < _shared_state->order_by_eq_expr_ctxs.size(); ++i) {
+        _order_by_end = 
_dependency->compare_row_to_find_end(_shared_state->ordey_by_column_idxs[i],
+                                                             _order_by_start, 
_order_by_end, true);
+    }
+    _order_by_start.pos =
+            
_shared_state->input_block_first_row_positions[_order_by_start.block_num] +
+            _order_by_start.row_num;
+    _order_by_end.pos = 
_shared_state->input_block_first_row_positions[_order_by_end.block_num] +
+                        _order_by_end.row_num;
+    // `_order_by_end` will be assigned to `_order_by_start` next time,
+    // so make it a valid position.
+    if (_order_by_end.row_num == 
_shared_state->input_blocks[_order_by_end.block_num].rows()) {
+        _order_by_end.block_num++;
+        _order_by_end.row_num = 0;
+    }
+}
+
+Status AnalyticLocalState::init_result_columns() {
+    if (!_window_end_position) {
+        _result_window_columns.resize(_agg_functions_size);
+        for (size_t i = 0; i < _agg_functions_size; ++i) {
+            _result_window_columns[i] =
+                    _agg_functions[i]->data_type()->create_column(); //return 
type
+        }
+    }
+    return Status::OK();
+}
+
+//calculate pos have arrive partition end, so it's needed to init next 
partition, and update the boundary of partition
+bool AnalyticLocalState::init_next_partition(vectorized::BlockRowPos 
found_partition_end) {
+    if ((_shared_state->current_row_position >= 
_shared_state->partition_by_end.pos) &&
+        ((_shared_state->partition_by_end.pos == 0) ||
+         (_shared_state->partition_by_end.pos != found_partition_end.pos))) {
+        _partition_by_start = _shared_state->partition_by_end;
+        _shared_state->partition_by_end = found_partition_end;
+        _shared_state->current_row_position = _partition_by_start.pos;
+        _reset_agg_status();
+        return true;
+    }
+    return false;
+}
+
+Status AnalyticLocalState::output_current_block(vectorized::Block* block) {
+    block->swap(std::move(_shared_state->input_blocks[_output_block_index]));
+    _blocks_memory_usage->add(-block->allocated_bytes());
+    mem_tracker()->consume(-block->allocated_bytes());
+    if (_shared_state->origin_cols.size() < block->columns()) {
+        block->erase_not_in(_shared_state->origin_cols);
+    }
+
+    
DCHECK(_parent->cast<AnalyticSourceOperatorX>()._change_to_nullable_flags.size()
 ==
+           _result_window_columns.size());
+    for (size_t i = 0; i < _result_window_columns.size(); ++i) {
+        if 
(_parent->cast<AnalyticSourceOperatorX>()._change_to_nullable_flags[i]) {
+            block->insert({make_nullable(std::move(_result_window_columns[i])),
+                           make_nullable(_agg_functions[i]->data_type()), ""});
+        } else {
+            block->insert(
+                    {std::move(_result_window_columns[i]), 
_agg_functions[i]->data_type(), ""});
+        }
+    }
+
+    _output_block_index++;
+    _window_end_position = 0;
+
+    return Status::OK();
+}
+
+void AnalyticLocalState::release_mem() {
+    _agg_arena_pool = nullptr;
+
+    std::vector<vectorized::Block> tmp_input_blocks;
+    _shared_state->input_blocks.swap(tmp_input_blocks);
+
+    std::vector<std::vector<vectorized::MutableColumnPtr>> 
tmp_agg_input_columns;
+    _shared_state->agg_input_columns.swap(tmp_agg_input_columns);
+
+    std::vector<vectorized::MutableColumnPtr> tmp_result_window_columns;
+    _result_window_columns.swap(tmp_result_window_columns);
+}
+
+AnalyticSourceOperatorX::AnalyticSourceOperatorX(ObjectPool* pool, const 
TPlanNode& tnode,
+                                                 const DescriptorTbl& descs, 
std::string op_name)
+        : OperatorXBase(pool, tnode, descs, op_name),
+          _window(tnode.analytic_node.window),
+          _intermediate_tuple_id(tnode.analytic_node.intermediate_tuple_id),
+          _output_tuple_id(tnode.analytic_node.output_tuple_id),
+          _has_window(tnode.analytic_node.__isset.window),
+          _has_range_window(tnode.analytic_node.window.type == 
TAnalyticWindowType::RANGE),
+          _has_window_start(tnode.analytic_node.window.__isset.window_start),
+          _has_window_end(tnode.analytic_node.window.__isset.window_end) {
+    _fn_scope = vectorized::AnalyticFnScope::PARTITION;
+    if (tnode.analytic_node.__isset.window &&
+        tnode.analytic_node.window.type == TAnalyticWindowType::RANGE) {
+        DCHECK(!_window.__isset.window_start) << "RANGE windows must have 
UNBOUNDED PRECEDING";
+        DCHECK(!_window.__isset.window_end ||
+               _window.window_end.type == 
TAnalyticWindowBoundaryType::CURRENT_ROW)
+                << "RANGE window end bound must be CURRENT ROW or UNBOUNDED 
FOLLOWING";
+
+        if (_window.__isset
+                    .window_end) { //haven't set end, so same as PARTITION, 
[unbounded preceding, unbounded following]
+            _fn_scope =
+                    vectorized::AnalyticFnScope::RANGE; //range:  [unbounded 
preceding,current row]
+        }
+
+    } else if (tnode.analytic_node.__isset.window) {
+        if (_window.__isset.window_start || _window.__isset.window_end) {
+            _fn_scope = vectorized::AnalyticFnScope::ROWS;
+        }
+    }
+}
+
+Status AnalyticSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
+    RETURN_IF_ERROR(OperatorXBase::init(tnode, state));
+    const TAnalyticNode& analytic_node = tnode.analytic_node;
+    size_t agg_size = analytic_node.analytic_functions.size();
+
+    for (int i = 0; i < agg_size; ++i) {
+        vectorized::AggFnEvaluator* evaluator = nullptr;
+        RETURN_IF_ERROR(vectorized::AggFnEvaluator::create(
+                _pool, analytic_node.analytic_functions[i], {}, &evaluator));
+        _agg_functions.emplace_back(evaluator);
+    }
+
+    return Status::OK();
+}
+
+Status AnalyticSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* block,
+                                          SourceState& source_state) {
+    auto& local_state = 
state->get_local_state(id())->cast<AnalyticLocalState>();
+    if (local_state._shared_state->input_eos &&
+        (local_state._output_block_index == 
local_state._shared_state->input_blocks.size() ||
+         local_state._shared_state->input_total_rows == 0)) {
+        source_state = SourceState::FINISHED;
+        return Status::OK();
+    }
+
+    while (!local_state._shared_state->input_eos ||
+           local_state._output_block_index < 
local_state._shared_state->input_blocks.size()) {
+        {
+            SCOPED_TIMER(local_state._evaluation_timer);
+            local_state._shared_state->found_partition_end =
+                    local_state._dependency->get_partition_by_end();
+        }
+        local_state._shared_state->need_more_input =
+                local_state._dependency->whether_need_next_partition(
+                        local_state._shared_state->found_partition_end);
+        if (local_state._shared_state->need_more_input) {
+            return Status::OK();
+        }
+        local_state._next_partition =
+                
local_state.init_next_partition(local_state._shared_state->found_partition_end);
+        local_state.init_result_columns();
+        size_t current_block_rows =
+                
local_state._shared_state->input_blocks[local_state._output_block_index].rows();
+        local_state._executor.get_next(current_block_rows);
+        if (local_state._window_end_position == current_block_rows) {
+            break;
+        }
+    }
+    RETURN_IF_ERROR(local_state.output_current_block(block));
+    
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
block,
+                                                           block->columns()));
+    local_state.reached_limit(block, source_state);
+    return Status::OK();
+}
+
+Status AnalyticSourceOperatorX::setup_local_state(RuntimeState* state, 
LocalStateInfo& info) {
+    auto local_state = AnalyticLocalState::create_shared(state, this);
+    state->emplace_local_state(id(), local_state);
+    return local_state->init(state, info);
+}
+
+bool AnalyticSourceOperatorX::can_read(RuntimeState* state) {
+    auto& local_state = 
state->get_local_state(id())->cast<AnalyticLocalState>();
+    if (local_state._shared_state->need_more_input) {
+        return false;
+    }
+    return true;
+}
+
+Status AnalyticSourceOperatorX::close(RuntimeState* state) {
+    auto& local_state = 
state->get_local_state(id())->cast<AnalyticLocalState>();
+    for (auto* agg_function : local_state._agg_functions) {
+        agg_function->close(state);
+    }
+
+    local_state._destroy_agg_status();
+    local_state.release_mem();
+    return OperatorXBase::close(state);
+}
+
+Status AnalyticSourceOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(OperatorXBase::prepare(state));
+    DCHECK(_child_x->row_desc().is_prefix_of(_row_descriptor));
+    _intermediate_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
+    _output_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
+    for (size_t i = 0; i < _agg_functions.size(); ++i) {
+        SlotDescriptor* intermediate_slot_desc = 
_intermediate_tuple_desc->slots()[i];
+        SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[i];
+        RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child_x->row_desc(),
+                                                   intermediate_slot_desc, 
output_slot_desc));
+        _change_to_nullable_flags.push_back(output_slot_desc->is_nullable() &&
+                                            
!_agg_functions[i]->data_type()->is_nullable());
+    }
+
+    _offsets_of_aggregate_states.resize(_agg_functions.size());
+    for (size_t i = 0; i < _agg_functions.size(); ++i) {
+        _offsets_of_aggregate_states[i] = _total_size_of_aggregate_states;
+        const auto& agg_function = _agg_functions[i]->function();
+        // aggregate states are aligned based on maximum requirement
+        _align_aggregate_states = std::max(_align_aggregate_states, 
agg_function->align_of_data());
+        _total_size_of_aggregate_states += agg_function->size_of_data();
+        // If not the last aggregate_state, we need pad it so that next 
aggregate_state will be aligned.
+        if (i + 1 < _agg_functions.size()) {
+            size_t alignment_of_next_state = _agg_functions[i + 
1]->function()->align_of_data();
+            if ((alignment_of_next_state & (alignment_of_next_state - 1)) != 
0) {
+                return Status::RuntimeError("Logical error: align_of_data is 
not 2^N");
+            }
+            /// Extend total_size to next alignment requirement
+            /// Add padding by rounding up 'total_size_of_aggregate_states' to 
be a multiplier of alignment_of_next_state.
+            _total_size_of_aggregate_states =
+                    (_total_size_of_aggregate_states + alignment_of_next_state 
- 1) /
+                    alignment_of_next_state * alignment_of_next_state;
+        }
+    }
+    return Status::OK();
+}
+
+Status AnalyticSourceOperatorX::open(RuntimeState* state) {
+    RETURN_IF_ERROR(OperatorXBase::open(state));
+    for (auto* agg_function : _agg_functions) {
+        RETURN_IF_ERROR(agg_function->open(state));
+    }
+    return Status::OK();
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/analytic_source_operator.h 
b/be/src/pipeline/exec/analytic_source_operator.h
index 60f1a86158..21045a56ea 100644
--- a/be/src/pipeline/exec/analytic_source_operator.h
+++ b/be/src/pipeline/exec/analytic_source_operator.h
@@ -45,5 +45,123 @@ public:
     Status open(RuntimeState*) override { return Status::OK(); }
 };
 
+class AnalyticSourceOperatorX;
+class AnalyticLocalState final : public PipelineXLocalState {
+public:
+    ENABLE_FACTORY_CREATOR(AnalyticLocalState);
+    AnalyticLocalState(RuntimeState* state, OperatorXBase* parent);
+
+    Status init(RuntimeState* state, LocalStateInfo& info) override;
+
+    Status init_result_columns();
+
+    Status output_current_block(vectorized::Block* block);
+
+    void release_mem();
+
+    bool init_next_partition(vectorized::BlockRowPos found_partition_end);
+
+private:
+    Status _get_next_for_rows(size_t rows);
+    Status _get_next_for_range(size_t rows);
+    Status _get_next_for_partition(size_t rows);
+
+    void _execute_for_win_func(int64_t partition_start, int64_t partition_end, 
int64_t frame_start,
+                               int64_t frame_end);
+    void _insert_result_info(int64_t current_block_rows);
+
+    void _update_order_by_range();
+
+    Status _reset_agg_status();
+    Status _create_agg_status();
+    Status _destroy_agg_status();
+
+    friend class AnalyticSourceOperatorX;
+
+    AnalyticDependency* _dependency;
+    AnalyticSharedState* _shared_state;
+
+    int64_t _output_block_index;
+    int64_t _window_end_position;
+    bool _next_partition;
+    std::vector<vectorized::MutableColumnPtr> _result_window_columns;
+
+    int64_t _rows_start_offset;
+    int64_t _rows_end_offset;
+    vectorized::AggregateDataPtr _fn_place_ptr;
+    size_t _agg_functions_size;
+    bool _agg_functions_created;
+    vectorized::BlockRowPos _order_by_start;
+    vectorized::BlockRowPos _order_by_end;
+    vectorized::BlockRowPos _partition_by_start;
+    std::unique_ptr<vectorized::Arena> _agg_arena_pool;
+    std::vector<vectorized::AggFnEvaluator*> _agg_functions;
+
+    RuntimeProfile::Counter* _memory_usage_counter;
+    RuntimeProfile::Counter* _evaluation_timer;
+    RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage;
+
+    using vectorized_execute = std::function<void(int64_t peer_group_start, 
int64_t peer_group_end,
+                                                  int64_t frame_start, int64_t 
frame_end)>;
+    using vectorized_get_next = std::function<Status(size_t rows)>;
+    using vectorized_get_result = std::function<void(int64_t 
current_block_rows)>;
+
+    struct executor {
+        vectorized_execute execute;
+        vectorized_get_next get_next;
+        vectorized_get_result insert_result;
+    };
+
+    executor _executor;
+};
+
+class AnalyticSourceOperatorX final : public OperatorXBase {
+public:
+    AnalyticSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs,
+                            std::string op_name);
+    bool can_read(RuntimeState* state) override;
+
+    Status setup_local_state(RuntimeState* state, LocalStateInfo& info) 
override;
+
+    Status get_block(RuntimeState* state, vectorized::Block* block,
+                     SourceState& source_state) override;
+
+    Status close(RuntimeState* state) override;
+    bool is_source() const override { return true; }
+
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
+    Status open(RuntimeState* state) override;
+
+private:
+    friend class AnalyticLocalState;
+
+    TAnalyticWindow _window;
+
+    TupleId _intermediate_tuple_id;
+    TupleId _output_tuple_id;
+
+    bool _has_window;
+    bool _has_range_window;
+    bool _has_window_start;
+    bool _has_window_end;
+
+    std::vector<vectorized::AggFnEvaluator*> _agg_functions;
+
+    vectorized::AnalyticFnScope _fn_scope;
+
+    TupleDescriptor* _intermediate_tuple_desc;
+    TupleDescriptor* _output_tuple_desc;
+
+    /// The offset of the n-th functions.
+    std::vector<size_t> _offsets_of_aggregate_states;
+    /// The total size of the row from the functions.
+    size_t _total_size_of_aggregate_states = 0;
+    /// The max align size for functions
+    size_t _align_aggregate_states = 1;
+
+    std::vector<bool> _change_to_nullable_flags;
+};
+
 } // namespace pipeline
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index f58b9406bf..d3afe827c8 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -102,6 +102,7 @@ bool ExchangeSinkLocalState::transfer_large_data_by_brpc() 
const {
 }
 
 Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& 
info) {
+    RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info));
     _sender_id = info.sender_id;
     _broadcast_pb_blocks.resize(config::num_broadcast_buffer);
     _broadcast_pb_block_idx = 0;
@@ -128,7 +129,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
     std::string title = "VDataStreamSender (dst_id={}, dst_fragments=[{}])";
     _profile = p._pool->add(new RuntimeProfile(title));
     SCOPED_TIMER(_profile->total_time_counter());
-    _mem_tracker = std::make_unique<MemTracker>("ExchangeSinkLocalState:");
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
     int local_size = 0;
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index b240521913..efb716f7ac 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -29,7 +29,10 @@ class RuntimeState;
 namespace doris::pipeline {
 
 OperatorBase::OperatorBase(OperatorBuilderBase* operator_builder)
-        : _operator_builder(operator_builder), _is_closed(false) {}
+        : _operator_builder(operator_builder),
+          _child(nullptr),
+          _child_x(nullptr),
+          _is_closed(false) {}
 
 bool OperatorBase::is_sink() const {
     return _operator_builder->is_sink();
@@ -59,6 +62,11 @@ std::string OperatorBase::debug_string() const {
     return ss.str();
 }
 
+Status PipelineXSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& 
info) {
+    _mem_tracker = std::make_unique<MemTracker>(_parent->get_name());
+    return Status::OK();
+}
+
 std::string OperatorXBase::debug_string() const {
     std::stringstream ss;
     ss << _op_name << ": is_source: " << is_source();
@@ -96,8 +104,8 @@ Status OperatorXBase::prepare(RuntimeState* state) {
 
     RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, 
intermediate_row_desc()));
 
-    if (_children) {
-        RETURN_IF_ERROR(_children->prepare(state));
+    if (_child_x && !is_source()) {
+        RETURN_IF_ERROR(_child_x->prepare(state));
     }
 
     return Status::OK();
@@ -124,7 +132,9 @@ Status OperatorXBase::close(RuntimeState* state) {
     }
     auto local_state = state->get_local_state(id());
     Status result;
-    _children->close(state);
+    if (_child_x && !is_source()) {
+        _child_x->close(state);
+    }
     if (local_state->_rows_returned_counter != nullptr) {
         COUNTER_SET(local_state->_rows_returned_counter, 
local_state->_num_rows_returned);
     }
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index a03297bcf2..6daf9407e7 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -203,9 +203,6 @@ public:
     }
 
     Status set_child(OperatorXPtr child) {
-        if (is_source()) {
-            return Status::OK();
-        }
         _child_x = std::move(child);
         return Status::OK();
     }
@@ -270,7 +267,7 @@ protected:
     // Used on pipeline X
     OperatorXPtr _child_x;
 
-    bool _is_closed = false;
+    bool _is_closed;
 };
 
 /**
@@ -577,7 +574,6 @@ public:
               _type(tnode.node_type),
               _pool(pool),
               _tuple_ids(tnode.row_tuples),
-              _children(nullptr),
               _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
               _resource_profile(tnode.resource_profile),
               _limit(tnode.limit),
@@ -683,7 +679,6 @@ protected:
 
     vectorized::VExprContextSPtrs _conjuncts;
 
-    OperatorXBase* _children;
     RowDescriptor _row_descriptor;
 
     std::unique_ptr<RowDescriptor> _output_row_descriptor;
@@ -709,7 +704,7 @@ public:
             : _parent(parent_), _state(state_) {}
     virtual ~PipelineXSinkLocalState() {}
 
-    virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) { 
return Status::OK(); }
+    virtual Status init(RuntimeState* state, LocalSinkStateInfo& info);
     template <class TARGET>
     TARGET& cast() {
         DCHECK(dynamic_cast<TARGET*>(this));
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp 
b/be/src/pipeline/exec/sort_sink_operator.cpp
index 182ce7bd71..af655b896c 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -83,7 +83,9 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, const 
TPlanNode& tnode,
           _limit(tnode.limit),
           _use_topn_opt(tnode.sort_node.use_topn_opt),
           _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
-          _use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read) {}
+          _use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read) {
+    _name = "SortSinkOperatorX";
+}
 
 Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.sort_node.sort_info, _pool));
@@ -134,6 +136,7 @@ Status SortSinkOperatorX::prepare(RuntimeState* state) {
     } else {
         _algorithm = SortAlgorithm::FULL_SORT;
     }
+    _profile = state->obj_pool()->add(new RuntimeProfile("SortSinkOperatorX"));
     return _vsort_exec_exprs.prepare(state, _child_x->row_desc(), 
_row_descriptor);
 }
 
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp 
b/be/src/pipeline/pipeline_x/dependency.cpp
index 238295e63b..49262ce0e5 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -99,4 +99,122 @@ void AggDependency::release_tracker() {
     mem_tracker()->release(_mem_usage_record.used_in_state + 
_mem_usage_record.used_in_arena);
 }
 
+vectorized::BlockRowPos AnalyticDependency::get_partition_by_end() {
+    if (_analytic_state.current_row_position <
+        _analytic_state.partition_by_end.pos) { //still have data, return 
partition_by_end directly
+        return _analytic_state.partition_by_end;
+    }
+
+    if (_analytic_state.partition_by_eq_expr_ctxs.empty() ||
+        (_analytic_state.input_total_rows == 0)) { //no partition_by, the all 
block is end
+        return _analytic_state.all_block_end;
+    }
+
+    vectorized::BlockRowPos cal_end = _analytic_state.all_block_end;
+    for (size_t i = 0; i < _analytic_state.partition_by_eq_expr_ctxs.size();
+         ++i) { //have partition_by, binary search the partiton end
+        cal_end = 
compare_row_to_find_end(_analytic_state.partition_by_column_idxs[i],
+                                          _analytic_state.partition_by_end, 
cal_end);
+    }
+    cal_end.pos =
+            _analytic_state.input_block_first_row_positions[cal_end.block_num] 
+ cal_end.row_num;
+    return cal_end;
+}
+
+//_partition_by_columns,_order_by_columns save in blocks, so if need to 
calculate the boundary, may find in which blocks firstly
+vectorized::BlockRowPos AnalyticDependency::compare_row_to_find_end(int idx,
+                                                                    
vectorized::BlockRowPos start,
+                                                                    
vectorized::BlockRowPos end,
+                                                                    bool 
need_check_first) {
+    int64_t start_init_row_num = start.row_num;
+    vectorized::ColumnPtr start_column =
+            
_analytic_state.input_blocks[start.block_num].get_by_position(idx).column;
+    vectorized::ColumnPtr start_next_block_column = start_column;
+
+    DCHECK_LE(start.block_num, end.block_num);
+    DCHECK_LE(start.block_num, _analytic_state.input_blocks.size() - 1);
+    int64_t start_block_num = start.block_num;
+    int64_t end_block_num = end.block_num;
+    int64_t mid_blcok_num = end.block_num;
+    // To fix this problem: https://github.com/apache/doris/issues/15951
+    // in this case, the partition by column is last row of block, so it's 
pointed to a new block at row = 0, range is: [left, right)
+    // From the perspective of order by column, the two values are exactly 
equal.
+    // so the range will be get wrong because it's compare_at == 0 with next 
block at row = 0
+    if (need_check_first && end.block_num > 0 && end.row_num == 0) {
+        end.block_num--;
+        end_block_num--;
+        end.row_num = _analytic_state.input_blocks[end_block_num].rows();
+    }
+    //binary search find in which block
+    while (start_block_num < end_block_num) {
+        mid_blcok_num = (start_block_num + end_block_num + 1) >> 1;
+        start_next_block_column =
+                
_analytic_state.input_blocks[mid_blcok_num].get_by_position(idx).column;
+        //Compares (*this)[n] and rhs[m], this: start[init_row]  rhs: mid[0]
+        if (start_column->compare_at(start_init_row_num, 0, 
*start_next_block_column, 1) == 0) {
+            start_block_num = mid_blcok_num;
+        } else {
+            end_block_num = mid_blcok_num - 1;
+        }
+    }
+
+    // have check the start.block_num:  start_column[start_init_row_num] with 
mid_blcok_num start_next_block_column[0]
+    // now next block must not be result, so need check with end_block_num: 
start_next_block_column[last_row]
+    if (end_block_num == mid_blcok_num - 1) {
+        start_next_block_column =
+                
_analytic_state.input_blocks[end_block_num].get_by_position(idx).column;
+        int64_t block_size = 
_analytic_state.input_blocks[end_block_num].rows();
+        if ((start_column->compare_at(start_init_row_num, block_size - 1, 
*start_next_block_column,
+                                      1) == 0)) {
+            start.block_num = end_block_num + 1;
+            start.row_num = 0;
+            return start;
+        }
+    }
+
+    //check whether need get column again, maybe same as first init
+    // if the start_block_num have move to forword, so need update start block 
num and compare it from row_num=0
+    if (start_block_num != start.block_num) {
+        start_init_row_num = 0;
+        start.block_num = start_block_num;
+        start_column = 
_analytic_state.input_blocks[start.block_num].get_by_position(idx).column;
+    }
+    //binary search, set start and end pos
+    int64_t start_pos = start_init_row_num;
+    int64_t end_pos = _analytic_state.input_blocks[start.block_num].rows() - 1;
+    //if end_block_num haven't moved, only start_block_num go to the end block
+    //so could use the end.row_num for binary search
+    if (start.block_num == end.block_num) {
+        end_pos = end.row_num;
+    }
+    while (start_pos < end_pos) {
+        int64_t mid_pos = (start_pos + end_pos) >> 1;
+        if (start_column->compare_at(start_init_row_num, mid_pos, 
*start_column, 1)) {
+            end_pos = mid_pos;
+        } else {
+            start_pos = mid_pos + 1;
+        }
+    }
+    start.row_num = start_pos; //update row num, return the find end
+    return start;
+}
+
+bool AnalyticDependency::whether_need_next_partition(vectorized::BlockRowPos 
found_partition_end) {
+    if (_analytic_state.input_eos ||
+        (_analytic_state.current_row_position <
+         _analytic_state.partition_by_end.pos)) { //now still have partition 
data
+        return false;
+    }
+    if ((_analytic_state.partition_by_eq_expr_ctxs.empty() && 
!_analytic_state.input_eos) ||
+        (found_partition_end.pos == 0)) { //no partition, get until fetch to 
EOS
+        return true;
+    }
+    if (!_analytic_state.partition_by_eq_expr_ctxs.empty() &&
+        found_partition_end.pos == _analytic_state.all_block_end.pos &&
+        !_analytic_state.input_eos) { //current partition data calculate done
+        return true;
+    }
+    return false;
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 478623637e..152d0e69c1 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -20,6 +20,7 @@
 #include "pipeline/exec/data_queue.h"
 #include "vec/common/sort/sorter.h"
 #include "vec/exec/vaggregation_node.h"
+#include "vec/exec/vanalytic_eval_node.h"
 
 namespace doris {
 namespace pipeline {
@@ -141,5 +142,46 @@ private:
     SortSharedState _sort_state;
 };
 
+struct AnalyticSharedState {
+public:
+    AnalyticSharedState() = default;
+
+    int64_t current_row_position = 0;
+    vectorized::BlockRowPos partition_by_end;
+    vectorized::VExprContextSPtrs partition_by_eq_expr_ctxs;
+    int64_t input_total_rows = 0;
+    vectorized::BlockRowPos all_block_end;
+    std::vector<vectorized::Block> input_blocks;
+    bool input_eos = false;
+    std::atomic_bool need_more_input = true;
+    vectorized::BlockRowPos found_partition_end;
+    std::vector<int64_t> origin_cols;
+    vectorized::VExprContextSPtrs order_by_eq_expr_ctxs;
+    std::vector<int64_t> input_block_first_row_positions;
+    std::vector<std::vector<vectorized::MutableColumnPtr>> agg_input_columns;
+
+    // TODO: maybe global?
+    std::vector<int64_t> partition_by_column_idxs;
+    std::vector<int64_t> ordey_by_column_idxs;
+};
+
+class AnalyticDependency final : public Dependency {
+public:
+    AnalyticDependency(int id) : Dependency(id) {}
+    ~AnalyticDependency() override = default;
+
+    void* shared_state() override { return (void*)&_analytic_state; };
+
+    vectorized::BlockRowPos get_partition_by_end();
+
+    bool whether_need_next_partition(vectorized::BlockRowPos 
found_partition_end);
+    vectorized::BlockRowPos compare_row_to_find_end(int idx, 
vectorized::BlockRowPos start,
+                                                    vectorized::BlockRowPos 
end,
+                                                    bool need_check_first = 
false);
+
+private:
+    AnalyticSharedState _analytic_state;
+};
+
 } // namespace pipeline
 } // namespace doris
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 13e89e8766..7e97696a62 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -42,6 +42,8 @@
 #include "io/fs/stream_load_pipe.h"
 #include "pipeline/exec/aggregation_sink_operator.h"
 #include "pipeline/exec/aggregation_source_operator.h"
+#include "pipeline/exec/analytic_sink_operator.h"
+#include "pipeline/exec/analytic_source_operator.h"
 #include "pipeline/exec/data_queue.h"
 #include "pipeline/exec/datagen_operator.h"
 #include "pipeline/exec/exchange_sink_operator.h"
@@ -471,7 +473,7 @@ Status 
PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool,
 
     RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
 
-    if (op->get_child()) {
+    if (op->get_child() && !op->is_source()) {
         
op->get_runtime_profile()->add_child(op->get_child()->get_runtime_profile(), 
true, nullptr);
     }
 
@@ -550,6 +552,23 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
         break;
     }
+    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
+        op.reset(new AnalyticSourceOperatorX(pool, tnode, descs, 
"AnalyticSourceXOperator"));
+        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+
+        const auto downstream_pipeline_id = cur_pipe->id();
+        if (_dag.find(downstream_pipeline_id) == _dag.end()) {
+            _dag.insert({downstream_pipeline_id, {}});
+        }
+        cur_pipe = add_pipeline();
+        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
+
+        DataSinkOperatorXPtr sink;
+        sink.reset(new AnalyticSinkOperatorX(pool, tnode, descs));
+        RETURN_IF_ERROR(cur_pipe->set_sink(sink));
+        RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
+        break;
+    }
     default:
         return Status::InternalError("Unsupported exec type in pipelineX: {}",
                                      print_plan_node_type(tnode.node_type));
diff --git a/be/src/vec/exec/vanalytic_eval_node.h 
b/be/src/vec/exec/vanalytic_eval_node.h
index d232c28ff9..9e3c24fb81 100644
--- a/be/src/vec/exec/vanalytic_eval_node.h
+++ b/be/src/vec/exec/vanalytic_eval_node.h
@@ -67,6 +67,8 @@ struct BlockRowPos {
 
 class AggFnEvaluator;
 
+enum AnalyticFnScope { PARTITION, RANGE, ROWS };
+
 class VAnalyticEvalNode : public ExecNode {
 public:
     ~VAnalyticEvalNode() override = default;
@@ -134,7 +136,6 @@ private:
     void _release_mem();
 
 private:
-    enum AnalyticFnScope { PARTITION, RANGE, ROWS };
     std::vector<Block> _input_blocks;
     std::vector<int64_t> input_block_first_row_positions;
     std::vector<AggFnEvaluator*> _agg_functions;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to