This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2dab3bae59a6a9298cd6d9cc520faf8a6e513ba1 Author: Qiang Zhao <74767115+mattisonc...@users.noreply.github.com> AuthorDate: Wed Apr 13 21:43:47 2022 +0800 [improve][txn] Avoid create multiple future and exception handler. (#15089) (cherry picked from commit 4aeeed5dab9dfe9493526f36d539b3ef29cf6fe5) --- .../broker/TransactionMetadataStoreService.java | 228 +++++++++------------ 1 file changed, 92 insertions(+), 136 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java index 679bcf99368..7297c334c4c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java @@ -25,12 +25,12 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; import io.netty.util.concurrent.DefaultThreadFactory; -import java.util.ArrayList; import java.util.Collections; import java.util.Deque; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutorService; @@ -38,6 +38,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; @@ -338,12 +340,13 @@ public class TransactionMetadataStoreService { } public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, boolean isTimeout) { - CompletableFuture<Void> completableFuture = new CompletableFuture<>(); - return endTransaction(txnID, txnAction, isTimeout, completableFuture); + CompletableFuture<Void> future = new CompletableFuture<>(); + endTransaction(txnID, txnAction, isTimeout, future); + return future; } - public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, boolean isTimeout, - CompletableFuture<Void> completableFuture) { + public void endTransaction(TxnID txnID, int txnAction, boolean isTimeout, + CompletableFuture<Void> future) { TxnStatus newStatus; switch (txnAction) { case TxnAction.COMMIT_VALUE: @@ -356,90 +359,60 @@ public class TransactionMetadataStoreService { TransactionCoordinatorException.UnsupportedTxnActionException exception = new TransactionCoordinatorException.UnsupportedTxnActionException(txnID, txnAction); LOG.error(exception.getMessage()); - completableFuture.completeExceptionally(exception); - return completableFuture; + future.completeExceptionally(exception); + return; } - - getTxnMeta(txnID).thenAccept(txnMeta -> { - TxnStatus txnStatus = txnMeta.status(); - if (txnStatus == TxnStatus.OPEN) { - updateTxnStatus(txnID, newStatus, TxnStatus.OPEN, isTimeout).thenAccept(v -> - endTxnInTransactionBuffer(txnID, txnAction).thenAccept(a -> - completableFuture.complete(null)).exceptionally(e -> { - if (!isRetryableException(e.getCause())) { - LOG.error("EndTxnInTransactionBuffer fail! TxnId : {}, " - + "TxnAction : {}", txnID, txnAction, e); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, " - + "TxnAction : {}", txnID, txnAction, e); - } - transactionOpRetryTimer.newTimeout(timeout -> - endTransaction(txnID, txnAction, isTimeout, completableFuture), - endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS); - return null; - - } - completableFuture.completeExceptionally(e.getCause()); - return null; - })).exceptionally(e -> { - if (!isRetryableException(e.getCause())) { - LOG.error("EndTransaction UpdateTxnStatus fail! TxnId : {}, " - + "TxnAction : {}", txnID, txnAction, e); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("EndTransaction UpdateTxnStatus op retry! TxnId : {}, " - + "TxnAction : {}", txnID, txnAction, e); - } - transactionOpRetryTimer.newTimeout(timeout -> endTransaction(txnID, txnAction, - isTimeout, completableFuture), endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS); - return null; - + getTxnMeta(txnID) + .thenCompose(txnMeta -> { + if (txnMeta.status() == TxnStatus.OPEN) { + return updateTxnStatus(txnID, newStatus, TxnStatus.OPEN, isTimeout) + .thenCompose(__ -> endTxnInTransactionBuffer(txnID, txnAction)); + } + return fakeAsyncCheckTxnStatus(txnMeta.status(), txnAction, txnID, newStatus) + .thenCompose(__ -> endTxnInTransactionBuffer(txnID, txnAction)); + }).whenComplete((__, ex)-> { + if (ex == null) { + future.complete(null); + return; + } + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + if (!isRetryableException(realCause)) { + LOG.error("End transaction fail! TxnId : {}, " + + "TxnAction : {}", txnID, txnAction, realCause); + future.completeExceptionally(ex); + return; } - completableFuture.completeExceptionally(e.getCause()); - return null; - }); - } else { - if ((txnStatus == COMMITTING && txnAction == TxnAction.COMMIT.getValue()) - || (txnStatus == ABORTING && txnAction == TxnAction.ABORT.getValue())) { - endTxnInTransactionBuffer(txnID, txnAction).thenAccept(k -> - completableFuture.complete(null)).exceptionally(e -> { - if (isRetryableException(e.getCause())) { - if (LOG.isDebugEnabled()) { - LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, " - + "TxnAction : {}", txnID, txnAction, e); - } - transactionOpRetryTimer.newTimeout(timeout -> - endTransaction(txnID, txnAction, isTimeout, completableFuture), - endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS); - return null; - } else { - LOG.error("EndTxnInTransactionBuffer fail! TxnId : {}, " - + "TxnAction : {}", txnID, txnAction, e); - } - completableFuture.completeExceptionally(e.getCause()); - return null; - }); - } else { if (LOG.isDebugEnabled()) { - LOG.debug("EndTxnInTransactionBuffer op retry! TxnId : {}, TxnAction : {}", txnID, txnAction); + LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, " + + "TxnAction : {}", txnID, txnAction, realCause); } - completableFuture.completeExceptionally(new InvalidTxnStatusException(txnID, newStatus, txnStatus)); - } - } - }).exceptionally(e -> { - if (isRetryableException(e.getCause())) { - if (LOG.isDebugEnabled()) { - LOG.debug("End transaction op retry! TxnId : {}, TxnAction : {}", txnID, txnAction, e); - } - transactionOpRetryTimer.newTimeout(timeout -> endTransaction(txnID, txnAction, isTimeout, - completableFuture), endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS); - return null; + transactionOpRetryTimer.newTimeout(timeout -> + endTransaction(txnID, txnAction, isTimeout, future), + endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS); + }); + } + + private CompletionStage<Void> fakeAsyncCheckTxnStatus(TxnStatus txnStatus, int txnAction, + TxnID txnID, TxnStatus expectStatus) { + boolean isLegal; + switch (txnStatus) { + case COMMITTING: + isLegal = (txnAction == TxnAction.COMMIT.getValue()); + break; + case ABORTING: + isLegal = (txnAction == TxnAction.ABORT.getValue()); + break; + default: + isLegal = false; + } + if (!isLegal) { + if (LOG.isDebugEnabled()) { + LOG.debug("EndTxnInTransactionBuffer op retry! TxnId : {}, TxnAction : {}", txnID, txnAction); } - completableFuture.completeExceptionally(e.getCause()); - return null; - }); - return completableFuture; + return FutureUtil.failedFuture( + new InvalidTxnStatusException(txnID, expectStatus, txnStatus)); + } + return CompletableFuture.completedFuture(null); } // when managedLedger fence will remove this tc and reload @@ -470,59 +443,42 @@ public class TransactionMetadataStoreService { } private CompletableFuture<Void> endTxnInTransactionBuffer(TxnID txnID, int txnAction) { - CompletableFuture<Void> resultFuture = new CompletableFuture<>(); - List<CompletableFuture<TxnID>> completableFutureList = new ArrayList<>(); - this.getTxnMeta(txnID).whenComplete((txnMeta, throwable) -> { - if (throwable != null) { - resultFuture.completeExceptionally(throwable); - return; - } - long lowWaterMark = getLowWaterMark(txnID); - - txnMeta.ackedPartitions().forEach(tbSub -> { - CompletableFuture<TxnID> actionFuture = new CompletableFuture<>(); - if (TxnAction.COMMIT_VALUE == txnAction) { - actionFuture = tbClient.commitTxnOnSubscription( - tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), - txnID.getLeastSigBits(), lowWaterMark); - } else if (TxnAction.ABORT_VALUE == txnAction) { - actionFuture = tbClient.abortTxnOnSubscription( - tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), - txnID.getLeastSigBits(), lowWaterMark); - } else { - actionFuture.completeExceptionally(new Throwable("Unsupported txnAction " + txnAction)); - } - completableFutureList.add(actionFuture); - }); - - txnMeta.producedPartitions().forEach(partition -> { - CompletableFuture<TxnID> actionFuture = new CompletableFuture<>(); - if (TxnAction.COMMIT_VALUE == txnAction) { - actionFuture = tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(), - txnID.getLeastSigBits(), lowWaterMark); - } else if (TxnAction.ABORT_VALUE == txnAction) { - actionFuture = tbClient.abortTxnOnTopic(partition, txnID.getMostSigBits(), - txnID.getLeastSigBits(), lowWaterMark); - } else { - actionFuture.completeExceptionally(new Throwable("Unsupported txnAction " + txnAction)); - } - completableFutureList.add(actionFuture); - }); - - try { - FutureUtil.waitForAll(completableFutureList).whenComplete((ignored, waitThrowable) -> { - if (waitThrowable != null) { - resultFuture.completeExceptionally(waitThrowable); - return; - } - resultFuture.complete(null); + return getTxnMeta(txnID) + .thenCompose(txnMeta -> { + long lowWaterMark = getLowWaterMark(txnID); + Stream<CompletableFuture<?>> onSubFutureStream = txnMeta.ackedPartitions().stream().map(tbSub -> { + switch (txnAction) { + case TxnAction.COMMIT_VALUE: + return tbClient.commitTxnOnSubscription( + tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), + txnID.getLeastSigBits(), lowWaterMark); + case TxnAction.ABORT_VALUE: + return tbClient.abortTxnOnSubscription( + tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), + txnID.getLeastSigBits(), lowWaterMark); + default: + return FutureUtil.failedFuture( + new IllegalStateException("Unsupported txnAction " + txnAction)); + } + }); + Stream<CompletableFuture<?>> onTopicFutureStream = + txnMeta.producedPartitions().stream().map(partition -> { + switch (txnAction) { + case TxnAction.COMMIT_VALUE: + return tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(), + txnID.getLeastSigBits(), lowWaterMark); + case TxnAction.ABORT_VALUE: + return tbClient.abortTxnOnTopic(partition, txnID.getMostSigBits(), + txnID.getLeastSigBits(), lowWaterMark); + default: + return FutureUtil.failedFuture( + new IllegalStateException("Unsupported txnAction " + txnAction)); + } + }); + return FutureUtil.waitForAll(Stream.concat(onSubFutureStream, onTopicFutureStream) + .collect(Collectors.toList())) + .thenCompose(__ -> endTxnInTransactionMetadataStore(txnID, txnAction)); }); - } catch (Exception e) { - resultFuture.completeExceptionally(e); - } - }); - - return resultFuture.thenCompose((future) -> endTxnInTransactionMetadataStore(txnID, txnAction)); } private static boolean isRetryableException(Throwable e) {