This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new cae220ea578 branch-4.0: [fix](transaction) fix 
IllegalMonitorStateException in routine load afterAborted when coordinate BE 
restarts (#62892)
cae220ea578 is described below

commit cae220ea578012bc8e09019eaa08403430c6e66e
Author: hui lai <[email protected]>
AuthorDate: Thu May 7 14:43:56 2026 +0800

    branch-4.0: [fix](transaction) fix IllegalMonitorStateException in routine 
load afterAborted when coordinate BE restarts (#62892)
    
    pick https://github.com/apache/doris/pull/61881
---
 .../transaction/CloudGlobalTransactionMgr.java     | 80 +++++++++++++++-------
 .../transaction/CloudGlobalTransactionMgrTest.java | 34 ++++++++-
 .../test_routine_load_be_restart.groovy            | 21 +++++-
 3 files changed, 106 insertions(+), 29 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index d6a4648141a..42a97d5d94a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -116,6 +116,7 @@ import 
org.apache.doris.transaction.GlobalTransactionMgrIface;
 import org.apache.doris.transaction.SubTransactionState;
 import org.apache.doris.transaction.TabletCommitInfo;
 import org.apache.doris.transaction.TransactionCommitFailedException;
+import org.apache.doris.transaction.TransactionException;
 import org.apache.doris.transaction.TransactionIdGenerator;
 import org.apache.doris.transaction.TransactionNotFoundException;
 import org.apache.doris.transaction.TransactionState;
@@ -1816,29 +1817,62 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     @Override
     public void abortTransaction(Long dbId, Long transactionId, String reason,
             TxnCommitAttachment txnCommitAttachment, List<Table> tableList) 
throws UserException {
-        if (txnCommitAttachment != null) {
-            if (txnCommitAttachment instanceof RLTaskTxnCommitAttachment) {
-                RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = 
(RLTaskTxnCommitAttachment) txnCommitAttachment;
-                TxnStateChangeCallback cb = 
callbackFactory.getCallback(rlTaskTxnCommitAttachment.getJobId());
-                if (cb != null) {
-                    // use a temporary transaction state to do before commit 
check,
-                    // what actually works is the transactionId
-                    TransactionState tmpTxnState = new TransactionState();
-                    tmpTxnState.setTransactionId(transactionId);
-                    cb.beforeAborted(tmpTxnState);
-                }
-            }
-        }
+        Pair<Long, TxnStateChangeCallback> callbackInfo =
+                handleBeforeAbort(dbId, transactionId, txnCommitAttachment);
 
         AbortTxnResponse abortTxnResponse = null;
         try {
             abortTxnResponse = abortTransactionImpl(dbId, transactionId, 
reason, null);
         } finally {
-            handleAfterAbort(abortTxnResponse, txnCommitAttachment, 
transactionId);
+            handleAfterAbort(abortTxnResponse, txnCommitAttachment, 
transactionId,
+                    callbackInfo.first, callbackInfo.second);
             clearTxnLastSignature(dbId, transactionId);
         }
     }
 
+    /**
+     * Resolves the transaction callback and calls beforeAborted() before the 
abort RPC,
+     * so the lock-handoff pattern (beforeAborted acquires, afterAborted 
releases) wraps
+     * the correct scope.
+     *
+     * @return a Pair of (callbackId, callback); callback is null if no 
callback is registered
+     *         or if beforeAborted failed (in either case handleAfterAbort 
will skip afterAborted)
+     */
+    private Pair<Long, TxnStateChangeCallback> handleBeforeAbort(Long dbId, 
long transactionId,
+            TxnCommitAttachment txnCommitAttachment) throws UserException {
+        long callbackId = 0L;
+        if (txnCommitAttachment instanceof RLTaskTxnCommitAttachment) {
+            callbackId = ((RLTaskTxnCommitAttachment) 
txnCommitAttachment).getJobId();
+        } else if (txnCommitAttachment == null) {
+            // txnCommitAttachment is null (e.g. BE restart abort): the 
callbackId is only
+            // stored in the meta service, so do a pre-query to fetch it 
before the abort RPC,
+            // ensuring beforeAborted is called before the transaction is 
actually aborted.
+            TransactionState existingState = getTransactionState(dbId, 
transactionId);
+            if (existingState == null) {
+                throw new UserException("failed to get transaction state 
before abort, transactionId: "
+                        + transactionId);
+            }
+            callbackId = existingState.getCallbackId();
+        }
+        TxnStateChangeCallback cb = callbackFactory.getCallback(callbackId);
+        if (cb != null) {
+            // use a temporary transaction state to do before abort check,
+            // what actually works is the transactionId
+            TransactionState tmpTxnState = new TransactionState();
+            tmpTxnState.setTransactionId(transactionId);
+            try {
+                cb.beforeAborted(tmpTxnState);
+            } catch (TransactionException e) {
+                LOG.warn("beforeAborted failed for txn {}, callbackId {}, msg: 
{}",
+                        transactionId, callbackId, e.getMessage());
+                // beforeAborted failed so the lock was not acquired; pass 
null cb to
+                // handleAfterAbort so afterAborted is skipped and the lock is 
not released.
+                return Pair.of(callbackId, null);
+            }
+        }
+        return Pair.of(callbackId, cb);
+    }
+
     private AbortTxnResponse abortTransactionImpl(Long dbId, Long 
transactionId, String reason,
             TxnCommitAttachment txnCommitAttachment) throws UserException {
         LOG.info("try to abort transaction, dbId:{}, transactionId:{}, reason: 
{}", dbId, transactionId, reason);
@@ -1885,26 +1919,20 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     }
 
     private void handleAfterAbort(AbortTxnResponse abortTxnResponse, 
TxnCommitAttachment txnCommitAttachment,
-                                long transactionId) throws UserException {
+                                long transactionId, long callbackId, 
TxnStateChangeCallback cb) throws UserException {
         TransactionState txnState = new TransactionState();
         boolean txnOperated = false;
-        long callbackId = 0L;
-        TxnStateChangeCallback cb = null;
         String abortReason = "";
 
         if (abortTxnResponse != null) {
             txnState = 
TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo());
             txnOperated = abortTxnResponse.getStatus().getCode() == 
MetaServiceCode.OK;
-            callbackId = txnState.getCallbackId();
             abortReason = txnState.getReason();
         }
-        if (txnCommitAttachment != null && txnCommitAttachment instanceof 
RLTaskTxnCommitAttachment) {
-            RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = 
(RLTaskTxnCommitAttachment) txnCommitAttachment;
-            callbackId = rlTaskTxnCommitAttachment.getJobId();
+        if (txnCommitAttachment instanceof RLTaskTxnCommitAttachment) {
             txnState.setTransactionId(transactionId);
         }
 
-        cb = callbackFactory.getCallback(callbackId);
         if (cb != null) {
             LOG.info("run txn callback, txnId:{} callbackId:{}, txnState:{}",
                     transactionId, callbackId, txnState);
@@ -2356,9 +2384,11 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             return null;
         }
 
-        if (getTxnResponse.getStatus().getCode() != MetaServiceCode.OK || 
!getTxnResponse.hasTxnInfo()) {
-            LOG.info("getTransactionState exception: {}, {}", 
getTxnResponse.getStatus().getCode(),
-                    getTxnResponse.getStatus().getMsg());
+        if (getTxnResponse == null || getTxnResponse.getStatus().getCode() != 
MetaServiceCode.OK
+                || !getTxnResponse.hasTxnInfo()) {
+            LOG.info("getTransactionState exception: {}",
+                    getTxnResponse == null ? "null response" : 
getTxnResponse.getStatus().getCode()
+                            + " " + getTxnResponse.getStatus().getMsg());
             return null;
         }
         return TxnUtil.transactionStateFromPb(getTxnResponse.getTxnInfo());
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
index 75648634e3b..9df6a5f3d3a 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
@@ -284,7 +284,17 @@ public class CloudGlobalTransactionMgrTest {
 
     @Test
     public void testAbortTransaction() throws UserException {
+        long transactionId = 123533;
         new MockUp<MetaServiceProxy>(MetaServiceProxy.class) {
+            @Mock
+            public Cloud.GetTxnResponse getTxn(Cloud.GetTxnRequest request) {
+                return Cloud.GetTxnResponse.newBuilder()
+                        .setStatus(Cloud.MetaServiceResponseStatus.newBuilder()
+                                .setCode(MetaServiceCode.OK).setMsg("OK"))
+                        .setTxnInfo(buildTxnInfo(transactionId))
+                        .build();
+            }
+
             @Mock
             public Cloud.AbortTxnResponse abortTxn(Cloud.AbortTxnRequest 
request) {
                 AbortTxnResponse.Builder abortTxnResponseBuilder = 
AbortTxnResponse.newBuilder();
@@ -293,7 +303,6 @@ public class CloudGlobalTransactionMgrTest {
                 return abortTxnResponseBuilder.build();
             }
         };
-        long transactionId = 123533;
         masterTransMgr.abortTransaction(CatalogTestUtil.testDbId1, 
transactionId, "User Cancelled");
     }
 
@@ -313,8 +322,19 @@ public class CloudGlobalTransactionMgrTest {
 
     @Test
     public void testAbortTransactionConflict() throws UserException {
+        long transactionId = 123533;
         new MockUp<MetaServiceProxy>(MetaServiceProxy.class) {
             int times = 1;
+
+            @Mock
+            public Cloud.GetTxnResponse getTxn(Cloud.GetTxnRequest request) {
+                return Cloud.GetTxnResponse.newBuilder()
+                        .setStatus(Cloud.MetaServiceResponseStatus.newBuilder()
+                                .setCode(MetaServiceCode.OK).setMsg("OK"))
+                        .setTxnInfo(buildTxnInfo(transactionId))
+                        .build();
+            }
+
             @Mock
             public Cloud.AbortTxnResponse abortTxn(Cloud.AbortTxnRequest 
request) {
                 AbortTxnResponse.Builder abortTxnResponseBuilder = 
AbortTxnResponse.newBuilder();
@@ -330,7 +350,6 @@ public class CloudGlobalTransactionMgrTest {
                 return abortTxnResponseBuilder.build();
             }
         };
-        long transactionId = 123533;
         masterTransMgr.abortTransaction(CatalogTestUtil.testDbId1, 
transactionId, "User Cancelled");
     }
 
@@ -405,4 +424,15 @@ public class CloudGlobalTransactionMgrTest {
         long result = masterTransMgr.getNextTransactionId();
         Assert.assertEquals(1000, result);
     }
+
+    private TxnInfoPB buildTxnInfo(long transactionId) {
+        return TxnInfoPB.newBuilder()
+                .setDbId(CatalogTestUtil.testDbId1)
+                
.addAllTableIds(Lists.newArrayList(CatalogTestUtil.testTableId1))
+                .setTxnId(transactionId)
+                .setLabel(CatalogTestUtil.testTxnLabel1)
+                .setListenerId(0L)
+                .setStatus(Cloud.TxnStatusPB.TXN_STATUS_PREPARED)
+                .build();
+    }
 }
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_be_restart.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_be_restart.groovy
index 74e93dc22bd..ef5f1c10cf5 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_be_restart.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_be_restart.groovy
@@ -78,6 +78,23 @@ suite("test_routine_load_be_restart","nonConcurrent") {
                 GetDebugPoint().disableDebugPointForAllFEs(injection_load_hang)
             }
 
+            // After the simulated BE restart, the routine load must recover: 
new tasks are
+            // scheduled (afterAborted callback ran without exception) and 
data is loaded.
+            // If handleAfterAbort threw IllegalMonitorStateException the job 
would be stuck.
+            def count = RoutineLoadTestUtils.waitForTaskFinish(runSql, job, 
tableName, 0)
+            logger.info("rows loaded after be-restart abort, wait iterations: 
" + count)
+
+            // Verify job is still RUNNING (not PAUSED due to an exception in 
afterAborted).
+            def res = sql "show routine load for ${job}"
+            assertEquals("RUNNING", res[0][8].toString())
+
+            // Send additional data and confirm it is loaded to prove ongoing 
task scheduling
+            // works correctly after the abort — this would time-out if the 
replacement task
+            // was never enqueued due to the missing beforeAborted() call.
+            def rowsAfterFirstLoad = sql("select count(*) from 
${tableName}")[0][0] as int
+            RoutineLoadTestUtils.sendTestDataToKafka(producer, kafkaCsvTopics)
+            RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 
rowsAfterFirstLoad)
+
             // test coordinator be down
             def injection_abort_txn_be_down = 
"HeartbeatMgr.abortTxnWhenCoordinateBeDown"
             try {
@@ -89,8 +106,8 @@ suite("test_routine_load_be_restart","nonConcurrent") {
                 
GetDebugPoint().disableDebugPointForAllFEs(injection_abort_txn_be_down)
                 GetDebugPoint().disableDebugPointForAllFEs(injection_load_hang)
             }
-            def count = RoutineLoadTestUtils.waitForTaskFinish(runSql, job, 
tableName, 0)
-            logger.info("wait count: " + count)
+            def finalCount = RoutineLoadTestUtils.waitForTaskFinish(runSql, 
job, tableName, 0)
+            logger.info("wait count: " + finalCount)
         } finally {
             sql "stop routine load for ${job}"
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to