This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4019f96648c [Improvement](exchange) avoid hash shuffle when partition
type is bucket shuffle and only one instance (#58068)
4019f96648c is described below
commit 4019f96648c9282b83130dfad5db828e4ce59c03
Author: Pxl <[email protected]>
AuthorDate: Wed Nov 19 12:32:14 2025 +0800
[Improvement](exchange) avoid hash shuffle when partition type is bucket
shuffle and only one instance (#58068)
This pull request updates the logic for partition type handling and
improves robustness in the `ExchangeSinkOperatorX` implementation. The
main changes include refining how partition types are set, especially
for single-destination cases, and ensuring that empty blocks are handled
efficiently to avoid unnecessary serialization and sending.
Partition type logic improvements:
* The condition for changing the partition type now only excludes
`UNPARTITIONED`, allowing other partition types to be considered for
broadcast scenarios.
* When there is only one destination fragment, the partition type is set
to `RANDOM` instead of `UNPARTITIONED`, clarifying the intended behavior
for single-destination exchanges.
Handling of empty blocks:
* Added an early return in the `sink` method when the input block is
empty and not at end-of-stream (`eos`), preventing unnecessary
processing.
* Serialization and sending of remote blocks now only occurs if the
block is non-empty, further optimizing resource usage.
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 38 +++++++++++++------------
1 file changed, 20 insertions(+), 18 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index bb8440e4ef2..3ddc3807e49 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -295,9 +295,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
_output_tuple_id = sink.output_tuple_id;
}
- // Bucket shuffle may contain some same bucket so no need change the
BUCKET_SHFFULE_HASH_PARTITIONED
- if (_part_type != TPartitionType::UNPARTITIONED &&
- _part_type != TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
+ if (_part_type != TPartitionType::UNPARTITIONED) {
// if the destinations only one dest, we need to use broadcast
std::unordered_set<UniqueId> dest_fragment_ids_set;
for (const auto& dest : _dests) {
@@ -306,7 +304,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
break;
}
}
- _part_type = dest_fragment_ids_set.size() == 1 ?
TPartitionType::UNPARTITIONED : _part_type;
+ _part_type = dest_fragment_ids_set.size() == 1 ?
TPartitionType::RANDOM : _part_type;
}
}
@@ -378,25 +376,27 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
set_low_memory_mode(state);
}
+ if (block->empty() && !eos) {
+ return Status::OK();
+ }
+
if (_part_type == TPartitionType::UNPARTITIONED) {
// 1. serialize depends on it is not local exchange
// 2. send block
// 3. rollover block
if (local_state._only_local_exchange) {
- if (!block->empty()) {
- Status status;
- size_t idx = 0;
- for (auto& channel : local_state.channels) {
- if (!channel->is_receiver_eof()) {
- // If this channel is the last, we can move this block
to downstream pipeline.
- // Otherwise, this block also need to be broadcasted
to other channels so should be copied.
- DCHECK_GE(local_state._last_local_channel_idx, 0);
- status = channel->send_local_block(
- block, eos, idx ==
local_state._last_local_channel_idx);
- HANDLE_CHANNEL_STATUS(state, channel, status);
- }
- idx++;
+ Status status;
+ size_t idx = 0;
+ for (auto& channel : local_state.channels) {
+ if (!channel->is_receiver_eof()) {
+ // If this channel is the last, we can move this block to
downstream pipeline.
+ // Otherwise, this block also need to be broadcasted to
other channels so should be copied.
+ DCHECK_GE(local_state._last_local_channel_idx, 0);
+ status = channel->send_local_block(block, eos,
+ idx ==
local_state._last_local_channel_idx);
+ HANDLE_CHANNEL_STATUS(state, channel, status);
}
+ idx++;
}
} else {
auto block_holder =
vectorized::BroadcastPBlockHolder::create_shared();
@@ -486,7 +486,9 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
HANDLE_CHANNEL_STATUS(state, current_channel, status);
} else {
auto pblock = std::make_unique<PBlock>();
- RETURN_IF_ERROR(local_state._serializer.serialize_block(block,
pblock.get()));
+ if (!block->empty()) {
+
RETURN_IF_ERROR(local_state._serializer.serialize_block(block, pblock.get()));
+ }
auto status =
current_channel->send_remote_block(std::move(pblock), eos);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]