This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2dab3bae59a6a9298cd6d9cc520faf8a6e513ba1
Author: Qiang Zhao <74767115+mattisonc...@users.noreply.github.com>
AuthorDate: Wed Apr 13 21:43:47 2022 +0800

    [improve][txn] Avoid create multiple future and exception handler. (#15089)
    
    (cherry picked from commit 4aeeed5dab9dfe9493526f36d539b3ef29cf6fe5)
---
 .../broker/TransactionMetadataStoreService.java    | 228 +++++++++------------
 1 file changed, 92 insertions(+), 136 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 679bcf99368..7297c334c4c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -25,12 +25,12 @@ import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutorService;
@@ -38,6 +38,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
@@ -338,12 +340,13 @@ public class TransactionMetadataStoreService {
     }
 
     public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, 
boolean isTimeout) {
-        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-        return endTransaction(txnID, txnAction, isTimeout, completableFuture);
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        endTransaction(txnID, txnAction, isTimeout, future);
+        return future;
     }
 
-    public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, 
boolean isTimeout,
-                                                  CompletableFuture<Void> 
completableFuture) {
+    public void endTransaction(TxnID txnID, int txnAction, boolean isTimeout,
+                                                  CompletableFuture<Void> 
future) {
         TxnStatus newStatus;
         switch (txnAction) {
             case TxnAction.COMMIT_VALUE:
@@ -356,90 +359,60 @@ public class TransactionMetadataStoreService {
                 TransactionCoordinatorException.UnsupportedTxnActionException 
exception =
                         new 
TransactionCoordinatorException.UnsupportedTxnActionException(txnID, txnAction);
                 LOG.error(exception.getMessage());
-                completableFuture.completeExceptionally(exception);
-                return completableFuture;
+                future.completeExceptionally(exception);
+                return;
         }
-
-        getTxnMeta(txnID).thenAccept(txnMeta -> {
-            TxnStatus txnStatus = txnMeta.status();
-            if (txnStatus == TxnStatus.OPEN) {
-                updateTxnStatus(txnID, newStatus, TxnStatus.OPEN, 
isTimeout).thenAccept(v ->
-                        endTxnInTransactionBuffer(txnID, 
txnAction).thenAccept(a ->
-                                
completableFuture.complete(null)).exceptionally(e -> {
-                            if (!isRetryableException(e.getCause())) {
-                                LOG.error("EndTxnInTransactionBuffer fail! 
TxnId : {}, "
-                                        + "TxnAction : {}", txnID, txnAction, 
e);
-                            } else {
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("EndTxnInTransactionBuffer 
retry! TxnId : {}, "
-                                            + "TxnAction : {}", txnID, 
txnAction, e);
-                                }
-                                transactionOpRetryTimer.newTimeout(timeout ->
-                                        endTransaction(txnID, txnAction, 
isTimeout, completableFuture),
-                                        endTransactionRetryIntervalTime, 
TimeUnit.MILLISECONDS);
-                                return null;
-
-                            }
-                            
completableFuture.completeExceptionally(e.getCause());
-                            return null;
-                        })).exceptionally(e -> {
-                    if (!isRetryableException(e.getCause())) {
-                        LOG.error("EndTransaction UpdateTxnStatus fail! TxnId 
: {}, "
-                                + "TxnAction : {}", txnID, txnAction, e);
-                    } else {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("EndTransaction UpdateTxnStatus op 
retry! TxnId : {}, "
-                                    + "TxnAction : {}", txnID, txnAction, e);
-                        }
-                        transactionOpRetryTimer.newTimeout(timeout -> 
endTransaction(txnID, txnAction,
-                                isTimeout, completableFuture), 
endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
-                        return null;
-
+        getTxnMeta(txnID)
+                .thenCompose(txnMeta -> {
+                    if (txnMeta.status() == TxnStatus.OPEN) {
+                        return updateTxnStatus(txnID, newStatus, 
TxnStatus.OPEN, isTimeout)
+                                .thenCompose(__ -> 
endTxnInTransactionBuffer(txnID, txnAction));
+                    }
+                    return fakeAsyncCheckTxnStatus(txnMeta.status(), 
txnAction, txnID, newStatus)
+                            .thenCompose(__ -> 
endTxnInTransactionBuffer(txnID, txnAction));
+                }).whenComplete((__, ex)-> {
+                    if (ex == null) {
+                        future.complete(null);
+                        return;
+                    }
+                    Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                    if (!isRetryableException(realCause)) {
+                        LOG.error("End transaction fail! TxnId : {}, "
+                                + "TxnAction : {}", txnID, txnAction, 
realCause);
+                        future.completeExceptionally(ex);
+                        return;
                     }
-                    completableFuture.completeExceptionally(e.getCause());
-                    return null;
-                });
-            } else {
-                if ((txnStatus == COMMITTING && txnAction == 
TxnAction.COMMIT.getValue())
-                        || (txnStatus == ABORTING && txnAction == 
TxnAction.ABORT.getValue())) {
-                    endTxnInTransactionBuffer(txnID, txnAction).thenAccept(k ->
-                            completableFuture.complete(null)).exceptionally(e 
-> {
-                        if (isRetryableException(e.getCause())) {
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("EndTxnInTransactionBuffer retry! 
TxnId : {}, "
-                                        + "TxnAction : {}", txnID, txnAction, 
e);
-                            }
-                            transactionOpRetryTimer.newTimeout(timeout ->
-                                            endTransaction(txnID, txnAction, 
isTimeout, completableFuture),
-                                    endTransactionRetryIntervalTime, 
TimeUnit.MILLISECONDS);
-                            return null;
-                        } else {
-                            LOG.error("EndTxnInTransactionBuffer fail! TxnId : 
{}, "
-                                    + "TxnAction : {}", txnID, txnAction, e);
-                        }
-                        completableFuture.completeExceptionally(e.getCause());
-                        return null;
-                    });
-                } else {
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug("EndTxnInTransactionBuffer op retry! TxnId : 
{}, TxnAction : {}", txnID, txnAction);
+                        LOG.debug("EndTxnInTransactionBuffer retry! TxnId : 
{}, "
+                                + "TxnAction : {}", txnID, txnAction, 
realCause);
                     }
-                    completableFuture.completeExceptionally(new 
InvalidTxnStatusException(txnID, newStatus, txnStatus));
-                }
-            }
-        }).exceptionally(e -> {
-            if (isRetryableException(e.getCause())) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("End transaction op retry! TxnId : {}, TxnAction 
: {}", txnID, txnAction, e);
-                }
-                transactionOpRetryTimer.newTimeout(timeout -> 
endTransaction(txnID, txnAction, isTimeout,
-                        completableFuture), endTransactionRetryIntervalTime, 
TimeUnit.MILLISECONDS);
-                return null;
+                    transactionOpRetryTimer.newTimeout(timeout ->
+                                    endTransaction(txnID, txnAction, 
isTimeout, future),
+                            endTransactionRetryIntervalTime, 
TimeUnit.MILLISECONDS);
+                });
+    }
+
+    private CompletionStage<Void> fakeAsyncCheckTxnStatus(TxnStatus txnStatus, 
int txnAction,
+                                                          TxnID txnID, 
TxnStatus expectStatus) {
+        boolean isLegal;
+        switch (txnStatus) {
+            case COMMITTING:
+                isLegal =  (txnAction == TxnAction.COMMIT.getValue());
+                break;
+            case ABORTING:
+                isLegal =  (txnAction == TxnAction.ABORT.getValue());
+                break;
+            default:
+                isLegal = false;
+        }
+        if (!isLegal) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("EndTxnInTransactionBuffer op retry! TxnId : {}, 
TxnAction : {}", txnID, txnAction);
             }
-            completableFuture.completeExceptionally(e.getCause());
-            return null;
-        });
-        return completableFuture;
+            return FutureUtil.failedFuture(
+                    new InvalidTxnStatusException(txnID, expectStatus, 
txnStatus));
+        }
+       return CompletableFuture.completedFuture(null);
     }
 
     // when managedLedger fence will remove this tc and reload
@@ -470,59 +443,42 @@ public class TransactionMetadataStoreService {
     }
 
     private CompletableFuture<Void> endTxnInTransactionBuffer(TxnID txnID, int 
txnAction) {
-        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
-        List<CompletableFuture<TxnID>> completableFutureList = new 
ArrayList<>();
-        this.getTxnMeta(txnID).whenComplete((txnMeta, throwable) -> {
-            if (throwable != null) {
-                resultFuture.completeExceptionally(throwable);
-                return;
-            }
-            long lowWaterMark = getLowWaterMark(txnID);
-
-            txnMeta.ackedPartitions().forEach(tbSub -> {
-                CompletableFuture<TxnID> actionFuture = new 
CompletableFuture<>();
-                if (TxnAction.COMMIT_VALUE == txnAction) {
-                    actionFuture = tbClient.commitTxnOnSubscription(
-                            tbSub.getTopic(), tbSub.getSubscription(), 
txnID.getMostSigBits(),
-                            txnID.getLeastSigBits(), lowWaterMark);
-                } else if (TxnAction.ABORT_VALUE == txnAction) {
-                    actionFuture = tbClient.abortTxnOnSubscription(
-                            tbSub.getTopic(), tbSub.getSubscription(), 
txnID.getMostSigBits(),
-                            txnID.getLeastSigBits(), lowWaterMark);
-                } else {
-                    actionFuture.completeExceptionally(new 
Throwable("Unsupported txnAction " + txnAction));
-                }
-                completableFutureList.add(actionFuture);
-            });
-
-            txnMeta.producedPartitions().forEach(partition -> {
-                CompletableFuture<TxnID> actionFuture = new 
CompletableFuture<>();
-                if (TxnAction.COMMIT_VALUE == txnAction) {
-                    actionFuture = tbClient.commitTxnOnTopic(partition, 
txnID.getMostSigBits(),
-                            txnID.getLeastSigBits(), lowWaterMark);
-                } else if (TxnAction.ABORT_VALUE == txnAction) {
-                    actionFuture = tbClient.abortTxnOnTopic(partition, 
txnID.getMostSigBits(),
-                            txnID.getLeastSigBits(), lowWaterMark);
-                } else {
-                    actionFuture.completeExceptionally(new 
Throwable("Unsupported txnAction " + txnAction));
-                }
-                completableFutureList.add(actionFuture);
-            });
-
-            try {
-                
FutureUtil.waitForAll(completableFutureList).whenComplete((ignored, 
waitThrowable) -> {
-                    if (waitThrowable != null) {
-                        resultFuture.completeExceptionally(waitThrowable);
-                        return;
-                    }
-                    resultFuture.complete(null);
+        return getTxnMeta(txnID)
+                .thenCompose(txnMeta -> {
+                    long lowWaterMark = getLowWaterMark(txnID);
+                    Stream<CompletableFuture<?>> onSubFutureStream = 
txnMeta.ackedPartitions().stream().map(tbSub -> {
+                        switch (txnAction) {
+                            case TxnAction.COMMIT_VALUE:
+                                return tbClient.commitTxnOnSubscription(
+                                        tbSub.getTopic(), 
tbSub.getSubscription(), txnID.getMostSigBits(),
+                                        txnID.getLeastSigBits(), lowWaterMark);
+                            case TxnAction.ABORT_VALUE:
+                                return tbClient.abortTxnOnSubscription(
+                                        tbSub.getTopic(), 
tbSub.getSubscription(), txnID.getMostSigBits(),
+                                        txnID.getLeastSigBits(), lowWaterMark);
+                            default:
+                                return FutureUtil.failedFuture(
+                                        new IllegalStateException("Unsupported 
txnAction " + txnAction));
+                        }
+                    });
+                    Stream<CompletableFuture<?>> onTopicFutureStream =
+                            
txnMeta.producedPartitions().stream().map(partition -> {
+                                switch (txnAction) {
+                                    case TxnAction.COMMIT_VALUE:
+                                        return 
tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(),
+                                                txnID.getLeastSigBits(), 
lowWaterMark);
+                                    case TxnAction.ABORT_VALUE:
+                                        return 
tbClient.abortTxnOnTopic(partition, txnID.getMostSigBits(),
+                                            txnID.getLeastSigBits(), 
lowWaterMark);
+                                    default:
+                                        return FutureUtil.failedFuture(
+                                                new 
IllegalStateException("Unsupported txnAction " + txnAction));
+                        }
+                    });
+                    return 
FutureUtil.waitForAll(Stream.concat(onSubFutureStream, onTopicFutureStream)
+                                    .collect(Collectors.toList()))
+                            .thenCompose(__ -> 
endTxnInTransactionMetadataStore(txnID, txnAction));
                 });
-            } catch (Exception e) {
-                resultFuture.completeExceptionally(e);
-            }
-        });
-
-        return resultFuture.thenCompose((future) -> 
endTxnInTransactionMetadataStore(txnID, txnAction));
     }
 
     private static boolean isRetryableException(Throwable e) {

Reply via email to