This is an automated email from the ASF dual-hosted git repository. rgao pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 45c74fb6b46231acd8f584ca0d45c7f4d6992cc4 Author: Xiangying Meng <[email protected]> AuthorDate: Thu Feb 24 21:57:38 2022 +0800 [Transaction] Adopt single thread pool in TC (#14238) ### Motivation Optimize code and improve maintainability. ### Modification * Option 1 (the way I use) Create a thread pool at peer TC. * advantage Each TC has a single thread pool to perform its own tasks, and will not be blocked due to sharing a single thread with other TCs * disadvantage Too many thread pools may be created * Option 2 Create an ExecuteProvider in the TC service. It create some single-threaded pools when the TC Service is created, and then assign a single-threaded pool to TC when the TC is created * The advantages and disadvantages are opposite to the option one (cherry picked from commit ced57866700aaeae163bcc6670d9a8eb1ffe8c50) --- .../impl/MLTransactionMetadataStore.java | 328 ++++++++++++--------- 1 file changed, 188 insertions(+), 140 deletions(-) diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java index a71d203..f109ec4 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java @@ -18,12 +18,15 @@ */ package org.apache.pulsar.transaction.coordinator.impl; +import io.netty.util.concurrent.DefaultThreadFactory; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.Position; @@ -69,6 +72,7 @@ public class MLTransactionMetadataStore private final LongAdder transactionTimeoutCount; private final LongAdder appendLogCount; private final MLTransactionSequenceIdGenerator sequenceIdGenerator; + private final ExecutorService internalPinnedExecutor; public MLTransactionMetadataStore(TransactionCoordinatorID tcID, MLTransactionLogImpl mlTransactionLog, @@ -87,12 +91,16 @@ public class MLTransactionMetadataStore this.abortedTransactionCount = new LongAdder(); this.transactionTimeoutCount = new LongAdder(); this.appendLogCount = new LongAdder(); + DefaultThreadFactory threadFactory = new DefaultThreadFactory("transaction_coordinator_" + + tcID.toString() + "thread_factory"); + this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory); if (!changeToInitializingState()) { log.error("Managed ledger transaction metadata store change state error when init it"); return; } - new Thread(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() { + + internalPinnedExecutor.execute(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() { @Override public void replayComplete() { @@ -125,7 +133,8 @@ public class MLTransactionMetadataStore long timeoutAt = transactionMetadataEntry.getTimeoutMs(); txnMetaMap.put(transactionId, MutablePair.of(new TxnMetaImpl(txnID, openTimestamp, timeoutAt), positions)); - recoverTracker.handleOpenStatusTransaction(txnSequenceId, timeoutAt + openTimestamp); + recoverTracker.handleOpenStatusTransaction(txnSequenceId, + timeoutAt + openTimestamp); } break; case ADD_PARTITION: @@ -174,7 +183,7 @@ public class MLTransactionMetadataStore log.error(e.getMessage(), e); } } - })).start(); + })); } @Override @@ -195,167 +204,206 @@ public class MLTransactionMetadataStore } @Override - public synchronized CompletableFuture<TxnID> newTransaction(long timeOut) { - if (!checkIfReady()) { - return FutureUtil.failedFuture( - new CoordinatorException - .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction")); - } + public CompletableFuture<TxnID> newTransaction(long timeOut) { + CompletableFuture<TxnID> completableFuture = new CompletableFuture<>(); + internalPinnedExecutor.execute(() -> { + if (!checkIfReady()) { + completableFuture.completeExceptionally(new CoordinatorException + .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction")); + return; + } - long mostSigBits = tcID.getId(); - long leastSigBits = sequenceIdGenerator.generateSequenceId(); - TxnID txnID = new TxnID(mostSigBits, leastSigBits); - long currentTimeMillis = System.currentTimeMillis(); - TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry() - .setTxnidMostBits(mostSigBits) - .setTxnidLeastBits(leastSigBits) - .setStartTime(currentTimeMillis) - .setTimeoutMs(timeOut) - .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW) - .setLastModificationTime(currentTimeMillis) - .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId()); - return transactionLog.append(transactionMetadataEntry) - .thenCompose(position -> { - appendLogCount.increment(); - TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut); - List<Position> positions = new ArrayList<>(); - positions.add(position); - Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions); - txnMetaMap.put(leastSigBits, pair); - this.timeoutTracker.addTransaction(leastSigBits, timeOut); - createdTransactionCount.increment(); - return CompletableFuture.completedFuture(txnID); - }); + long mostSigBits = tcID.getId(); + long leastSigBits = sequenceIdGenerator.generateSequenceId(); + TxnID txnID = new TxnID(mostSigBits, leastSigBits); + long currentTimeMillis = System.currentTimeMillis(); + TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry() + .setTxnidMostBits(mostSigBits) + .setTxnidLeastBits(leastSigBits) + .setStartTime(currentTimeMillis) + .setTimeoutMs(timeOut) + .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW) + .setLastModificationTime(currentTimeMillis) + .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId()); + transactionLog.append(transactionMetadataEntry) + .whenComplete((position, throwable) -> { + if (throwable != null) { + completableFuture.completeExceptionally(throwable); + } else { + appendLogCount.increment(); + TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut); + List<Position> positions = new ArrayList<>(); + positions.add(position); + Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions); + txnMetaMap.put(leastSigBits, pair); + this.timeoutTracker.addTransaction(leastSigBits, timeOut); + createdTransactionCount.increment(); + completableFuture.complete(txnID); + } + }); + }); + return completableFuture; } @Override - public synchronized CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID, List<String> partitions) { - if (!checkIfReady()) { - return FutureUtil.failedFuture( - new CoordinatorException.TransactionMetadataStoreStateException(tcID, - State.Ready, getState(), "add produced partition")); - } - return getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> { - TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry() - .setTxnidMostBits(txnID.getMostSigBits()) - .setTxnidLeastBits(txnID.getLeastSigBits()) - .setMetadataOp(TransactionMetadataOp.ADD_PARTITION) - .addAllPartitions(partitions) - .setLastModificationTime(System.currentTimeMillis()) - .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId()); + public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID, List<String> partitions) { + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + internalPinnedExecutor.execute(() -> { + if (!checkIfReady()) { + completableFuture + .completeExceptionally(new CoordinatorException.TransactionMetadataStoreStateException(tcID, + State.Ready, getState(), "add produced partition")); + return; + } + getTxnPositionPair(txnID).thenAccept(txnMetaListPair -> { + TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry() + .setTxnidMostBits(txnID.getMostSigBits()) + .setTxnidLeastBits(txnID.getLeastSigBits()) + .setMetadataOp(TransactionMetadataOp.ADD_PARTITION) + .addAllPartitions(partitions) + .setLastModificationTime(System.currentTimeMillis()) + .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId()); - return transactionLog.append(transactionMetadataEntry) - .thenCompose(position -> { - appendLogCount.increment(); - try { - synchronized (txnMetaListPair.getLeft()) { - txnMetaListPair.getLeft().addProducedPartitions(partitions); - txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position); + transactionLog.append(transactionMetadataEntry) + .whenComplete((position, exception) -> { + if (exception != null) { + completableFuture.completeExceptionally(exception); + return; } - return CompletableFuture.completedFuture(null); - } catch (InvalidTxnStatusException e) { - transactionLog.deletePosition(Collections.singletonList(position)); - log.error("TxnID : " + txnMetaListPair.getLeft().id().toString() - + " add produced partition error with TxnStatus : " - + txnMetaListPair.getLeft().status().name(), e); - return FutureUtil.failedFuture(e); - } - }); + appendLogCount.increment(); + try { + synchronized (txnMetaListPair.getLeft()) { + txnMetaListPair.getLeft().addProducedPartitions(partitions); + txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position); + } + completableFuture.complete(null); + } catch (InvalidTxnStatusException e) { + transactionLog.deletePosition(Collections.singletonList(position)); + log.error("TxnID : " + txnMetaListPair.getLeft().id().toString() + + " add produced partition error with TxnStatus : " + + txnMetaListPair.getLeft().status().name(), e); + completableFuture.completeExceptionally(e); + } + }); + }); }); + return completableFuture; } @Override - public synchronized CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID, + public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID, List<TransactionSubscription> txnSubscriptions) { - if (!checkIfReady()) { - return FutureUtil.failedFuture( - new CoordinatorException.TransactionMetadataStoreStateException(tcID, - State.Ready, getState(), "add acked partition")); - } - return getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> { - TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry() - .setTxnidMostBits(txnID.getMostSigBits()) - .setTxnidLeastBits(txnID.getLeastSigBits()) - .setMetadataOp(TransactionMetadataOp.ADD_SUBSCRIPTION) - .addAllSubscriptions(txnSubscriptionToSubscription(txnSubscriptions)) - .setLastModificationTime(System.currentTimeMillis()) - .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId()); + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + internalPinnedExecutor.execute(() -> { + if (!checkIfReady()) { + completableFuture.completeExceptionally(new CoordinatorException + .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "add acked partition")); + return; + } + getTxnPositionPair(txnID).thenAccept(txnMetaListPair -> { + TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry() + .setTxnidMostBits(txnID.getMostSigBits()) + .setTxnidLeastBits(txnID.getLeastSigBits()) + .setMetadataOp(TransactionMetadataOp.ADD_SUBSCRIPTION) + .addAllSubscriptions(txnSubscriptionToSubscription(txnSubscriptions)) + .setLastModificationTime(System.currentTimeMillis()) + .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId()); - return transactionLog.append(transactionMetadataEntry) - .thenCompose(position -> { - appendLogCount.increment(); - try { - synchronized (txnMetaListPair.getLeft()) { - txnMetaListPair.getLeft().addAckedPartitions(txnSubscriptions); - txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position); + transactionLog.append(transactionMetadataEntry) + .whenComplete((position, exception) -> { + if (exception != null) { + completableFuture.completeExceptionally(exception); + return; } - return CompletableFuture.completedFuture(null); - } catch (InvalidTxnStatusException e) { - transactionLog.deletePosition(Collections.singletonList(position)); - log.error("TxnID : " + txnMetaListPair.getLeft().id().toString() - + " add acked subscription error with TxnStatus : " - + txnMetaListPair.getLeft().status().name(), e); - return FutureUtil.failedFuture(e); - } - }); + appendLogCount.increment(); + try { + synchronized (txnMetaListPair.getLeft()) { + txnMetaListPair.getLeft().addAckedPartitions(txnSubscriptions); + txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position); + } + completableFuture.complete(null); + } catch (InvalidTxnStatusException e) { + transactionLog.deletePosition(Collections.singletonList(position)); + log.error("TxnID : " + txnMetaListPair.getLeft().id().toString() + + " add acked subscription error with TxnStatus : " + + txnMetaListPair.getLeft().status().name(), e); + completableFuture.completeExceptionally(e); + } + }); + }); }); + return completableFuture; } @Override - public synchronized CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus newStatus, + public CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus newStatus, TxnStatus expectedStatus, boolean isTimeout) { - if (!checkIfReady()) { - return FutureUtil.failedFuture( - new CoordinatorException.TransactionMetadataStoreStateException(tcID, - State.Ready, getState(), "update transaction status")); - } - return getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> { - if (txnMetaListPair.getLeft().status() == newStatus) { - return CompletableFuture.completedFuture(null); + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + internalPinnedExecutor.execute(() -> { + if (!checkIfReady()) { + completableFuture.completeExceptionally(new CoordinatorException + .TransactionMetadataStoreStateException(tcID, + State.Ready, getState(), "update transaction status")); + return; } - TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry() - .setTxnidMostBits(txnID.getMostSigBits()) - .setTxnidLeastBits(txnID.getLeastSigBits()) - .setExpectedStatus(expectedStatus) - .setMetadataOp(TransactionMetadataOp.UPDATE) - .setLastModificationTime(System.currentTimeMillis()) - .setNewStatus(newStatus) - .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId()); + getTxnPositionPair(txnID).thenAccept(txnMetaListPair -> { + if (txnMetaListPair.getLeft().status() == newStatus) { + completableFuture.complete(null); + return; + } + TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry() + .setTxnidMostBits(txnID.getMostSigBits()) + .setTxnidLeastBits(txnID.getLeastSigBits()) + .setExpectedStatus(expectedStatus) + .setMetadataOp(TransactionMetadataOp.UPDATE) + .setLastModificationTime(System.currentTimeMillis()) + .setNewStatus(newStatus) + .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId()); - return transactionLog.append(transactionMetadataEntry).thenCompose(position -> { - appendLogCount.increment(); - try { - synchronized (txnMetaListPair.getLeft()) { - txnMetaListPair.getLeft().updateTxnStatus(newStatus, expectedStatus); - txnMetaListPair.getRight().add(position); + transactionLog.append(transactionMetadataEntry).whenComplete((position, throwable) -> { + if (throwable != null) { + completableFuture.completeExceptionally(throwable); + return; } - if (newStatus == TxnStatus.ABORTING && isTimeout) { - this.transactionTimeoutCount.increment(); - } - if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) { - return transactionLog.deletePosition(txnMetaListPair.getRight()).thenCompose(v -> { - this.transactionMetadataStoreStats - .addTransactionExecutionLatencySample(System.currentTimeMillis() - - txnMetaListPair.getLeft().getOpenTimestamp()); - if (newStatus == TxnStatus.COMMITTED) { - committedTransactionCount.increment(); - } else { - abortedTransactionCount.increment(); - } - txnMetaMap.remove(txnID.getLeastSigBits()); - return CompletableFuture.completedFuture(null); - }); + appendLogCount.increment(); + try { + synchronized (txnMetaListPair.getLeft()) { + txnMetaListPair.getLeft().updateTxnStatus(newStatus, expectedStatus); + txnMetaListPair.getRight().add(position); + } + if (newStatus == TxnStatus.ABORTING && isTimeout) { + this.transactionTimeoutCount.increment(); + } + if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) { + transactionLog.deletePosition(txnMetaListPair.getRight()).whenComplete((v, exception) -> { + if (exception != null) { + completableFuture.completeExceptionally(exception); + return; + } + this.transactionMetadataStoreStats + .addTransactionExecutionLatencySample(System.currentTimeMillis() + - txnMetaListPair.getLeft().getOpenTimestamp()); + if (newStatus == TxnStatus.COMMITTED) { + committedTransactionCount.increment(); + } else { + abortedTransactionCount.increment(); + } + txnMetaMap.remove(txnID.getLeastSigBits()); + completableFuture.complete(null); + }); + } + completableFuture.complete(null); + } catch (InvalidTxnStatusException e) { + transactionLog.deletePosition(Collections.singletonList(position)); + log.error("TxnID : " + txnMetaListPair.getLeft().id().toString() + + " add update txn status error with TxnStatus : " + + txnMetaListPair.getLeft().status().name(), e); + completableFuture.completeExceptionally(e); } - return CompletableFuture.completedFuture(null); - } catch (InvalidTxnStatusException e) { - transactionLog.deletePosition(Collections.singletonList(position)); - log.error("TxnID : " + txnMetaListPair.getLeft().id().toString() - + " add update txn status error with TxnStatus : " - + txnMetaListPair.getLeft().status().name(), e); - return FutureUtil.failedFuture(e); - } + }); }); }); + return completableFuture; } @Override
