This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch fe_local_shuffle
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/fe_local_shuffle by this push:
     new 00560be57e5 [fix](local shuffle) pre-create NLJ shared state at 
tree-build time to fix COREDUMP
00560be57e5 is described below

commit 00560be57e565ccf086dc89fc4a87aa4a25f27f8
Author: 924060929 <[email protected]>
AuthorDate: Wed Apr 1 12:14:27 2026 +0800

    [fix](local shuffle) pre-create NLJ shared state at tree-build time to fix 
COREDUMP
    
    NestedLoopJoinBuildSinkOperatorX::sink() crashes with 
DCHECK(source_deps.size() > 0)
    because the NLJ probe's source dependency was never created. This happens 
when
    FE-planned local exchanges change pipeline num_tasks, causing some 
instances to
    skip creating probe tasks via the per-instance injection path.
    
    Fix: create NLJ shared state at tree-build time with _num_instances 
source/sink deps
    and register in _op_id_to_shared_state, same pattern as HASH_JOIN_NODE with 
shared
    hash table. This ensures all instances find the shared state via 
_shared_state_map
    with properly initialized dependencies.
---
 be/src/exec/pipeline/pipeline_fragment_context.cpp | 24 ++++++++++++++++++++++
 1 file changed, 24 insertions(+)

diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp 
b/be/src/exec/pipeline/pipeline_fragment_context.cpp
index abfe3048fff..2944682b2f1 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp
@@ -1616,6 +1616,30 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, 
_runtime_state.get()));
         _pipeline_parent_map.push(op->node_id(), cur_pipe);
         _pipeline_parent_map.push(op->node_id(), build_side_pipe);
+
+        // Pre-create shared state with _num_instances deps and register in
+        // _op_id_to_shared_state, same as HASH_JOIN_NODE with shared hash 
table.
+        // This ensures all instances find the shared state via 
_shared_state_map
+        // (Path 1) with properly initialized source_deps and sink_deps, even 
when
+        // FE-planned local exchanges change pipeline num_tasks causing some
+        // instances to skip creating probe/build tasks via the per-instance
+        // create_shared_state + inject_shared_state path (Path 2).
+        {
+            std::shared_ptr<NestedLoopJoinSharedState> shared_state =
+                    NestedLoopJoinSharedState::create_shared();
+            for (int i = 0; i < _num_instances; i++) {
+                auto sink_dep = std::make_shared<Dependency>(
+                        op->operator_id(), op->node_id(),
+                        "CROSS_JOIN_SINK_OPERATOR_DEPENDENCY");
+                sink_dep->set_shared_state(shared_state.get());
+                shared_state->sink_deps.push_back(sink_dep);
+            }
+            shared_state->create_source_dependencies(
+                    _num_instances, op->operator_id(), op->node_id(),
+                    "CROSS_JOIN_PROBE");
+            _op_id_to_shared_state.insert(
+                    {op->operator_id(), {shared_state, 
shared_state->sink_deps}});
+        }
         break;
     }
     case TPlanNodeType::UNION_NODE: {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to