This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 57a616eaa79 [fix][broker] Fix consumer stops receiving messages when
with large backlogs processing (#22454)
57a616eaa79 is described below
commit 57a616eaa79096af5b49db89c99cd39ccc94ec00
Author: Jiwei Guo <[email protected]>
AuthorDate: Mon Apr 8 18:22:05 2024 +0800
[fix][broker] Fix consumer stops receiving messages when with large
backlogs processing (#22454)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 5 ++
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 9 +---
.../service/persistent/PersistentSubscription.java | 4 +-
.../service/persistent/PersistentTopicTest.java | 56 +++++++++++++++++++++-
4 files changed, 65 insertions(+), 9 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 4daa06cad57..69b130a98c8 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -990,6 +990,11 @@ public class ManagedCursorImpl implements ManagedCursor {
log.debug("[{}] [{}] Re-trying the read at position {}",
ledger.getName(), name, op.readPosition);
}
+ if (isClosed()) {
+ callback.readEntriesFailed(new
CursorAlreadyClosedException("Cursor was already closed"), ctx);
+ return;
+ }
+
if (!hasMoreEntries()) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Still no entries available. Register
for notification", ledger.getName(),
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 3a12cb2ad6c..698563ed7a1 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1032,6 +1032,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
+ consumerName), ctx);
return;
} else if (!cursor.isDurable()) {
+ cursor.setState(ManagedCursorImpl.State.Closed);
cursors.removeCursor(consumerName);
deactivateCursorByName(consumerName);
callback.deleteCursorComplete(ctx);
@@ -3814,13 +3815,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
public void addWaitingCursor(ManagedCursorImpl cursor) {
- if (cursor instanceof NonDurableCursorImpl) {
- if (cursor.isActive()) {
- this.waitingCursors.add(cursor);
- }
- } else {
- this.waitingCursors.add(cursor);
- }
+ this.waitingCursors.add(cursor);
}
public boolean isCursorActive(ManagedCursor cursor) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 6e8e94baeae..dbbf92aa76d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -308,7 +308,6 @@ public class PersistentSubscription extends
AbstractSubscription {
if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
deactivateCursor();
- topic.getManagedLedger().removeWaitingCursor(cursor);
if (!cursor.isDurable()) {
// If cursor is not durable, we need to clean up the
subscription as well. No need to check for active
@@ -338,11 +337,14 @@ public class PersistentSubscription extends
AbstractSubscription {
if (!isResetCursor) {
try {
topic.getManagedLedger().deleteCursor(cursor.getName());
+
topic.getManagedLedger().removeWaitingCursor(cursor);
} catch (InterruptedException | ManagedLedgerException
e) {
log.warn("[{}] [{}] Failed to remove non durable
cursor", topic.getName(), subName, e);
}
}
});
+ } else {
+ topic.getManagedLedger().removeWaitingCursor(cursor);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index d42b1d92007..c214634e6ed 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -80,6 +80,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -113,6 +114,11 @@ public class PersistentTopicTest extends BrokerTestBase {
super.internalCleanup();
}
+ @Override protected void doInitConf() throws Exception {
+ super.doInitConf();
+ this.conf.setManagedLedgerCursorBackloggedThreshold(10);
+ }
+
/**
* Test validates that broker cleans up topic which failed to unload while
bundle unloading.
*
@@ -681,7 +687,7 @@ public class PersistentTopicTest extends BrokerTestBase {
ManagedLedgerImpl ledger =
(ManagedLedgerImpl)persistentTopic.getManagedLedger();
final ManagedCursor spyCursor=
spy(ledger.newNonDurableCursor(PositionImpl.LATEST, "sub-2"));
doAnswer((invocation) -> {
- Thread.sleep(10_000);
+ Thread.sleep(5_000);
invocation.callRealMethod();
return null;
}).when(spyCursor).asyncReadEntriesOrWait(any(int.class),
any(long.class),
@@ -708,4 +714,52 @@ public class PersistentTopicTest extends BrokerTestBase {
assertEquals(ledger.getWaitingCursorsCount(), 0);
});
}
+
+ @Test
+ public void testAddWaitingCursorsForNonDurable2() throws Exception {
+ final String ns = "prop/ns-test";
+ admin.namespaces().createNamespace(ns, 2);
+ final String topicName =
"persistent://prop/ns-test/testAddWaitingCursors2";
+ admin.topics().createNonPartitionedTopic(topicName);
+ pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+ .subscriptionMode(SubscriptionMode.Durable)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName("sub-1").subscribe().close();
+ @Cleanup
+ final Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(topicName).create();
+ for (int i = 0; i < 100; i ++) {
+ producer.sendAsync("test-" + i);
+ }
+ @Cleanup
+ final Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+ .subscriptionMode(SubscriptionMode.NonDurable)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionName("sub-2").subscribe();
+ int count = 0;
+ while(true) {
+ final Message<String> msg = consumer.receive(3, TimeUnit.SECONDS);
+ if (msg != null) {
+ consumer.acknowledge(msg);
+ count++;
+ } else {
+ break;
+ }
+ }
+ Assert.assertEquals(count, 100);
+ Thread.sleep(3_000);
+ for (int i = 0; i < 100; i ++) {
+ producer.sendAsync("test-" + i);
+ }
+ while(true) {
+ final Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+ if (msg != null) {
+ consumer.acknowledge(msg);
+ count++;
+ } else {
+ break;
+ }
+ }
+ Assert.assertEquals(count, 200);
+ }
}