This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 65e277e365726f54d937b7b69d7abe4eff69fbec Author: zhangstar333 <[email protected]> AuthorDate: Thu Feb 1 18:58:02 2024 +0800 [refacotr](node) refactor partition sort node to improve readability (#30511) * [refacotr](node) refactor partition sort node to improve readability * update --- .../pipeline/exec/partition_sort_sink_operator.cpp | 37 ++++--- .../pipeline/exec/partition_sort_sink_operator.h | 10 +- .../exec/partition_sort_source_operator.cpp | 2 - be/src/pipeline/pipeline_x/dependency.h | 1 - be/src/vec/common/sort/partition_sorter.cpp | 8 ++ be/src/vec/common/sort/partition_sorter.h | 1 + be/src/vec/exec/vpartition_sort_node.cpp | 113 +++++++++++++-------- be/src/vec/exec/vpartition_sort_node.h | 57 +++-------- 8 files changed, 117 insertions(+), 112 deletions(-) diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index 1fe25ea1910..6cdfca18d8b 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -47,7 +47,7 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo _partition_sort_info = std::make_shared<vectorized::PartitionSortInfo>( &_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order, p._nulls_first, p._child_x->row_desc(), state, _profile, p._has_global_limit, p._partition_inner_limit, - p._top_n_algorithm, _shared_state->previous_row.get(), p._topn_phase); + p._top_n_algorithm, p._topn_phase); _init_hash_method(); return Status::OK(); } @@ -122,8 +122,8 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._dependency->set_ready_to_read(); } } else { - RETURN_IF_ERROR( - _split_block_by_partition(input_block, state->batch_size(), local_state)); + RETURN_IF_ERROR(_split_block_by_partition(input_block, local_state, + source_state == SourceState::FINISHED)); RETURN_IF_CANCELLED(state); input_block->clear_column_data(); } @@ -135,19 +135,16 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._agg_arena_pool.reset(nullptr); local_state._partitioned_data.reset(nullptr); for (int i = 0; i < local_state._value_places.size(); ++i) { - auto sorter = vectorized::PartitionSorter::create_unique( - _vsort_exec_exprs, _limit, 0, _pool, _is_asc_order, _nulls_first, - _child_x->row_desc(), state, i == 0 ? local_state._profile : nullptr, - _has_global_limit, _partition_inner_limit, _top_n_algorithm, - local_state._shared_state->previous_row.get()); + local_state._value_places[i]->create_or_reset_sorter_state(); + auto sorter = std::move(local_state._value_places[i]->_partition_topn_sorter); DCHECK(_child_x->row_desc().num_materialized_slots() == - local_state._value_places[i]->blocks.back()->columns()); + local_state._value_places[i]->_blocks.back()->columns()); //get blocks from every partition, and sorter get those data. - for (const auto& block : local_state._value_places[i]->blocks) { + for (const auto& block : local_state._value_places[i]->_blocks) { RETURN_IF_ERROR(sorter->append_block(block.get())); } - sorter->init_profile(local_state._profile); + local_state._value_places[i]->_blocks.clear(); RETURN_IF_ERROR(sorter->prepare_for_read()); local_state._shared_state->partition_sorts.push_back(std::move(sorter)); } @@ -165,7 +162,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* } Status PartitionSortSinkOperatorX::_split_block_by_partition( - vectorized::Block* input_block, int batch_size, PartitionSortSinkLocalState& local_state) { + vectorized::Block* input_block, PartitionSortSinkLocalState& local_state, bool eos) { for (int i = 0; i < _partition_exprs_num; ++i) { int result_column_id = -1; RETURN_IF_ERROR(_partition_expr_ctxs[i]->execute(input_block, &result_column_id)); @@ -173,15 +170,16 @@ Status PartitionSortSinkOperatorX::_split_block_by_partition( local_state._partition_columns[i] = input_block->get_by_position(result_column_id).column.get(); } - _emplace_into_hash_table(local_state._partition_columns, input_block, batch_size, local_state); + RETURN_IF_ERROR(_emplace_into_hash_table(local_state._partition_columns, input_block, + local_state, eos)); return Status::OK(); } -void PartitionSortSinkOperatorX::_emplace_into_hash_table( +Status PartitionSortSinkOperatorX::_emplace_into_hash_table( const vectorized::ColumnRawPtrs& key_columns, const vectorized::Block* input_block, - int batch_size, PartitionSortSinkLocalState& local_state) { - std::visit( - [&](auto&& agg_method) -> void { + PartitionSortSinkLocalState& local_state, bool eos) { + return std::visit( + [&](auto&& agg_method) -> Status { SCOPED_TIMER(local_state._build_timer); using HashMethodType = std::decay_t<decltype(agg_method)>; using AggState = typename HashMethodType::State; @@ -213,10 +211,9 @@ void PartitionSortSinkOperatorX::_emplace_into_hash_table( } for (auto* place : local_state._value_places) { SCOPED_TIMER(local_state._selector_block_timer); - place->append_block_by_selector(input_block, _child_x->row_desc(), - _has_global_limit, _partition_inner_limit, - batch_size); + RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos)); } + return Status::OK(); }, local_state._partitioned_data->method_variant); } diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h index 21145189fc7..7cb881e39ca 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.h +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -131,11 +131,11 @@ private: bool _has_global_limit = false; int64_t _partition_inner_limit = 0; - Status _split_block_by_partition(vectorized::Block* input_block, int batch_size, - PartitionSortSinkLocalState& local_state); - void _emplace_into_hash_table(const vectorized::ColumnRawPtrs& key_columns, - const vectorized::Block* input_block, int batch_size, - PartitionSortSinkLocalState& local_state); + Status _split_block_by_partition(vectorized::Block* input_block, + PartitionSortSinkLocalState& local_state, bool eos); + Status _emplace_into_hash_table(const vectorized::ColumnRawPtrs& key_columns, + const vectorized::Block* input_block, + PartitionSortSinkLocalState& local_state, bool eos); }; } // namespace pipeline diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp b/be/src/pipeline/exec/partition_sort_source_operator.cpp index 83da2f84112..c66ad2fc30a 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp @@ -34,7 +34,6 @@ Status PartitionSortSourceLocalState::init(RuntimeState* state, LocalStateInfo& SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); _get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime"); - _shared_state->previous_row = std::make_unique<vectorized::SortCursorCmp>(); return Status::OK(); } @@ -95,7 +94,6 @@ Status PartitionSortSourceOperatorX::get_sorted_block(RuntimeState* state, } if (current_eos) { //current sort have eos, so get next idx - local_state._shared_state->previous_row->reset(); auto rows = local_state._shared_state->partition_sorts[local_state._sort_idx] ->get_output_rows(); local_state._num_rows_returned += rows; diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 5ae3480cd4d..11a9cbf39d5 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -479,7 +479,6 @@ public: std::queue<vectorized::Block> blocks_buffer; std::mutex buffer_mutex; std::vector<std::unique_ptr<vectorized::PartitionSorter>> partition_sorts; - std::unique_ptr<vectorized::SortCursorCmp> previous_row; bool sink_eos = false; std::mutex sink_eos_lock; }; diff --git a/be/src/vec/common/sort/partition_sorter.cpp b/be/src/vec/common/sort/partition_sorter.cpp index a3954aa3f55..a03646a7e9a 100644 --- a/be/src/vec/common/sort/partition_sorter.cpp +++ b/be/src/vec/common/sort/partition_sorter.cpp @@ -75,6 +75,14 @@ Status PartitionSorter::prepare_for_read() { return Status::OK(); } +// have done sorter and get topn records, so could reset those state to init +void PartitionSorter::reset_sorter_state(RuntimeState* runtime_state) { + std::priority_queue<MergeSortBlockCursor> empty_queue; + std::swap(_block_priority_queue, empty_queue); + _state = MergeSorterState::create_unique(_row_desc, _offset, _limit, runtime_state, nullptr); + _previous_row->reset(); +} + Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) { if (_state->get_sorted_block().empty()) { *eos = true; diff --git a/be/src/vec/common/sort/partition_sorter.h b/be/src/vec/common/sort/partition_sorter.h index afe95e6208f..ab4b4dac775 100644 --- a/be/src/vec/common/sort/partition_sorter.h +++ b/be/src/vec/common/sort/partition_sorter.h @@ -94,6 +94,7 @@ public: Status partition_sort_read(Block* block, bool* eos, int batch_size); int64 get_output_rows() const { return _output_total_rows; } + void reset_sorter_state(RuntimeState* runtime_state); private: std::unique_ptr<MergeSorterState> _state; diff --git a/be/src/vec/exec/vpartition_sort_node.cpp b/be/src/vec/exec/vpartition_sort_node.cpp index 694aa99a6cd..772156b9880 100644 --- a/be/src/vec/exec/vpartition_sort_node.cpp +++ b/be/src/vec/exec/vpartition_sort_node.cpp @@ -28,6 +28,7 @@ #include "common/logging.h" #include "common/object_pool.h" +#include "common/status.h" #include "runtime/runtime_state.h" #include "vec/common/hash_table/hash.h" #include "vec/common/hash_table/hash_map_context_creator.h" @@ -38,23 +39,59 @@ #include "vec/exprs/vexpr_context.h" namespace doris::vectorized { -Status PartitionBlocks::do_partition_topn_sort() { +Status PartitionBlocks::append_block_by_selector(const vectorized::Block* input_block, bool eos) { + if (_blocks.empty() || reach_limit()) { + _init_rows = _partition_sort_info->_runtime_state->batch_size(); + _blocks.push_back(Block::create_unique( + VectorizedUtils::create_empty_block(_partition_sort_info->_row_desc))); + } + auto columns = input_block->get_columns(); + auto mutable_columns = _blocks.back()->mutate_columns(); + DCHECK(columns.size() == mutable_columns.size()); + for (int i = 0; i < mutable_columns.size(); ++i) { + columns[i]->append_data_by_selector(mutable_columns[i], _selector); + } + _blocks.back()->set_columns(std::move(mutable_columns)); + auto selector_rows = _selector.size(); + _init_rows = _init_rows - selector_rows; + _total_rows = _total_rows + selector_rows; + _current_input_rows = _current_input_rows + selector_rows; + _selector.clear(); + // maybe better could change by user PARTITION_SORT_ROWS_THRESHOLD + if (!eos && _partition_sort_info->_partition_inner_limit != -1 && + _current_input_rows >= PARTITION_SORT_ROWS_THRESHOLD && + _partition_sort_info->_topn_phase != TPartTopNPhase::TWO_PHASE_GLOBAL) { + create_or_reset_sorter_state(); + RETURN_IF_ERROR(do_partition_topn_sort()); + _current_input_rows = 0; // reset record + _do_partition_topn_count++; + } + return Status::OK(); +} + +void PartitionBlocks::create_or_reset_sorter_state() { if (_partition_topn_sorter == nullptr) { + _previous_row = std::make_unique<SortCursorCmp>(); _partition_topn_sorter = PartitionSorter::create_unique( *_partition_sort_info->_vsort_exec_exprs, _partition_sort_info->_limit, _partition_sort_info->_offset, _partition_sort_info->_pool, _partition_sort_info->_is_asc_order, _partition_sort_info->_nulls_first, - _partition_sort_info->_row_desc, _partition_sort_info->_runtime_state, nullptr, + _partition_sort_info->_row_desc, _partition_sort_info->_runtime_state, + _is_first_sorter ? _partition_sort_info->_runtime_profile : nullptr, _partition_sort_info->_has_global_limit, _partition_sort_info->_partition_inner_limit, - _partition_sort_info->_top_n_algorithm, _partition_sort_info->_previous_row); + _partition_sort_info->_top_n_algorithm, _previous_row.get()); + _partition_topn_sorter->init_profile(_partition_sort_info->_runtime_profile); + } else { + _partition_topn_sorter->reset_sorter_state(_partition_sort_info->_runtime_state); } +} - for (const auto& block : blocks) { +Status PartitionBlocks::do_partition_topn_sort() { + for (const auto& block : _blocks) { RETURN_IF_ERROR(_partition_topn_sorter->append_block(block.get())); } - blocks.clear(); - _partition_topn_sorter->init_profile(_partition_sort_info->_runtime_profile); + _blocks.clear(); RETURN_IF_ERROR(_partition_topn_sorter->prepare_for_read()); bool current_eos = false; size_t current_output_rows = 0; @@ -67,14 +104,11 @@ Status PartitionBlocks::do_partition_topn_sort() { auto rows = output_block->rows(); if (rows > 0) { current_output_rows += rows; - blocks.emplace_back(std::move(output_block)); + _blocks.emplace_back(std::move(output_block)); } } _topn_filter_rows += (_current_input_rows - current_output_rows); - _partition_sort_info->_previous_row->reset(); - _partition_topn_sorter.reset(nullptr); - return Status::OK(); } @@ -83,7 +117,6 @@ VPartitionSortNode::VPartitionSortNode(ObjectPool* pool, const TPlanNode& tnode, : ExecNode(pool, tnode, descs), _hash_table_size_counter(nullptr) { _partitioned_data = std::make_unique<PartitionedHashMapVariants>(); _agg_arena_pool = std::make_unique<Arena>(); - _previous_row = std::make_unique<SortCursorCmp>(); } Status VPartitionSortNode::init(const TPlanNode& tnode, RuntimeState* state) { @@ -130,27 +163,26 @@ Status VPartitionSortNode::prepare(RuntimeState* state) { _partition_sort_info = std::make_shared<PartitionSortInfo>( &_vsort_exec_exprs, _limit, 0, _pool, _is_asc_order, _nulls_first, child(0)->row_desc(), state, _runtime_profile.get(), _has_global_limit, _partition_inner_limit, - _top_n_algorithm, _previous_row.get(), _topn_phase); + _top_n_algorithm, _topn_phase); return Status::OK(); } -Status VPartitionSortNode::_split_block_by_partition(vectorized::Block* input_block, - int batch_size) { +Status VPartitionSortNode::_split_block_by_partition(vectorized::Block* input_block, bool eos) { for (int i = 0; i < _partition_exprs_num; ++i) { int result_column_id = -1; RETURN_IF_ERROR(_partition_expr_ctxs[i]->execute(input_block, &result_column_id)); DCHECK(result_column_id != -1); _partition_columns[i] = input_block->get_by_position(result_column_id).column.get(); } - _emplace_into_hash_table(_partition_columns, input_block, batch_size); + RETURN_IF_ERROR(_emplace_into_hash_table(_partition_columns, input_block, eos)); return Status::OK(); } -void VPartitionSortNode::_emplace_into_hash_table(const ColumnRawPtrs& key_columns, - const vectorized::Block* input_block, - int batch_size) { - std::visit( - [&](auto&& agg_method) -> void { +Status VPartitionSortNode::_emplace_into_hash_table(const ColumnRawPtrs& key_columns, + const vectorized::Block* input_block, + bool eos) { + return std::visit( + [&](auto&& agg_method) -> Status { SCOPED_TIMER(_build_timer); using HashMethodType = std::decay_t<decltype(agg_method)>; using AggState = typename HashMethodType::State; @@ -183,10 +215,9 @@ void VPartitionSortNode::_emplace_into_hash_table(const ColumnRawPtrs& key_colum SCOPED_TIMER(_selector_block_timer); for (auto* place : _value_places) { - place->append_block_by_selector(input_block, child(0)->row_desc(), - _has_global_limit, _partition_inner_limit, - batch_size); + RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos)); } + return Status::OK(); }, _partitioned_data->method_variant); } @@ -214,7 +245,7 @@ Status VPartitionSortNode::sink(RuntimeState* state, vectorized::Block* input_bl _blocks_buffer.push(std::move(*input_block)); } } else { - RETURN_IF_ERROR(_split_block_by_partition(input_block, state->batch_size())); + RETURN_IF_ERROR(_split_block_by_partition(input_block, eos)); RETURN_IF_CANCELLED(state); input_block->clear_column_data(); } @@ -227,21 +258,18 @@ Status VPartitionSortNode::sink(RuntimeState* state, vectorized::Block* input_bl _partitioned_data.reset(nullptr); SCOPED_TIMER(_partition_sort_timer); for (int i = 0; i < _value_places.size(); ++i) { - auto sorter = PartitionSorter::create_unique( - _vsort_exec_exprs, _limit, 0, _pool, _is_asc_order, _nulls_first, - child(0)->row_desc(), state, i == 0 ? _runtime_profile.get() : nullptr, - _has_global_limit, _partition_inner_limit, _top_n_algorithm, - _previous_row.get()); + _value_places[i]->create_or_reset_sorter_state(); + auto sorter = std::ref(_value_places[i]->_partition_topn_sorter); DCHECK(child(0)->row_desc().num_materialized_slots() == - _value_places[i]->blocks.back()->columns()); + _value_places[i]->_blocks.back()->columns()); //get blocks from every partition, and sorter get those data. - for (const auto& block : _value_places[i]->blocks) { - RETURN_IF_ERROR(sorter->append_block(block.get())); + for (const auto& block : _value_places[i]->_blocks) { + RETURN_IF_ERROR(sorter.get()->append_block(block.get())); } - sorter->init_profile(_runtime_profile.get()); - RETURN_IF_ERROR(sorter->prepare_for_read()); - _partition_sorts.push_back(std::move(sorter)); + _value_places[i]->_blocks.clear(); + RETURN_IF_ERROR(sorter.get()->prepare_for_read()); + _partition_sorts.push_back(std::move(sorter.get())); } COUNTER_SET(_hash_table_size_counter, int64_t(_num_partition)); @@ -269,7 +297,7 @@ Status VPartitionSortNode::open(RuntimeState* state) { RETURN_IF_ERROR(sink(state, input_block.get(), eos)); } while (!eos); - static_cast<void>(child(0)->close(state)); + RETURN_IF_ERROR(child(0)->close(state)); return Status::OK(); } @@ -342,7 +370,6 @@ Status VPartitionSortNode::get_sorted_block(RuntimeState* state, Block* output_b } if (*current_eos) { //current sort have eos, so get next idx - _previous_row->reset(); auto rows = _partition_sorts[_sort_idx]->get_output_rows(); partition_profile_output_rows.push_back(rows); _num_rows_returned += rows; @@ -436,22 +463,22 @@ void VPartitionSortNode::_init_hash_method() { } void VPartitionSortNode::debug_profile() { - fmt::memory_buffer partition_rows_read, partition_blocks_read, partition_filter_rows; + fmt::memory_buffer partition_rows_read, partition_topn_count, partition_filter_rows; fmt::format_to(partition_rows_read, "["); - fmt::format_to(partition_blocks_read, "["); + fmt::format_to(partition_topn_count, "["); fmt::format_to(partition_filter_rows, "["); for (auto* place : _value_places) { fmt::format_to(partition_rows_read, "{}, ", place->get_total_rows()); fmt::format_to(partition_filter_rows, "{}, ", place->get_topn_filter_rows()); - fmt::format_to(partition_blocks_read, "{}, ", place->blocks.size()); + fmt::format_to(partition_topn_count, "{}, ", place->get_do_topn_count()); } fmt::format_to(partition_rows_read, "]"); - fmt::format_to(partition_blocks_read, "]"); + fmt::format_to(partition_topn_count, "]"); fmt::format_to(partition_filter_rows, "]"); - runtime_profile()->add_info_string("PerPartitionBlocksRead", - fmt::to_string(partition_blocks_read)); + runtime_profile()->add_info_string("PerPartitionDoTopNCount", + fmt::to_string(partition_topn_count)); runtime_profile()->add_info_string("PerPartitionRowsRead", fmt::to_string(partition_rows_read)); runtime_profile()->add_info_string("PerPartitionFilterRows", fmt::to_string(partition_filter_rows)); diff --git a/be/src/vec/exec/vpartition_sort_node.h b/be/src/vec/exec/vpartition_sort_node.h index 1ee7ffe26cb..a209e9b5962 100644 --- a/be/src/vec/exec/vpartition_sort_node.h +++ b/be/src/vec/exec/vpartition_sort_node.h @@ -48,8 +48,7 @@ struct PartitionSortInfo { const std::vector<bool>& nulls_first, const RowDescriptor& row_desc, RuntimeState* runtime_state, RuntimeProfile* runtime_profile, bool has_global_limit, int64_t partition_inner_limit, - TopNAlgorithm::type top_n_algorithm, SortCursorCmp* previous_row, - TPartTopNPhase::type topn_phase) + TopNAlgorithm::type top_n_algorithm, TPartTopNPhase::type topn_phase) : _vsort_exec_exprs(vsort_exec_exprs), _limit(limit), _offset(offset), @@ -62,13 +61,12 @@ struct PartitionSortInfo { _has_global_limit(has_global_limit), _partition_inner_limit(partition_inner_limit), _top_n_algorithm(top_n_algorithm), - _previous_row(previous_row), _topn_phase(topn_phase) {} public: VSortExecExprs* _vsort_exec_exprs = nullptr; int64_t _limit = -1; - int64_t _offset = -1; + int64_t _offset = 0; ObjectPool* _pool = nullptr; std::vector<bool> _is_asc_order; std::vector<bool> _nulls_first; @@ -78,69 +76,47 @@ public: bool _has_global_limit = false; int64_t _partition_inner_limit = 0; TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::ROW_NUMBER; - SortCursorCmp* _previous_row = nullptr; TPartTopNPhase::type _topn_phase = TPartTopNPhase::TWO_PHASE_GLOBAL; }; struct PartitionBlocks { public: - PartitionBlocks() = default; //should fixed in pipelineX PartitionBlocks(std::shared_ptr<PartitionSortInfo> partition_sort_info, bool is_first_sorter) : _is_first_sorter(is_first_sorter), _partition_sort_info(partition_sort_info) {} ~PartitionBlocks() = default; - void add_row_idx(size_t row) { selector.push_back(row); } + void add_row_idx(size_t row) { _selector.push_back(row); } - void append_block_by_selector(const vectorized::Block* input_block, - const RowDescriptor& row_desc, bool is_limit, - int64_t partition_inner_limit, int batch_size) { - if (blocks.empty() || reach_limit()) { - _init_rows = batch_size; - blocks.push_back(Block::create_unique(VectorizedUtils::create_empty_block(row_desc))); - } - auto columns = input_block->get_columns(); - auto mutable_columns = blocks.back()->mutate_columns(); - DCHECK(columns.size() == mutable_columns.size()); - for (int i = 0; i < mutable_columns.size(); ++i) { - columns[i]->append_data_by_selector(mutable_columns[i], selector); - } - blocks.back()->set_columns(std::move(mutable_columns)); - auto selector_rows = selector.size(); - _init_rows = _init_rows - selector_rows; - _total_rows = _total_rows + selector_rows; - _current_input_rows = _current_input_rows + selector_rows; - selector.clear(); - // maybe better could change by user PARTITION_SORT_ROWS_THRESHOLD - if (_current_input_rows >= PARTITION_SORT_ROWS_THRESHOLD && - _partition_sort_info->_topn_phase != TPartTopNPhase::TWO_PHASE_GLOBAL) { - static_cast<void>(do_partition_topn_sort()); // fixed : should return status - _current_input_rows = 0; // reset record - } - } + Status append_block_by_selector(const vectorized::Block* input_block, bool eos); Status do_partition_topn_sort(); + void create_or_reset_sorter_state(); + void append_whole_block(vectorized::Block* input_block, const RowDescriptor& row_desc) { auto empty_block = Block::create_unique(VectorizedUtils::create_empty_block(row_desc)); empty_block->swap(*input_block); - blocks.emplace_back(std::move(empty_block)); + _blocks.emplace_back(std::move(empty_block)); } bool reach_limit() { - return _init_rows <= 0 || blocks.back()->bytes() > INITIAL_BUFFERED_BLOCK_BYTES; + return _init_rows <= 0 || _blocks.back()->bytes() > INITIAL_BUFFERED_BLOCK_BYTES; } size_t get_total_rows() const { return _total_rows; } size_t get_topn_filter_rows() const { return _topn_filter_rows; } + size_t get_do_topn_count() const { return _do_partition_topn_count; } - IColumn::Selector selector; - std::vector<std::unique_ptr<Block>> blocks; + IColumn::Selector _selector; + std::vector<std::unique_ptr<Block>> _blocks; size_t _total_rows = 0; size_t _current_input_rows = 0; size_t _topn_filter_rows = 0; + size_t _do_partition_topn_count = 0; int _init_rows = 4096; bool _is_first_sorter = false; + std::unique_ptr<SortCursorCmp> _previous_row; std::unique_ptr<PartitionSorter> _partition_topn_sorter = nullptr; std::shared_ptr<PartitionSortInfo> _partition_sort_info = nullptr; }; @@ -261,9 +237,9 @@ public: private: void _init_hash_method(); - Status _split_block_by_partition(vectorized::Block* input_block, int batch_size); - void _emplace_into_hash_table(const ColumnRawPtrs& key_columns, - const vectorized::Block* input_block, int batch_size); + Status _split_block_by_partition(vectorized::Block* input_block, bool eos); + Status _emplace_into_hash_table(const ColumnRawPtrs& key_columns, + const vectorized::Block* input_block, bool eos); Status get_sorted_block(RuntimeState* state, Block* output_block, bool* eos); // hash table @@ -286,7 +262,6 @@ private: int _num_partition = 0; int64_t _partition_inner_limit = 0; int _sort_idx = 0; - std::unique_ptr<SortCursorCmp> _previous_row; std::queue<Block> _blocks_buffer; int64_t child_input_rows = 0; std::mutex _buffer_mutex; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
