This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4019f96648c [Improvement](exchange) avoid hash shuffle when partition 
type is bucket shuffle and only one instance (#58068)
4019f96648c is described below

commit 4019f96648c9282b83130dfad5db828e4ce59c03
Author: Pxl <[email protected]>
AuthorDate: Wed Nov 19 12:32:14 2025 +0800

    [Improvement](exchange) avoid hash shuffle when partition type is bucket 
shuffle and only one instance (#58068)
    
    This pull request updates the logic for partition type handling and
    improves robustness in the `ExchangeSinkOperatorX` implementation. The
    main changes include refining how partition types are set, especially
    for single-destination cases, and ensuring that empty blocks are handled
    efficiently to avoid unnecessary serialization and sending.
    
    Partition type logic improvements:
    * The condition for changing the partition type now only excludes
    `UNPARTITIONED`, allowing other partition types to be considered for
    broadcast scenarios.
    * When there is only one destination fragment, the partition type is set
    to `RANDOM` instead of `UNPARTITIONED`, clarifying the intended behavior
    for single-destination exchanges.
    
    Handling of empty blocks:
    * Added an early return in the `sink` method when the input block is
    empty and not at end-of-stream (`eos`), preventing unnecessary
    processing.
    * Serialization and sending of remote blocks now only occurs if the
    block is non-empty, further optimizing resource usage.
---
 be/src/pipeline/exec/exchange_sink_operator.cpp | 38 +++++++++++++------------
 1 file changed, 20 insertions(+), 18 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index bb8440e4ef2..3ddc3807e49 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -295,9 +295,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
         _output_tuple_id = sink.output_tuple_id;
     }
 
-    // Bucket shuffle may contain some same bucket so no need change the 
BUCKET_SHFFULE_HASH_PARTITIONED
-    if (_part_type != TPartitionType::UNPARTITIONED &&
-        _part_type != TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
+    if (_part_type != TPartitionType::UNPARTITIONED) {
         // if the destinations only one dest, we need to use broadcast
         std::unordered_set<UniqueId> dest_fragment_ids_set;
         for (const auto& dest : _dests) {
@@ -306,7 +304,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
                 break;
             }
         }
-        _part_type = dest_fragment_ids_set.size() == 1 ? 
TPartitionType::UNPARTITIONED : _part_type;
+        _part_type = dest_fragment_ids_set.size() == 1 ? 
TPartitionType::RANDOM : _part_type;
     }
 }
 
@@ -378,25 +376,27 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         set_low_memory_mode(state);
     }
 
+    if (block->empty() && !eos) {
+        return Status::OK();
+    }
+
     if (_part_type == TPartitionType::UNPARTITIONED) {
         // 1. serialize depends on it is not local exchange
         // 2. send block
         // 3. rollover block
         if (local_state._only_local_exchange) {
-            if (!block->empty()) {
-                Status status;
-                size_t idx = 0;
-                for (auto& channel : local_state.channels) {
-                    if (!channel->is_receiver_eof()) {
-                        // If this channel is the last, we can move this block 
to downstream pipeline.
-                        // Otherwise, this block also need to be broadcasted 
to other channels so should be copied.
-                        DCHECK_GE(local_state._last_local_channel_idx, 0);
-                        status = channel->send_local_block(
-                                block, eos, idx == 
local_state._last_local_channel_idx);
-                        HANDLE_CHANNEL_STATUS(state, channel, status);
-                    }
-                    idx++;
+            Status status;
+            size_t idx = 0;
+            for (auto& channel : local_state.channels) {
+                if (!channel->is_receiver_eof()) {
+                    // If this channel is the last, we can move this block to 
downstream pipeline.
+                    // Otherwise, this block also need to be broadcasted to 
other channels so should be copied.
+                    DCHECK_GE(local_state._last_local_channel_idx, 0);
+                    status = channel->send_local_block(block, eos,
+                                                       idx == 
local_state._last_local_channel_idx);
+                    HANDLE_CHANNEL_STATUS(state, channel, status);
                 }
+                idx++;
             }
         } else {
             auto block_holder = 
vectorized::BroadcastPBlockHolder::create_shared();
@@ -486,7 +486,9 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
                 HANDLE_CHANNEL_STATUS(state, current_channel, status);
             } else {
                 auto pblock = std::make_unique<PBlock>();
-                RETURN_IF_ERROR(local_state._serializer.serialize_block(block, 
pblock.get()));
+                if (!block->empty()) {
+                    
RETURN_IF_ERROR(local_state._serializer.serialize_block(block, pblock.get()));
+                }
                 auto status = 
current_channel->send_remote_block(std::move(pblock), eos);
                 HANDLE_CHANNEL_STATUS(state, current_channel, status);
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to