This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b702d440dc5 [fix][broker] Check cursor state before adding it to the 
`waitingCursors` (#22191)
b702d440dc5 is described below

commit b702d440dc5e5a4cfd845bf60d5e310efe665ff5
Author: Jiwei Guo <[email protected]>
AuthorDate: Thu Mar 28 06:53:21 2024 +0800

    [fix][broker] Check cursor state before adding it to the `waitingCursors` 
(#22191)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 +++++
 .../service/persistent/PersistentTopicTest.java    | 46 ++++++++++++++++++++++
 3 files changed, 57 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 8b13fc0f342..b253da72fa9 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -992,7 +992,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                             name);
                 }
                 // Let the managed ledger know we want to be notified whenever 
a new entry is published
-                ledger.waitingCursors.add(this);
+                ledger.addWaitingCursor(this);
             } else {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] [{}] Skip notification registering since 
we do have entries available",
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 1c0a0465507..0f089ef4a85 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3813,6 +3813,16 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         this.waitingCursors.remove(cursor);
     }
 
+    public void addWaitingCursor(ManagedCursorImpl cursor) {
+        if (cursor instanceof NonDurableCursorImpl) {
+            if (cursor.isActive()) {
+                this.waitingCursors.add(cursor);
+            }
+        } else {
+            this.waitingCursors.add(cursor);
+        }
+    }
+
     public boolean isCursorActive(ManagedCursor cursor) {
         return activeCursors.get(cursor.getName()) != null;
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index ea1a68bb0c2..d42b1d92007 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -59,10 +59,15 @@ import java.util.function.Supplier;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.TopicPoliciesService;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -75,6 +80,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
@@ -662,4 +668,44 @@ public class PersistentTopicTest extends BrokerTestBase {
         subscribe.close();
         admin.topics().delete(topicName);
     }
+
+    @Test
+    public void testAddWaitingCursorsForNonDurable() throws Exception {
+        final String ns = "prop/ns-test";
+        admin.namespaces().createNamespace(ns, 2);
+        final String topicName = 
"persistent://prop/ns-test/testAddWaitingCursors";
+        admin.topics().createNonPartitionedTopic(topicName);
+        final Optional<Topic> topic = 
pulsar.getBrokerService().getTopic(topicName, false).join();
+        assertNotNull(topic.get());
+        PersistentTopic persistentTopic = (PersistentTopic) topic.get();
+        ManagedLedgerImpl ledger = 
(ManagedLedgerImpl)persistentTopic.getManagedLedger();
+        final ManagedCursor spyCursor= 
spy(ledger.newNonDurableCursor(PositionImpl.LATEST, "sub-2"));
+        doAnswer((invocation) -> {
+            Thread.sleep(10_000);
+            invocation.callRealMethod();
+            return null;
+        }).when(spyCursor).asyncReadEntriesOrWait(any(int.class), 
any(long.class),
+                any(AsyncCallbacks.ReadEntriesCallback.class), 
any(Object.class), any(PositionImpl.class));
+        Field cursorField = 
ManagedLedgerImpl.class.getDeclaredField("cursors");
+        cursorField.setAccessible(true);
+        ManagedCursorContainer container = (ManagedCursorContainer) 
cursorField.get(ledger);
+        container.removeCursor("sub-2");
+        container.add(spyCursor, null);
+        final Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+                .subscriptionMode(SubscriptionMode.NonDurable)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName("sub-2").subscribe();
+        final Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+        producer.send("test");
+        producer.close();
+        final Message<String> receive = consumer.receive();
+        assertEquals("test", receive.getValue());
+        consumer.close();
+        Awaitility.await()
+                .pollDelay(5, TimeUnit.SECONDS)
+                .pollInterval(1, TimeUnit.SECONDS)
+                .untilAsserted(() -> {
+                    assertEquals(ledger.getWaitingCursorsCount(), 0);
+        });
+    }
 }

Reply via email to