This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 405541b2de5 [fix](tvf) fix wrong backend assignment for insert into 
local tvf (#62143)
405541b2de5 is described below

commit 405541b2de5356f1dac8151961f6d7df341ec33a
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Mon Apr 6 20:34:18 2026 -0700

    [fix](tvf) fix wrong backend assignment for insert into local tvf (#62143)
    
    ### What problem does this PR solve?
    
    Followup #61732
    
    Problem Summary: When using INSERT INTO local() TVF with a specific
    backend_id, the Nereids DistributePlanner does not pin the sink fragment
    to the designated backend, causing data to be written to the wrong
    node's local disk.
---
 .../worker/job/UnassignedJobBuilder.java           | 16 ++++-
 .../worker/job/UnassignedLocalTVFSinkJob.java      | 68 ++++++++++++++++++++++
 2 files changed, 83 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
index d7950022151..53f79ac72e5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
@@ -31,6 +31,7 @@ import org.apache.doris.planner.PlanFragmentId;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.planner.SchemaScanNode;
+import org.apache.doris.planner.TVFTableSink;
 import org.apache.doris.thrift.TExplainLevel;
 
 import com.google.common.collect.ArrayListMultimap;
@@ -92,7 +93,20 @@ public class UnassignedJobBuilder {
             // this fragment already set its instances in 
`visitPhysicalDistribute`.
             // now assign to 1 BE 1 instance.
             return new UnassignedAllBEJob(statementContext, planFragment, 
inputJobs);
-        } else if (scanNodes.isEmpty() && isTopFragment
+        }
+
+        // For INSERT INTO local() with a specific backend_id, the sink 
fragment must
+        // execute on the designated backend so that data is written to the 
correct
+        // node's local disk.
+        if (planFragment.getSink() instanceof TVFTableSink) {
+            TVFTableSink tvfSink = (TVFTableSink) planFragment.getSink();
+            if ("local".equals(tvfSink.getTvfName()) && tvfSink.getBackendId() 
!= -1) {
+                return new UnassignedLocalTVFSinkJob(
+                        statementContext, planFragment, inputJobs, 
tvfSink.getBackendId());
+            }
+        }
+
+        if (scanNodes.isEmpty() && isTopFragment
                 && statementContext.getGroupCommitMergeBackend() != null) {
             return new UnassignedGroupCommitJob(statementContext, 
planFragment, scanNodes, inputJobs);
         } else if (!scanNodes.isEmpty() || isLeafFragment(planFragment)) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalTVFSinkJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalTVFSinkJob.java
new file mode 100644
index 00000000000..abe804dc170
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalTVFSinkJob.java
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.distribute.worker.job;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
+import org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
+import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.system.Backend;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ListMultimap;
+
+import java.util.List;
+
+/**
+ * UnassignedLocalTVFSinkJob.
+ * For INSERT INTO local(...) with a specific backend_id, the sink fragment 
must
+ * execute on the designated backend so that data is written to the correct 
node's local disk.
+ */
+public class UnassignedLocalTVFSinkJob extends AbstractUnassignedJob {
+    private final long backendId;
+
+    public UnassignedLocalTVFSinkJob(
+            StatementContext statementContext, PlanFragment fragment,
+            ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob,
+            long backendId) {
+        super(statementContext, fragment, ImmutableList.of(), 
exchangeToChildJob);
+        this.backendId = backendId;
+    }
+
+    @Override
+    public List<AssignedJob> computeAssignedJobs(
+            DistributeContext distributeContext, ListMultimap<ExchangeNode, 
AssignedJob> inputJobs) {
+        Backend targetBackend = 
Env.getCurrentSystemInfo().getBackend(backendId);
+        if (targetBackend == null || !targetBackend.isAlive()) {
+            throw new IllegalStateException("Backend " + backendId
+                    + " is not available for local TVF sink");
+        }
+        DistributedPlanWorker worker = new BackendWorker(
+                Env.getCurrentInternalCatalog().getId(), targetBackend);
+        return ImmutableList.of(
+                assignWorkerAndDataSources(
+                        0, 
statementContext.getConnectContext().nextInstanceId(),
+                        worker, new DefaultScanSource(ImmutableMap.of())
+                )
+        );
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to