This is an automated email from the ASF dual-hosted git repository. 924060929 pushed a commit to branch fe_local_shuffle_rebase in repository https://gitbox.apache.org/repos/asf/doris.git
commit 4ff13be21baa228d65c4bfa79b3726171553d06a Author: 924060929 <[email protected]> AuthorDate: Thu May 21 16:48:23 2026 +0800 [fix](local shuffle) Route to NereidsCoordinator based on distributedPlans Dictionary refresh hangs because the FE-side planning inserted LE nodes (via PhysicalDictionarySink special case in NereidsPlanner.distribute), but EnvFactory.createCoordinator dispatched to legacy Coordinator, which forces enable_local_shuffle_planner=false at initQueryOptions. BE then runs its own _plan_local_exchange on a fragment that already has FE- planned LEs, doubling pipeline 1 num_tasks (1 -> 7) and adding a new pipeline. The FE-init exchanger was sized for sender_count=1, but 7 sink LocalStates close it; _running_sink_operators goes negative, source side short-circuits, dependent pipeline tasks never wake up, fragment never finishes, and the dict stays in LOADING forever. The mismatch is between two checks for "did FE plan distribute": - Plan time (NereidsPlanner.distribute, line 704): SessionVariable.canUseNereidsDistributePlanner() OR PhysicalDictionarySink -> true for dict refresh -> addLocalExchange runs -> LE inserted - Dispatch time (EnvFactory.createCoordinator, line 148/156): canUseNereidsDistributePlanner(context), which returns false when statementContext.parsedStatement is not a LogicalPlanAdapter — and dict refresh constructs StmtExecutor with a raw SQL string, leaving parsedStatement null -> dispatched to legacy Coordinator Replace the dispatch-side session-var check with a direct read of NereidsPlanner.getDistributedPlans(): non-empty iff FE actually ran distribute. This aligns dispatch with plan-time reality without depending on parsedStatement state, session var values, or sink type. Apply the same fix to CloudEnvFactory. Verified locally: test_dict_load_and_get_ip_trie was hanging at the first refresh and reproducing the _running_sink_operators=-6 pattern; with this fix the test passes and dict refresh correctly reports the expected duplicate-CIDR error. --- .../main/java/org/apache/doris/catalog/EnvFactory.java | 16 ++++++++++++++-- .../org/apache/doris/cloud/catalog/CloudEnvFactory.java | 4 ++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java index 3ecdb6ddef3..9cfc70f98a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java @@ -145,7 +145,7 @@ public class EnvFactory { public Coordinator createCoordinator(ConnectContext context, Planner planner, StatsErrorEstimator statsErrorEstimator) { - if (planner instanceof NereidsPlanner && SessionVariable.canUseNereidsDistributePlanner(context)) { + if (planner instanceof NereidsPlanner && hasNereidsDistributedPlans((NereidsPlanner) planner)) { return new NereidsCoordinator(context, (NereidsPlanner) planner, statsErrorEstimator); } return new Coordinator(context, planner, statsErrorEstimator); @@ -153,12 +153,24 @@ public class EnvFactory { public Coordinator createCoordinator(ConnectContext context, Planner planner, StatsErrorEstimator statsErrorEstimator, long jobId) { - if (planner instanceof NereidsPlanner && SessionVariable.canUseNereidsDistributePlanner(context)) { + if (planner instanceof NereidsPlanner && hasNereidsDistributedPlans((NereidsPlanner) planner)) { return new NereidsCoordinator(context, (NereidsPlanner) planner, statsErrorEstimator, jobId); } return new Coordinator(context, planner, statsErrorEstimator); } + // Dispatch decision must mirror what FE planning actually did. SessionVariable + // checks (parsedStatement state, session vars) can drift from plan-time reality — + // e.g. dict refresh runs distribute() unconditionally for PhysicalDictionarySink + // even though canUseNereidsDistributePlanner(context) returns false because + // parsedStatement is never set on that path, sending the query to legacy + // Coordinator and producing a hang. The distributedPlans field is the + // single source of truth: it is populated iff FE did distribute planning. + protected static boolean hasNereidsDistributedPlans(NereidsPlanner planner) { + FragmentIdMapping<DistributedPlan> distributedPlans = planner.getDistributedPlans(); + return distributedPlans != null && !distributedPlans.isEmpty(); + } + // Used for broker load task/export task/update coordinator public Coordinator createCoordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable, List<PlanFragment> fragments, List<ScanNode> scanNodes, diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java index 5b240dcd8ef..83c84027d28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java @@ -158,7 +158,7 @@ public class CloudEnvFactory extends EnvFactory { @Override public Coordinator createCoordinator(ConnectContext context, Planner planner, StatsErrorEstimator statsErrorEstimator) { - if (planner instanceof NereidsPlanner && SessionVariable.canUseNereidsDistributePlanner()) { + if (planner instanceof NereidsPlanner && hasNereidsDistributedPlans((NereidsPlanner) planner)) { return new NereidsCoordinator(context, (NereidsPlanner) planner, statsErrorEstimator); } return new CloudCoordinator(context, planner, statsErrorEstimator); @@ -167,7 +167,7 @@ public class CloudEnvFactory extends EnvFactory { @Override public Coordinator createCoordinator(ConnectContext context, Planner planner, StatsErrorEstimator statsErrorEstimator, long jobId) { - if (planner instanceof NereidsPlanner && SessionVariable.canUseNereidsDistributePlanner()) { + if (planner instanceof NereidsPlanner && hasNereidsDistributedPlans((NereidsPlanner) planner)) { return new NereidsCoordinator(context, (NereidsPlanner) planner, statsErrorEstimator, jobId); } return new CloudCoordinator(context, planner, statsErrorEstimator); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
