This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1253ef3dc67 [refactor](exchange) remove duplicate code (#45466)
1253ef3dc67 is described below
commit 1253ef3dc674f4c2b7ac9824dedb708d7c1bb6d0
Author: Gabriel <[email protected]>
AuthorDate: Wed Dec 18 10:32:32 2024 +0800
[refactor](exchange) remove duplicate code (#45466)
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 8 ++------
1 file changed, 2 insertions(+), 6 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index aa893fc0a26..e7fed76be8f 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -112,6 +112,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
}
}
only_local_exchange = local_size == channels.size();
+ _rpc_channels_num = channels.size() - local_size;
if (!only_local_exchange) {
_sink_buffer = p.get_sink_buffer(state->fragment_instance_id().lo);
@@ -206,17 +207,12 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
std::mt19937 g(rd());
shuffle(channels.begin(), channels.end(), g);
}
- size_t local_size = 0;
for (int i = 0; i < channels.size(); ++i) {
RETURN_IF_ERROR(channels[i]->open(state));
if (channels[i]->is_local()) {
- local_size++;
_last_local_channel_idx = i;
}
}
- only_local_exchange = local_size == channels.size();
-
- _rpc_channels_num = channels.size() - local_size;
PUniqueId id;
id.set_hi(_state->query_id().hi);
@@ -228,7 +224,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
_parent->operator_id(), _parent->node_id(),
"BroadcastDependency", true);
_broadcast_pb_mem_limiter =
vectorized::BroadcastPBlockHolderMemLimiter::create_shared(_broadcast_dependency);
- } else if (local_size > 0) {
+ } else if (!only_local_exchange) {
size_t dep_id = 0;
for (auto& channel : channels) {
if (channel->is_local()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]