liaoxin01 commented on code in PR #41267:
URL: https://github.com/apache/doris/pull/41267#discussion_r1776314891
##########
fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java:
##########
@@ -512,22 +512,33 @@ private void commitTransaction(long dbId, List<Table>
tableList, long transactio
}
final CommitTxnRequest commitTxnRequest = builder.build();
+ boolean txnOperated = false;
+ TransactionState txnState = null;
+ TxnStateChangeCallback cb = null;
+ CommitTxnResponse commitTxnResponse = null;
try {
- commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList);
- } catch (UserException e) {
- // For routine load, it is necessary to release the write lock
when commit transaction fails,
- // otherwise it will cause the lock added in beforeCommitted to
not be released.
+ commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList,
txnState, commitTxnResponse);
+ txnOperated = true;
+ } finally {
if (txnCommitAttachment != null && txnCommitAttachment instanceof
RLTaskTxnCommitAttachment) {
RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment =
(RLTaskTxnCommitAttachment) txnCommitAttachment;
-
Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock();
+ cb =
callbackFactory.getCallback(rlTaskTxnCommitAttachment.getJobId());
+ } else {
+ cb = callbackFactory.getCallback(txnState.getCallbackId());
+ }
+
+ if (cb != null) {
+ LOG.info("commitTxn, run txn callback, transactionId:{}
callbackId:{}, txnState:{}",
+ txnState.getTransactionId(), txnState.getCallbackId(),
txnState);
+ cb.afterCommitted(txnState, txnOperated);
+ cb.afterVisible(txnState, txnOperated);
}
- throw e;
}
}
private void commitTxn(CommitTxnRequest commitTxnRequest, long
transactionId, boolean is2PC, long dbId,
- List<Table> tableList) throws UserException {
- CommitTxnResponse commitTxnResponse = null;
+ List<Table> tableList, TransactionState txnState,
+ CommitTxnResponse commitTxnResponse) throws UserException {
Review Comment:
```suggestion
private void commitTxn(CommitTxnRequest commitTxnRequest, long
transactionId, boolean is2PC, long dbId,
List<Table> tableList, TransactionState txnState) throws
UserException
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]