This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new ade1841a01d [fix](shuffle) Do not return error if local recvr is null
(#35399)
ade1841a01d is described below
commit ade1841a01d39c5c114d205549d58c9353b09a35
Author: Gabriel <[email protected]>
AuthorDate: Sun May 26 20:20:50 2024 +0800
[fix](shuffle) Do not return error if local recvr is null (#35399)
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 17 +++++++++++------
be/src/vec/sink/vdata_stream_sender.cpp | 4 +---
2 files changed, 12 insertions(+), 9 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 2e0972ca5ee..dfcb3e8e120 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -210,12 +210,17 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
size_t dep_id = 0;
for (auto* channel : channels) {
if (channel->is_local()) {
-
_local_channels_dependency.push_back(channel->get_local_channel_dependency());
- DCHECK(_local_channels_dependency[dep_id] != nullptr);
- _wait_channel_timer.push_back(_profile->add_nonzero_counter(
- fmt::format("WaitForLocalExchangeBuffer{}", dep_id),
TUnit ::TIME_NS,
- timer_name, 1));
- dep_id++;
+ if (auto dep = channel->get_local_channel_dependency()) {
+ _local_channels_dependency.push_back(dep);
+ DCHECK(_local_channels_dependency[dep_id] != nullptr);
+
_wait_channel_timer.push_back(_profile->add_nonzero_counter(
+ fmt::format("WaitForLocalExchangeBuffer{}",
dep_id), TUnit ::TIME_NS,
+ timer_name, 1));
+ dep_id++;
+ } else {
+ LOG(WARNING) << "local recvr is null: query id = "
+ << print_id(state->query_id()) << " node id =
" << p.node_id();
+ }
}
}
}
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index 24f92bf2aae..ca731b48b37 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -180,9 +180,7 @@ template <typename Parent>
std::shared_ptr<pipeline::Dependency>
PipChannel<Parent>::get_local_channel_dependency() {
if (!Channel<Parent>::_local_recvr) {
if constexpr (std::is_same_v<pipeline::ExchangeSinkLocalState,
Parent>) {
- throw Exception(ErrorCode::INTERNAL_ERROR,
- "_local_recvr is null: " +
-
std::to_string(Channel<Parent>::_parent->parent()->node_id()));
+ return nullptr;
} else {
throw Exception(ErrorCode::INTERNAL_ERROR, "_local_recvr is null");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]