This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b702d440dc5 [fix][broker] Check cursor state before adding it to the
`waitingCursors` (#22191)
b702d440dc5 is described below
commit b702d440dc5e5a4cfd845bf60d5e310efe665ff5
Author: Jiwei Guo <[email protected]>
AuthorDate: Thu Mar 28 06:53:21 2024 +0800
[fix][broker] Check cursor state before adding it to the `waitingCursors`
(#22191)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +-
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 +++++
.../service/persistent/PersistentTopicTest.java | 46 ++++++++++++++++++++++
3 files changed, 57 insertions(+), 1 deletion(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 8b13fc0f342..b253da72fa9 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -992,7 +992,7 @@ public class ManagedCursorImpl implements ManagedCursor {
name);
}
// Let the managed ledger know we want to be notified whenever
a new entry is published
- ledger.waitingCursors.add(this);
+ ledger.addWaitingCursor(this);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skip notification registering since
we do have entries available",
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 1c0a0465507..0f089ef4a85 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3813,6 +3813,16 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
this.waitingCursors.remove(cursor);
}
+ public void addWaitingCursor(ManagedCursorImpl cursor) {
+ if (cursor instanceof NonDurableCursorImpl) {
+ if (cursor.isActive()) {
+ this.waitingCursors.add(cursor);
+ }
+ } else {
+ this.waitingCursors.add(cursor);
+ }
+ }
+
public boolean isCursorActive(ManagedCursor cursor) {
return activeCursors.get(cursor.getName()) != null;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index ea1a68bb0c2..d42b1d92007 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -59,10 +59,15 @@ import java.util.function.Supplier;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -75,6 +80,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
@@ -662,4 +668,44 @@ public class PersistentTopicTest extends BrokerTestBase {
subscribe.close();
admin.topics().delete(topicName);
}
+
+ @Test
+ public void testAddWaitingCursorsForNonDurable() throws Exception {
+ final String ns = "prop/ns-test";
+ admin.namespaces().createNamespace(ns, 2);
+ final String topicName =
"persistent://prop/ns-test/testAddWaitingCursors";
+ admin.topics().createNonPartitionedTopic(topicName);
+ final Optional<Topic> topic =
pulsar.getBrokerService().getTopic(topicName, false).join();
+ assertNotNull(topic.get());
+ PersistentTopic persistentTopic = (PersistentTopic) topic.get();
+ ManagedLedgerImpl ledger =
(ManagedLedgerImpl)persistentTopic.getManagedLedger();
+ final ManagedCursor spyCursor=
spy(ledger.newNonDurableCursor(PositionImpl.LATEST, "sub-2"));
+ doAnswer((invocation) -> {
+ Thread.sleep(10_000);
+ invocation.callRealMethod();
+ return null;
+ }).when(spyCursor).asyncReadEntriesOrWait(any(int.class),
any(long.class),
+ any(AsyncCallbacks.ReadEntriesCallback.class),
any(Object.class), any(PositionImpl.class));
+ Field cursorField =
ManagedLedgerImpl.class.getDeclaredField("cursors");
+ cursorField.setAccessible(true);
+ ManagedCursorContainer container = (ManagedCursorContainer)
cursorField.get(ledger);
+ container.removeCursor("sub-2");
+ container.add(spyCursor, null);
+ final Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+ .subscriptionMode(SubscriptionMode.NonDurable)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionName("sub-2").subscribe();
+ final Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+ producer.send("test");
+ producer.close();
+ final Message<String> receive = consumer.receive();
+ assertEquals("test", receive.getValue());
+ consumer.close();
+ Awaitility.await()
+ .pollDelay(5, TimeUnit.SECONDS)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .untilAsserted(() -> {
+ assertEquals(ledger.getWaitingCursorsCount(), 0);
+ });
+ }
}