This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c6b4887124bb5dee9edc7a85448dc341c3ff41a6
Author: 道君 <[email protected]>
AuthorDate: Thu Feb 29 21:22:03 2024 +0800

    [fix][txn]Fix TopicTransactionBuffer potential thread safety issue (#22149)
    
    (cherry picked from commit 74be3fd4917a2327f2da9b5b55cc572b3c1f4e84)
---
 .../buffer/impl/TopicTransactionBuffer.java        | 30 ++++++++++++----------
 1 file changed, 17 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 5392e473947..a36216bd625 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -170,13 +170,15 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                         if (msgMetadata != null && 
msgMetadata.hasTxnidMostBits() && msgMetadata.hasTxnidLeastBits()) {
                             TxnID txnID = new 
TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits());
                             PositionImpl position = 
PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
-                            if (Markers.isTxnMarker(msgMetadata)) {
-                                if (Markers.isTxnAbortMarker(msgMetadata)) {
-                                    
snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, position);
+                            synchronized (TopicTransactionBuffer.this) {
+                                if (Markers.isTxnMarker(msgMetadata)) {
+                                    if (Markers.isTxnAbortMarker(msgMetadata)) 
{
+                                        
snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, position);
+                                    }
+                                    updateMaxReadPosition(txnID);
+                                } else {
+                                    handleTransactionMessage(txnID, position);
                                 }
-                                updateMaxReadPosition(txnID);
-                            } else {
-                                handleTransactionMessage(txnID, position);
                             }
                         }
                     }
@@ -362,10 +364,10 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                             updateMaxReadPosition(txnID);
                             
snapshotAbortedTxnProcessor.trimExpiredAbortedTxns();
                             takeSnapshotByChangeTimes();
+                            txnAbortedCounter.increment();
+                            completableFuture.complete(null);
+                            handleLowWaterMark(txnID, lowWaterMark);
                         }
-                        txnAbortedCounter.increment();
-                        completableFuture.complete(null);
-                        handleLowWaterMark(txnID, lowWaterMark);
                     }
 
                     @Override
@@ -473,7 +475,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
     }
 
     @Override
-    public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {
+    public synchronized boolean isTxnAborted(TxnID txnID, PositionImpl 
readPosition) {
         return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
     }
 
@@ -510,9 +512,11 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
     @Override
     public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
         TransactionInBufferStats transactionInBufferStats = new 
TransactionInBufferStats();
-        transactionInBufferStats.aborted = isTxnAborted(txnID, null);
-        if (ongoingTxns.containsKey(txnID)) {
-            transactionInBufferStats.startPosition = 
ongoingTxns.get(txnID).toString();
+        synchronized (this) {
+            transactionInBufferStats.aborted = isTxnAborted(txnID, null);
+            if (ongoingTxns.containsKey(txnID)) {
+                transactionInBufferStats.startPosition = 
ongoingTxns.get(txnID).toString();
+            }
         }
         return transactionInBufferStats;
     }

Reply via email to