This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 65e277e365726f54d937b7b69d7abe4eff69fbec
Author: zhangstar333 <[email protected]>
AuthorDate: Thu Feb 1 18:58:02 2024 +0800

    [refacotr](node) refactor partition sort node to improve readability 
(#30511)
    
    * [refacotr](node) refactor partition sort node to improve readability
    
    * update
---
 .../pipeline/exec/partition_sort_sink_operator.cpp |  37 ++++---
 .../pipeline/exec/partition_sort_sink_operator.h   |  10 +-
 .../exec/partition_sort_source_operator.cpp        |   2 -
 be/src/pipeline/pipeline_x/dependency.h            |   1 -
 be/src/vec/common/sort/partition_sorter.cpp        |   8 ++
 be/src/vec/common/sort/partition_sorter.h          |   1 +
 be/src/vec/exec/vpartition_sort_node.cpp           | 113 +++++++++++++--------
 be/src/vec/exec/vpartition_sort_node.h             |  57 +++--------
 8 files changed, 117 insertions(+), 112 deletions(-)

diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp 
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 1fe25ea1910..6cdfca18d8b 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -47,7 +47,7 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo
     _partition_sort_info = std::make_shared<vectorized::PartitionSortInfo>(
             &_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order, 
p._nulls_first,
             p._child_x->row_desc(), state, _profile, p._has_global_limit, 
p._partition_inner_limit,
-            p._top_n_algorithm, _shared_state->previous_row.get(), 
p._topn_phase);
+            p._top_n_algorithm, p._topn_phase);
     _init_hash_method();
     return Status::OK();
 }
@@ -122,8 +122,8 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                     local_state._dependency->set_ready_to_read();
                 }
             } else {
-                RETURN_IF_ERROR(
-                        _split_block_by_partition(input_block, 
state->batch_size(), local_state));
+                RETURN_IF_ERROR(_split_block_by_partition(input_block, 
local_state,
+                                                          source_state == 
SourceState::FINISHED));
                 RETURN_IF_CANCELLED(state);
                 input_block->clear_column_data();
             }
@@ -135,19 +135,16 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
         local_state._agg_arena_pool.reset(nullptr);
         local_state._partitioned_data.reset(nullptr);
         for (int i = 0; i < local_state._value_places.size(); ++i) {
-            auto sorter = vectorized::PartitionSorter::create_unique(
-                    _vsort_exec_exprs, _limit, 0, _pool, _is_asc_order, 
_nulls_first,
-                    _child_x->row_desc(), state, i == 0 ? local_state._profile 
: nullptr,
-                    _has_global_limit, _partition_inner_limit, 
_top_n_algorithm,
-                    local_state._shared_state->previous_row.get());
+            local_state._value_places[i]->create_or_reset_sorter_state();
+            auto sorter = 
std::move(local_state._value_places[i]->_partition_topn_sorter);
 
             DCHECK(_child_x->row_desc().num_materialized_slots() ==
-                   local_state._value_places[i]->blocks.back()->columns());
+                   local_state._value_places[i]->_blocks.back()->columns());
             //get blocks from every partition, and sorter get those data.
-            for (const auto& block : local_state._value_places[i]->blocks) {
+            for (const auto& block : local_state._value_places[i]->_blocks) {
                 RETURN_IF_ERROR(sorter->append_block(block.get()));
             }
-            sorter->init_profile(local_state._profile);
+            local_state._value_places[i]->_blocks.clear();
             RETURN_IF_ERROR(sorter->prepare_for_read());
             
local_state._shared_state->partition_sorts.push_back(std::move(sorter));
         }
@@ -165,7 +162,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
 }
 
 Status PartitionSortSinkOperatorX::_split_block_by_partition(
-        vectorized::Block* input_block, int batch_size, 
PartitionSortSinkLocalState& local_state) {
+        vectorized::Block* input_block, PartitionSortSinkLocalState& 
local_state, bool eos) {
     for (int i = 0; i < _partition_exprs_num; ++i) {
         int result_column_id = -1;
         RETURN_IF_ERROR(_partition_expr_ctxs[i]->execute(input_block, 
&result_column_id));
@@ -173,15 +170,16 @@ Status 
PartitionSortSinkOperatorX::_split_block_by_partition(
         local_state._partition_columns[i] =
                 input_block->get_by_position(result_column_id).column.get();
     }
-    _emplace_into_hash_table(local_state._partition_columns, input_block, 
batch_size, local_state);
+    RETURN_IF_ERROR(_emplace_into_hash_table(local_state._partition_columns, 
input_block,
+                                             local_state, eos));
     return Status::OK();
 }
 
-void PartitionSortSinkOperatorX::_emplace_into_hash_table(
+Status PartitionSortSinkOperatorX::_emplace_into_hash_table(
         const vectorized::ColumnRawPtrs& key_columns, const vectorized::Block* 
input_block,
-        int batch_size, PartitionSortSinkLocalState& local_state) {
-    std::visit(
-            [&](auto&& agg_method) -> void {
+        PartitionSortSinkLocalState& local_state, bool eos) {
+    return std::visit(
+            [&](auto&& agg_method) -> Status {
                 SCOPED_TIMER(local_state._build_timer);
                 using HashMethodType = std::decay_t<decltype(agg_method)>;
                 using AggState = typename HashMethodType::State;
@@ -213,10 +211,9 @@ void PartitionSortSinkOperatorX::_emplace_into_hash_table(
                 }
                 for (auto* place : local_state._value_places) {
                     SCOPED_TIMER(local_state._selector_block_timer);
-                    place->append_block_by_selector(input_block, 
_child_x->row_desc(),
-                                                    _has_global_limit, 
_partition_inner_limit,
-                                                    batch_size);
+                    
RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos));
                 }
+                return Status::OK();
             },
             local_state._partitioned_data->method_variant);
 }
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h 
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index 21145189fc7..7cb881e39ca 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -131,11 +131,11 @@ private:
     bool _has_global_limit = false;
     int64_t _partition_inner_limit = 0;
 
-    Status _split_block_by_partition(vectorized::Block* input_block, int 
batch_size,
-                                     PartitionSortSinkLocalState& local_state);
-    void _emplace_into_hash_table(const vectorized::ColumnRawPtrs& key_columns,
-                                  const vectorized::Block* input_block, int 
batch_size,
-                                  PartitionSortSinkLocalState& local_state);
+    Status _split_block_by_partition(vectorized::Block* input_block,
+                                     PartitionSortSinkLocalState& local_state, 
bool eos);
+    Status _emplace_into_hash_table(const vectorized::ColumnRawPtrs& 
key_columns,
+                                    const vectorized::Block* input_block,
+                                    PartitionSortSinkLocalState& local_state, 
bool eos);
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp 
b/be/src/pipeline/exec/partition_sort_source_operator.cpp
index 83da2f84112..c66ad2fc30a 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp
@@ -34,7 +34,6 @@ Status PartitionSortSourceLocalState::init(RuntimeState* 
state, LocalStateInfo&
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     _get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime");
-    _shared_state->previous_row = 
std::make_unique<vectorized::SortCursorCmp>();
     return Status::OK();
 }
 
@@ -95,7 +94,6 @@ Status 
PartitionSortSourceOperatorX::get_sorted_block(RuntimeState* state,
     }
     if (current_eos) {
         //current sort have eos, so get next idx
-        local_state._shared_state->previous_row->reset();
         auto rows = 
local_state._shared_state->partition_sorts[local_state._sort_idx]
                             ->get_output_rows();
         local_state._num_rows_returned += rows;
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 5ae3480cd4d..11a9cbf39d5 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -479,7 +479,6 @@ public:
     std::queue<vectorized::Block> blocks_buffer;
     std::mutex buffer_mutex;
     std::vector<std::unique_ptr<vectorized::PartitionSorter>> partition_sorts;
-    std::unique_ptr<vectorized::SortCursorCmp> previous_row;
     bool sink_eos = false;
     std::mutex sink_eos_lock;
 };
diff --git a/be/src/vec/common/sort/partition_sorter.cpp 
b/be/src/vec/common/sort/partition_sorter.cpp
index a3954aa3f55..a03646a7e9a 100644
--- a/be/src/vec/common/sort/partition_sorter.cpp
+++ b/be/src/vec/common/sort/partition_sorter.cpp
@@ -75,6 +75,14 @@ Status PartitionSorter::prepare_for_read() {
     return Status::OK();
 }
 
+// have done sorter and get topn records, so could reset those state to init
+void PartitionSorter::reset_sorter_state(RuntimeState* runtime_state) {
+    std::priority_queue<MergeSortBlockCursor> empty_queue;
+    std::swap(_block_priority_queue, empty_queue);
+    _state = MergeSorterState::create_unique(_row_desc, _offset, _limit, 
runtime_state, nullptr);
+    _previous_row->reset();
+}
+
 Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) 
{
     if (_state->get_sorted_block().empty()) {
         *eos = true;
diff --git a/be/src/vec/common/sort/partition_sorter.h 
b/be/src/vec/common/sort/partition_sorter.h
index afe95e6208f..ab4b4dac775 100644
--- a/be/src/vec/common/sort/partition_sorter.h
+++ b/be/src/vec/common/sort/partition_sorter.h
@@ -94,6 +94,7 @@ public:
 
     Status partition_sort_read(Block* block, bool* eos, int batch_size);
     int64 get_output_rows() const { return _output_total_rows; }
+    void reset_sorter_state(RuntimeState* runtime_state);
 
 private:
     std::unique_ptr<MergeSorterState> _state;
diff --git a/be/src/vec/exec/vpartition_sort_node.cpp 
b/be/src/vec/exec/vpartition_sort_node.cpp
index 694aa99a6cd..772156b9880 100644
--- a/be/src/vec/exec/vpartition_sort_node.cpp
+++ b/be/src/vec/exec/vpartition_sort_node.cpp
@@ -28,6 +28,7 @@
 
 #include "common/logging.h"
 #include "common/object_pool.h"
+#include "common/status.h"
 #include "runtime/runtime_state.h"
 #include "vec/common/hash_table/hash.h"
 #include "vec/common/hash_table/hash_map_context_creator.h"
@@ -38,23 +39,59 @@
 #include "vec/exprs/vexpr_context.h"
 
 namespace doris::vectorized {
-Status PartitionBlocks::do_partition_topn_sort() {
+Status PartitionBlocks::append_block_by_selector(const vectorized::Block* 
input_block, bool eos) {
+    if (_blocks.empty() || reach_limit()) {
+        _init_rows = _partition_sort_info->_runtime_state->batch_size();
+        _blocks.push_back(Block::create_unique(
+                
VectorizedUtils::create_empty_block(_partition_sort_info->_row_desc)));
+    }
+    auto columns = input_block->get_columns();
+    auto mutable_columns = _blocks.back()->mutate_columns();
+    DCHECK(columns.size() == mutable_columns.size());
+    for (int i = 0; i < mutable_columns.size(); ++i) {
+        columns[i]->append_data_by_selector(mutable_columns[i], _selector);
+    }
+    _blocks.back()->set_columns(std::move(mutable_columns));
+    auto selector_rows = _selector.size();
+    _init_rows = _init_rows - selector_rows;
+    _total_rows = _total_rows + selector_rows;
+    _current_input_rows = _current_input_rows + selector_rows;
+    _selector.clear();
+    // maybe better could change by user PARTITION_SORT_ROWS_THRESHOLD
+    if (!eos && _partition_sort_info->_partition_inner_limit != -1 &&
+        _current_input_rows >= PARTITION_SORT_ROWS_THRESHOLD &&
+        _partition_sort_info->_topn_phase != TPartTopNPhase::TWO_PHASE_GLOBAL) 
{
+        create_or_reset_sorter_state();
+        RETURN_IF_ERROR(do_partition_topn_sort());
+        _current_input_rows = 0; // reset record
+        _do_partition_topn_count++;
+    }
+    return Status::OK();
+}
+
+void PartitionBlocks::create_or_reset_sorter_state() {
     if (_partition_topn_sorter == nullptr) {
+        _previous_row = std::make_unique<SortCursorCmp>();
         _partition_topn_sorter = PartitionSorter::create_unique(
                 *_partition_sort_info->_vsort_exec_exprs, 
_partition_sort_info->_limit,
                 _partition_sort_info->_offset, _partition_sort_info->_pool,
                 _partition_sort_info->_is_asc_order, 
_partition_sort_info->_nulls_first,
-                _partition_sort_info->_row_desc, 
_partition_sort_info->_runtime_state, nullptr,
+                _partition_sort_info->_row_desc, 
_partition_sort_info->_runtime_state,
+                _is_first_sorter ? _partition_sort_info->_runtime_profile : 
nullptr,
                 _partition_sort_info->_has_global_limit,
                 _partition_sort_info->_partition_inner_limit,
-                _partition_sort_info->_top_n_algorithm, 
_partition_sort_info->_previous_row);
+                _partition_sort_info->_top_n_algorithm, _previous_row.get());
+        
_partition_topn_sorter->init_profile(_partition_sort_info->_runtime_profile);
+    } else {
+        
_partition_topn_sorter->reset_sorter_state(_partition_sort_info->_runtime_state);
     }
+}
 
-    for (const auto& block : blocks) {
+Status PartitionBlocks::do_partition_topn_sort() {
+    for (const auto& block : _blocks) {
         RETURN_IF_ERROR(_partition_topn_sorter->append_block(block.get()));
     }
-    blocks.clear();
-    
_partition_topn_sorter->init_profile(_partition_sort_info->_runtime_profile);
+    _blocks.clear();
     RETURN_IF_ERROR(_partition_topn_sorter->prepare_for_read());
     bool current_eos = false;
     size_t current_output_rows = 0;
@@ -67,14 +104,11 @@ Status PartitionBlocks::do_partition_topn_sort() {
         auto rows = output_block->rows();
         if (rows > 0) {
             current_output_rows += rows;
-            blocks.emplace_back(std::move(output_block));
+            _blocks.emplace_back(std::move(output_block));
         }
     }
 
     _topn_filter_rows += (_current_input_rows - current_output_rows);
-    _partition_sort_info->_previous_row->reset();
-    _partition_topn_sorter.reset(nullptr);
-
     return Status::OK();
 }
 
@@ -83,7 +117,6 @@ VPartitionSortNode::VPartitionSortNode(ObjectPool* pool, 
const TPlanNode& tnode,
         : ExecNode(pool, tnode, descs), _hash_table_size_counter(nullptr) {
     _partitioned_data = std::make_unique<PartitionedHashMapVariants>();
     _agg_arena_pool = std::make_unique<Arena>();
-    _previous_row = std::make_unique<SortCursorCmp>();
 }
 
 Status VPartitionSortNode::init(const TPlanNode& tnode, RuntimeState* state) {
@@ -130,27 +163,26 @@ Status VPartitionSortNode::prepare(RuntimeState* state) {
     _partition_sort_info = std::make_shared<PartitionSortInfo>(
             &_vsort_exec_exprs, _limit, 0, _pool, _is_asc_order, _nulls_first, 
child(0)->row_desc(),
             state, _runtime_profile.get(), _has_global_limit, 
_partition_inner_limit,
-            _top_n_algorithm, _previous_row.get(), _topn_phase);
+            _top_n_algorithm, _topn_phase);
     return Status::OK();
 }
 
-Status VPartitionSortNode::_split_block_by_partition(vectorized::Block* 
input_block,
-                                                     int batch_size) {
+Status VPartitionSortNode::_split_block_by_partition(vectorized::Block* 
input_block, bool eos) {
     for (int i = 0; i < _partition_exprs_num; ++i) {
         int result_column_id = -1;
         RETURN_IF_ERROR(_partition_expr_ctxs[i]->execute(input_block, 
&result_column_id));
         DCHECK(result_column_id != -1);
         _partition_columns[i] = 
input_block->get_by_position(result_column_id).column.get();
     }
-    _emplace_into_hash_table(_partition_columns, input_block, batch_size);
+    RETURN_IF_ERROR(_emplace_into_hash_table(_partition_columns, input_block, 
eos));
     return Status::OK();
 }
 
-void VPartitionSortNode::_emplace_into_hash_table(const ColumnRawPtrs& 
key_columns,
-                                                  const vectorized::Block* 
input_block,
-                                                  int batch_size) {
-    std::visit(
-            [&](auto&& agg_method) -> void {
+Status VPartitionSortNode::_emplace_into_hash_table(const ColumnRawPtrs& 
key_columns,
+                                                    const vectorized::Block* 
input_block,
+                                                    bool eos) {
+    return std::visit(
+            [&](auto&& agg_method) -> Status {
                 SCOPED_TIMER(_build_timer);
                 using HashMethodType = std::decay_t<decltype(agg_method)>;
                 using AggState = typename HashMethodType::State;
@@ -183,10 +215,9 @@ void VPartitionSortNode::_emplace_into_hash_table(const 
ColumnRawPtrs& key_colum
 
                 SCOPED_TIMER(_selector_block_timer);
                 for (auto* place : _value_places) {
-                    place->append_block_by_selector(input_block, 
child(0)->row_desc(),
-                                                    _has_global_limit, 
_partition_inner_limit,
-                                                    batch_size);
+                    
RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos));
                 }
+                return Status::OK();
             },
             _partitioned_data->method_variant);
 }
@@ -214,7 +245,7 @@ Status VPartitionSortNode::sink(RuntimeState* state, 
vectorized::Block* input_bl
                     _blocks_buffer.push(std::move(*input_block));
                 }
             } else {
-                RETURN_IF_ERROR(_split_block_by_partition(input_block, 
state->batch_size()));
+                RETURN_IF_ERROR(_split_block_by_partition(input_block, eos));
                 RETURN_IF_CANCELLED(state);
                 input_block->clear_column_data();
             }
@@ -227,21 +258,18 @@ Status VPartitionSortNode::sink(RuntimeState* state, 
vectorized::Block* input_bl
         _partitioned_data.reset(nullptr);
         SCOPED_TIMER(_partition_sort_timer);
         for (int i = 0; i < _value_places.size(); ++i) {
-            auto sorter = PartitionSorter::create_unique(
-                    _vsort_exec_exprs, _limit, 0, _pool, _is_asc_order, 
_nulls_first,
-                    child(0)->row_desc(), state, i == 0 ? 
_runtime_profile.get() : nullptr,
-                    _has_global_limit, _partition_inner_limit, 
_top_n_algorithm,
-                    _previous_row.get());
+            _value_places[i]->create_or_reset_sorter_state();
+            auto sorter = std::ref(_value_places[i]->_partition_topn_sorter);
 
             DCHECK(child(0)->row_desc().num_materialized_slots() ==
-                   _value_places[i]->blocks.back()->columns());
+                   _value_places[i]->_blocks.back()->columns());
             //get blocks from every partition, and sorter get those data.
-            for (const auto& block : _value_places[i]->blocks) {
-                RETURN_IF_ERROR(sorter->append_block(block.get()));
+            for (const auto& block : _value_places[i]->_blocks) {
+                RETURN_IF_ERROR(sorter.get()->append_block(block.get()));
             }
-            sorter->init_profile(_runtime_profile.get());
-            RETURN_IF_ERROR(sorter->prepare_for_read());
-            _partition_sorts.push_back(std::move(sorter));
+            _value_places[i]->_blocks.clear();
+            RETURN_IF_ERROR(sorter.get()->prepare_for_read());
+            _partition_sorts.push_back(std::move(sorter.get()));
         }
 
         COUNTER_SET(_hash_table_size_counter, int64_t(_num_partition));
@@ -269,7 +297,7 @@ Status VPartitionSortNode::open(RuntimeState* state) {
         RETURN_IF_ERROR(sink(state, input_block.get(), eos));
     } while (!eos);
 
-    static_cast<void>(child(0)->close(state));
+    RETURN_IF_ERROR(child(0)->close(state));
 
     return Status::OK();
 }
@@ -342,7 +370,6 @@ Status VPartitionSortNode::get_sorted_block(RuntimeState* 
state, Block* output_b
     }
     if (*current_eos) {
         //current sort have eos, so get next idx
-        _previous_row->reset();
         auto rows = _partition_sorts[_sort_idx]->get_output_rows();
         partition_profile_output_rows.push_back(rows);
         _num_rows_returned += rows;
@@ -436,22 +463,22 @@ void VPartitionSortNode::_init_hash_method() {
 }
 
 void VPartitionSortNode::debug_profile() {
-    fmt::memory_buffer partition_rows_read, partition_blocks_read, 
partition_filter_rows;
+    fmt::memory_buffer partition_rows_read, partition_topn_count, 
partition_filter_rows;
     fmt::format_to(partition_rows_read, "[");
-    fmt::format_to(partition_blocks_read, "[");
+    fmt::format_to(partition_topn_count, "[");
     fmt::format_to(partition_filter_rows, "[");
 
     for (auto* place : _value_places) {
         fmt::format_to(partition_rows_read, "{}, ", place->get_total_rows());
         fmt::format_to(partition_filter_rows, "{}, ", 
place->get_topn_filter_rows());
-        fmt::format_to(partition_blocks_read, "{}, ", place->blocks.size());
+        fmt::format_to(partition_topn_count, "{}, ", 
place->get_do_topn_count());
     }
     fmt::format_to(partition_rows_read, "]");
-    fmt::format_to(partition_blocks_read, "]");
+    fmt::format_to(partition_topn_count, "]");
     fmt::format_to(partition_filter_rows, "]");
 
-    runtime_profile()->add_info_string("PerPartitionBlocksRead",
-                                       fmt::to_string(partition_blocks_read));
+    runtime_profile()->add_info_string("PerPartitionDoTopNCount",
+                                       fmt::to_string(partition_topn_count));
     runtime_profile()->add_info_string("PerPartitionRowsRead", 
fmt::to_string(partition_rows_read));
     runtime_profile()->add_info_string("PerPartitionFilterRows",
                                        fmt::to_string(partition_filter_rows));
diff --git a/be/src/vec/exec/vpartition_sort_node.h 
b/be/src/vec/exec/vpartition_sort_node.h
index 1ee7ffe26cb..a209e9b5962 100644
--- a/be/src/vec/exec/vpartition_sort_node.h
+++ b/be/src/vec/exec/vpartition_sort_node.h
@@ -48,8 +48,7 @@ struct PartitionSortInfo {
                       const std::vector<bool>& nulls_first, const 
RowDescriptor& row_desc,
                       RuntimeState* runtime_state, RuntimeProfile* 
runtime_profile,
                       bool has_global_limit, int64_t partition_inner_limit,
-                      TopNAlgorithm::type top_n_algorithm, SortCursorCmp* 
previous_row,
-                      TPartTopNPhase::type topn_phase)
+                      TopNAlgorithm::type top_n_algorithm, 
TPartTopNPhase::type topn_phase)
             : _vsort_exec_exprs(vsort_exec_exprs),
               _limit(limit),
               _offset(offset),
@@ -62,13 +61,12 @@ struct PartitionSortInfo {
               _has_global_limit(has_global_limit),
               _partition_inner_limit(partition_inner_limit),
               _top_n_algorithm(top_n_algorithm),
-              _previous_row(previous_row),
               _topn_phase(topn_phase) {}
 
 public:
     VSortExecExprs* _vsort_exec_exprs = nullptr;
     int64_t _limit = -1;
-    int64_t _offset = -1;
+    int64_t _offset = 0;
     ObjectPool* _pool = nullptr;
     std::vector<bool> _is_asc_order;
     std::vector<bool> _nulls_first;
@@ -78,69 +76,47 @@ public:
     bool _has_global_limit = false;
     int64_t _partition_inner_limit = 0;
     TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::ROW_NUMBER;
-    SortCursorCmp* _previous_row = nullptr;
     TPartTopNPhase::type _topn_phase = TPartTopNPhase::TWO_PHASE_GLOBAL;
 };
 
 struct PartitionBlocks {
 public:
-    PartitionBlocks() = default; //should fixed in pipelineX
     PartitionBlocks(std::shared_ptr<PartitionSortInfo> partition_sort_info, 
bool is_first_sorter)
             : _is_first_sorter(is_first_sorter), 
_partition_sort_info(partition_sort_info) {}
     ~PartitionBlocks() = default;
 
-    void add_row_idx(size_t row) { selector.push_back(row); }
+    void add_row_idx(size_t row) { _selector.push_back(row); }
 
-    void append_block_by_selector(const vectorized::Block* input_block,
-                                  const RowDescriptor& row_desc, bool is_limit,
-                                  int64_t partition_inner_limit, int 
batch_size) {
-        if (blocks.empty() || reach_limit()) {
-            _init_rows = batch_size;
-            
blocks.push_back(Block::create_unique(VectorizedUtils::create_empty_block(row_desc)));
-        }
-        auto columns = input_block->get_columns();
-        auto mutable_columns = blocks.back()->mutate_columns();
-        DCHECK(columns.size() == mutable_columns.size());
-        for (int i = 0; i < mutable_columns.size(); ++i) {
-            columns[i]->append_data_by_selector(mutable_columns[i], selector);
-        }
-        blocks.back()->set_columns(std::move(mutable_columns));
-        auto selector_rows = selector.size();
-        _init_rows = _init_rows - selector_rows;
-        _total_rows = _total_rows + selector_rows;
-        _current_input_rows = _current_input_rows + selector_rows;
-        selector.clear();
-        // maybe better could change by user PARTITION_SORT_ROWS_THRESHOLD
-        if (_current_input_rows >= PARTITION_SORT_ROWS_THRESHOLD &&
-            _partition_sort_info->_topn_phase != 
TPartTopNPhase::TWO_PHASE_GLOBAL) {
-            static_cast<void>(do_partition_topn_sort()); // fixed : should 
return status
-            _current_input_rows = 0;                     // reset record
-        }
-    }
+    Status append_block_by_selector(const vectorized::Block* input_block, bool 
eos);
 
     Status do_partition_topn_sort();
 
+    void create_or_reset_sorter_state();
+
     void append_whole_block(vectorized::Block* input_block, const 
RowDescriptor& row_desc) {
         auto empty_block = 
Block::create_unique(VectorizedUtils::create_empty_block(row_desc));
         empty_block->swap(*input_block);
-        blocks.emplace_back(std::move(empty_block));
+        _blocks.emplace_back(std::move(empty_block));
     }
 
     bool reach_limit() {
-        return _init_rows <= 0 || blocks.back()->bytes() > 
INITIAL_BUFFERED_BLOCK_BYTES;
+        return _init_rows <= 0 || _blocks.back()->bytes() > 
INITIAL_BUFFERED_BLOCK_BYTES;
     }
 
     size_t get_total_rows() const { return _total_rows; }
     size_t get_topn_filter_rows() const { return _topn_filter_rows; }
+    size_t get_do_topn_count() const { return _do_partition_topn_count; }
 
-    IColumn::Selector selector;
-    std::vector<std::unique_ptr<Block>> blocks;
+    IColumn::Selector _selector;
+    std::vector<std::unique_ptr<Block>> _blocks;
     size_t _total_rows = 0;
     size_t _current_input_rows = 0;
     size_t _topn_filter_rows = 0;
+    size_t _do_partition_topn_count = 0;
     int _init_rows = 4096;
     bool _is_first_sorter = false;
 
+    std::unique_ptr<SortCursorCmp> _previous_row;
     std::unique_ptr<PartitionSorter> _partition_topn_sorter = nullptr;
     std::shared_ptr<PartitionSortInfo> _partition_sort_info = nullptr;
 };
@@ -261,9 +237,9 @@ public:
 
 private:
     void _init_hash_method();
-    Status _split_block_by_partition(vectorized::Block* input_block, int 
batch_size);
-    void _emplace_into_hash_table(const ColumnRawPtrs& key_columns,
-                                  const vectorized::Block* input_block, int 
batch_size);
+    Status _split_block_by_partition(vectorized::Block* input_block, bool eos);
+    Status _emplace_into_hash_table(const ColumnRawPtrs& key_columns,
+                                    const vectorized::Block* input_block, bool 
eos);
     Status get_sorted_block(RuntimeState* state, Block* output_block, bool* 
eos);
 
     // hash table
@@ -286,7 +262,6 @@ private:
     int _num_partition = 0;
     int64_t _partition_inner_limit = 0;
     int _sort_idx = 0;
-    std::unique_ptr<SortCursorCmp> _previous_row;
     std::queue<Block> _blocks_buffer;
     int64_t child_input_rows = 0;
     std::mutex _buffer_mutex;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to