This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e6580abc8a [Fix][txn] Make transaction stats consistent at end txn 
(#15472)
5e6580abc8a is described below

commit 5e6580abc8aea515581f0d23964b46bb58e493f4
Author: Xiangying Meng <[email protected]>
AuthorDate: Sat May 7 20:24:15 2022 +0800

    [Fix][txn] Make transaction stats consistent at end txn (#15472)
    
    ### Motivation
    When the end transaction log is appended to the transaction log, the 
transaction ended. The transaction should be removed from the `txnMetaMap`. If 
transactionLog fails to delete the location, we only need to log it.
    ### Modification
    Not complete exceptionally, but only give a warn log.
---
 .../pulsar/broker/transaction/TransactionTest.java | 24 ++++++++++++++++++
 .../impl/MLTransactionMetadataStore.java           | 29 ++++++++++------------
 2 files changed, 37 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 60ecf04e602..8f2dacc434f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -102,6 +102,7 @@ import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
 import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
 import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
@@ -971,4 +972,27 @@ public class TransactionTest extends TransactionTestBase {
                     Integer.parseInt(lastConfirmedEntry[1]) - 2);
         });
     }
+
+    @Test
+    public void testConsistencyOfTransactionStatsAtEndTxn() throws Exception {
+        TransactionMetadataStore transactionMetadataStore = 
getPulsarServiceList().get(0)
+                .getTransactionMetadataStoreService()
+                .getStores()
+                .get(new TransactionCoordinatorID(0));
+
+        Field field = 
MLTransactionMetadataStore.class.getDeclaredField("transactionLog");
+        field.setAccessible(true);
+        MLTransactionLogImpl transactionLog = (MLTransactionLogImpl) 
field.get(transactionMetadataStore);
+        Field field1 = MLTransactionLogImpl.class.getDeclaredField("cursor");
+        field1.setAccessible(true);
+        ManagedCursorImpl managedCursor = (ManagedCursorImpl) 
field1.get(transactionLog);
+        managedCursor.close();
+
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build()
+                .get();
+
+        transaction.commit().get();
+    }
 }
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index f93de8b0175..685d57e664e 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -372,23 +372,20 @@ public class MLTransactionMetadataStore
                                     this.transactionTimeoutCount.increment();
                                 }
                                 if (newStatus == TxnStatus.COMMITTED || 
newStatus == TxnStatus.ABORTED) {
-                                    
transactionLog.deletePosition(txnMetaListPair.getRight()).whenComplete((v, ex) 
-> {
-                                        if (ex != null) {
-                                            promise.completeExceptionally(ex);
-                                            return;
-                                        }
-                                        this.transactionMetadataStoreStats
-                                                
.addTransactionExecutionLatencySample(System.currentTimeMillis()
-                                                        - 
txnMetaListPair.getLeft().getOpenTimestamp());
-                                        if (newStatus == TxnStatus.COMMITTED) {
-                                            
committedTransactionCount.increment();
-                                        } else {
-                                            
abortedTransactionCount.increment();
-                                        }
-                                        
txnMetaMap.remove(txnID.getLeastSigBits());
-                                        promise.complete(null);
+                                    this.transactionMetadataStoreStats
+                                            
.addTransactionExecutionLatencySample(System.currentTimeMillis()
+                                                    - 
txnMetaListPair.getLeft().getOpenTimestamp());
+                                    if (newStatus == TxnStatus.COMMITTED) {
+                                        committedTransactionCount.increment();
+                                    } else {
+                                        abortedTransactionCount.increment();
+                                    }
+                                    txnMetaMap.remove(txnID.getLeastSigBits());
+                                    
transactionLog.deletePosition(txnMetaListPair.getRight()).exceptionally(ex -> {
+                                        log.warn("Failed to delete transaction 
log position "
+                                                + "at end transaction [{}]", 
txnID);
+                                        return null;
                                     });
-                                    return;
                                 }
                                 promise.complete(null);
                             } catch (InvalidTxnStatusException e) {

Reply via email to