This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 126714ace4f [fix](pipelineX) fix nullptr in loca exchange dependency 
(#27488)
126714ace4f is described below

commit 126714ace4f635f459b49479dd62d1394312df6e
Author: Mryange <[email protected]>
AuthorDate: Fri Nov 24 15:12:25 2023 +0800

    [fix](pipelineX) fix nullptr in loca exchange dependency (#27488)
    
    fix nullptr in loca exchange dependency
---
 be/src/pipeline/pipeline_x/operator.cpp            |  9 ++++++-
 be/src/pipeline/pipeline_x/operator.h              |  1 +
 .../pipeline_x/pipeline_x_fragment_context.cpp     | 28 ++++++++++++++--------
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     |  2 +-
 4 files changed, 28 insertions(+), 12 deletions(-)

diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index 0eafada38a9..c0f3a6f029a 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -346,7 +346,11 @@ Status 
PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
     _dependency = (DependencyType*)info.dependency.get();
     if constexpr (!std::is_same_v<FakeDependency, DependencyType>) {
         auto& deps = info.upstream_dependencies;
-        _dependency->set_shared_state(deps.front()->shared_state());
+        if constexpr (std::is_same_v<LocalExchangeSourceDependency, 
DependencyType>) {
+            _dependency->set_shared_state(info.local_exchange_state);
+        } else {
+            _dependency->set_shared_state(deps.front()->shared_state());
+        }
         _shared_state = (typename 
DependencyType::SharedState*)_dependency->shared_state().get();
         _shared_state->ref();
         _wait_for_dependency_timer =
@@ -408,6 +412,9 @@ Status 
PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
     if constexpr (!std::is_same_v<FakeDependency, DependencyType>) {
         auto& deps = info.dependencys;
         _dependency = (DependencyType*)deps.front().get();
+        if constexpr (std::is_same_v<LocalExchangeSinkDependency, 
DependencyType>) {
+            _dependency->set_shared_state(info.local_exchange_state);
+        }
         if (_dependency) {
             _shared_state =
                     (typename 
DependencyType::SharedState*)_dependency->shared_state().get();
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index 7d534c048b8..28f156a15a9 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -47,6 +47,7 @@ struct LocalSinkStateInfo {
     RuntimeProfile* parent_profile;
     const int sender_id;
     std::vector<DependencySPtr>& dependencys;
+    std::shared_ptr<LocalExchangeSharedState> local_exchange_state;
     const TDataSink& tsink;
 };
 
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index b26e9ead695..33587c31dfc 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -426,17 +426,25 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
         _runtime_states[i]->set_total_load_streams(request.total_load_streams);
         _runtime_states[i]->set_num_local_sink(request.num_local_sink);
         std::map<PipelineId, PipelineXTask*> pipeline_id_to_task;
-        for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
+        auto get_local_exchange_state =
+                [&](PipelinePtr pipeline) -> 
std::shared_ptr<LocalExchangeSharedState> {
+            auto source_id = pipeline->operator_xs().front()->operator_id();
+            if (auto iter = _op_id_to_le_state.find(source_id); iter != 
_op_id_to_le_state.end()) {
+                return iter->second;
+            }
+            for (auto sink_to_source_id : pipeline->sink_x()->dests_id()) {
+                if (auto iter = _op_id_to_le_state.find(sink_to_source_id);
+                    iter != _op_id_to_le_state.end()) {
+                    return iter->second;
+                }
+            }
+            return nullptr;
+        };
+        for (auto& pipeline : _pipelines) {
             auto task = std::make_unique<PipelineXTask>(
-                    _pipelines[pip_idx], _total_tasks++, 
_runtime_states[i].get(), this,
-                    _runtime_states[i]->runtime_profile(),
-                    _op_id_to_le_state.contains(
-                            
_pipelines[pip_idx]->operator_xs().front()->operator_id())
-                            ? _op_id_to_le_state
-                                      
[_pipelines[pip_idx]->operator_xs().front()->operator_id()]
-                            : nullptr,
-                    i);
-            pipeline_id_to_task.insert({_pipelines[pip_idx]->id(), 
task.get()});
+                    pipeline, _total_tasks++, _runtime_states[i].get(), this,
+                    _runtime_states[i]->runtime_profile(), 
get_local_exchange_state(pipeline), i);
+            pipeline_id_to_task.insert({pipeline->id(), task.get()});
             _tasks[i].emplace_back(std::move(task));
         }
 
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 24a090c6998..b3cc261f5b3 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -75,7 +75,7 @@ Status PipelineXTask::prepare(RuntimeState* state, const 
TPipelineInstanceParams
     {
         // set sink local state
         LocalSinkStateInfo info {_parent_profile, local_params.sender_id,
-                                 get_downstream_dependency(), tsink};
+                                 get_downstream_dependency(), 
_local_exchange_state, tsink};
         RETURN_IF_ERROR(_sink->setup_local_state(state, info));
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to