This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new acf07cab6fe [refactor](minor) Init counter in prepare phase (#39287)
(#39385)
acf07cab6fe is described below
commit acf07cab6fe8f175952958e46f7e9872d8a03d06
Author: Gabriel <[email protected]>
AuthorDate: Thu Aug 15 13:36:12 2024 +0800
[refactor](minor) Init counter in prepare phase (#39287) (#39385)
pick #39287
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 9 +++------
be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp | 2 ++
be/src/vec/common/sort/sorter.cpp | 2 ++
3 files changed, 7 insertions(+), 6 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index e457f090af5..e4150b4f7ac 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -154,7 +154,10 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
// Make sure brpc stub is ready before execution.
for (int i = 0; i < channels.size(); ++i) {
RETURN_IF_ERROR(channels[i]->init_stub(state));
+ _wait_channel_timer.push_back(_profile->add_nonzero_counter(
+ fmt::format("WaitForLocalExchangeBuffer{}", i), TUnit
::TIME_NS, timer_name, 1));
}
+ _wait_broadcast_buffer_timer = ADD_CHILD_TIMER(_profile,
"WaitForBroadcastBuffer", timer_name);
return Status::OK();
}
@@ -201,9 +204,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
for (int i = 0; i < config::num_broadcast_buffer; ++i) {
_broadcast_pb_blocks->push(vectorized::BroadcastPBlockHolder::create_shared());
}
-
- _wait_broadcast_buffer_timer =
- ADD_CHILD_TIMER(_profile, "WaitForBroadcastBuffer",
timer_name);
} else if (local_size > 0) {
size_t dep_id = 0;
for (auto* channel : channels) {
@@ -211,9 +211,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
if (auto dep = channel->get_local_channel_dependency()) {
_local_channels_dependency.push_back(dep);
DCHECK(_local_channels_dependency[dep_id] != nullptr);
-
_wait_channel_timer.push_back(_profile->add_nonzero_counter(
- fmt::format("WaitForLocalExchangeBuffer{}",
dep_id), TUnit ::TIME_NS,
- timer_name, 1));
dep_id++;
} else {
LOG(WARNING) << "local recvr is null: query id = "
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 043a28a5d9b..83de348dbdb 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -32,6 +32,7 @@
PartitionedAggSinkLocalState::PartitionedAggSinkLocalState(DataSinkOperatorXBase
std::make_shared<Dependency>(parent->operator_id(),
parent->node_id(),
parent->get_name() +
"_SPILL_DEPENDENCY", true);
}
+
Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state,
doris::pipeline::LocalSinkStateInfo&
info) {
RETURN_IF_ERROR(Base::init(state, info));
@@ -64,6 +65,7 @@ Status PartitionedAggSinkLocalState::open(RuntimeState*
state) {
SCOPED_TIMER(Base::_open_timer);
return Base::open(state);
}
+
Status PartitionedAggSinkLocalState::close(RuntimeState* state, Status
exec_status) {
SCOPED_TIMER(Base::exec_time_counter());
SCOPED_TIMER(Base::_close_timer);
diff --git a/be/src/vec/common/sort/sorter.cpp
b/be/src/vec/common/sort/sorter.cpp
index cfbd3cb41c8..eca7e15626b 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -66,6 +66,7 @@ void MergeSorterState::reset() {
unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty());
in_mem_sorted_bocks_size_ = 0;
}
+
Status MergeSorterState::add_sorted_block(Block& block) {
auto rows = block.rows();
if (0 == rows) {
@@ -279,6 +280,7 @@ Status FullSorter::_do_sort() {
}
return Status::OK();
}
+
size_t FullSorter::data_size() const {
return _state->data_size();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]