codelipenghui commented on a change in pull request #9236:
URL: https://github.com/apache/pulsar/pull/9236#discussion_r580264385



##########
File path: 
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
##########
@@ -1931,6 +1931,12 @@
     )
     private boolean transactionCoordinatorEnabled = false;
 
+    @FieldContext(
+            category = CATEGORY_TRANSACTION,
+            doc = "End transaction operation retry interval time. 
(unit:second)"
+    )
+    private long endTxnOpRetryIntervalTime = 5;

Review comment:
       If this is an internal retry logic in the TransactionMetadataService, we 
don't need to expose the configuration here.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -305,7 +320,18 @@ public long getLowWaterMark(TxnID txnID) {
                 resultFuture.completeExceptionally(e);
             }
         });
-        return resultFuture;
+
+        return resultFuture.thenCompose((future) -> 
endTxnInTransactionMetadataStore(txnID, txnAction));
+    }
+
+    private CompletableFuture<Void> endTxnInTransactionMetadataStore(TxnID 
txnID, int txnAction) {
+        if (TxnAction.COMMIT.getValue() == txnAction) {
+            return updateTxnStatus(txnID, TxnStatus.COMMITTED, 
TxnStatus.COMMITTING);
+        } else if (TxnAction.ABORT.getValue() == txnAction) {
+            return updateTxnStatus(txnID, TxnStatus.ABORTED, 
TxnStatus.ABORTING);
+        } else {
+            return FutureUtil.failedFuture(new Throwable("Unsupported 
txnAction " + txnAction));

Review comment:
       Why complete with Throwable? It should be a transaction exception.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -224,15 +231,23 @@ public long getLowWaterMark(TxnID txnID) {
         }
 
         completableFuture = updateTxnStatus(txnID, newStatus, TxnStatus.OPEN)
-                .thenCompose(ignored -> endTxnInTransactionBuffer(txnID, 
txnAction));
-        if (TxnStatus.COMMITTING.equals(newStatus)) {
-            completableFuture = completableFuture
-                    .thenCompose(ignored -> updateTxnStatus(txnID, 
TxnStatus.COMMITTED, TxnStatus.COMMITTING));
-        } else if (TxnStatus.ABORTING.equals(newStatus)) {
-            completableFuture = completableFuture
-                    .thenCompose(ignored -> updateTxnStatus(txnID, 
TxnStatus.ABORTED, TxnStatus.ABORTING));
-        }
-        return completableFuture;
+                .thenCompose(ignored -> endTxnInTransactionBuffer(txnID, 
txnAction))
+                .exceptionally(e -> {
+                    if (e.getCause() instanceof TransactionNotFoundException
+                            || e.getCause() instanceof 
InvalidTxnStatusException) {

Review comment:
       It's better to create a method `isRetryableException()` to clearly list 
those exceptions that need to be retried. This will avoid stuck in an infinite 
retry situation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to