This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6edc6f84bb0 [fix](shuffle) Fix remaining tasks if all tasks are 
running on single BE (#41350)
6edc6f84bb0 is described below

commit 6edc6f84bb086177ad4b34de1b02644894552a28
Author: Gabriel <[email protected]>
AuthorDate: Fri Sep 27 17:23:19 2024 +0800

    [fix](shuffle) Fix remaining tasks if all tasks are running on single BE 
(#41350)
---
 be/src/pipeline/exec/exchange_sink_operator.cpp | 35 +++++++++++++++----------
 be/src/pipeline/exec/exchange_sink_operator.h   |  4 ++-
 2 files changed, 24 insertions(+), 15 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 4773c7a90bd..518620ba6b4 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -132,13 +132,17 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
     PUniqueId id;
     id.set_hi(_state->query_id().hi);
     id.set_lo(_state->query_id().lo);
-    _sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, p._dest_node_id, 
_sender_id,
-                                                        _state->be_number(), 
state, this);
 
-    register_channels(_sink_buffer.get());
-    _queue_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
-                                                  
"ExchangeSinkQueueDependency", true);
-    _sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
+    if (!only_local_exchange) {
+        _sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, 
p._dest_node_id, _sender_id,
+                                                            
_state->be_number(), state, this);
+        register_channels(_sink_buffer.get());
+        _queue_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                      
"ExchangeSinkQueueDependency", true);
+        _sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
+        _finish_dependency->block();
+    }
+
     if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) 
&&
         !only_local_exchange) {
         _broadcast_dependency = Dependency::create_shared(
@@ -244,7 +248,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
                                   fmt::format("Crc32HashPartitioner({})", 
_partition_count));
     }
 
-    _finish_dependency->block();
     if (_part_type == TPartitionType::HASH_PARTITIONED ||
         _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
         _part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
@@ -559,8 +562,9 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
                 final_st = st;
             }
         }
-        local_state._sink_buffer->set_should_stop();
-        return final_st;
+        if (local_state._sink_buffer) {
+            local_state._sink_buffer->set_should_stop();
+        }
     }
     return final_st;
 }
@@ -631,11 +635,14 @@ Status ExchangeSinkOperatorX::channel_add_rows_with_idx(
 std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer, "{}", 
Base::debug_string(indentation_level));
-    fmt::format_to(debug_string_buffer,
-                   ", Sink Buffer: (_should_stop = {}, _busy_channels = {}, 
_is_finishing = {}), "
-                   "_reach_limit: {}",
-                   _sink_buffer->_should_stop.load(), 
_sink_buffer->_busy_channels.load(),
-                   _sink_buffer->_is_finishing.load(), _reach_limit.load());
+    if (_sink_buffer) {
+        fmt::format_to(
+                debug_string_buffer,
+                ", Sink Buffer: (_should_stop = {}, _busy_channels = {}, 
_is_finishing = {}), "
+                "_reach_limit: {}",
+                _sink_buffer->_should_stop.load(), 
_sink_buffer->_busy_channels.load(),
+                _sink_buffer->_is_finishing.load(), _reach_limit.load());
+    }
     return fmt::to_string(debug_string_buffer);
 }
 
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 300e2a5172f..adf8a342470 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -64,7 +64,9 @@ public:
 
     std::vector<Dependency*> dependencies() const override {
         std::vector<Dependency*> dep_vec;
-        dep_vec.push_back(_queue_dependency.get());
+        if (_queue_dependency) {
+            dep_vec.push_back(_queue_dependency.get());
+        }
         if (_broadcast_dependency) {
             dep_vec.push_back(_broadcast_dependency.get());
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to