This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0e9951f9cb4 [Bug](partition-topn) fix partition-topn calculate
partition input rows have error (#39100)
0e9951f9cb4 is described below
commit 0e9951f9cb4a609cb88ef47b50334a724dd17cb6
Author: zhangstar333 <[email protected]>
AuthorDate: Sat Aug 10 18:31:37 2024 +0800
[Bug](partition-topn) fix partition-topn calculate partition input rows
have error (#39100)
1. fix the _sorted_partition_input_rows calculate have error, it's
should only update the rows which have been emplace into hash table, not
include the rows which is pass through.
2. add some counter in profile could get some info of about input/output
rows have been do partition-topn.
---
be/src/pipeline/exec/partition_sort_sink_operator.cpp | 12 ++++++++----
be/src/pipeline/exec/partition_sort_sink_operator.h | 3 ++-
be/src/pipeline/exec/partition_sort_source_operator.cpp | 8 +++++---
be/src/pipeline/exec/partition_sort_source_operator.h | 6 +++---
4 files changed, 18 insertions(+), 11 deletions(-)
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 62dafd54849..404d9095f96 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -115,6 +115,8 @@ Status PartitionSortSinkLocalState::init(RuntimeState*
state, LocalSinkStateInfo
_selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime");
_emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime");
_passthrough_rows_counter = ADD_COUNTER(_profile,
"PassThroughRowsCounter", TUnit::UNIT);
+ _sorted_partition_input_rows_counter =
+ ADD_COUNTER(_profile, "SortedPartitionInputRows", TUnit::UNIT);
_partition_sort_info = std::make_shared<PartitionSortInfo>(
&_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order,
p._nulls_first,
p._child_x->row_desc(), state, _profile, p._has_global_limit,
p._partition_inner_limit,
@@ -173,7 +175,6 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
SCOPED_TIMER(local_state.exec_time_counter());
if (current_rows > 0) {
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)input_block->rows());
- local_state.child_input_rows = local_state.child_input_rows +
current_rows;
if (UNLIKELY(_partition_exprs_num == 0)) {
if (UNLIKELY(local_state._value_places.empty())) {
local_state._value_places.push_back(_pool->add(new
PartitionBlocks(
@@ -185,10 +186,9 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
//if is TWO_PHASE_GLOBAL, must be sort all data thought partition
num threshold have been exceeded.
if (_topn_phase != TPartTopNPhase::TWO_PHASE_GLOBAL &&
local_state._num_partition >
config::partition_topn_partition_threshold &&
- local_state.child_input_rows < 10000 *
local_state._num_partition) {
+ local_state._sorted_partition_input_rows < 10000 *
local_state._num_partition) {
{
- COUNTER_UPDATE(local_state._passthrough_rows_counter,
- (int64_t)input_block->rows());
+ COUNTER_UPDATE(local_state._passthrough_rows_counter,
(int64_t)current_rows);
std::lock_guard<std::mutex>
lock(local_state._shared_state->buffer_mutex);
local_state._shared_state->blocks_buffer.push(std::move(*input_block));
// buffer have data, source could read this.
@@ -198,6 +198,8 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
RETURN_IF_ERROR(_split_block_by_partition(input_block,
local_state, eos));
RETURN_IF_CANCELLED(state);
input_block->clear_column_data();
+ local_state._sorted_partition_input_rows =
+ local_state._sorted_partition_input_rows +
current_rows;
}
}
}
@@ -220,6 +222,8 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
}
COUNTER_SET(local_state._hash_table_size_counter,
int64_t(local_state._num_partition));
+ COUNTER_SET(local_state._sorted_partition_input_rows_counter,
+ local_state._sorted_partition_input_rows);
//so all data from child have sink completed
{
std::unique_lock<std::mutex>
lc(local_state._shared_state->sink_eos_lock);
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index b7e83763f1d..25ad0309bde 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -224,7 +224,7 @@ private:
// Expressions and parameters used for build _sort_description
vectorized::VSortExecExprs _vsort_exec_exprs;
vectorized::VExprContextSPtrs _partition_expr_ctxs;
- int64_t child_input_rows = 0;
+ int64_t _sorted_partition_input_rows = 0;
std::vector<PartitionDataPtr> _value_places;
int _num_partition = 0;
std::vector<const vectorized::IColumn*> _partition_columns;
@@ -238,6 +238,7 @@ private:
RuntimeProfile::Counter* _selector_block_timer = nullptr;
RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
RuntimeProfile::Counter* _passthrough_rows_counter = nullptr;
+ RuntimeProfile::Counter* _sorted_partition_input_rows_counter = nullptr;
Status _init_hash_method();
};
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp
b/be/src/pipeline/exec/partition_sort_source_operator.cpp
index 89cbbb7cf21..17400d108d0 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp
@@ -29,6 +29,8 @@ Status PartitionSortSourceLocalState::init(RuntimeState*
state, LocalStateInfo&
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
_get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime");
+ _sorted_partition_output_rows_counter =
+ ADD_COUNTER(profile(), "SortedPartitionOutputRows", TUnit::UNIT);
return Status::OK();
}
@@ -57,7 +59,7 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState*
state, vectorized::
}
if (!output_block->empty()) {
COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
- COUNTER_UPDATE(local_state.rows_returned_counter(),
output_block->rows());
+ local_state._num_rows_returned += output_block->rows();
}
return Status::OK();
}
@@ -79,7 +81,7 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState*
state, vectorized::
}
if (!output_block->empty()) {
COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
- COUNTER_UPDATE(local_state.rows_returned_counter(),
output_block->rows());
+ local_state._num_rows_returned += output_block->rows();
}
return Status::OK();
}
@@ -98,7 +100,7 @@ Status
PartitionSortSourceOperatorX::get_sorted_block(RuntimeState* state,
//current sort have eos, so get next idx
auto rows =
local_state._shared_state->partition_sorts[local_state._sort_idx]
->get_output_rows();
- local_state._num_rows_returned += rows;
+ COUNTER_UPDATE(local_state._sorted_partition_output_rows_counter,
rows);
local_state._shared_state->partition_sorts[local_state._sort_idx].reset(nullptr);
local_state._sort_idx++;
}
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h
b/be/src/pipeline/exec/partition_sort_source_operator.h
index 4b5589c0e8f..1f75e1f49d4 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.h
+++ b/be/src/pipeline/exec/partition_sort_source_operator.h
@@ -34,14 +34,14 @@ public:
ENABLE_FACTORY_CREATOR(PartitionSortSourceLocalState);
using Base = PipelineXLocalState<PartitionSortNodeSharedState>;
PartitionSortSourceLocalState(RuntimeState* state, OperatorXBase* parent)
- : PipelineXLocalState<PartitionSortNodeSharedState>(state, parent),
- _get_sorted_timer(nullptr) {}
+ : PipelineXLocalState<PartitionSortNodeSharedState>(state, parent)
{}
Status init(RuntimeState* state, LocalStateInfo& info) override;
private:
friend class PartitionSortSourceOperatorX;
- RuntimeProfile::Counter* _get_sorted_timer;
+ RuntimeProfile::Counter* _get_sorted_timer = nullptr;
+ RuntimeProfile::Counter* _sorted_partition_output_rows_counter = nullptr;
std::atomic<int> _sort_idx = 0;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]