This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 1785dac51b4 branch-4.0: [Improvement](exchange) avoid hash shuffle 
when partition type is bucket shuffle and only one instance #58068 (#58206)
1785dac51b4 is described below

commit 1785dac51b4e6631eae4fd7d8709b16d9b7e7d87
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Nov 25 09:58:30 2025 +0800

    branch-4.0: [Improvement](exchange) avoid hash shuffle when partition type 
is bucket shuffle and only one instance #58068 (#58206)
    
    Cherry-picked from #58068
    
    Co-authored-by: Pxl <[email protected]>
---
 be/src/pipeline/exec/exchange_sink_operator.cpp | 38 +++++++++++++------------
 1 file changed, 20 insertions(+), 18 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index c32860811af..0ec71918be6 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -294,9 +294,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
         _output_tuple_id = sink.output_tuple_id;
     }
 
-    // Bucket shuffle may contain some same bucket so no need change the 
BUCKET_SHFFULE_HASH_PARTITIONED
-    if (_part_type != TPartitionType::UNPARTITIONED &&
-        _part_type != TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
+    if (_part_type != TPartitionType::UNPARTITIONED) {
         // if the destinations only one dest, we need to use broadcast
         std::unordered_set<UniqueId> dest_fragment_ids_set;
         for (auto& dest : _dests) {
@@ -305,7 +303,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
                 break;
             }
         }
-        _part_type = dest_fragment_ids_set.size() == 1 ? 
TPartitionType::UNPARTITIONED : _part_type;
+        _part_type = dest_fragment_ids_set.size() == 1 ? 
TPartitionType::RANDOM : _part_type;
     }
 }
 
@@ -377,25 +375,27 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         set_low_memory_mode(state);
     }
 
+    if (block->empty() && !eos) {
+        return Status::OK();
+    }
+
     if (_part_type == TPartitionType::UNPARTITIONED) {
         // 1. serialize depends on it is not local exchange
         // 2. send block
         // 3. rollover block
         if (local_state._only_local_exchange) {
-            if (!block->empty()) {
-                Status status;
-                size_t idx = 0;
-                for (auto& channel : local_state.channels) {
-                    if (!channel->is_receiver_eof()) {
-                        // If this channel is the last, we can move this block 
to downstream pipeline.
-                        // Otherwise, this block also need to be broadcasted 
to other channels so should be copied.
-                        DCHECK_GE(local_state._last_local_channel_idx, 0);
-                        status = channel->send_local_block(
-                                block, eos, idx == 
local_state._last_local_channel_idx);
-                        HANDLE_CHANNEL_STATUS(state, channel, status);
-                    }
-                    idx++;
+            Status status;
+            size_t idx = 0;
+            for (auto& channel : local_state.channels) {
+                if (!channel->is_receiver_eof()) {
+                    // If this channel is the last, we can move this block to 
downstream pipeline.
+                    // Otherwise, this block also need to be broadcasted to 
other channels so should be copied.
+                    DCHECK_GE(local_state._last_local_channel_idx, 0);
+                    status = channel->send_local_block(block, eos,
+                                                       idx == 
local_state._last_local_channel_idx);
+                    HANDLE_CHANNEL_STATUS(state, channel, status);
                 }
+                idx++;
             }
         } else {
             auto block_holder = 
vectorized::BroadcastPBlockHolder::create_shared();
@@ -485,7 +485,9 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
                 HANDLE_CHANNEL_STATUS(state, current_channel, status);
             } else {
                 auto pblock = std::make_unique<PBlock>();
-                RETURN_IF_ERROR(local_state._serializer.serialize_block(block, 
pblock.get()));
+                if (!block->empty()) {
+                    
RETURN_IF_ERROR(local_state._serializer.serialize_block(block, pblock.get()));
+                }
                 auto status = 
current_channel->send_remote_block(std::move(pblock), eos);
                 HANDLE_CHANNEL_STATUS(state, current_channel, status);
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to