This is an automated email from the ASF dual-hosted git repository.

deardeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 345f6b978d0 [fix](cloud) CloudUpgradeMgr inspect and abort failed 
conflict txns while waiting (#60830)
345f6b978d0 is described below

commit 345f6b978d02f7b828155889836eeed769d3eb1c
Author: deardeng <[email protected]>
AuthorDate: Thu May 28 20:30:42 2026 +0800

    [fix](cloud) CloudUpgradeMgr inspect and abort failed conflict txns while 
waiting (#60830)
    
    When CloudUpgradeMgr waits for unfinished transactions after registering
    
    watershed txn ids, it now proactively inspects conflict transactions for
    
    the target db/table set and logs sampled txn details for diagnosis.
    
    If enable_abort_txn_by_checking_conflict_txn is enabled, the manager
    
    invokes GlobalTransactionMgr.checkFailedTxns() and aborts failed txns to
    
    reduce the chance of upgrade being blocked by stale/conflicting txns.
    
    Abort failures are handled per txn and do not stop processing the rest.
    
    This commit also adds tests:
    
    - FE UT CloudUpgradeMgrTest to verify enabled/disabled behavior and
    
      continue-on-abort-error semantics.
    
    - cloud multi_cluster docker regression case
    test_unfinished_txn_2pc.groovy
    
      to reproduce and validate long-running unfinished 2PC txn behavior.
---
 .../main/java/org/apache/doris/common/Config.java  |   3 +-
 .../doris/cloud/catalog/CloudUpgradeMgr.java       |  42 ++++
 .../doris/cloud/catalog/CloudUpgradeMgrTest.java   | 221 +++++++++++++++++++++
 .../multi_cluster/test_unfinished_txn_2pc.groovy   | 116 +++++++++++
 4 files changed, 381 insertions(+), 1 deletion(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 5c242bd25fa..bdd64284c9a 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2973,7 +2973,8 @@ public class Config extends ConfigBase {
     public static boolean enable_abort_txn_by_checking_coordinator_be = true;
 
     @ConfField(mutable = true, description = {
-            "Whether to abort transactions by checking conflict transactions 
in schema change."})
+            "Whether to abort transactions by checking conflict transactions 
in schema change "
+                    + "or cloud upgrade checks."})
     public static boolean enable_abort_txn_by_checking_conflict_txn = true;
 
     @ConfField(mutable = true, description = {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java
index 0a466e73f5f..848e528ddf7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java
@@ -26,6 +26,8 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.system.Backend;
+import org.apache.doris.transaction.GlobalTransactionMgr;
+import org.apache.doris.transaction.TransactionState;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
@@ -97,6 +99,7 @@ public class CloudUpgradeMgr extends MasterDaemon {
                 }
                 if (!isFinished) {
                     isBeInactive = false;
+                    logAndAbortFailedConflictTxns(be, dbWithWaterTxn, 
tableIdList);
                     LOG.info("BE {} is still active, waiting db {} txn {}",
                             be, dbWithWaterTxn.dbId, dbWithWaterTxn.txnId);
                     break;
@@ -111,6 +114,45 @@ public class CloudUpgradeMgr extends MasterDaemon {
         LOG.info("finish cloud upgrade mgr");
     }
 
+    private void logAndAbortFailedConflictTxns(long be, DbWithWaterTxn 
dbWithWaterTxn, List<Long> tableIdList) {
+        try {
+            CloudGlobalTransactionMgr txnMgr = (CloudGlobalTransactionMgr) 
Env.getCurrentGlobalTransactionMgr();
+            List<TransactionState> conflictTxns =
+                    txnMgr.getUnFinishedPreviousLoad(dbWithWaterTxn.txnId, 
dbWithWaterTxn.dbId, tableIdList);
+            if (conflictTxns.isEmpty()) {
+                LOG.info("BE {} waiting db {} txn {} but no conflict txn 
details returned, tableCount={}",
+                        be, dbWithWaterTxn.dbId, dbWithWaterTxn.txnId, 
tableIdList.size());
+                return;
+            }
+
+            if (Config.enable_abort_txn_by_checking_conflict_txn) {
+                List<TransactionState> failedTxns = 
GlobalTransactionMgr.checkFailedTxns(conflictTxns);
+                for (TransactionState txn : failedTxns) {
+                    try {
+                        txnMgr.abortTransaction(txn.getDbId(), 
txn.getTransactionId(), "Cancel by cloud upgrade");
+                        LOG.info("BE {} abort conflict txn {} while waiting db 
{} txn {}",
+                                be, txn.getTransactionId(), 
dbWithWaterTxn.dbId, dbWithWaterTxn.txnId);
+                    } catch (UserException e) {
+                        LOG.warn("BE {} failed to abort conflict txn {} while 
waiting db {} txn {}",
+                                be, txn.getTransactionId(), 
dbWithWaterTxn.dbId, dbWithWaterTxn.txnId, e);
+                    }
+                }
+            }
+
+            String txnSamples = conflictTxns.stream()
+                    .limit(20)
+                    .map(txn -> String.format("(%d,%s,%s)",
+                            txn.getTransactionId(), 
txn.getTransactionStatus(), txn.getLabel()))
+                    .collect(Collectors.joining(", "));
+            LOG.info("BE {} waiting db {} txn {}, conflictTxnCount={}, 
sampleTxns=[{}], tableCount={}",
+                    be, dbWithWaterTxn.dbId, dbWithWaterTxn.txnId, 
conflictTxns.size(), txnSamples,
+                    tableIdList.size());
+        } catch (UserException e) {
+            LOG.warn("failed to get conflict txns for BE {}, db {}, txn {}",
+                    be, dbWithWaterTxn.dbId, dbWithWaterTxn.txnId, e);
+        }
+    }
+
     /* called after tablets migrating to new BE process complete */
     public void registerWaterShedTxnId(long be) throws UserException {
         LinkedBlockingQueue<DbWithWaterTxn> txnIds = new 
LinkedBlockingQueue<>();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudUpgradeMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudUpgradeMgrTest.java
new file mode 100644
index 00000000000..938bc54f9cb
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudUpgradeMgrTest.java
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.transaction.CloudGlobalTransactionMgr;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.UserException;
+import org.apache.doris.transaction.GlobalTransactionMgr;
+import org.apache.doris.transaction.TransactionState;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class CloudUpgradeMgrTest {
+
+    private boolean oldEnableAbortConflictTxn;
+
+    @Before
+    public void setUp() {
+        oldEnableAbortConflictTxn = 
Config.enable_abort_txn_by_checking_conflict_txn;
+    }
+
+    @After
+    public void tearDown() {
+        Config.enable_abort_txn_by_checking_conflict_txn = 
oldEnableAbortConflictTxn;
+    }
+
+    @Test
+    public void testLogAndAbortFailedConflictTxnsWhenEnabled() throws 
Exception {
+        Config.enable_abort_txn_by_checking_conflict_txn = true;
+        CloudUpgradeMgr cloudUpgradeMgr = new CloudUpgradeMgr(null);
+        CloudGlobalTransactionMgr txnMgr = 
Mockito.mock(CloudGlobalTransactionMgr.class);
+
+        long dbId = 1000L;
+        long waterTxnId = 9000L;
+        long beId = 2000L;
+        List<Long> tableIdList = Lists.newArrayList(11L, 12L);
+
+        TransactionState conflictTxn1 = newTxn(dbId, 101L, "txn_101");
+        TransactionState conflictTxn2 = newTxn(dbId, 102L, "txn_102");
+        TransactionState conflictTxn3 = newTxn(dbId, 103L, "txn_103");
+        List<TransactionState> conflictTxns = Lists.newArrayList(conflictTxn1, 
conflictTxn2, conflictTxn3);
+        List<TransactionState> failedTxns = Lists.newArrayList(conflictTxn1, 
conflictTxn3);
+
+        List<Long> abortedTxnIds = new ArrayList<>();
+
+        Mockito.doAnswer(invocation -> {
+            long endTransactionId = invocation.getArgument(0);
+            long actualDbId = invocation.getArgument(1);
+            List<Long> actualTableIdList = invocation.getArgument(2);
+            Assert.assertEquals(waterTxnId, endTransactionId);
+            Assert.assertEquals(dbId, actualDbId);
+            Assert.assertEquals(tableIdList, actualTableIdList);
+            return conflictTxns;
+        }).when(txnMgr).getUnFinishedPreviousLoad(Mockito.anyLong(), 
Mockito.anyLong(), Mockito.anyList());
+
+        Mockito.doAnswer(invocation -> {
+            Long actualDbId = invocation.getArgument(0);
+            Long txnId = invocation.getArgument(1);
+            String reason = invocation.getArgument(2);
+            Assert.assertEquals(dbId, actualDbId.longValue());
+            Assert.assertEquals("Cancel by cloud upgrade", reason);
+            abortedTxnIds.add(txnId);
+            return null;
+        }).when(txnMgr).abortTransaction(Mockito.anyLong(), Mockito.anyLong(), 
Mockito.anyString());
+
+        try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class);
+                MockedStatic<GlobalTransactionMgr> mockedGlobalTxnMgr = 
Mockito.mockStatic(GlobalTransactionMgr.class)) {
+            
mockedEnv.when(Env::getCurrentGlobalTransactionMgr).thenReturn(txnMgr);
+            mockedGlobalTxnMgr.when(() -> 
GlobalTransactionMgr.checkFailedTxns(Mockito.anyList()))
+                    .thenAnswer(invocation -> {
+                        List<TransactionState> txns = 
invocation.getArgument(0);
+                        Assert.assertEquals(conflictTxns, txns);
+                        return failedTxns;
+                    });
+
+            invokeLogAndAbortFailedConflictTxns(cloudUpgradeMgr, beId, 
createDbWithWaterTxn(dbId, waterTxnId),
+                    tableIdList);
+        }
+
+        Assert.assertEquals(Lists.newArrayList(101L, 103L), abortedTxnIds);
+    }
+
+    @Test
+    public void testLogAndAbortFailedConflictTxnsWhenDisabled() throws 
Exception {
+        Config.enable_abort_txn_by_checking_conflict_txn = false;
+        CloudUpgradeMgr cloudUpgradeMgr = new CloudUpgradeMgr(null);
+        CloudGlobalTransactionMgr txnMgr = 
Mockito.mock(CloudGlobalTransactionMgr.class);
+
+        long dbId = 1000L;
+        long waterTxnId = 9000L;
+        long beId = 2000L;
+        List<Long> tableIdList = Lists.newArrayList(11L, 12L);
+
+        List<TransactionState> conflictTxns = Lists.newArrayList(
+                newTxn(dbId, 201L, "txn_201"),
+                newTxn(dbId, 202L, "txn_202"));
+
+        AtomicInteger checkFailedCallCount = new AtomicInteger(0);
+        AtomicInteger abortCallCount = new AtomicInteger(0);
+
+        Mockito.doReturn(conflictTxns).when(txnMgr)
+                .getUnFinishedPreviousLoad(Mockito.anyLong(), 
Mockito.anyLong(), Mockito.anyList());
+        Mockito.doAnswer(invocation -> {
+            abortCallCount.incrementAndGet();
+            return null;
+        }).when(txnMgr).abortTransaction(Mockito.anyLong(), Mockito.anyLong(), 
Mockito.anyString());
+
+        try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class);
+                MockedStatic<GlobalTransactionMgr> mockedGlobalTxnMgr = 
Mockito.mockStatic(GlobalTransactionMgr.class)) {
+            
mockedEnv.when(Env::getCurrentGlobalTransactionMgr).thenReturn(txnMgr);
+            mockedGlobalTxnMgr.when(() -> 
GlobalTransactionMgr.checkFailedTxns(Mockito.anyList()))
+                    .thenAnswer(invocation -> {
+                        checkFailedCallCount.incrementAndGet();
+                        return invocation.getArgument(0);
+                    });
+
+            invokeLogAndAbortFailedConflictTxns(cloudUpgradeMgr, beId, 
createDbWithWaterTxn(dbId, waterTxnId),
+                    tableIdList);
+        }
+
+        Assert.assertEquals(0, checkFailedCallCount.get());
+        Assert.assertEquals(0, abortCallCount.get());
+    }
+
+    @Test
+    public void testLogAndAbortFailedConflictTxnsContinueWhenAbortFailed() 
throws Exception {
+        Config.enable_abort_txn_by_checking_conflict_txn = true;
+        CloudUpgradeMgr cloudUpgradeMgr = new CloudUpgradeMgr(null);
+        CloudGlobalTransactionMgr txnMgr = 
Mockito.mock(CloudGlobalTransactionMgr.class);
+
+        long dbId = 1000L;
+        long waterTxnId = 9000L;
+        long beId = 2000L;
+        List<Long> tableIdList = Lists.newArrayList(11L, 12L);
+
+        TransactionState conflictTxn1 = newTxn(dbId, 301L, "txn_301");
+        TransactionState conflictTxn2 = newTxn(dbId, 302L, "txn_302");
+        List<TransactionState> conflictTxns = Lists.newArrayList(conflictTxn1, 
conflictTxn2);
+
+        AtomicInteger abortAttemptCount = new AtomicInteger(0);
+
+        Mockito.doReturn(conflictTxns).when(txnMgr)
+                .getUnFinishedPreviousLoad(Mockito.anyLong(), 
Mockito.anyLong(), Mockito.anyList());
+        Mockito.doAnswer(invocation -> {
+            Long txnId = invocation.getArgument(1);
+            abortAttemptCount.incrementAndGet();
+            if (txnId == 301L) {
+                throw new UserException("mock abort failed");
+            }
+            return null;
+        }).when(txnMgr).abortTransaction(Mockito.anyLong(), Mockito.anyLong(), 
Mockito.anyString());
+
+        try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class);
+                MockedStatic<GlobalTransactionMgr> mockedGlobalTxnMgr = 
Mockito.mockStatic(GlobalTransactionMgr.class)) {
+            
mockedEnv.when(Env::getCurrentGlobalTransactionMgr).thenReturn(txnMgr);
+            mockedGlobalTxnMgr.when(() -> 
GlobalTransactionMgr.checkFailedTxns(Mockito.anyList()))
+                    .thenAnswer(invocation -> invocation.getArgument(0));
+
+            invokeLogAndAbortFailedConflictTxns(cloudUpgradeMgr, beId, 
createDbWithWaterTxn(dbId, waterTxnId),
+                    tableIdList);
+        }
+
+        Assert.assertEquals(2, abortAttemptCount.get());
+    }
+
+    private static TransactionState newTxn(long dbId, long txnId, String 
label) {
+        return new TransactionState(
+                dbId,
+                Lists.newArrayList(1L),
+                txnId,
+                label,
+                null,
+                TransactionState.LoadJobSourceType.FRONTEND,
+                new 
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0, 
"127.0.0.1", 0L),
+                -1,
+                1000L);
+    }
+
+    private static Object createDbWithWaterTxn(long dbId, long txnId) throws 
Exception {
+        Class<?> clazz = 
Class.forName("org.apache.doris.cloud.catalog.CloudUpgradeMgr$DbWithWaterTxn");
+        Constructor<?> constructor = clazz.getDeclaredConstructor(Long.class, 
Long.class);
+        constructor.setAccessible(true);
+        return constructor.newInstance(dbId, txnId);
+    }
+
+    private static void invokeLogAndAbortFailedConflictTxns(CloudUpgradeMgr 
cloudUpgradeMgr, long beId,
+            Object dbWithWaterTxn, List<Long> tableIdList) throws Exception {
+        Method method = 
CloudUpgradeMgr.class.getDeclaredMethod("logAndAbortFailedConflictTxns",
+                long.class, dbWithWaterTxn.getClass(), List.class);
+        method.setAccessible(true);
+        method.invoke(cloudUpgradeMgr, beId, dbWithWaterTxn, tableIdList);
+    }
+}
diff --git 
a/regression-test/suites/cloud_p0/multi_cluster/test_unfinished_txn_2pc.groovy 
b/regression-test/suites/cloud_p0/multi_cluster/test_unfinished_txn_2pc.groovy
new file mode 100644
index 00000000000..fd12484c399
--- /dev/null
+++ 
b/regression-test/suites/cloud_p0/multi_cluster/test_unfinished_txn_2pc.groovy
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite('test_unfinished_txn_2pc', 'multi_cluster,docker') {
+    def options = new ClusterOptions()
+    options.cloudMode = true
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.feConfigs += [
+            'cloud_cluster_check_interval_second=1',
+    ]
+
+    docker(options) {
+        def clusterInfo = sql_return_maparray """show clusters"""
+        def currentCluster = clusterInfo.find { it.is_current == 'TRUE' }
+        assertNotNull(currentCluster)
+        sql """use @${currentCluster.cluster}"""
+
+        def dbName = context.config.getDbNameByFile(context.file)
+        def tableName = 'test_unfinished_txn_2pc_tbl'
+        Long txnId = null
+
+        def doStreamLoad2pcOperation = { long id, String operation ->
+            def command = "curl -sS -X PUT --location-trusted -u 
${context.config.feHttpUser}:${context.config.feHttpPassword}" +
+                    " -H txn_id:${id}" +
+                    " -H txn_operation:${operation}" +
+                    " 
http://${context.config.feHttpAddress}/api/${dbName}/${tableName}/_stream_load_2pc";
+            logger.info("execute stream load 2pc operation: {}", command)
+
+            def process = command.execute()
+            def code = process.waitFor()
+            def out = process.text
+            logger.info("stream load 2pc {} result: {}", operation, out)
+            assertEquals(0, code)
+
+            def json = parseJson(out)
+            def status = ((json.status != null ? json.status : json.Status) as 
String).toLowerCase()
+            assertEquals('success', status)
+        }
+
+        try {
+            sql """
+                CREATE TABLE IF NOT EXISTS ${tableName} (
+                    k1 INT,
+                    k2 INT
+                )
+                DUPLICATE KEY(k1)
+                DISTRIBUTED BY HASH(k1) BUCKETS 1
+                PROPERTIES ("replication_num" = "1")
+            """
+
+            String content = "1,10\\n2,20\\n3,30\\n"
+            streamLoad {
+                table "${tableName}"
+                set 'column_separator', ','
+                set 'two_phase_commit', 'true'
+                inputStream new ByteArrayInputStream(content.getBytes())
+                time 10000
+
+                check { loadResult, exception, startTime, endTime ->
+                    if (exception != null) {
+                        throw exception
+                    }
+                    logger.info("stream load result: {}", loadResult)
+                    def json = parseJson(loadResult)
+                    def status = ((json.status != null ? json.status : 
json.Status) as String).toLowerCase()
+                    assertEquals('success', status)
+                    txnId = Long.valueOf(json.TxnId.toString())
+                    assertTrue(txnId != null && txnId > 0)
+                }
+            }
+
+            def rowCount = sql "select count(*) from ${tableName}"
+            assertEquals(0, rowCount[0][0] as int)
+
+            def finalStatuses = ['VISIBLE', 'ABORTED'] as Set
+            for (int i = 0; i < 20; i++) {
+                def txns = sql_return_maparray "show transaction from 
${dbName} where id = ${txnId}"
+                assertEquals(1, txns.size())
+                def txnStatus = txns[0].TransactionStatus as String
+                logger.info("txn {} status after {}s: {}", txnId, i, txnStatus)
+                assertTrue(!finalStatuses.contains(txnStatus))
+                sleep(1000)
+            }
+        } finally {
+            if (txnId != null) {
+                try {
+                    doStreamLoad2pcOperation(txnId, 'abort')
+                    awaitUntil(30) {
+                        def txns = sql_return_maparray "show transaction from 
${dbName} where id = ${txnId}"
+                        txns.size() == 1 && (txns[0].TransactionStatus as 
String) == 'ABORTED'
+                    }
+                } catch (Exception e) {
+                    logger.info("abort unfinished txn {} failed in cleanup: 
{}", txnId, e.getMessage())
+                }
+            }
+            sql "drop table if exists ${tableName}"
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to