This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new cae220ea578 branch-4.0: [fix](transaction) fix
IllegalMonitorStateException in routine load afterAborted when coordinate BE
restarts (#62892)
cae220ea578 is described below
commit cae220ea578012bc8e09019eaa08403430c6e66e
Author: hui lai <[email protected]>
AuthorDate: Thu May 7 14:43:56 2026 +0800
branch-4.0: [fix](transaction) fix IllegalMonitorStateException in routine
load afterAborted when coordinate BE restarts (#62892)
pick https://github.com/apache/doris/pull/61881
---
.../transaction/CloudGlobalTransactionMgr.java | 80 +++++++++++++++-------
.../transaction/CloudGlobalTransactionMgrTest.java | 34 ++++++++-
.../test_routine_load_be_restart.groovy | 21 +++++-
3 files changed, 106 insertions(+), 29 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index d6a4648141a..42a97d5d94a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -116,6 +116,7 @@ import
org.apache.doris.transaction.GlobalTransactionMgrIface;
import org.apache.doris.transaction.SubTransactionState;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionCommitFailedException;
+import org.apache.doris.transaction.TransactionException;
import org.apache.doris.transaction.TransactionIdGenerator;
import org.apache.doris.transaction.TransactionNotFoundException;
import org.apache.doris.transaction.TransactionState;
@@ -1816,29 +1817,62 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
@Override
public void abortTransaction(Long dbId, Long transactionId, String reason,
TxnCommitAttachment txnCommitAttachment, List<Table> tableList)
throws UserException {
- if (txnCommitAttachment != null) {
- if (txnCommitAttachment instanceof RLTaskTxnCommitAttachment) {
- RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment =
(RLTaskTxnCommitAttachment) txnCommitAttachment;
- TxnStateChangeCallback cb =
callbackFactory.getCallback(rlTaskTxnCommitAttachment.getJobId());
- if (cb != null) {
- // use a temporary transaction state to do before commit
check,
- // what actually works is the transactionId
- TransactionState tmpTxnState = new TransactionState();
- tmpTxnState.setTransactionId(transactionId);
- cb.beforeAborted(tmpTxnState);
- }
- }
- }
+ Pair<Long, TxnStateChangeCallback> callbackInfo =
+ handleBeforeAbort(dbId, transactionId, txnCommitAttachment);
AbortTxnResponse abortTxnResponse = null;
try {
abortTxnResponse = abortTransactionImpl(dbId, transactionId,
reason, null);
} finally {
- handleAfterAbort(abortTxnResponse, txnCommitAttachment,
transactionId);
+ handleAfterAbort(abortTxnResponse, txnCommitAttachment,
transactionId,
+ callbackInfo.first, callbackInfo.second);
clearTxnLastSignature(dbId, transactionId);
}
}
+ /**
+ * Resolves the transaction callback and calls beforeAborted() before the
abort RPC,
+ * so the lock-handoff pattern (beforeAborted acquires, afterAborted
releases) wraps
+ * the correct scope.
+ *
+ * @return a Pair of (callbackId, callback); callback is null if no
callback is registered
+ * or if beforeAborted failed (in either case handleAfterAbort
will skip afterAborted)
+ */
+ private Pair<Long, TxnStateChangeCallback> handleBeforeAbort(Long dbId,
long transactionId,
+ TxnCommitAttachment txnCommitAttachment) throws UserException {
+ long callbackId = 0L;
+ if (txnCommitAttachment instanceof RLTaskTxnCommitAttachment) {
+ callbackId = ((RLTaskTxnCommitAttachment)
txnCommitAttachment).getJobId();
+ } else if (txnCommitAttachment == null) {
+ // txnCommitAttachment is null (e.g. BE restart abort): the
callbackId is only
+ // stored in the meta service, so do a pre-query to fetch it
before the abort RPC,
+ // ensuring beforeAborted is called before the transaction is
actually aborted.
+ TransactionState existingState = getTransactionState(dbId,
transactionId);
+ if (existingState == null) {
+ throw new UserException("failed to get transaction state
before abort, transactionId: "
+ + transactionId);
+ }
+ callbackId = existingState.getCallbackId();
+ }
+ TxnStateChangeCallback cb = callbackFactory.getCallback(callbackId);
+ if (cb != null) {
+ // use a temporary transaction state to do before abort check,
+ // what actually works is the transactionId
+ TransactionState tmpTxnState = new TransactionState();
+ tmpTxnState.setTransactionId(transactionId);
+ try {
+ cb.beforeAborted(tmpTxnState);
+ } catch (TransactionException e) {
+ LOG.warn("beforeAborted failed for txn {}, callbackId {}, msg:
{}",
+ transactionId, callbackId, e.getMessage());
+ // beforeAborted failed so the lock was not acquired; pass
null cb to
+ // handleAfterAbort so afterAborted is skipped and the lock is
not released.
+ return Pair.of(callbackId, null);
+ }
+ }
+ return Pair.of(callbackId, cb);
+ }
+
private AbortTxnResponse abortTransactionImpl(Long dbId, Long
transactionId, String reason,
TxnCommitAttachment txnCommitAttachment) throws UserException {
LOG.info("try to abort transaction, dbId:{}, transactionId:{}, reason:
{}", dbId, transactionId, reason);
@@ -1885,26 +1919,20 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
private void handleAfterAbort(AbortTxnResponse abortTxnResponse,
TxnCommitAttachment txnCommitAttachment,
- long transactionId) throws UserException {
+ long transactionId, long callbackId,
TxnStateChangeCallback cb) throws UserException {
TransactionState txnState = new TransactionState();
boolean txnOperated = false;
- long callbackId = 0L;
- TxnStateChangeCallback cb = null;
String abortReason = "";
if (abortTxnResponse != null) {
txnState =
TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo());
txnOperated = abortTxnResponse.getStatus().getCode() ==
MetaServiceCode.OK;
- callbackId = txnState.getCallbackId();
abortReason = txnState.getReason();
}
- if (txnCommitAttachment != null && txnCommitAttachment instanceof
RLTaskTxnCommitAttachment) {
- RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment =
(RLTaskTxnCommitAttachment) txnCommitAttachment;
- callbackId = rlTaskTxnCommitAttachment.getJobId();
+ if (txnCommitAttachment instanceof RLTaskTxnCommitAttachment) {
txnState.setTransactionId(transactionId);
}
- cb = callbackFactory.getCallback(callbackId);
if (cb != null) {
LOG.info("run txn callback, txnId:{} callbackId:{}, txnState:{}",
transactionId, callbackId, txnState);
@@ -2356,9 +2384,11 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
return null;
}
- if (getTxnResponse.getStatus().getCode() != MetaServiceCode.OK ||
!getTxnResponse.hasTxnInfo()) {
- LOG.info("getTransactionState exception: {}, {}",
getTxnResponse.getStatus().getCode(),
- getTxnResponse.getStatus().getMsg());
+ if (getTxnResponse == null || getTxnResponse.getStatus().getCode() !=
MetaServiceCode.OK
+ || !getTxnResponse.hasTxnInfo()) {
+ LOG.info("getTransactionState exception: {}",
+ getTxnResponse == null ? "null response" :
getTxnResponse.getStatus().getCode()
+ + " " + getTxnResponse.getStatus().getMsg());
return null;
}
return TxnUtil.transactionStateFromPb(getTxnResponse.getTxnInfo());
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
index 75648634e3b..9df6a5f3d3a 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgrTest.java
@@ -284,7 +284,17 @@ public class CloudGlobalTransactionMgrTest {
@Test
public void testAbortTransaction() throws UserException {
+ long transactionId = 123533;
new MockUp<MetaServiceProxy>(MetaServiceProxy.class) {
+ @Mock
+ public Cloud.GetTxnResponse getTxn(Cloud.GetTxnRequest request) {
+ return Cloud.GetTxnResponse.newBuilder()
+ .setStatus(Cloud.MetaServiceResponseStatus.newBuilder()
+ .setCode(MetaServiceCode.OK).setMsg("OK"))
+ .setTxnInfo(buildTxnInfo(transactionId))
+ .build();
+ }
+
@Mock
public Cloud.AbortTxnResponse abortTxn(Cloud.AbortTxnRequest
request) {
AbortTxnResponse.Builder abortTxnResponseBuilder =
AbortTxnResponse.newBuilder();
@@ -293,7 +303,6 @@ public class CloudGlobalTransactionMgrTest {
return abortTxnResponseBuilder.build();
}
};
- long transactionId = 123533;
masterTransMgr.abortTransaction(CatalogTestUtil.testDbId1,
transactionId, "User Cancelled");
}
@@ -313,8 +322,19 @@ public class CloudGlobalTransactionMgrTest {
@Test
public void testAbortTransactionConflict() throws UserException {
+ long transactionId = 123533;
new MockUp<MetaServiceProxy>(MetaServiceProxy.class) {
int times = 1;
+
+ @Mock
+ public Cloud.GetTxnResponse getTxn(Cloud.GetTxnRequest request) {
+ return Cloud.GetTxnResponse.newBuilder()
+ .setStatus(Cloud.MetaServiceResponseStatus.newBuilder()
+ .setCode(MetaServiceCode.OK).setMsg("OK"))
+ .setTxnInfo(buildTxnInfo(transactionId))
+ .build();
+ }
+
@Mock
public Cloud.AbortTxnResponse abortTxn(Cloud.AbortTxnRequest
request) {
AbortTxnResponse.Builder abortTxnResponseBuilder =
AbortTxnResponse.newBuilder();
@@ -330,7 +350,6 @@ public class CloudGlobalTransactionMgrTest {
return abortTxnResponseBuilder.build();
}
};
- long transactionId = 123533;
masterTransMgr.abortTransaction(CatalogTestUtil.testDbId1,
transactionId, "User Cancelled");
}
@@ -405,4 +424,15 @@ public class CloudGlobalTransactionMgrTest {
long result = masterTransMgr.getNextTransactionId();
Assert.assertEquals(1000, result);
}
+
+ private TxnInfoPB buildTxnInfo(long transactionId) {
+ return TxnInfoPB.newBuilder()
+ .setDbId(CatalogTestUtil.testDbId1)
+
.addAllTableIds(Lists.newArrayList(CatalogTestUtil.testTableId1))
+ .setTxnId(transactionId)
+ .setLabel(CatalogTestUtil.testTxnLabel1)
+ .setListenerId(0L)
+ .setStatus(Cloud.TxnStatusPB.TXN_STATUS_PREPARED)
+ .build();
+ }
}
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_be_restart.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_be_restart.groovy
index 74e93dc22bd..ef5f1c10cf5 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_be_restart.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_be_restart.groovy
@@ -78,6 +78,23 @@ suite("test_routine_load_be_restart","nonConcurrent") {
GetDebugPoint().disableDebugPointForAllFEs(injection_load_hang)
}
+ // After the simulated BE restart, the routine load must recover:
new tasks are
+ // scheduled (afterAborted callback ran without exception) and
data is loaded.
+ // If handleAfterAbort threw IllegalMonitorStateException the job
would be stuck.
+ def count = RoutineLoadTestUtils.waitForTaskFinish(runSql, job,
tableName, 0)
+ logger.info("rows loaded after be-restart abort, wait iterations:
" + count)
+
+ // Verify job is still RUNNING (not PAUSED due to an exception in
afterAborted).
+ def res = sql "show routine load for ${job}"
+ assertEquals("RUNNING", res[0][8].toString())
+
+ // Send additional data and confirm it is loaded to prove ongoing
task scheduling
+ // works correctly after the abort — this would time-out if the
replacement task
+ // was never enqueued due to the missing beforeAborted() call.
+ def rowsAfterFirstLoad = sql("select count(*) from
${tableName}")[0][0] as int
+ RoutineLoadTestUtils.sendTestDataToKafka(producer, kafkaCsvTopics)
+ RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName,
rowsAfterFirstLoad)
+
// test coordinator be down
def injection_abort_txn_be_down =
"HeartbeatMgr.abortTxnWhenCoordinateBeDown"
try {
@@ -89,8 +106,8 @@ suite("test_routine_load_be_restart","nonConcurrent") {
GetDebugPoint().disableDebugPointForAllFEs(injection_abort_txn_be_down)
GetDebugPoint().disableDebugPointForAllFEs(injection_load_hang)
}
- def count = RoutineLoadTestUtils.waitForTaskFinish(runSql, job,
tableName, 0)
- logger.info("wait count: " + count)
+ def finalCount = RoutineLoadTestUtils.waitForTaskFinish(runSql,
job, tableName, 0)
+ logger.info("wait count: " + finalCount)
} finally {
sql "stop routine load for ${job}"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]