codelipenghui commented on a change in pull request #9236:
URL: https://github.com/apache/pulsar/pull/9236#discussion_r580264385
##########
File path:
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
##########
@@ -1931,6 +1931,12 @@
)
private boolean transactionCoordinatorEnabled = false;
+ @FieldContext(
+ category = CATEGORY_TRANSACTION,
+ doc = "End transaction operation retry interval time.
(unit:second)"
+ )
+ private long endTxnOpRetryIntervalTime = 5;
Review comment:
If this is an internal retry logic in the TransactionMetadataService, we
don't need to expose the configuration here.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -305,7 +320,18 @@ public long getLowWaterMark(TxnID txnID) {
resultFuture.completeExceptionally(e);
}
});
- return resultFuture;
+
+ return resultFuture.thenCompose((future) ->
endTxnInTransactionMetadataStore(txnID, txnAction));
+ }
+
+ private CompletableFuture<Void> endTxnInTransactionMetadataStore(TxnID
txnID, int txnAction) {
+ if (TxnAction.COMMIT.getValue() == txnAction) {
+ return updateTxnStatus(txnID, TxnStatus.COMMITTED,
TxnStatus.COMMITTING);
+ } else if (TxnAction.ABORT.getValue() == txnAction) {
+ return updateTxnStatus(txnID, TxnStatus.ABORTED,
TxnStatus.ABORTING);
+ } else {
+ return FutureUtil.failedFuture(new Throwable("Unsupported
txnAction " + txnAction));
Review comment:
Why complete with Throwable? It should be a transaction exception.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -224,15 +231,23 @@ public long getLowWaterMark(TxnID txnID) {
}
completableFuture = updateTxnStatus(txnID, newStatus, TxnStatus.OPEN)
- .thenCompose(ignored -> endTxnInTransactionBuffer(txnID,
txnAction));
- if (TxnStatus.COMMITTING.equals(newStatus)) {
- completableFuture = completableFuture
- .thenCompose(ignored -> updateTxnStatus(txnID,
TxnStatus.COMMITTED, TxnStatus.COMMITTING));
- } else if (TxnStatus.ABORTING.equals(newStatus)) {
- completableFuture = completableFuture
- .thenCompose(ignored -> updateTxnStatus(txnID,
TxnStatus.ABORTED, TxnStatus.ABORTING));
- }
- return completableFuture;
+ .thenCompose(ignored -> endTxnInTransactionBuffer(txnID,
txnAction))
+ .exceptionally(e -> {
+ if (e.getCause() instanceof TransactionNotFoundException
+ || e.getCause() instanceof
InvalidTxnStatusException) {
Review comment:
It's better to create a method `isRetryableException()` to clearly list
those exceptions that need to be retried. This will avoid stuck in an infinite
retry situation.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]