This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5e6580abc8a [Fix][txn] Make transaction stats consistent at end txn
(#15472)
5e6580abc8a is described below
commit 5e6580abc8aea515581f0d23964b46bb58e493f4
Author: Xiangying Meng <[email protected]>
AuthorDate: Sat May 7 20:24:15 2022 +0800
[Fix][txn] Make transaction stats consistent at end txn (#15472)
### Motivation
When the end transaction log is appended to the transaction log, the
transaction ended. The transaction should be removed from the `txnMetaMap`. If
transactionLog fails to delete the location, we only need to log it.
### Modification
Not complete exceptionally, but only give a warn log.
---
.../pulsar/broker/transaction/TransactionTest.java | 24 ++++++++++++++++++
.../impl/MLTransactionMetadataStore.java | 29 ++++++++++------------
2 files changed, 37 insertions(+), 16 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 60ecf04e602..8f2dacc434f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -102,6 +102,7 @@ import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
@@ -971,4 +972,27 @@ public class TransactionTest extends TransactionTestBase {
Integer.parseInt(lastConfirmedEntry[1]) - 2);
});
}
+
+ @Test
+ public void testConsistencyOfTransactionStatsAtEndTxn() throws Exception {
+ TransactionMetadataStore transactionMetadataStore =
getPulsarServiceList().get(0)
+ .getTransactionMetadataStoreService()
+ .getStores()
+ .get(new TransactionCoordinatorID(0));
+
+ Field field =
MLTransactionMetadataStore.class.getDeclaredField("transactionLog");
+ field.setAccessible(true);
+ MLTransactionLogImpl transactionLog = (MLTransactionLogImpl)
field.get(transactionMetadataStore);
+ Field field1 = MLTransactionLogImpl.class.getDeclaredField("cursor");
+ field1.setAccessible(true);
+ ManagedCursorImpl managedCursor = (ManagedCursorImpl)
field1.get(transactionLog);
+ managedCursor.close();
+
+ Transaction transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build()
+ .get();
+
+ transaction.commit().get();
+ }
}
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index f93de8b0175..685d57e664e 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -372,23 +372,20 @@ public class MLTransactionMetadataStore
this.transactionTimeoutCount.increment();
}
if (newStatus == TxnStatus.COMMITTED ||
newStatus == TxnStatus.ABORTED) {
-
transactionLog.deletePosition(txnMetaListPair.getRight()).whenComplete((v, ex)
-> {
- if (ex != null) {
- promise.completeExceptionally(ex);
- return;
- }
- this.transactionMetadataStoreStats
-
.addTransactionExecutionLatencySample(System.currentTimeMillis()
- -
txnMetaListPair.getLeft().getOpenTimestamp());
- if (newStatus == TxnStatus.COMMITTED) {
-
committedTransactionCount.increment();
- } else {
-
abortedTransactionCount.increment();
- }
-
txnMetaMap.remove(txnID.getLeastSigBits());
- promise.complete(null);
+ this.transactionMetadataStoreStats
+
.addTransactionExecutionLatencySample(System.currentTimeMillis()
+ -
txnMetaListPair.getLeft().getOpenTimestamp());
+ if (newStatus == TxnStatus.COMMITTED) {
+ committedTransactionCount.increment();
+ } else {
+ abortedTransactionCount.increment();
+ }
+ txnMetaMap.remove(txnID.getLeastSigBits());
+
transactionLog.deletePosition(txnMetaListPair.getRight()).exceptionally(ex -> {
+ log.warn("Failed to delete transaction
log position "
+ + "at end transaction [{}]",
txnID);
+ return null;
});
- return;
}
promise.complete(null);
} catch (InvalidTxnStatusException e) {