This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 126714ace4f [fix](pipelineX) fix nullptr in loca exchange dependency
(#27488)
126714ace4f is described below
commit 126714ace4f635f459b49479dd62d1394312df6e
Author: Mryange <[email protected]>
AuthorDate: Fri Nov 24 15:12:25 2023 +0800
[fix](pipelineX) fix nullptr in loca exchange dependency (#27488)
fix nullptr in loca exchange dependency
---
be/src/pipeline/pipeline_x/operator.cpp | 9 ++++++-
be/src/pipeline/pipeline_x/operator.h | 1 +
.../pipeline_x/pipeline_x_fragment_context.cpp | 28 ++++++++++++++--------
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 2 +-
4 files changed, 28 insertions(+), 12 deletions(-)
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index 0eafada38a9..c0f3a6f029a 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -346,7 +346,11 @@ Status
PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
_dependency = (DependencyType*)info.dependency.get();
if constexpr (!std::is_same_v<FakeDependency, DependencyType>) {
auto& deps = info.upstream_dependencies;
- _dependency->set_shared_state(deps.front()->shared_state());
+ if constexpr (std::is_same_v<LocalExchangeSourceDependency,
DependencyType>) {
+ _dependency->set_shared_state(info.local_exchange_state);
+ } else {
+ _dependency->set_shared_state(deps.front()->shared_state());
+ }
_shared_state = (typename
DependencyType::SharedState*)_dependency->shared_state().get();
_shared_state->ref();
_wait_for_dependency_timer =
@@ -408,6 +412,9 @@ Status
PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
if constexpr (!std::is_same_v<FakeDependency, DependencyType>) {
auto& deps = info.dependencys;
_dependency = (DependencyType*)deps.front().get();
+ if constexpr (std::is_same_v<LocalExchangeSinkDependency,
DependencyType>) {
+ _dependency->set_shared_state(info.local_exchange_state);
+ }
if (_dependency) {
_shared_state =
(typename
DependencyType::SharedState*)_dependency->shared_state().get();
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index 7d534c048b8..28f156a15a9 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -47,6 +47,7 @@ struct LocalSinkStateInfo {
RuntimeProfile* parent_profile;
const int sender_id;
std::vector<DependencySPtr>& dependencys;
+ std::shared_ptr<LocalExchangeSharedState> local_exchange_state;
const TDataSink& tsink;
};
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index b26e9ead695..33587c31dfc 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -426,17 +426,25 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
_runtime_states[i]->set_total_load_streams(request.total_load_streams);
_runtime_states[i]->set_num_local_sink(request.num_local_sink);
std::map<PipelineId, PipelineXTask*> pipeline_id_to_task;
- for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
+ auto get_local_exchange_state =
+ [&](PipelinePtr pipeline) ->
std::shared_ptr<LocalExchangeSharedState> {
+ auto source_id = pipeline->operator_xs().front()->operator_id();
+ if (auto iter = _op_id_to_le_state.find(source_id); iter !=
_op_id_to_le_state.end()) {
+ return iter->second;
+ }
+ for (auto sink_to_source_id : pipeline->sink_x()->dests_id()) {
+ if (auto iter = _op_id_to_le_state.find(sink_to_source_id);
+ iter != _op_id_to_le_state.end()) {
+ return iter->second;
+ }
+ }
+ return nullptr;
+ };
+ for (auto& pipeline : _pipelines) {
auto task = std::make_unique<PipelineXTask>(
- _pipelines[pip_idx], _total_tasks++,
_runtime_states[i].get(), this,
- _runtime_states[i]->runtime_profile(),
- _op_id_to_le_state.contains(
-
_pipelines[pip_idx]->operator_xs().front()->operator_id())
- ? _op_id_to_le_state
-
[_pipelines[pip_idx]->operator_xs().front()->operator_id()]
- : nullptr,
- i);
- pipeline_id_to_task.insert({_pipelines[pip_idx]->id(),
task.get()});
+ pipeline, _total_tasks++, _runtime_states[i].get(), this,
+ _runtime_states[i]->runtime_profile(),
get_local_exchange_state(pipeline), i);
+ pipeline_id_to_task.insert({pipeline->id(), task.get()});
_tasks[i].emplace_back(std::move(task));
}
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 24a090c6998..b3cc261f5b3 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -75,7 +75,7 @@ Status PipelineXTask::prepare(RuntimeState* state, const
TPipelineInstanceParams
{
// set sink local state
LocalSinkStateInfo info {_parent_profile, local_params.sender_id,
- get_downstream_dependency(), tsink};
+ get_downstream_dependency(),
_local_exchange_state, tsink};
RETURN_IF_ERROR(_sink->setup_local_state(state, info));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]