This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 405541b2de5 [fix](tvf) fix wrong backend assignment for insert into
local tvf (#62143)
405541b2de5 is described below
commit 405541b2de5356f1dac8151961f6d7df341ec33a
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Mon Apr 6 20:34:18 2026 -0700
[fix](tvf) fix wrong backend assignment for insert into local tvf (#62143)
### What problem does this PR solve?
Followup #61732
Problem Summary: When using INSERT INTO local() TVF with a specific
backend_id, the Nereids DistributePlanner does not pin the sink fragment
to the designated backend, causing data to be written to the wrong
node's local disk.
---
.../worker/job/UnassignedJobBuilder.java | 16 ++++-
.../worker/job/UnassignedLocalTVFSinkJob.java | 68 ++++++++++++++++++++++
2 files changed, 83 insertions(+), 1 deletion(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
index d7950022151..53f79ac72e5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
@@ -31,6 +31,7 @@ import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.SchemaScanNode;
+import org.apache.doris.planner.TVFTableSink;
import org.apache.doris.thrift.TExplainLevel;
import com.google.common.collect.ArrayListMultimap;
@@ -92,7 +93,20 @@ public class UnassignedJobBuilder {
// this fragment already set its instances in
`visitPhysicalDistribute`.
// now assign to 1 BE 1 instance.
return new UnassignedAllBEJob(statementContext, planFragment,
inputJobs);
- } else if (scanNodes.isEmpty() && isTopFragment
+ }
+
+ // For INSERT INTO local() with a specific backend_id, the sink
fragment must
+ // execute on the designated backend so that data is written to the
correct
+ // node's local disk.
+ if (planFragment.getSink() instanceof TVFTableSink) {
+ TVFTableSink tvfSink = (TVFTableSink) planFragment.getSink();
+ if ("local".equals(tvfSink.getTvfName()) && tvfSink.getBackendId()
!= -1) {
+ return new UnassignedLocalTVFSinkJob(
+ statementContext, planFragment, inputJobs,
tvfSink.getBackendId());
+ }
+ }
+
+ if (scanNodes.isEmpty() && isTopFragment
&& statementContext.getGroupCommitMergeBackend() != null) {
return new UnassignedGroupCommitJob(statementContext,
planFragment, scanNodes, inputJobs);
} else if (!scanNodes.isEmpty() || isLeafFragment(planFragment)) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalTVFSinkJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalTVFSinkJob.java
new file mode 100644
index 00000000000..abe804dc170
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalTVFSinkJob.java
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.distribute.worker.job;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
+import org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker;
+import
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
+import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.system.Backend;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ListMultimap;
+
+import java.util.List;
+
+/**
+ * UnassignedLocalTVFSinkJob.
+ * For INSERT INTO local(...) with a specific backend_id, the sink fragment
must
+ * execute on the designated backend so that data is written to the correct
node's local disk.
+ */
+public class UnassignedLocalTVFSinkJob extends AbstractUnassignedJob {
+ private final long backendId;
+
+ public UnassignedLocalTVFSinkJob(
+ StatementContext statementContext, PlanFragment fragment,
+ ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob,
+ long backendId) {
+ super(statementContext, fragment, ImmutableList.of(),
exchangeToChildJob);
+ this.backendId = backendId;
+ }
+
+ @Override
+ public List<AssignedJob> computeAssignedJobs(
+ DistributeContext distributeContext, ListMultimap<ExchangeNode,
AssignedJob> inputJobs) {
+ Backend targetBackend =
Env.getCurrentSystemInfo().getBackend(backendId);
+ if (targetBackend == null || !targetBackend.isAlive()) {
+ throw new IllegalStateException("Backend " + backendId
+ + " is not available for local TVF sink");
+ }
+ DistributedPlanWorker worker = new BackendWorker(
+ Env.getCurrentInternalCatalog().getId(), targetBackend);
+ return ImmutableList.of(
+ assignWorkerAndDataSources(
+ 0,
statementContext.getConnectContext().nextInstanceId(),
+ worker, new DefaultScanSource(ImmutableMap.of())
+ )
+ );
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]