This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7dee63e [Transaction] Fix recover of TransactionBuffer (#13739)
7dee63e is described below
commit 7dee63ed707c8784989692b764b058bc8ea7f4ac
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Jan 18 08:46:45 2022 +0800
[Transaction] Fix recover of TransactionBuffer (#13739)
### Motivation
Fix the concurrency issue caused by TransactionBuffer when recovering.
### Modification
1. use TransactionReplayExecutor when thenAcceptAsync in recover::run was
called.
2. delete repeated changeToInitializingState
3. if cursor.hasMoreEntries() == false && entryQueue.size() == 0, return
false and stop recovering.
- If the cursor is cleared when transaction is recovering , there will
no entries can be read, but currentLoadPosition < lastConfirmedEntry.
---
.../buffer/impl/TopicTransactionBuffer.java | 17 +++++++----
.../pulsar/broker/transaction/TransactionTest.java | 33 ++++++++++++++++++++++
2 files changed, 45 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index d6b053c..3c10738 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -96,7 +96,6 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
public TopicTransactionBuffer(PersistentTopic topic) {
super(State.None);
this.topic = topic;
- this.changeToInitializingState();
this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar()
.getTransactionBufferSnapshotService().createWriter(TopicName.get(topic.getName()));
this.timer =
topic.getBrokerService().getPulsar().getTransactionTimer();
@@ -531,7 +530,11 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
@SneakyThrows
@Override
public void run() {
- this.topicTransactionBuffer.changeToInitializingState();
+ if (!this.topicTransactionBuffer.changeToInitializingState()) {
+ log.warn("TransactionBuffer {} of topic {} can not change
state to Initializing",
+ this, topic.getName());
+ return;
+ }
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotService()
.createReader(TopicName.get(topic.getName())).thenAcceptAsync(reader -> {
try {
@@ -579,8 +582,8 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
}
PositionImpl lastConfirmedEntry = (PositionImpl)
topic.getManagedLedger().getLastConfirmedEntry();
PositionImpl currentLoadPosition = (PositionImpl)
this.startReadCursorPosition;
- FillEntryQueueCallback fillEntryQueueCallback = new
FillEntryQueueCallback(entryQueue, managedCursor,
- TopicTransactionBufferRecover.this);
+ FillEntryQueueCallback fillEntryQueueCallback = new
FillEntryQueueCallback(entryQueue,
+ managedCursor, TopicTransactionBufferRecover.this);
if (lastConfirmedEntry.getEntryId() != -1) {
while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0
&& fillEntryQueueCallback.fillQueue()) {
@@ -604,7 +607,7 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
closeCursor(managedCursor);
callBack.recoverComplete();
- }).exceptionally(e -> {
+ },
topic.getBrokerService().getPulsar().getTransactionReplayExecutor()).exceptionally(e
-> {
callBack.recoverExceptionally(new Exception(e));
log.error("[{}]Transaction buffer new snapshot reader fail!",
topic.getName(), e);
return null;
@@ -654,6 +657,10 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
if (cursor.hasMoreEntries()) {
outstandingReadsRequests.incrementAndGet();
cursor.asyncReadEntries(100, this, System.nanoTime(),
PositionImpl.LATEST);
+ } else {
+ if (entryQueue.size() == 0) {
+ isReadable = false;
+ }
}
}
return isReadable;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 0010a94..8af03b1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
@@ -34,6 +35,7 @@ import static org.testng.Assert.fail;
import io.netty.buffer.Unpooled;
import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
@@ -696,4 +698,35 @@ public class TransactionTest extends TransactionTestBase {
commitTxn.commit();
}
+ @Test
+ public void testNoEntryCanBeReadWhenRecovery() throws Exception {
+ String topic = NAMESPACE1 + "/test";
+ PersistentTopic persistentTopic =
+ (PersistentTopic) pulsarServiceList.get(0).getBrokerService()
+ .getTopic(TopicName.get(topic).toString(), true)
+ .get()
+ .get();
+
+ Class<PersistentTopic> persistentTopicClass = PersistentTopic.class;
+ Field filed1 = persistentTopicClass.getDeclaredField("ledger");
+ Field field2 =
persistentTopicClass.getDeclaredField("transactionBuffer");
+ filed1.setAccessible(true);
+ field2.setAccessible(true);
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
spy(filed1.get(persistentTopic));
+ filed1.set(persistentTopic, managedLedger);
+
+ TopicTransactionBuffer topicTransactionBuffer =
(TopicTransactionBuffer) field2.get(persistentTopic);
+ Method method =
TopicTransactionBuffer.class.getDeclaredMethod("takeSnapshot");
+ method.setAccessible(true);
+ CompletableFuture<Void> completableFuture = (CompletableFuture<Void>)
method.invoke(topicTransactionBuffer);
+ completableFuture.get();
+
+
doReturn(PositionImpl.LATEST).when(managedLedger).getLastConfirmedEntry();
+ ManagedCursorImpl managedCursor = mock(ManagedCursorImpl.class);
+ doReturn(false).when(managedCursor).hasMoreEntries();
+ doReturn(managedCursor).when(managedLedger).newNonDurableCursor(any(),
any());
+
+ TopicTransactionBuffer transactionBuffer = new
TopicTransactionBuffer(persistentTopic);
+ Awaitility.await().untilAsserted(() ->
Assert.assertTrue(transactionBuffer.checkIfReady()));
+ }
}
\ No newline at end of file