This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ba8ff27d25200b371904161e9162d7c112848066 Author: Jiwei Guo <[email protected]> AuthorDate: Thu Mar 28 06:53:21 2024 +0800 [fix][broker] Check cursor state before adding it to the `waitingCursors` (#22191) (cherry picked from commit b702d440dc5e5a4cfd845bf60d5e310efe665ff5) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 +++++ .../service/persistent/PersistentTopicTest.java | 46 ++++++++++++++++++++++ 3 files changed, 57 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 7fd93dacf49..b5c16317f2b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -987,7 +987,7 @@ public class ManagedCursorImpl implements ManagedCursor { name); } // Let the managed ledger know we want to be notified whenever a new entry is published - ledger.waitingCursors.add(this); + ledger.addWaitingCursor(this); } else { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Skip notification registering since we do have entries available", diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 6727fc63479..73dfc86e1ad 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3804,6 +3804,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { this.waitingCursors.remove(cursor); } + public void addWaitingCursor(ManagedCursorImpl cursor) { + if (cursor instanceof NonDurableCursorImpl) { + if (cursor.isActive()) { + this.waitingCursors.add(cursor); + } + } else { + this.waitingCursors.add(cursor); + } + } + public boolean isCursorActive(ManagedCursor cursor) { return activeCursors.get(cursor.getName()) != null; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 4b4aa5b45d3..55024ca3d7d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -57,10 +57,15 @@ import java.util.function.Supplier; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.stats.PrometheusMetricsTest; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; @@ -74,6 +79,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; @@ -661,4 +667,44 @@ public class PersistentTopicTest extends BrokerTestBase { subscribe.close(); admin.topics().delete(topicName); } + + @Test + public void testAddWaitingCursorsForNonDurable() throws Exception { + final String ns = "prop/ns-test"; + admin.namespaces().createNamespace(ns, 2); + final String topicName = "persistent://prop/ns-test/testAddWaitingCursors"; + admin.topics().createNonPartitionedTopic(topicName); + final Optional<Topic> topic = pulsar.getBrokerService().getTopic(topicName, false).join(); + assertNotNull(topic.get()); + PersistentTopic persistentTopic = (PersistentTopic) topic.get(); + ManagedLedgerImpl ledger = (ManagedLedgerImpl)persistentTopic.getManagedLedger(); + final ManagedCursor spyCursor= spy(ledger.newNonDurableCursor(PositionImpl.LATEST, "sub-2")); + doAnswer((invocation) -> { + Thread.sleep(10_000); + invocation.callRealMethod(); + return null; + }).when(spyCursor).asyncReadEntriesOrWait(any(int.class), any(long.class), + any(AsyncCallbacks.ReadEntriesCallback.class), any(Object.class), any(PositionImpl.class)); + Field cursorField = ManagedLedgerImpl.class.getDeclaredField("cursors"); + cursorField.setAccessible(true); + ManagedCursorContainer container = (ManagedCursorContainer) cursorField.get(ledger); + container.removeCursor("sub-2"); + container.add(spyCursor, null); + final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName) + .subscriptionMode(SubscriptionMode.NonDurable) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName("sub-2").subscribe(); + final Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + producer.send("test"); + producer.close(); + final Message<String> receive = consumer.receive(); + assertEquals("test", receive.getValue()); + consumer.close(); + Awaitility.await() + .pollDelay(5, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .untilAsserted(() -> { + assertEquals(ledger.getWaitingCursorsCount(), 0); + }); + } }
