This is an automated email from the ASF dual-hosted git repository. bogong pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 48402853178f2ba1cdc0fe1df7dcee223456a9a5 Author: Xiangying Meng <[email protected]> AuthorDate: Sat May 7 20:24:15 2022 +0800 [Fix][txn] Make transaction stats consistent at end txn (#15472) ### Motivation When the end transaction log is appended to the transaction log, the transaction ended. The transaction should be removed from the `txnMetaMap`. If transactionLog fails to delete the location, we only need to log it. ### Modification Not complete exceptionally, but only give a warn log. (cherry picked from commit 5e6580abc8aea515581f0d23964b46bb58e493f4) --- .../pulsar/broker/transaction/TransactionTest.java | 24 ++++++++++++++++++ .../impl/MLTransactionMetadataStore.java | 29 ++++++++++------------ 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 777d5c90363..7ab49481466 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -101,6 +101,7 @@ import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker; import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker; @@ -966,4 +967,27 @@ public class TransactionTest extends TransactionTestBase { Integer.parseInt(lastConfirmedEntry[1]) - 2); }); } + + @Test + public void testConsistencyOfTransactionStatsAtEndTxn() throws Exception { + TransactionMetadataStore transactionMetadataStore = getPulsarServiceList().get(0) + .getTransactionMetadataStoreService() + .getStores() + .get(new TransactionCoordinatorID(0)); + + Field field = MLTransactionMetadataStore.class.getDeclaredField("transactionLog"); + field.setAccessible(true); + MLTransactionLogImpl transactionLog = (MLTransactionLogImpl) field.get(transactionMetadataStore); + Field field1 = MLTransactionLogImpl.class.getDeclaredField("cursor"); + field1.setAccessible(true); + ManagedCursorImpl managedCursor = (ManagedCursorImpl) field1.get(transactionLog); + managedCursor.close(); + + Transaction transaction = pulsarClient.newTransaction() + .withTransactionTimeout(5, TimeUnit.SECONDS) + .build() + .get(); + + transaction.commit().get(); + } } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java index f93de8b0175..685d57e664e 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java @@ -372,23 +372,20 @@ public class MLTransactionMetadataStore this.transactionTimeoutCount.increment(); } if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) { - transactionLog.deletePosition(txnMetaListPair.getRight()).whenComplete((v, ex) -> { - if (ex != null) { - promise.completeExceptionally(ex); - return; - } - this.transactionMetadataStoreStats - .addTransactionExecutionLatencySample(System.currentTimeMillis() - - txnMetaListPair.getLeft().getOpenTimestamp()); - if (newStatus == TxnStatus.COMMITTED) { - committedTransactionCount.increment(); - } else { - abortedTransactionCount.increment(); - } - txnMetaMap.remove(txnID.getLeastSigBits()); - promise.complete(null); + this.transactionMetadataStoreStats + .addTransactionExecutionLatencySample(System.currentTimeMillis() + - txnMetaListPair.getLeft().getOpenTimestamp()); + if (newStatus == TxnStatus.COMMITTED) { + committedTransactionCount.increment(); + } else { + abortedTransactionCount.increment(); + } + txnMetaMap.remove(txnID.getLeastSigBits()); + transactionLog.deletePosition(txnMetaListPair.getRight()).exceptionally(ex -> { + log.warn("Failed to delete transaction log position " + + "at end transaction [{}]", txnID); + return null; }); - return; } promise.complete(null); } catch (InvalidTxnStatusException e) {
