This is an automated email from the ASF dual-hosted git repository.
deardeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 345f6b978d0 [fix](cloud) CloudUpgradeMgr inspect and abort failed
conflict txns while waiting (#60830)
345f6b978d0 is described below
commit 345f6b978d02f7b828155889836eeed769d3eb1c
Author: deardeng <[email protected]>
AuthorDate: Thu May 28 20:30:42 2026 +0800
[fix](cloud) CloudUpgradeMgr inspect and abort failed conflict txns while
waiting (#60830)
When CloudUpgradeMgr waits for unfinished transactions after registering
watershed txn ids, it now proactively inspects conflict transactions for
the target db/table set and logs sampled txn details for diagnosis.
If enable_abort_txn_by_checking_conflict_txn is enabled, the manager
invokes GlobalTransactionMgr.checkFailedTxns() and aborts failed txns to
reduce the chance of upgrade being blocked by stale/conflicting txns.
Abort failures are handled per txn and do not stop processing the rest.
This commit also adds tests:
- FE UT CloudUpgradeMgrTest to verify enabled/disabled behavior and
continue-on-abort-error semantics.
- cloud multi_cluster docker regression case
test_unfinished_txn_2pc.groovy
to reproduce and validate long-running unfinished 2PC txn behavior.
---
.../main/java/org/apache/doris/common/Config.java | 3 +-
.../doris/cloud/catalog/CloudUpgradeMgr.java | 42 ++++
.../doris/cloud/catalog/CloudUpgradeMgrTest.java | 221 +++++++++++++++++++++
.../multi_cluster/test_unfinished_txn_2pc.groovy | 116 +++++++++++
4 files changed, 381 insertions(+), 1 deletion(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 5c242bd25fa..bdd64284c9a 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2973,7 +2973,8 @@ public class Config extends ConfigBase {
public static boolean enable_abort_txn_by_checking_coordinator_be = true;
@ConfField(mutable = true, description = {
- "Whether to abort transactions by checking conflict transactions
in schema change."})
+ "Whether to abort transactions by checking conflict transactions
in schema change "
+ + "or cloud upgrade checks."})
public static boolean enable_abort_txn_by_checking_conflict_txn = true;
@ConfField(mutable = true, description = {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java
index 0a466e73f5f..848e528ddf7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java
@@ -26,6 +26,8 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.system.Backend;
+import org.apache.doris.transaction.GlobalTransactionMgr;
+import org.apache.doris.transaction.TransactionState;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@@ -97,6 +99,7 @@ public class CloudUpgradeMgr extends MasterDaemon {
}
if (!isFinished) {
isBeInactive = false;
+ logAndAbortFailedConflictTxns(be, dbWithWaterTxn,
tableIdList);
LOG.info("BE {} is still active, waiting db {} txn {}",
be, dbWithWaterTxn.dbId, dbWithWaterTxn.txnId);
break;
@@ -111,6 +114,45 @@ public class CloudUpgradeMgr extends MasterDaemon {
LOG.info("finish cloud upgrade mgr");
}
+ private void logAndAbortFailedConflictTxns(long be, DbWithWaterTxn
dbWithWaterTxn, List<Long> tableIdList) {
+ try {
+ CloudGlobalTransactionMgr txnMgr = (CloudGlobalTransactionMgr)
Env.getCurrentGlobalTransactionMgr();
+ List<TransactionState> conflictTxns =
+ txnMgr.getUnFinishedPreviousLoad(dbWithWaterTxn.txnId,
dbWithWaterTxn.dbId, tableIdList);
+ if (conflictTxns.isEmpty()) {
+ LOG.info("BE {} waiting db {} txn {} but no conflict txn
details returned, tableCount={}",
+ be, dbWithWaterTxn.dbId, dbWithWaterTxn.txnId,
tableIdList.size());
+ return;
+ }
+
+ if (Config.enable_abort_txn_by_checking_conflict_txn) {
+ List<TransactionState> failedTxns =
GlobalTransactionMgr.checkFailedTxns(conflictTxns);
+ for (TransactionState txn : failedTxns) {
+ try {
+ txnMgr.abortTransaction(txn.getDbId(),
txn.getTransactionId(), "Cancel by cloud upgrade");
+ LOG.info("BE {} abort conflict txn {} while waiting db
{} txn {}",
+ be, txn.getTransactionId(),
dbWithWaterTxn.dbId, dbWithWaterTxn.txnId);
+ } catch (UserException e) {
+ LOG.warn("BE {} failed to abort conflict txn {} while
waiting db {} txn {}",
+ be, txn.getTransactionId(),
dbWithWaterTxn.dbId, dbWithWaterTxn.txnId, e);
+ }
+ }
+ }
+
+ String txnSamples = conflictTxns.stream()
+ .limit(20)
+ .map(txn -> String.format("(%d,%s,%s)",
+ txn.getTransactionId(),
txn.getTransactionStatus(), txn.getLabel()))
+ .collect(Collectors.joining(", "));
+ LOG.info("BE {} waiting db {} txn {}, conflictTxnCount={},
sampleTxns=[{}], tableCount={}",
+ be, dbWithWaterTxn.dbId, dbWithWaterTxn.txnId,
conflictTxns.size(), txnSamples,
+ tableIdList.size());
+ } catch (UserException e) {
+ LOG.warn("failed to get conflict txns for BE {}, db {}, txn {}",
+ be, dbWithWaterTxn.dbId, dbWithWaterTxn.txnId, e);
+ }
+ }
+
/* called after tablets migrating to new BE process complete */
public void registerWaterShedTxnId(long be) throws UserException {
LinkedBlockingQueue<DbWithWaterTxn> txnIds = new
LinkedBlockingQueue<>();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudUpgradeMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudUpgradeMgrTest.java
new file mode 100644
index 00000000000..938bc54f9cb
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudUpgradeMgrTest.java
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.transaction.CloudGlobalTransactionMgr;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.UserException;
+import org.apache.doris.transaction.GlobalTransactionMgr;
+import org.apache.doris.transaction.TransactionState;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class CloudUpgradeMgrTest {
+
+ private boolean oldEnableAbortConflictTxn;
+
+ @Before
+ public void setUp() {
+ oldEnableAbortConflictTxn =
Config.enable_abort_txn_by_checking_conflict_txn;
+ }
+
+ @After
+ public void tearDown() {
+ Config.enable_abort_txn_by_checking_conflict_txn =
oldEnableAbortConflictTxn;
+ }
+
+ @Test
+ public void testLogAndAbortFailedConflictTxnsWhenEnabled() throws
Exception {
+ Config.enable_abort_txn_by_checking_conflict_txn = true;
+ CloudUpgradeMgr cloudUpgradeMgr = new CloudUpgradeMgr(null);
+ CloudGlobalTransactionMgr txnMgr =
Mockito.mock(CloudGlobalTransactionMgr.class);
+
+ long dbId = 1000L;
+ long waterTxnId = 9000L;
+ long beId = 2000L;
+ List<Long> tableIdList = Lists.newArrayList(11L, 12L);
+
+ TransactionState conflictTxn1 = newTxn(dbId, 101L, "txn_101");
+ TransactionState conflictTxn2 = newTxn(dbId, 102L, "txn_102");
+ TransactionState conflictTxn3 = newTxn(dbId, 103L, "txn_103");
+ List<TransactionState> conflictTxns = Lists.newArrayList(conflictTxn1,
conflictTxn2, conflictTxn3);
+ List<TransactionState> failedTxns = Lists.newArrayList(conflictTxn1,
conflictTxn3);
+
+ List<Long> abortedTxnIds = new ArrayList<>();
+
+ Mockito.doAnswer(invocation -> {
+ long endTransactionId = invocation.getArgument(0);
+ long actualDbId = invocation.getArgument(1);
+ List<Long> actualTableIdList = invocation.getArgument(2);
+ Assert.assertEquals(waterTxnId, endTransactionId);
+ Assert.assertEquals(dbId, actualDbId);
+ Assert.assertEquals(tableIdList, actualTableIdList);
+ return conflictTxns;
+ }).when(txnMgr).getUnFinishedPreviousLoad(Mockito.anyLong(),
Mockito.anyLong(), Mockito.anyList());
+
+ Mockito.doAnswer(invocation -> {
+ Long actualDbId = invocation.getArgument(0);
+ Long txnId = invocation.getArgument(1);
+ String reason = invocation.getArgument(2);
+ Assert.assertEquals(dbId, actualDbId.longValue());
+ Assert.assertEquals("Cancel by cloud upgrade", reason);
+ abortedTxnIds.add(txnId);
+ return null;
+ }).when(txnMgr).abortTransaction(Mockito.anyLong(), Mockito.anyLong(),
Mockito.anyString());
+
+ try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class);
+ MockedStatic<GlobalTransactionMgr> mockedGlobalTxnMgr =
Mockito.mockStatic(GlobalTransactionMgr.class)) {
+
mockedEnv.when(Env::getCurrentGlobalTransactionMgr).thenReturn(txnMgr);
+ mockedGlobalTxnMgr.when(() ->
GlobalTransactionMgr.checkFailedTxns(Mockito.anyList()))
+ .thenAnswer(invocation -> {
+ List<TransactionState> txns =
invocation.getArgument(0);
+ Assert.assertEquals(conflictTxns, txns);
+ return failedTxns;
+ });
+
+ invokeLogAndAbortFailedConflictTxns(cloudUpgradeMgr, beId,
createDbWithWaterTxn(dbId, waterTxnId),
+ tableIdList);
+ }
+
+ Assert.assertEquals(Lists.newArrayList(101L, 103L), abortedTxnIds);
+ }
+
+ @Test
+ public void testLogAndAbortFailedConflictTxnsWhenDisabled() throws
Exception {
+ Config.enable_abort_txn_by_checking_conflict_txn = false;
+ CloudUpgradeMgr cloudUpgradeMgr = new CloudUpgradeMgr(null);
+ CloudGlobalTransactionMgr txnMgr =
Mockito.mock(CloudGlobalTransactionMgr.class);
+
+ long dbId = 1000L;
+ long waterTxnId = 9000L;
+ long beId = 2000L;
+ List<Long> tableIdList = Lists.newArrayList(11L, 12L);
+
+ List<TransactionState> conflictTxns = Lists.newArrayList(
+ newTxn(dbId, 201L, "txn_201"),
+ newTxn(dbId, 202L, "txn_202"));
+
+ AtomicInteger checkFailedCallCount = new AtomicInteger(0);
+ AtomicInteger abortCallCount = new AtomicInteger(0);
+
+ Mockito.doReturn(conflictTxns).when(txnMgr)
+ .getUnFinishedPreviousLoad(Mockito.anyLong(),
Mockito.anyLong(), Mockito.anyList());
+ Mockito.doAnswer(invocation -> {
+ abortCallCount.incrementAndGet();
+ return null;
+ }).when(txnMgr).abortTransaction(Mockito.anyLong(), Mockito.anyLong(),
Mockito.anyString());
+
+ try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class);
+ MockedStatic<GlobalTransactionMgr> mockedGlobalTxnMgr =
Mockito.mockStatic(GlobalTransactionMgr.class)) {
+
mockedEnv.when(Env::getCurrentGlobalTransactionMgr).thenReturn(txnMgr);
+ mockedGlobalTxnMgr.when(() ->
GlobalTransactionMgr.checkFailedTxns(Mockito.anyList()))
+ .thenAnswer(invocation -> {
+ checkFailedCallCount.incrementAndGet();
+ return invocation.getArgument(0);
+ });
+
+ invokeLogAndAbortFailedConflictTxns(cloudUpgradeMgr, beId,
createDbWithWaterTxn(dbId, waterTxnId),
+ tableIdList);
+ }
+
+ Assert.assertEquals(0, checkFailedCallCount.get());
+ Assert.assertEquals(0, abortCallCount.get());
+ }
+
+ @Test
+ public void testLogAndAbortFailedConflictTxnsContinueWhenAbortFailed()
throws Exception {
+ Config.enable_abort_txn_by_checking_conflict_txn = true;
+ CloudUpgradeMgr cloudUpgradeMgr = new CloudUpgradeMgr(null);
+ CloudGlobalTransactionMgr txnMgr =
Mockito.mock(CloudGlobalTransactionMgr.class);
+
+ long dbId = 1000L;
+ long waterTxnId = 9000L;
+ long beId = 2000L;
+ List<Long> tableIdList = Lists.newArrayList(11L, 12L);
+
+ TransactionState conflictTxn1 = newTxn(dbId, 301L, "txn_301");
+ TransactionState conflictTxn2 = newTxn(dbId, 302L, "txn_302");
+ List<TransactionState> conflictTxns = Lists.newArrayList(conflictTxn1,
conflictTxn2);
+
+ AtomicInteger abortAttemptCount = new AtomicInteger(0);
+
+ Mockito.doReturn(conflictTxns).when(txnMgr)
+ .getUnFinishedPreviousLoad(Mockito.anyLong(),
Mockito.anyLong(), Mockito.anyList());
+ Mockito.doAnswer(invocation -> {
+ Long txnId = invocation.getArgument(1);
+ abortAttemptCount.incrementAndGet();
+ if (txnId == 301L) {
+ throw new UserException("mock abort failed");
+ }
+ return null;
+ }).when(txnMgr).abortTransaction(Mockito.anyLong(), Mockito.anyLong(),
Mockito.anyString());
+
+ try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class);
+ MockedStatic<GlobalTransactionMgr> mockedGlobalTxnMgr =
Mockito.mockStatic(GlobalTransactionMgr.class)) {
+
mockedEnv.when(Env::getCurrentGlobalTransactionMgr).thenReturn(txnMgr);
+ mockedGlobalTxnMgr.when(() ->
GlobalTransactionMgr.checkFailedTxns(Mockito.anyList()))
+ .thenAnswer(invocation -> invocation.getArgument(0));
+
+ invokeLogAndAbortFailedConflictTxns(cloudUpgradeMgr, beId,
createDbWithWaterTxn(dbId, waterTxnId),
+ tableIdList);
+ }
+
+ Assert.assertEquals(2, abortAttemptCount.get());
+ }
+
+ private static TransactionState newTxn(long dbId, long txnId, String
label) {
+ return new TransactionState(
+ dbId,
+ Lists.newArrayList(1L),
+ txnId,
+ label,
+ null,
+ TransactionState.LoadJobSourceType.FRONTEND,
+ new
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
"127.0.0.1", 0L),
+ -1,
+ 1000L);
+ }
+
+ private static Object createDbWithWaterTxn(long dbId, long txnId) throws
Exception {
+ Class<?> clazz =
Class.forName("org.apache.doris.cloud.catalog.CloudUpgradeMgr$DbWithWaterTxn");
+ Constructor<?> constructor = clazz.getDeclaredConstructor(Long.class,
Long.class);
+ constructor.setAccessible(true);
+ return constructor.newInstance(dbId, txnId);
+ }
+
+ private static void invokeLogAndAbortFailedConflictTxns(CloudUpgradeMgr
cloudUpgradeMgr, long beId,
+ Object dbWithWaterTxn, List<Long> tableIdList) throws Exception {
+ Method method =
CloudUpgradeMgr.class.getDeclaredMethod("logAndAbortFailedConflictTxns",
+ long.class, dbWithWaterTxn.getClass(), List.class);
+ method.setAccessible(true);
+ method.invoke(cloudUpgradeMgr, beId, dbWithWaterTxn, tableIdList);
+ }
+}
diff --git
a/regression-test/suites/cloud_p0/multi_cluster/test_unfinished_txn_2pc.groovy
b/regression-test/suites/cloud_p0/multi_cluster/test_unfinished_txn_2pc.groovy
new file mode 100644
index 00000000000..fd12484c399
--- /dev/null
+++
b/regression-test/suites/cloud_p0/multi_cluster/test_unfinished_txn_2pc.groovy
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite('test_unfinished_txn_2pc', 'multi_cluster,docker') {
+ def options = new ClusterOptions()
+ options.cloudMode = true
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ ]
+
+ docker(options) {
+ def clusterInfo = sql_return_maparray """show clusters"""
+ def currentCluster = clusterInfo.find { it.is_current == 'TRUE' }
+ assertNotNull(currentCluster)
+ sql """use @${currentCluster.cluster}"""
+
+ def dbName = context.config.getDbNameByFile(context.file)
+ def tableName = 'test_unfinished_txn_2pc_tbl'
+ Long txnId = null
+
+ def doStreamLoad2pcOperation = { long id, String operation ->
+ def command = "curl -sS -X PUT --location-trusted -u
${context.config.feHttpUser}:${context.config.feHttpPassword}" +
+ " -H txn_id:${id}" +
+ " -H txn_operation:${operation}" +
+ "
http://${context.config.feHttpAddress}/api/${dbName}/${tableName}/_stream_load_2pc"
+ logger.info("execute stream load 2pc operation: {}", command)
+
+ def process = command.execute()
+ def code = process.waitFor()
+ def out = process.text
+ logger.info("stream load 2pc {} result: {}", operation, out)
+ assertEquals(0, code)
+
+ def json = parseJson(out)
+ def status = ((json.status != null ? json.status : json.Status) as
String).toLowerCase()
+ assertEquals('success', status)
+ }
+
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ k1 INT,
+ k2 INT
+ )
+ DUPLICATE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES ("replication_num" = "1")
+ """
+
+ String content = "1,10\\n2,20\\n3,30\\n"
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', ','
+ set 'two_phase_commit', 'true'
+ inputStream new ByteArrayInputStream(content.getBytes())
+ time 10000
+
+ check { loadResult, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ logger.info("stream load result: {}", loadResult)
+ def json = parseJson(loadResult)
+ def status = ((json.status != null ? json.status :
json.Status) as String).toLowerCase()
+ assertEquals('success', status)
+ txnId = Long.valueOf(json.TxnId.toString())
+ assertTrue(txnId != null && txnId > 0)
+ }
+ }
+
+ def rowCount = sql "select count(*) from ${tableName}"
+ assertEquals(0, rowCount[0][0] as int)
+
+ def finalStatuses = ['VISIBLE', 'ABORTED'] as Set
+ for (int i = 0; i < 20; i++) {
+ def txns = sql_return_maparray "show transaction from
${dbName} where id = ${txnId}"
+ assertEquals(1, txns.size())
+ def txnStatus = txns[0].TransactionStatus as String
+ logger.info("txn {} status after {}s: {}", txnId, i, txnStatus)
+ assertTrue(!finalStatuses.contains(txnStatus))
+ sleep(1000)
+ }
+ } finally {
+ if (txnId != null) {
+ try {
+ doStreamLoad2pcOperation(txnId, 'abort')
+ awaitUntil(30) {
+ def txns = sql_return_maparray "show transaction from
${dbName} where id = ${txnId}"
+ txns.size() == 1 && (txns[0].TransactionStatus as
String) == 'ABORTED'
+ }
+ } catch (Exception e) {
+ logger.info("abort unfinished txn {} failed in cleanup:
{}", txnId, e.getMessage())
+ }
+ }
+ sql "drop table if exists ${tableName}"
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]