This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 08d8e95  [Transaction]Stop TB recovering with exception (#12636)
08d8e95 is described below

commit 08d8e95bef423cdce0dc2660377b5e2a8d371d88
Author: Xiangying Meng <[email protected]>
AuthorDate: Mon Nov 8 17:26:39 2021 +0800

    [Transaction]Stop TB recovering with exception (#12636)
    
    ### Motivation
    When TransactionBuffer recoversing, if any ledger was deleted from 
bookkeeper, or ManagerLedger was fenced, TtransactionBuffer will not stop 
recovering and continue to report the exception.
    
    ### Modifications
    End recovering when there is no ledger to read or the managerLedger is 
fenced.
---
 .../buffer/impl/TopicTransactionBuffer.java        | 14 +++--
 .../pulsar/broker/transaction/TransactionTest.java | 59 ++++++++++++++++++++++
 2 files changed, 70 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index fa46e65..ac081f1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -580,8 +580,8 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                 FillEntryQueueCallback fillEntryQueueCallback = new 
FillEntryQueueCallback(entryQueue, managedCursor,
                         TopicTransactionBufferRecover.this);
                 if (lastConfirmedEntry.getEntryId() != -1) {
-                    while (lastConfirmedEntry.compareTo(currentLoadPosition) > 
0) {
-                        fillEntryQueueCallback.fillQueue();
+                    while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0
+                            && fillEntryQueueCallback.fillQueue()) {
                         Entry entry = entryQueue.poll();
                         if (entry != null) {
                             try {
@@ -639,19 +639,22 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
 
         private final TopicTransactionBufferRecover recover;
 
+        private volatile boolean isReadable = true;
+
         private FillEntryQueueCallback(SpscArrayQueue<Entry> entryQueue, 
ManagedCursor cursor,
                                        TopicTransactionBufferRecover recover) {
             this.entryQueue = entryQueue;
             this.cursor = cursor;
             this.recover = recover;
         }
-        void fillQueue() {
+        boolean fillQueue() {
             if (entryQueue.size() < entryQueue.capacity() && 
outstandingReadsRequests.get() == 0) {
                 if (cursor.hasMoreEntries()) {
                     outstandingReadsRequests.incrementAndGet();
                     cursor.asyncReadEntries(100, this, System.nanoTime(), 
PositionImpl.latest);
                 }
             }
+            return isReadable;
         }
 
         @Override
@@ -671,6 +674,11 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
 
         @Override
         public void readEntriesFailed(ManagedLedgerException exception, Object 
ctx) {
+            if 
(recover.topic.getManagedLedger().getConfig().isAutoSkipNonRecoverableData()
+                    && exception instanceof 
ManagedLedgerException.NonRecoverableLedgerException
+                    || exception instanceof 
ManagedLedgerException.ManagedLedgerFencedException) {
+                isReadable = false;
+            }
             recover.callBackException(exception);
             outstandingReadsRequests.decrementAndGet();
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index e852a4f..bc8bef9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -20,6 +20,12 @@ package org.apache.pulsar.broker.transaction;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
 import com.google.common.collect.Sets;
 import io.netty.buffer.Unpooled;
 import java.lang.reflect.Field;
@@ -30,8 +36,11 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -39,6 +48,9 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
+import 
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
@@ -423,4 +435,51 @@ public class TransactionTest extends TransactionTestBase {
         Assert.assertEquals(position5.getEntryId(), messageId4.getEntryId());
 
         }
+
+    @Test
+    public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws 
Exception{
+        String topic = NAMESPACE1 + 
"/testEndTBRecoveringWhenManagerLedgerDisReadable";
+        admin.topics().createNonPartitionedTopic(topic);
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .producerName("test")
+                .enableBatching(false)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .topic(topic)
+                .create();
+        producer.newMessage().send();
+        PersistentTopic persistentTopic = (PersistentTopic) 
getPulsarServiceList().get(0).getBrokerService()
+                .getTopic(topic, false).get().get();
+        
persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
+
+        ManagedCursor managedCursor = mock(ManagedCursor.class);
+        doReturn("transaction-buffer-sub").when(managedCursor).getName();
+        doReturn(true).when(managedCursor).hasMoreEntries();
+        doAnswer(invocation -> {
+            AsyncCallbacks.ReadEntriesCallback callback = 
invocation.getArgument(1);
+            callback.readEntriesFailed(new 
ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"),
+                    null);
+            return null;
+        }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
+        Class<ManagedLedgerImpl> managedLedgerClass = ManagedLedgerImpl.class;
+        Field field = managedLedgerClass.getDeclaredField("cursors");
+        field.setAccessible(true);
+        ManagedCursorContainer managedCursors = (ManagedCursorContainer) 
field.get(persistentTopic.getManagedLedger());
+        managedCursors.removeCursor("transaction-buffer-sub");
+        managedCursors.add(managedCursor);
+
+        TransactionBuffer buffer1 = new 
TopicTransactionBuffer(persistentTopic);
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
+                assertEquals(buffer1.getStats().state, "Ready"));
+
+        doAnswer(invocation -> {
+            AsyncCallbacks.ReadEntriesCallback callback = 
invocation.getArgument(1);
+            callback.readEntriesFailed(new 
ManagedLedgerException.ManagedLedgerFencedException(), null);
+            return null;
+        }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
+
+        TransactionBuffer buffer2 = new 
TopicTransactionBuffer(persistentTopic);
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
+                assertEquals(buffer2.getStats().state, "Ready"));
+    }
 }
\ No newline at end of file

Reply via email to