This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dee63e  [Transaction] Fix recover of TransactionBuffer (#13739)
7dee63e is described below

commit 7dee63ed707c8784989692b764b058bc8ea7f4ac
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Jan 18 08:46:45 2022 +0800

    [Transaction] Fix recover of TransactionBuffer (#13739)
    
    ### Motivation
    Fix the concurrency issue caused by TransactionBuffer when recovering.
    ### Modification
    1. use TransactionReplayExecutor when thenAcceptAsync in recover::run was 
called.
    2. delete repeated changeToInitializingState
    3. if cursor.hasMoreEntries() == false && entryQueue.size() == 0, return 
false and stop recovering.
        -  If the cursor is cleared when transaction is recovering , there will 
no entries  can be read, but currentLoadPosition < lastConfirmedEntry.
---
 .../buffer/impl/TopicTransactionBuffer.java        | 17 +++++++----
 .../pulsar/broker/transaction/TransactionTest.java | 33 ++++++++++++++++++++++
 2 files changed, 45 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index d6b053c..3c10738 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -96,7 +96,6 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
     public TopicTransactionBuffer(PersistentTopic topic) {
         super(State.None);
         this.topic = topic;
-        this.changeToInitializingState();
         this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar()
                 
.getTransactionBufferSnapshotService().createWriter(TopicName.get(topic.getName()));
         this.timer = 
topic.getBrokerService().getPulsar().getTransactionTimer();
@@ -531,7 +530,11 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
         @SneakyThrows
         @Override
         public void run() {
-            this.topicTransactionBuffer.changeToInitializingState();
+            if (!this.topicTransactionBuffer.changeToInitializingState()) {
+                log.warn("TransactionBuffer {} of topic {} can not change 
state to Initializing",
+                        this, topic.getName());
+                return;
+            }
             
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotService()
                     
.createReader(TopicName.get(topic.getName())).thenAcceptAsync(reader -> {
                 try {
@@ -579,8 +582,8 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                 }
                 PositionImpl lastConfirmedEntry = (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
                 PositionImpl currentLoadPosition = (PositionImpl) 
this.startReadCursorPosition;
-                FillEntryQueueCallback fillEntryQueueCallback = new 
FillEntryQueueCallback(entryQueue, managedCursor,
-                        TopicTransactionBufferRecover.this);
+                FillEntryQueueCallback fillEntryQueueCallback = new 
FillEntryQueueCallback(entryQueue,
+                        managedCursor, TopicTransactionBufferRecover.this);
                 if (lastConfirmedEntry.getEntryId() != -1) {
                     while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0
                             && fillEntryQueueCallback.fillQueue()) {
@@ -604,7 +607,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
 
                 closeCursor(managedCursor);
                 callBack.recoverComplete();
-            }).exceptionally(e -> {
+            }, 
topic.getBrokerService().getPulsar().getTransactionReplayExecutor()).exceptionally(e
 -> {
                 callBack.recoverExceptionally(new Exception(e));
                 log.error("[{}]Transaction buffer new snapshot reader fail!", 
topic.getName(), e);
                 return null;
@@ -654,6 +657,10 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                 if (cursor.hasMoreEntries()) {
                     outstandingReadsRequests.incrementAndGet();
                     cursor.asyncReadEntries(100, this, System.nanoTime(), 
PositionImpl.LATEST);
+                } else {
+                    if (entryQueue.size() == 0) {
+                        isReadable = false;
+                    }
                 }
             }
             return isReadable;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 0010a94..8af03b1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
@@ -34,6 +35,7 @@ import static org.testng.Assert.fail;
 
 import io.netty.buffer.Unpooled;
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.List;
@@ -696,4 +698,35 @@ public class TransactionTest extends TransactionTestBase {
         commitTxn.commit();
     }
 
+    @Test
+    public void testNoEntryCanBeReadWhenRecovery() throws Exception {
+        String topic = NAMESPACE1 + "/test";
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsarServiceList.get(0).getBrokerService()
+                        .getTopic(TopicName.get(topic).toString(), true)
+                        .get()
+                        .get();
+
+        Class<PersistentTopic> persistentTopicClass = PersistentTopic.class;
+        Field filed1 = persistentTopicClass.getDeclaredField("ledger");
+        Field field2 = 
persistentTopicClass.getDeclaredField("transactionBuffer");
+        filed1.setAccessible(true);
+        field2.setAccessible(true);
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
spy(filed1.get(persistentTopic));
+        filed1.set(persistentTopic, managedLedger);
+
+        TopicTransactionBuffer topicTransactionBuffer = 
(TopicTransactionBuffer) field2.get(persistentTopic);
+        Method method = 
TopicTransactionBuffer.class.getDeclaredMethod("takeSnapshot");
+        method.setAccessible(true);
+        CompletableFuture<Void> completableFuture = (CompletableFuture<Void>) 
method.invoke(topicTransactionBuffer);
+        completableFuture.get();
+
+        
doReturn(PositionImpl.LATEST).when(managedLedger).getLastConfirmedEntry();
+        ManagedCursorImpl managedCursor = mock(ManagedCursorImpl.class);
+        doReturn(false).when(managedCursor).hasMoreEntries();
+        doReturn(managedCursor).when(managedLedger).newNonDurableCursor(any(), 
any());
+
+        TopicTransactionBuffer transactionBuffer = new 
TopicTransactionBuffer(persistentTopic);
+        Awaitility.await().untilAsserted(() -> 
Assert.assertTrue(transactionBuffer.checkIfReady()));
+    }
 }
\ No newline at end of file

Reply via email to