This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2c73edd27b93f1146615b178396f37826a8803c6 Author: Xiangying Meng <[email protected]> AuthorDate: Tue Jan 18 08:46:45 2022 +0800 [Transaction] Fix recover of TransactionBuffer (#13739) Fix the concurrency issue caused by TransactionBuffer when recovering. 1. use TransactionReplayExecutor when thenAcceptAsync in recover::run was called. 2. delete repeated changeToInitializingState 3. if cursor.hasMoreEntries() == false && entryQueue.size() == 0, return false and stop recovering. - If the cursor is cleared when transaction is recovering , there will no entries can be read, but currentLoadPosition < lastConfirmedEntry. (cherry picked from commit 7dee63ed707c8784989692b764b058bc8ea7f4ac) --- .../buffer/impl/TopicTransactionBuffer.java | 17 +++++++---- .../pulsar/broker/transaction/TransactionTest.java | 33 ++++++++++++++++++++++ 2 files changed, 45 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 9978f6f..9a324fb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -96,7 +96,6 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen public TopicTransactionBuffer(PersistentTopic topic) { super(State.None); this.topic = topic; - this.changeToInitializingState(); this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar() .getTransactionBufferSnapshotService().createWriter(TopicName.get(topic.getName())); this.timer = topic.getBrokerService().getPulsar().getTransactionTimer(); @@ -531,7 +530,11 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen @SneakyThrows @Override public void run() { - this.topicTransactionBuffer.changeToInitializingState(); + if (!this.topicTransactionBuffer.changeToInitializingState()) { + log.warn("TransactionBuffer {} of topic {} can not change state to Initializing", + this, topic.getName()); + return; + } topic.getBrokerService().getPulsar().getTransactionBufferSnapshotService() .createReader(TopicName.get(topic.getName())).thenAcceptAsync(reader -> { try { @@ -579,8 +582,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen } PositionImpl lastConfirmedEntry = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(); PositionImpl currentLoadPosition = (PositionImpl) this.startReadCursorPosition; - FillEntryQueueCallback fillEntryQueueCallback = new FillEntryQueueCallback(entryQueue, managedCursor, - TopicTransactionBufferRecover.this); + FillEntryQueueCallback fillEntryQueueCallback = new FillEntryQueueCallback(entryQueue, + managedCursor, TopicTransactionBufferRecover.this); if (lastConfirmedEntry.getEntryId() != -1) { while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0 && fillEntryQueueCallback.fillQueue()) { @@ -604,7 +607,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen closeCursor(managedCursor); callBack.recoverComplete(); - }).exceptionally(e -> { + }, topic.getBrokerService().getPulsar().getTransactionReplayExecutor()).exceptionally(e -> { callBack.recoverExceptionally(new Exception(e)); log.error("[{}]Transaction buffer new snapshot reader fail!", topic.getName(), e); return null; @@ -654,6 +657,10 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen if (cursor.hasMoreEntries()) { outstandingReadsRequests.incrementAndGet(); cursor.asyncReadEntries(100, this, System.nanoTime(), PositionImpl.latest); + } else { + if (entryQueue.size() == 0) { + isReadable = false; + } } } return isReadable; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 59d2839..6fe99a7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; @@ -34,6 +35,7 @@ import static org.testng.Assert.fail; import io.netty.buffer.Unpooled; import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.util.HashMap; import java.util.Map; import java.util.List; @@ -700,4 +702,35 @@ public class TransactionTest extends TransactionTestBase { commitTxn.commit(); } + @Test + public void testNoEntryCanBeReadWhenRecovery() throws Exception { + String topic = NAMESPACE1 + "/test"; + PersistentTopic persistentTopic = + (PersistentTopic) pulsarServiceList.get(0).getBrokerService() + .getTopic(TopicName.get(topic).toString(), true) + .get() + .get(); + + Class<PersistentTopic> persistentTopicClass = PersistentTopic.class; + Field filed1 = persistentTopicClass.getDeclaredField("ledger"); + Field field2 = persistentTopicClass.getDeclaredField("transactionBuffer"); + filed1.setAccessible(true); + field2.setAccessible(true); + ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) spy(filed1.get(persistentTopic)); + filed1.set(persistentTopic, managedLedger); + + TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) field2.get(persistentTopic); + Method method = TopicTransactionBuffer.class.getDeclaredMethod("takeSnapshot"); + method.setAccessible(true); + CompletableFuture<Void> completableFuture = (CompletableFuture<Void>) method.invoke(topicTransactionBuffer); + completableFuture.get(); + + doReturn(PositionImpl.latest).when(managedLedger).getLastConfirmedEntry(); + ManagedCursorImpl managedCursor = mock(ManagedCursorImpl.class); + doReturn(false).when(managedCursor).hasMoreEntries(); + doReturn(managedCursor).when(managedLedger).newNonDurableCursor(any(), any()); + + TopicTransactionBuffer transactionBuffer = new TopicTransactionBuffer(persistentTopic); + Awaitility.await().untilAsserted(() -> Assert.assertTrue(transactionBuffer.checkIfReady())); + } }
