This is an automated email from the ASF dual-hosted git repository. rgao pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 982a11fe26be5adb8e1f368fb6736a95c3eadadf Author: Xiangying Meng <[email protected]> AuthorDate: Thu Feb 24 21:54:17 2022 +0800 [Transaction] delete changeMaxReadPositionAndAddAbortTimes when checkIfNoSnapshot (#14276) (cherry picked from commit 0a9fd913528181951fd6ad97d3ba07e11e77cd70) --- .../buffer/impl/TopicTransactionBuffer.java | 1 - .../pulsar/broker/transaction/TransactionTest.java | 25 ++++++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 89c77d6..e0a6695 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -474,7 +474,6 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen synchronized (TopicTransactionBuffer.this) { if (checkIfNoSnapshot()) { maxReadPosition = position; - changeMaxReadPositionAndAddAbortTimes.incrementAndGet(); } else if (checkIfReady()) { if (ongoingTxns.isEmpty()) { maxReadPosition = position; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index b882d26..28d4fcf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -47,6 +47,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.AsyncCallbacks; @@ -800,4 +801,28 @@ public class TransactionTest extends TransactionTestBase { timeout = (Timeout) field.get(transaction); Assert.assertTrue(timeout.isCancelled()); } + + @Test + public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() throws Exception { + PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0) + .getBrokerService() + .getTopic(NAMESPACE1 + "/test", true) + .get().get(); + TransactionBuffer buffer = persistentTopic.getTransactionBuffer(); + Field field = TopicTransactionBuffer.class.getDeclaredField("changeMaxReadPositionAndAddAbortTimes"); + field.setAccessible(true); + AtomicLong changeMaxReadPositionAndAddAbortTimes = (AtomicLong) field.get(buffer); + Field field1 = TopicTransactionBufferState.class.getDeclaredField("state"); + field1.setAccessible(true); + + Awaitility.await().untilAsserted(() -> { + TopicTransactionBufferState.State state = (TopicTransactionBufferState.State) field1.get(buffer); + Assert.assertEquals(state, TopicTransactionBufferState.State.NoSnapshot); + }); + Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L); + + buffer.syncMaxReadPositionForNormalPublish(new PositionImpl(1, 1)); + Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L); + + } } \ No newline at end of file
