This is an automated email from the ASF dual-hosted git repository.

924060929 pushed a commit to branch fe_local_shuffle_rebase
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 4ff13be21baa228d65c4bfa79b3726171553d06a
Author: 924060929 <[email protected]>
AuthorDate: Thu May 21 16:48:23 2026 +0800

    [fix](local shuffle) Route to NereidsCoordinator based on distributedPlans
    
    Dictionary refresh hangs because the FE-side planning inserted LE nodes
    (via PhysicalDictionarySink special case in NereidsPlanner.distribute),
    but EnvFactory.createCoordinator dispatched to legacy Coordinator, which
    forces enable_local_shuffle_planner=false at initQueryOptions. BE then
    runs its own _plan_local_exchange on a fragment that already has FE-
    planned LEs, doubling pipeline 1 num_tasks (1 -> 7) and adding a new
    pipeline. The FE-init exchanger was sized for sender_count=1, but 7 sink
    LocalStates close it; _running_sink_operators goes negative, source side
    short-circuits, dependent pipeline tasks never wake up, fragment never
    finishes, and the dict stays in LOADING forever.
    
    The mismatch is between two checks for "did FE plan distribute":
    - Plan time (NereidsPlanner.distribute, line 704):
      SessionVariable.canUseNereidsDistributePlanner() OR PhysicalDictionarySink
      -> true for dict refresh -> addLocalExchange runs -> LE inserted
    - Dispatch time (EnvFactory.createCoordinator, line 148/156):
      canUseNereidsDistributePlanner(context), which returns false when
      statementContext.parsedStatement is not a LogicalPlanAdapter — and dict
      refresh constructs StmtExecutor with a raw SQL string, leaving
      parsedStatement null -> dispatched to legacy Coordinator
    
    Replace the dispatch-side session-var check with a direct read of
    NereidsPlanner.getDistributedPlans(): non-empty iff FE actually ran
    distribute. This aligns dispatch with plan-time reality without depending
    on parsedStatement state, session var values, or sink type.
    
    Apply the same fix to CloudEnvFactory.
    
    Verified locally: test_dict_load_and_get_ip_trie was hanging at the first
    refresh and reproducing the _running_sink_operators=-6 pattern; with this
    fix the test passes and dict refresh correctly reports the expected
    duplicate-CIDR error.
---
 .../main/java/org/apache/doris/catalog/EnvFactory.java   | 16 ++++++++++++++--
 .../org/apache/doris/cloud/catalog/CloudEnvFactory.java  |  4 ++--
 2 files changed, 16 insertions(+), 4 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
index 3ecdb6ddef3..9cfc70f98a7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
@@ -145,7 +145,7 @@ public class EnvFactory {
 
     public Coordinator createCoordinator(ConnectContext context, Planner 
planner,
                                          StatsErrorEstimator 
statsErrorEstimator) {
-        if (planner instanceof NereidsPlanner && 
SessionVariable.canUseNereidsDistributePlanner(context)) {
+        if (planner instanceof NereidsPlanner && 
hasNereidsDistributedPlans((NereidsPlanner) planner)) {
             return new NereidsCoordinator(context, (NereidsPlanner) planner, 
statsErrorEstimator);
         }
         return new Coordinator(context, planner, statsErrorEstimator);
@@ -153,12 +153,24 @@ public class EnvFactory {
 
     public Coordinator createCoordinator(ConnectContext context, Planner 
planner,
                                          StatsErrorEstimator 
statsErrorEstimator, long jobId) {
-        if (planner instanceof NereidsPlanner && 
SessionVariable.canUseNereidsDistributePlanner(context)) {
+        if (planner instanceof NereidsPlanner && 
hasNereidsDistributedPlans((NereidsPlanner) planner)) {
             return new NereidsCoordinator(context, (NereidsPlanner) planner, 
statsErrorEstimator, jobId);
         }
         return new Coordinator(context, planner, statsErrorEstimator);
     }
 
+    // Dispatch decision must mirror what FE planning actually did. 
SessionVariable
+    // checks (parsedStatement state, session vars) can drift from plan-time 
reality —
+    // e.g. dict refresh runs distribute() unconditionally for 
PhysicalDictionarySink
+    // even though canUseNereidsDistributePlanner(context) returns false 
because
+    // parsedStatement is never set on that path, sending the query to legacy
+    // Coordinator and producing a hang. The distributedPlans field is the
+    // single source of truth: it is populated iff FE did distribute planning.
+    protected static boolean hasNereidsDistributedPlans(NereidsPlanner 
planner) {
+        FragmentIdMapping<DistributedPlan> distributedPlans = 
planner.getDistributedPlans();
+        return distributedPlans != null && !distributedPlans.isEmpty();
+    }
+
     // Used for broker load task/export task/update coordinator
     public Coordinator createCoordinator(Long jobId, TUniqueId queryId, 
DescriptorTable descTable,
                                          List<PlanFragment> fragments, 
List<ScanNode> scanNodes,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
index 5b240dcd8ef..83c84027d28 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
@@ -158,7 +158,7 @@ public class CloudEnvFactory extends EnvFactory {
     @Override
     public Coordinator createCoordinator(ConnectContext context, Planner 
planner,
                                          StatsErrorEstimator 
statsErrorEstimator) {
-        if (planner instanceof NereidsPlanner && 
SessionVariable.canUseNereidsDistributePlanner()) {
+        if (planner instanceof NereidsPlanner && 
hasNereidsDistributedPlans((NereidsPlanner) planner)) {
             return new NereidsCoordinator(context, (NereidsPlanner) planner, 
statsErrorEstimator);
         }
         return new CloudCoordinator(context, planner, statsErrorEstimator);
@@ -167,7 +167,7 @@ public class CloudEnvFactory extends EnvFactory {
     @Override
     public Coordinator createCoordinator(ConnectContext context, Planner 
planner,
                                          StatsErrorEstimator 
statsErrorEstimator, long jobId) {
-        if (planner instanceof NereidsPlanner && 
SessionVariable.canUseNereidsDistributePlanner()) {
+        if (planner instanceof NereidsPlanner && 
hasNereidsDistributedPlans((NereidsPlanner) planner)) {
             return new NereidsCoordinator(context, (NereidsPlanner) planner, 
statsErrorEstimator, jobId);
         }
         return new CloudCoordinator(context, planner, statsErrorEstimator);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to