This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1e53635fb25de88c165a8b4f126759979a40dd4a
Author: Wenzhi Feng <[email protected]>
AuthorDate: Wed May 8 19:34:00 2024 +0800

    [fix] [broker] rename to changeMaxReadPositionCount (#22656)
    
    (cherry picked from commit 5ab05129514c1e71a09ec3f28b2b2dda9ce3e47f)
---
 .../transaction/buffer/impl/TopicTransactionBuffer.java  | 16 ++++++++--------
 .../pulsar/broker/transaction/TransactionTest.java       | 12 ++++++------
 2 files changed, 14 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index a36216bd625..81c9ecfc728 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -76,8 +76,8 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
      */
     private final LinkedMap<TxnID, PositionImpl> ongoingTxns = new 
LinkedMap<>();
 
-    // when add abort or change max read position, the count will +1. Take 
snapshot will set 0 into it.
-    private final AtomicLong changeMaxReadPositionAndAddAbortTimes = new 
AtomicLong();
+    // when change max read position, the count will +1. Take snapshot will 
reset the count.
+    private final AtomicLong changeMaxReadPositionCount = new AtomicLong();
 
     private final LongAdder txnCommittedCounter = new LongAdder();
 
@@ -429,15 +429,15 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
     }
 
     private void takeSnapshotByChangeTimes() {
-        if (changeMaxReadPositionAndAddAbortTimes.get() >= 
takeSnapshotIntervalNumber) {
-            this.changeMaxReadPositionAndAddAbortTimes.set(0);
+        if (changeMaxReadPositionCount.get() >= takeSnapshotIntervalNumber) {
+            this.changeMaxReadPositionCount.set(0);
             
this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition);
         }
     }
 
     private void takeSnapshotByTimeout() {
-        if (changeMaxReadPositionAndAddAbortTimes.get() > 0) {
-            this.changeMaxReadPositionAndAddAbortTimes.set(0);
+        if (changeMaxReadPositionCount.get() > 0) {
+            this.changeMaxReadPositionCount.set(0);
             
this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition);
         }
         this.timer.newTimeout(TopicTransactionBuffer.this,
@@ -454,7 +454,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
             maxReadPosition = (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
         }
         if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
-            this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
+            this.changeMaxReadPositionCount.getAndIncrement();
         }
     }
 
@@ -489,7 +489,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
             } else if (checkIfReady()) {
                 if (ongoingTxns.isEmpty()) {
                     maxReadPosition = position;
-                    changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
+                    changeMaxReadPositionCount.incrementAndGet();
                 }
             }
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 28dc2f8972c..86def029186 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1062,10 +1062,10 @@ public class TransactionTest extends 
TransactionTestBase {
     }
 
     @Test
-    public void 
testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() throws 
Exception {
+    public void testNotChangeMaxReadPositionCountWhenCheckIfNoSnapshot() 
throws Exception {
         PersistentTopic persistentTopic = (PersistentTopic) 
getPulsarServiceList().get(0)
                 .getBrokerService()
-                .getTopic(NAMESPACE1 + 
"/changeMaxReadPositionAndAddAbortTimes" + UUID.randomUUID(), true)
+                .getTopic(NAMESPACE1 + "/changeMaxReadPositionCount" + 
UUID.randomUUID(), true)
                 .get().get();
         TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
         Field processorField = 
TopicTransactionBuffer.class.getDeclaredField("snapshotAbortedTxnProcessor");
@@ -1073,9 +1073,9 @@ public class TransactionTest extends TransactionTestBase {
 
         AbortedTxnProcessor abortedTxnProcessor = (AbortedTxnProcessor) 
processorField.get(buffer);
         Field changeTimeField = TopicTransactionBuffer
-                
.class.getDeclaredField("changeMaxReadPositionAndAddAbortTimes");
+                .class.getDeclaredField("changeMaxReadPositionCount");
         changeTimeField.setAccessible(true);
-        AtomicLong changeMaxReadPositionAndAddAbortTimes = (AtomicLong) 
changeTimeField.get(buffer);
+        AtomicLong changeMaxReadPositionCount = (AtomicLong) 
changeTimeField.get(buffer);
 
         Field field1 = 
TopicTransactionBufferState.class.getDeclaredField("state");
         field1.setAccessible(true);
@@ -1084,10 +1084,10 @@ public class TransactionTest extends 
TransactionTestBase {
                     TopicTransactionBufferState.State state = 
(TopicTransactionBufferState.State) field1.get(buffer);
                     Assert.assertEquals(state, 
TopicTransactionBufferState.State.NoSnapshot);
         });
-        Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L);
+        Assert.assertEquals(changeMaxReadPositionCount.get(), 0L);
 
         buffer.syncMaxReadPositionForNormalPublish(new PositionImpl(1, 1));
-        Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L);
+        Assert.assertEquals(changeMaxReadPositionCount.get(), 0L);
 
     }
 

Reply via email to