This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 08d8e95 [Transaction]Stop TB recovering with exception (#12636)
08d8e95 is described below
commit 08d8e95bef423cdce0dc2660377b5e2a8d371d88
Author: Xiangying Meng <[email protected]>
AuthorDate: Mon Nov 8 17:26:39 2021 +0800
[Transaction]Stop TB recovering with exception (#12636)
### Motivation
When TransactionBuffer recoversing, if any ledger was deleted from
bookkeeper, or ManagerLedger was fenced, TtransactionBuffer will not stop
recovering and continue to report the exception.
### Modifications
End recovering when there is no ledger to read or the managerLedger is
fenced.
---
.../buffer/impl/TopicTransactionBuffer.java | 14 +++--
.../pulsar/broker/transaction/TransactionTest.java | 59 ++++++++++++++++++++++
2 files changed, 70 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index fa46e65..ac081f1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -580,8 +580,8 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
FillEntryQueueCallback fillEntryQueueCallback = new
FillEntryQueueCallback(entryQueue, managedCursor,
TopicTransactionBufferRecover.this);
if (lastConfirmedEntry.getEntryId() != -1) {
- while (lastConfirmedEntry.compareTo(currentLoadPosition) >
0) {
- fillEntryQueueCallback.fillQueue();
+ while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0
+ && fillEntryQueueCallback.fillQueue()) {
Entry entry = entryQueue.poll();
if (entry != null) {
try {
@@ -639,19 +639,22 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
private final TopicTransactionBufferRecover recover;
+ private volatile boolean isReadable = true;
+
private FillEntryQueueCallback(SpscArrayQueue<Entry> entryQueue,
ManagedCursor cursor,
TopicTransactionBufferRecover recover) {
this.entryQueue = entryQueue;
this.cursor = cursor;
this.recover = recover;
}
- void fillQueue() {
+ boolean fillQueue() {
if (entryQueue.size() < entryQueue.capacity() &&
outstandingReadsRequests.get() == 0) {
if (cursor.hasMoreEntries()) {
outstandingReadsRequests.incrementAndGet();
cursor.asyncReadEntries(100, this, System.nanoTime(),
PositionImpl.latest);
}
}
+ return isReadable;
}
@Override
@@ -671,6 +674,11 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
@Override
public void readEntriesFailed(ManagedLedgerException exception, Object
ctx) {
+ if
(recover.topic.getManagedLedger().getConfig().isAutoSkipNonRecoverableData()
+ && exception instanceof
ManagedLedgerException.NonRecoverableLedgerException
+ || exception instanceof
ManagedLedgerException.ManagedLedgerFencedException) {
+ isReadable = false;
+ }
recover.callBackException(exception);
outstandingReadsRequests.decrementAndGet();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index e852a4f..bc8bef9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -20,6 +20,12 @@ package org.apache.pulsar.broker.transaction;
import static java.nio.charset.StandardCharsets.UTF_8;
import static
org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
import com.google.common.collect.Sets;
import io.netty.buffer.Unpooled;
import java.lang.reflect.Field;
@@ -30,8 +36,11 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -39,6 +48,9 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
+import
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
@@ -423,4 +435,51 @@ public class TransactionTest extends TransactionTestBase {
Assert.assertEquals(position5.getEntryId(), messageId4.getEntryId());
}
+
+ @Test
+ public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws
Exception{
+ String topic = NAMESPACE1 +
"/testEndTBRecoveringWhenManagerLedgerDisReadable";
+ admin.topics().createNonPartitionedTopic(topic);
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .producerName("test")
+ .enableBatching(false)
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .topic(topic)
+ .create();
+ producer.newMessage().send();
+ PersistentTopic persistentTopic = (PersistentTopic)
getPulsarServiceList().get(0).getBrokerService()
+ .getTopic(topic, false).get().get();
+
persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
+
+ ManagedCursor managedCursor = mock(ManagedCursor.class);
+ doReturn("transaction-buffer-sub").when(managedCursor).getName();
+ doReturn(true).when(managedCursor).hasMoreEntries();
+ doAnswer(invocation -> {
+ AsyncCallbacks.ReadEntriesCallback callback =
invocation.getArgument(1);
+ callback.readEntriesFailed(new
ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"),
+ null);
+ return null;
+ }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
+ Class<ManagedLedgerImpl> managedLedgerClass = ManagedLedgerImpl.class;
+ Field field = managedLedgerClass.getDeclaredField("cursors");
+ field.setAccessible(true);
+ ManagedCursorContainer managedCursors = (ManagedCursorContainer)
field.get(persistentTopic.getManagedLedger());
+ managedCursors.removeCursor("transaction-buffer-sub");
+ managedCursors.add(managedCursor);
+
+ TransactionBuffer buffer1 = new
TopicTransactionBuffer(persistentTopic);
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
+ assertEquals(buffer1.getStats().state, "Ready"));
+
+ doAnswer(invocation -> {
+ AsyncCallbacks.ReadEntriesCallback callback =
invocation.getArgument(1);
+ callback.readEntriesFailed(new
ManagedLedgerException.ManagedLedgerFencedException(), null);
+ return null;
+ }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
+
+ TransactionBuffer buffer2 = new
TopicTransactionBuffer(persistentTopic);
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
+ assertEquals(buffer2.getStats().state, "Ready"));
+ }
}
\ No newline at end of file