This is an automated email from the ASF dual-hosted git repository.

liaoxin01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b3d70c1407 [fix](transaction) select txn insert backend from current 
cluster (#63634)
0b3d70c1407 is described below

commit 0b3d70c14071fa7416bed2aface4d9aa94a40d5a
Author: hui lai <[email protected]>
AuthorDate: Mon Jun 1 16:31:45 2026 +0800

    [fix](transaction) select txn insert backend from current cluster (#63634)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    
    In cloud mode with multiple compute groups, transactional `insert into
    values` may fail with:
    
    `Cannot invoke "org.apache.doris.system.Backend.getHost()" because
    "backend" is null`
    
    The root cause is that `InsertStreamTxnExecutor` selected a backend id
    from all clusters through `selectBackendIdsByPolicy(policy, 1)`, but
    then looked up the selected id from `getBackendsByCurrentCluster()`. If
    the selected backend belonged to another compute group, the lookup
    returned null and FE hit an NPE when calling `backend.getHost()`.
    
    This PR changes txn insert backend selection to use the current cluster
    backend snapshot as the candidate list, so the selected backend is
    always from the current compute group.
---
 .../apache/doris/qe/InsertStreamTxnExecutor.java   | 21 +++++---
 .../doris/qe/InsertStreamTxnExecutorTest.java      | 57 ++++++++++++++++++++++
 2 files changed, 71 insertions(+), 7 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
index ff0716ecae5..df4fee5fbfb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
@@ -44,6 +44,7 @@ import org.apache.doris.transaction.TransactionState;
 
 import org.apache.thrift.TException;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -105,13 +106,7 @@ public class InsertStreamTxnExecutor {
             table.readUnlock();
         }
 
-        BeSelectionPolicy policy = new 
BeSelectionPolicy.Builder().needLoadAvailable().needQueryAvailable().build();
-        List<Long> beIds = 
Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
-        if (beIds.isEmpty()) {
-            throw new UserException("No available backend to match the policy: 
" + policy);
-        }
-
-        Backend backend = 
Env.getCurrentSystemInfo().getBackendsByCurrentCluster().get(beIds.get(0));
+        Backend backend = selectBackendForTxnLoad();
         txnConf.setUserIp(backend.getHost());
         txnEntry.setBackend(backend);
         TNetworkAddress address = new TNetworkAddress(backend.getHost(), 
backend.getBrpcPort());
@@ -128,6 +123,18 @@ public class InsertStreamTxnExecutor {
         }
     }
 
+    static Backend selectBackendForTxnLoad() throws UserException {
+        BeSelectionPolicy policy = new 
BeSelectionPolicy.Builder().needLoadAvailable().needQueryAvailable().build();
+        Map<Long, Backend> currentBackends = 
Env.getCurrentSystemInfo().getBackendsByCurrentCluster();
+        List<Long> beIds = Env.getCurrentSystemInfo()
+                .selectBackendIdsByPolicy(policy, 1, new 
ArrayList<>(currentBackends.values()));
+        if (beIds.isEmpty()) {
+            throw new UserException("No available backend to match the policy: 
" + policy);
+        }
+
+        return currentBackends.get(beIds.get(0));
+    }
+
     public void commitTransaction() throws TException, TimeoutException,
             InterruptedException, ExecutionException {
         TTxnParams txnConf = txnEntry.getTxnConf();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/InsertStreamTxnExecutorTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/InsertStreamTxnExecutorTest.java
new file mode 100644
index 00000000000..fb22b660bcf
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/qe/InsertStreamTxnExecutorTest.java
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+public class InsertStreamTxnExecutorTest {
+
+    @Test
+    public void testSelectBackendForTxnLoadUsesCurrentClusterBackends() throws 
Exception {
+        SystemInfoService systemInfoService = Mockito.spy(new 
SystemInfoService());
+        Backend otherClusterBackend = createBackend(10001L, "127.0.0.1", 9060);
+        Backend currentClusterBackend = createBackend(10002L, "127.0.0.2", 
9061);
+        systemInfoService.addBackend(otherClusterBackend);
+        systemInfoService.addBackend(currentClusterBackend);
+        Mockito.doReturn(ImmutableMap.of(currentClusterBackend.getId(), 
currentClusterBackend))
+                .when(systemInfoService).getBackendsByCurrentCluster();
+
+        try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
+            
mockedEnv.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService);
+
+            Backend selectedBackend = 
InsertStreamTxnExecutor.selectBackendForTxnLoad();
+
+            Assert.assertEquals(currentClusterBackend.getId(), 
selectedBackend.getId());
+        }
+    }
+
+    private Backend createBackend(long id, String host, int brpcPort) {
+        Backend backend = new Backend(id, host, 9050);
+        backend.setAlive(true);
+        backend.setBrpcPort(brpcPort);
+        return backend;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to