This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e88da2a  Fix transaction log append sync problem. (#9238)
e88da2a is described below

commit e88da2ad26608f102845dece1dd84301c5fb6340
Author: congbo <[email protected]>
AuthorDate: Wed Mar 3 07:51:30 2021 +0800

    Fix transaction log append sync problem. (#9238)
    
    ## Motivation
    When the transaction status become aborted or committed the cursor may not 
delete the position, if not lock transaction operation and add append log 
position.
    
    If the txnMetaImpl have recycle, the same time we update the txnStatus in 
this recycle one we will produce the error behavior.
---
 .../impl/InMemTransactionMetadataStore.java        |  2 +-
 .../impl/MLTransactionMetadataStore.java           | 30 +++++++++++---------
 .../transaction/coordinator/impl/TxnMetaImpl.java  | 33 ++--------------------
 3 files changed, 21 insertions(+), 44 deletions(-)

diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
index 68d1c0e..1b588c7 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
@@ -65,7 +65,7 @@ class InMemTransactionMetadataStore implements 
TransactionMetadataStore {
             tcID.getId(),
             localID.getAndIncrement()
         );
-        TxnMetaImpl txn = TxnMetaImpl.create(txnID);
+        TxnMetaImpl txn = new TxnMetaImpl(txnID);
         transactions.put(txnID, txn);
         return CompletableFuture.completedFuture(txnID);
     }
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 227b4c4..f86e566 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -106,7 +106,7 @@ public class MLTransactionMetadataStore
                             } else {
                                 List<Position> positions = new ArrayList<>();
                                 positions.add(position);
-                                txnMetaMap.put(txnID, 
MutablePair.of(TxnMetaImpl.create(txnID), positions));
+                                txnMetaMap.put(txnID, MutablePair.of(new 
TxnMetaImpl(txnID), positions));
                                 
txnIdSortedSet.add(transactionMetadataEntry.getTxnidLeastBits());
                                 
timeoutTracker.replayAddTransaction(transactionMetadataEntry.getTxnidLeastBits(),
                                         
transactionMetadataEntry.getTimeoutMs());
@@ -140,7 +140,6 @@ public class MLTransactionMetadataStore
                                     
transactionLog.deletePosition(txnMetaMap.get(txnID).getRight()).thenAccept(v -> 
{
                                         TxnMeta txnMeta = 
txnMetaMap.remove(txnID).getLeft();
                                         
txnIdSortedSet.remove(transactionMetadataEntry.getTxnidLeastBits());
-                                        ((TxnMetaImpl) txnMeta).recycle();
                                     });
                                 } else {
                                     txnMetaMap.get(txnID).getLeft()
@@ -200,7 +199,7 @@ public class MLTransactionMetadataStore
                 .setLastModificationTime(currentTimeMillis);
         return transactionLog.append(transactionMetadataEntry)
                 .thenCompose(position -> {
-                    TxnMeta txn = TxnMetaImpl.create(txnID);
+                    TxnMeta txn = new TxnMetaImpl(txnID);
                     List<Position> positions = new ArrayList<>();
                     positions.add(position);
                     Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, 
positions);
@@ -229,11 +228,13 @@ public class MLTransactionMetadataStore
             return transactionLog.append(transactionMetadataEntry)
                     .thenCompose(position -> {
                         try {
-                            
txnMetaListPair.getLeft().addProducedPartitions(partitions);
-                            txnMetaMap.get(txnID).getRight().add(position);
+                            synchronized (txnMetaListPair.getLeft()) {
+                                
txnMetaListPair.getLeft().addProducedPartitions(partitions);
+                                txnMetaMap.get(txnID).getRight().add(position);
+                            }
                             return CompletableFuture.completedFuture(null);
                         } catch (InvalidTxnStatusException e) {
-                            txnMetaMap.get(txnID).getRight().add(position);
+                            
transactionLog.deletePosition(Collections.singletonList(position));
                             log.error("TxnID : " + 
txnMetaListPair.getLeft().id().toString()
                                     + " add produced partition error with 
TxnStatus : "
                                     + 
txnMetaListPair.getLeft().status().name(), e);
@@ -262,11 +263,13 @@ public class MLTransactionMetadataStore
             return transactionLog.append(transactionMetadataEntry)
                     .thenCompose(position -> {
                         try {
-                            
txnMetaListPair.getLeft().addAckedPartitions(txnSubscriptions);
-                            txnMetaMap.get(txnID).getRight().add(position);
+                            synchronized (txnMetaListPair.getLeft()) {
+                                
txnMetaListPair.getLeft().addAckedPartitions(txnSubscriptions);
+                                txnMetaMap.get(txnID).getRight().add(position);
+                            }
                             return CompletableFuture.completedFuture(null);
                         } catch (InvalidTxnStatusException e) {
-                            txnMetaMap.get(txnID).getRight().add(position);
+                            
transactionLog.deletePosition(Collections.singletonList(position));
                             log.error("TxnID : " + 
txnMetaListPair.getLeft().id().toString()
                                     + " add acked subscription error with 
TxnStatus : "
                                     + 
txnMetaListPair.getLeft().status().name(), e);
@@ -297,19 +300,20 @@ public class MLTransactionMetadataStore
 
             return 
transactionLog.append(transactionMetadataEntry).thenCompose(position -> {
                 try {
-                    txnMetaListPair.getLeft().updateTxnStatus(newStatus, 
expectedStatus);
-                    txnMetaListPair.getRight().add(position);
+                    synchronized (txnMetaListPair.getLeft()) {
+                        txnMetaListPair.getLeft().updateTxnStatus(newStatus, 
expectedStatus);
+                        txnMetaListPair.getRight().add(position);
+                    }
                     if (newStatus == TxnStatus.COMMITTED || newStatus == 
TxnStatus.ABORTED) {
                         return 
transactionLog.deletePosition(txnMetaListPair.getRight()).thenCompose(v -> {
                             txnMetaMap.remove(txnID);
                             txnIdSortedSet.remove(txnID.getLeastSigBits());
-                            ((TxnMetaImpl) 
txnMetaListPair.getLeft()).recycle();
                             return CompletableFuture.completedFuture(null);
                         });
                     }
                     return CompletableFuture.completedFuture(null);
                 } catch (InvalidTxnStatusException e) {
-                    txnMetaListPair.getRight().add(position);
+                    
transactionLog.deletePosition(Collections.singletonList(position));
                     log.error("TxnID : " + 
txnMetaListPair.getLeft().id().toString()
                             + " add update txn status error with TxnStatus : "
                             + txnMetaListPair.getLeft().status().name(), e);
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
index 16c5a7b..4462ffc 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.transaction.coordinator.impl;
 
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -38,38 +36,13 @@ import 
org.apache.pulsar.transaction.coordinator.util.TransactionUtil;
  */
 class TxnMetaImpl implements TxnMeta {
 
-    private TxnID txnID;
+    private final TxnID txnID;
     private final Set<String> producedPartitions = new HashSet<>();
     private final Set<TransactionSubscription> ackedPartitions = new 
HashSet<>();
     private volatile TxnStatus txnStatus = TxnStatus.OPEN;
-    private final Handle<TxnMetaImpl> recycleHandle;
 
-    private static final Recycler<TxnMetaImpl> RECYCLER = new 
Recycler<TxnMetaImpl>() {
-        protected TxnMetaImpl newObject(Recycler.Handle<TxnMetaImpl> handle) {
-            return new TxnMetaImpl(handle);
-        }
-    };
-
-    TxnMetaImpl(Handle<TxnMetaImpl> handle) {
-        this.recycleHandle = handle;
-    }
-
-    // Constructor for transaction metadata
-    static TxnMetaImpl create(TxnID txnID) {
-        @SuppressWarnings("unchecked")
-        TxnMetaImpl txnMeta = RECYCLER.get();
-        txnMeta.txnID = txnID;
-        return txnMeta;
-    }
-
-    public void recycle() {
-        this.producedPartitions.clear();
-        this.ackedPartitions.clear();
-        this.txnStatus = TxnStatus.OPEN;
-
-        if (recycleHandle != null) {
-            recycleHandle.recycle(this);
-        }
+    TxnMetaImpl(TxnID txnID) {
+        this.txnID = txnID;
     }
 
     @Override

Reply via email to