This is an automated email from the ASF dual-hosted git repository.
daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 825e997216d [fix] [broker] Subscription stuck due to called Admin API
analyzeSubscriptionBacklog (#22019)
825e997216d is described below
commit 825e997216dabe23a6dde0945ef769bbda0558e4
Author: fengyubiao <[email protected]>
AuthorDate: Mon Feb 19 00:04:10 2024 +0800
[fix] [broker] Subscription stuck due to called Admin API
analyzeSubscriptionBacklog (#22019)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 29 +++++++++++++++++++--
.../service/persistent/PersistentSubscription.java | 30 +++++++++++++++++++---
.../apache/pulsar/broker/admin/AdminApi2Test.java | 29 +++++++++++++++++++++
.../admin/AnalyzeBacklogSubscriptionTest.java | 18 ++++++-------
.../common/util/collections/BitSetRecyclable.java | 8 ++++++
5 files changed, 99 insertions(+), 15 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 8555753d98b..0b9a9c3e9fc 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -196,11 +196,11 @@ public class ManagedCursorImpl implements ManagedCursor {
position.ackSet = null;
return position;
};
- private final RangeSetWrapper<PositionImpl> individualDeletedMessages;
+ protected final RangeSetWrapper<PositionImpl> individualDeletedMessages;
// Maintain the deletion status for batch messages
// (ledgerId, entryId) -> deletion indexes
- private final ConcurrentSkipListMap<PositionImpl, BitSetRecyclable>
batchDeletedIndexes;
+ protected final ConcurrentSkipListMap<PositionImpl, BitSetRecyclable>
batchDeletedIndexes;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private RateLimiter markDeleteLimiter;
@@ -3622,4 +3622,29 @@ public class ManagedCursorImpl implements ManagedCursor {
public ManagedLedgerConfig getConfig() {
return config;
}
+
+ /***
+ * Create a non-durable cursor and copy the ack stats.
+ */
+ public ManagedCursor duplicateNonDurableCursor(String
nonDurableCursorName) throws ManagedLedgerException {
+ NonDurableCursorImpl newNonDurableCursor =
+ (NonDurableCursorImpl)
ledger.newNonDurableCursor(getMarkDeletedPosition(), nonDurableCursorName);
+ if (individualDeletedMessages != null) {
+ this.individualDeletedMessages.forEach(range -> {
+ newNonDurableCursor.individualDeletedMessages.addOpenClosed(
+ range.lowerEndpoint().getLedgerId(),
+ range.lowerEndpoint().getEntryId(),
+ range.upperEndpoint().getLedgerId(),
+ range.upperEndpoint().getEntryId());
+ return true;
+ });
+ }
+ if (batchDeletedIndexes != null) {
+ for (Map.Entry<PositionImpl, BitSetRecyclable> entry :
this.batchDeletedIndexes.entrySet()) {
+ BitSetRecyclable copiedBitSet =
BitSetRecyclable.valueOf(entry.getValue());
+ newNonDurableCursor.batchDeletedIndexes.put(entry.getKey(),
copiedBitSet);
+ }
+ }
+ return newNonDurableCursor;
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 1baa4087e55..a01904d86f3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -530,9 +531,15 @@ public class PersistentSubscription extends
AbstractSubscription implements Subs
return "Null";
}
- @Override
public CompletableFuture<AnalyzeBacklogResult>
analyzeBacklog(Optional<Position> position) {
-
+ final ManagedLedger managedLedger = topic.getManagedLedger();
+ final String newNonDurableCursorName = "analyze-backlog-" +
UUID.randomUUID();
+ ManagedCursor newNonDurableCursor;
+ try {
+ newNonDurableCursor = ((ManagedCursorImpl)
cursor).duplicateNonDurableCursor(newNonDurableCursorName);
+ } catch (ManagedLedgerException e) {
+ return CompletableFuture.failedFuture(e);
+ }
long start = System.currentTimeMillis();
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Starting to analyze backlog", topicName,
subName);
@@ -547,7 +554,7 @@ public class PersistentSubscription extends
AbstractSubscription implements Subs
AtomicLong rejectedMessages = new AtomicLong();
AtomicLong rescheduledMessages = new AtomicLong();
- Position currentPosition = cursor.getMarkDeletedPosition();
+ Position currentPosition =
newNonDurableCursor.getMarkDeletedPosition();
if (log.isDebugEnabled()) {
log.debug("[{}][{}] currentPosition {}",
@@ -607,7 +614,7 @@ public class PersistentSubscription extends
AbstractSubscription implements Subs
return true;
};
- return cursor.scan(
+ CompletableFuture<AnalyzeBacklogResult> res = newNonDurableCursor.scan(
position,
condition,
batchSize,
@@ -634,7 +641,22 @@ public class PersistentSubscription extends
AbstractSubscription implements Subs
topicName, subName, end - start, result);
return result;
});
+ res.whenComplete((__, ex) -> {
+ managedLedger.asyncDeleteCursor(newNonDurableCursorName,
+ new AsyncCallbacks.DeleteCursorCallback(){
+ @Override
+ public void deleteCursorComplete(Object ctx) {
+ // Nothing to do.
+ }
+ @Override
+ public void deleteCursorFailed(ManagedLedgerException
exception, Object ctx) {
+ log.warn("[{}][{}] Delete non-durable cursor[{}]
failed when analyze backlog.",
+ topicName, subName,
newNonDurableCursor.getName());
+ }
+ }, null);
+ });
+ return res;
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index f0bc80fa364..bbcae37c4e2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -3389,4 +3389,33 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
// cleanup.
admin.namespaces().deleteNamespace(ns);
}
+
+ @Test
+ private void testAnalyzeSubscriptionBacklogNotCauseStuck() throws
Exception {
+ final String topic = BrokerTestUtil.newUniqueName("persistent://" +
defaultNamespace + "/tp");
+ final String subscription = "s1";
+ admin.topics().createNonPartitionedTopic(topic);
+ // Send 10 messages.
+ Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(subscription)
+ .receiverQueueSize(0).subscribe();
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+ for (int i = 0; i < 10; i++) {
+ producer.send(i + "");
+ }
+
+ // Verify consumer can receive all messages after calling
"analyzeSubscriptionBacklog".
+ admin.topics().analyzeSubscriptionBacklog(topic, subscription,
Optional.of(MessageIdImpl.earliest));
+ for (int i = 0; i < 10; i++) {
+ Awaitility.await().untilAsserted(() -> {
+ Message m = consumer.receive();
+ assertNotNull(m);
+ consumer.acknowledge(m);
+ });
+ }
+
+ // cleanup.
+ consumer.close();
+ producer.close();
+ admin.topics().delete(topic);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
index 64b2a58ab86..f8aa3dc355d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
@@ -154,17 +154,17 @@ public class AnalyzeBacklogSubscriptionTest extends
ProducerConsumerBase {
AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklogResult
= admin.topics().analyzeSubscriptionBacklog(topic,
subscription, Optional.empty());
- assertEquals(numEntries,
analyzeSubscriptionBacklogResult.getEntries());
- assertEquals(numEntries,
analyzeSubscriptionBacklogResult.getFilterAcceptedEntries());
- assertEquals(0,
analyzeSubscriptionBacklogResult.getFilterRejectedEntries());
- assertEquals(0,
analyzeSubscriptionBacklogResult.getFilterRescheduledEntries());
- assertEquals(0,
analyzeSubscriptionBacklogResult.getFilterRescheduledEntries());
+ assertEquals(analyzeSubscriptionBacklogResult.getEntries(),
numEntries);
+
assertEquals(analyzeSubscriptionBacklogResult.getFilterAcceptedEntries(),
numEntries);
+
assertEquals(analyzeSubscriptionBacklogResult.getFilterRejectedEntries(), 0);
+
assertEquals(analyzeSubscriptionBacklogResult.getFilterRescheduledEntries(), 0);
+
assertEquals(analyzeSubscriptionBacklogResult.getFilterRescheduledEntries(), 0);
- assertEquals(numMessages,
analyzeSubscriptionBacklogResult.getMessages());
- assertEquals(numMessages,
analyzeSubscriptionBacklogResult.getFilterAcceptedMessages());
- assertEquals(0,
analyzeSubscriptionBacklogResult.getFilterRejectedMessages());
+ assertEquals(analyzeSubscriptionBacklogResult.getMessages(),
numMessages);
+
assertEquals(analyzeSubscriptionBacklogResult.getFilterAcceptedMessages(),
numMessages);
+
assertEquals(analyzeSubscriptionBacklogResult.getFilterRejectedMessages(), 0);
- assertEquals(0,
analyzeSubscriptionBacklogResult.getFilterRescheduledMessages());
+
assertEquals(analyzeSubscriptionBacklogResult.getFilterRescheduledMessages(),
0);
assertFalse(analyzeSubscriptionBacklogResult.isAborted());
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/BitSetRecyclable.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/BitSetRecyclable.java
index 12ce7eb74c7..b801d5f2b05 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/BitSetRecyclable.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/BitSetRecyclable.java
@@ -216,6 +216,14 @@ public class BitSetRecyclable implements Cloneable,
java.io.Serializable {
return BitSetRecyclable.valueOf(ByteBuffer.wrap(bytes));
}
+ /**
+ * Copy a BitSetRecyclable.
+ */
+ public static BitSetRecyclable valueOf(BitSetRecyclable src) {
+ // The internal implementation will do the array-copy.
+ return valueOf(src.words);
+ }
+
/**
* Returns a new bit set containing all the bits in the given byte
* buffer between its position and limit.