This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a9938558a8a09414d9adadc520f4c797c750c8a7 Author: gaozhangmin <[email protected]> AuthorDate: Tue Mar 29 01:00:20 2022 +0800 [fix][transaction] Properly close transaction-buffer-sub non durable cursor (#14900) Fixes #14880 ### Motivation Non durable cursor was not closed properly. ### Modifications For non durable cursor, `cursor.asyncClose` did nothing. The proper way is `topic.getManagedLedger().asyncDeleteCursor` (cherry picked from commit 4e62ffc15714cfa49ed441f3ba7ededb866b9062) --- .../transaction/buffer/impl/TopicTransactionBuffer.java | 13 ++++++++----- .../apache/pulsar/broker/transaction/TransactionTest.java | 7 +++++-- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 66ce8f5..e2888d9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -55,6 +55,7 @@ import org.apache.pulsar.common.policies.data.TransactionBufferStats; import org.apache.pulsar.common.policies.data.TransactionInBufferStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Markers; +import org.apache.pulsar.common.util.Codec; import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.SpscArrayQueue; @@ -639,7 +640,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen } } - closeCursor(managedCursor); + closeCursor(SUBSCRIPTION_NAME); callBack.recoverComplete(); }, topic.getBrokerService().getPulsar().getTransactionExecutorProvider() .getExecutor(this)).exceptionally(e -> { @@ -656,17 +657,19 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen }); } - private void closeCursor(ManagedCursor cursor) { - cursor.asyncClose(new AsyncCallbacks.CloseCallback() { + private void closeCursor(String subscriptionName) { + topic.getManagedLedger().asyncDeleteCursor(Codec.encode(subscriptionName), + new AsyncCallbacks.DeleteCursorCallback() { @Override - public void closeComplete(Object ctx) { + public void deleteCursorComplete(Object ctx) { log.info("[{}]Transaction buffer snapshot recover cursor close complete.", topic.getName()); } @Override - public void closeFailed(ManagedLedgerException exception, Object ctx) { + public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { log.error("[{}]Transaction buffer snapshot recover cursor close fail.", topic.getName()); } + }, null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 231c183..ae5e8c4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -538,7 +538,7 @@ public class TransactionTest extends TransactionTestBase { .getTopic("persistent://" + topic, false).get().get(); persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true); - ManagedCursor managedCursor = mock(ManagedCursor.class); + ManagedCursorImpl managedCursor = mock(ManagedCursorImpl.class); doReturn("transaction-buffer-sub").when(managedCursor).getName(); doReturn(true).when(managedCursor).hasMoreEntries(); doAnswer(invocation -> { @@ -579,6 +579,9 @@ public class TransactionTest extends TransactionTestBase { TransactionBuffer buffer3 = new TopicTransactionBuffer(persistentTopic); Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(buffer3.getStats().state, "Ready")); + persistentTopic.getInternalStats(false).thenAccept(internalStats -> { + assertTrue(internalStats.cursors.isEmpty()); + }); managedCursors.removeCursor("transaction-buffer-sub"); } @@ -893,4 +896,4 @@ public class TransactionTest extends TransactionTestBase { pulsarServiceList.forEach((pulsarService -> pulsarService.getConfiguration().setAllowAutoUpdateSchemaEnabled(true))); } -} \ No newline at end of file +}
