This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 1785dac51b4 branch-4.0: [Improvement](exchange) avoid hash shuffle
when partition type is bucket shuffle and only one instance #58068 (#58206)
1785dac51b4 is described below
commit 1785dac51b4e6631eae4fd7d8709b16d9b7e7d87
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Nov 25 09:58:30 2025 +0800
branch-4.0: [Improvement](exchange) avoid hash shuffle when partition type
is bucket shuffle and only one instance #58068 (#58206)
Cherry-picked from #58068
Co-authored-by: Pxl <[email protected]>
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 38 +++++++++++++------------
1 file changed, 20 insertions(+), 18 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index c32860811af..0ec71918be6 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -294,9 +294,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
_output_tuple_id = sink.output_tuple_id;
}
- // Bucket shuffle may contain some same bucket so no need change the
BUCKET_SHFFULE_HASH_PARTITIONED
- if (_part_type != TPartitionType::UNPARTITIONED &&
- _part_type != TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
+ if (_part_type != TPartitionType::UNPARTITIONED) {
// if the destinations only one dest, we need to use broadcast
std::unordered_set<UniqueId> dest_fragment_ids_set;
for (auto& dest : _dests) {
@@ -305,7 +303,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
break;
}
}
- _part_type = dest_fragment_ids_set.size() == 1 ?
TPartitionType::UNPARTITIONED : _part_type;
+ _part_type = dest_fragment_ids_set.size() == 1 ?
TPartitionType::RANDOM : _part_type;
}
}
@@ -377,25 +375,27 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
set_low_memory_mode(state);
}
+ if (block->empty() && !eos) {
+ return Status::OK();
+ }
+
if (_part_type == TPartitionType::UNPARTITIONED) {
// 1. serialize depends on it is not local exchange
// 2. send block
// 3. rollover block
if (local_state._only_local_exchange) {
- if (!block->empty()) {
- Status status;
- size_t idx = 0;
- for (auto& channel : local_state.channels) {
- if (!channel->is_receiver_eof()) {
- // If this channel is the last, we can move this block
to downstream pipeline.
- // Otherwise, this block also need to be broadcasted
to other channels so should be copied.
- DCHECK_GE(local_state._last_local_channel_idx, 0);
- status = channel->send_local_block(
- block, eos, idx ==
local_state._last_local_channel_idx);
- HANDLE_CHANNEL_STATUS(state, channel, status);
- }
- idx++;
+ Status status;
+ size_t idx = 0;
+ for (auto& channel : local_state.channels) {
+ if (!channel->is_receiver_eof()) {
+ // If this channel is the last, we can move this block to
downstream pipeline.
+ // Otherwise, this block also need to be broadcasted to
other channels so should be copied.
+ DCHECK_GE(local_state._last_local_channel_idx, 0);
+ status = channel->send_local_block(block, eos,
+ idx ==
local_state._last_local_channel_idx);
+ HANDLE_CHANNEL_STATUS(state, channel, status);
}
+ idx++;
}
} else {
auto block_holder =
vectorized::BroadcastPBlockHolder::create_shared();
@@ -485,7 +485,9 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
HANDLE_CHANNEL_STATUS(state, current_channel, status);
} else {
auto pblock = std::make_unique<PBlock>();
- RETURN_IF_ERROR(local_state._serializer.serialize_block(block,
pblock.get()));
+ if (!block->empty()) {
+
RETURN_IF_ERROR(local_state._serializer.serialize_block(block, pblock.get()));
+ }
auto status =
current_channel->send_remote_block(std::move(pblock), eos);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]