This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6edc6f84bb0 [fix](shuffle) Fix remaining tasks if all tasks are
running on single BE (#41350)
6edc6f84bb0 is described below
commit 6edc6f84bb086177ad4b34de1b02644894552a28
Author: Gabriel <[email protected]>
AuthorDate: Fri Sep 27 17:23:19 2024 +0800
[fix](shuffle) Fix remaining tasks if all tasks are running on single BE
(#41350)
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 35 +++++++++++++++----------
be/src/pipeline/exec/exchange_sink_operator.h | 4 ++-
2 files changed, 24 insertions(+), 15 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 4773c7a90bd..518620ba6b4 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -132,13 +132,17 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
PUniqueId id;
id.set_hi(_state->query_id().hi);
id.set_lo(_state->query_id().lo);
- _sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, p._dest_node_id,
_sender_id,
- _state->be_number(),
state, this);
- register_channels(_sink_buffer.get());
- _queue_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
-
"ExchangeSinkQueueDependency", true);
- _sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
+ if (!only_local_exchange) {
+ _sink_buffer = std::make_unique<ExchangeSinkBuffer>(id,
p._dest_node_id, _sender_id,
+
_state->be_number(), state, this);
+ register_channels(_sink_buffer.get());
+ _queue_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
+
"ExchangeSinkQueueDependency", true);
+ _sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
+ _finish_dependency->block();
+ }
+
if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1)
&&
!only_local_exchange) {
_broadcast_dependency = Dependency::create_shared(
@@ -244,7 +248,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
fmt::format("Crc32HashPartitioner({})",
_partition_count));
}
- _finish_dependency->block();
if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
@@ -559,8 +562,9 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
final_st = st;
}
}
- local_state._sink_buffer->set_should_stop();
- return final_st;
+ if (local_state._sink_buffer) {
+ local_state._sink_buffer->set_should_stop();
+ }
}
return final_st;
}
@@ -631,11 +635,14 @@ Status ExchangeSinkOperatorX::channel_add_rows_with_idx(
std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}",
Base::debug_string(indentation_level));
- fmt::format_to(debug_string_buffer,
- ", Sink Buffer: (_should_stop = {}, _busy_channels = {},
_is_finishing = {}), "
- "_reach_limit: {}",
- _sink_buffer->_should_stop.load(),
_sink_buffer->_busy_channels.load(),
- _sink_buffer->_is_finishing.load(), _reach_limit.load());
+ if (_sink_buffer) {
+ fmt::format_to(
+ debug_string_buffer,
+ ", Sink Buffer: (_should_stop = {}, _busy_channels = {},
_is_finishing = {}), "
+ "_reach_limit: {}",
+ _sink_buffer->_should_stop.load(),
_sink_buffer->_busy_channels.load(),
+ _sink_buffer->_is_finishing.load(), _reach_limit.load());
+ }
return fmt::to_string(debug_string_buffer);
}
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 300e2a5172f..adf8a342470 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -64,7 +64,9 @@ public:
std::vector<Dependency*> dependencies() const override {
std::vector<Dependency*> dep_vec;
- dep_vec.push_back(_queue_dependency.get());
+ if (_queue_dependency) {
+ dep_vec.push_back(_queue_dependency.get());
+ }
if (_broadcast_dependency) {
dep_vec.push_back(_broadcast_dependency.get());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]