This is an automated email from the ASF dual-hosted git repository.
huajianlan pushed a commit to branch fe_local_shuffle
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/fe_local_shuffle by this push:
new 00560be57e5 [fix](local shuffle) pre-create NLJ shared state at
tree-build time to fix COREDUMP
00560be57e5 is described below
commit 00560be57e565ccf086dc89fc4a87aa4a25f27f8
Author: 924060929 <[email protected]>
AuthorDate: Wed Apr 1 12:14:27 2026 +0800
[fix](local shuffle) pre-create NLJ shared state at tree-build time to fix
COREDUMP
NestedLoopJoinBuildSinkOperatorX::sink() crashes with
DCHECK(source_deps.size() > 0)
because the NLJ probe's source dependency was never created. This happens
when
FE-planned local exchanges change pipeline num_tasks, causing some
instances to
skip creating probe tasks via the per-instance injection path.
Fix: create NLJ shared state at tree-build time with _num_instances
source/sink deps
and register in _op_id_to_shared_state, same pattern as HASH_JOIN_NODE with
shared
hash table. This ensures all instances find the shared state via
_shared_state_map
with properly initialized dependencies.
---
be/src/exec/pipeline/pipeline_fragment_context.cpp | 24 ++++++++++++++++++++++
1 file changed, 24 insertions(+)
diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp
b/be/src/exec/pipeline/pipeline_fragment_context.cpp
index abfe3048fff..2944682b2f1 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp
@@ -1616,6 +1616,30 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode,
_runtime_state.get()));
_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
+
+ // Pre-create shared state with _num_instances deps and register in
+ // _op_id_to_shared_state, same as HASH_JOIN_NODE with shared hash
table.
+ // This ensures all instances find the shared state via
_shared_state_map
+ // (Path 1) with properly initialized source_deps and sink_deps, even
when
+ // FE-planned local exchanges change pipeline num_tasks causing some
+ // instances to skip creating probe/build tasks via the per-instance
+ // create_shared_state + inject_shared_state path (Path 2).
+ {
+ std::shared_ptr<NestedLoopJoinSharedState> shared_state =
+ NestedLoopJoinSharedState::create_shared();
+ for (int i = 0; i < _num_instances; i++) {
+ auto sink_dep = std::make_shared<Dependency>(
+ op->operator_id(), op->node_id(),
+ "CROSS_JOIN_SINK_OPERATOR_DEPENDENCY");
+ sink_dep->set_shared_state(shared_state.get());
+ shared_state->sink_deps.push_back(sink_dep);
+ }
+ shared_state->create_source_dependencies(
+ _num_instances, op->operator_id(), op->node_id(),
+ "CROSS_JOIN_PROBE");
+ _op_id_to_shared_state.insert(
+ {op->operator_id(), {shared_state,
shared_state->sink_deps}});
+ }
break;
}
case TPlanNodeType::UNION_NODE: {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]