This is an automated email from the ASF dual-hosted git repository.

daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 825e997216d [fix] [broker] Subscription stuck due to called Admin API 
analyzeSubscriptionBacklog (#22019)
825e997216d is described below

commit 825e997216dabe23a6dde0945ef769bbda0558e4
Author: fengyubiao <[email protected]>
AuthorDate: Mon Feb 19 00:04:10 2024 +0800

    [fix] [broker] Subscription stuck due to called Admin API 
analyzeSubscriptionBacklog (#22019)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 29 +++++++++++++++++++--
 .../service/persistent/PersistentSubscription.java | 30 +++++++++++++++++++---
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 29 +++++++++++++++++++++
 .../admin/AnalyzeBacklogSubscriptionTest.java      | 18 ++++++-------
 .../common/util/collections/BitSetRecyclable.java  |  8 ++++++
 5 files changed, 99 insertions(+), 15 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 8555753d98b..0b9a9c3e9fc 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -196,11 +196,11 @@ public class ManagedCursorImpl implements ManagedCursor {
         position.ackSet = null;
         return position;
     };
-    private final RangeSetWrapper<PositionImpl> individualDeletedMessages;
+    protected final RangeSetWrapper<PositionImpl> individualDeletedMessages;
 
     // Maintain the deletion status for batch messages
     // (ledgerId, entryId) -> deletion indexes
-    private final ConcurrentSkipListMap<PositionImpl, BitSetRecyclable> 
batchDeletedIndexes;
+    protected final ConcurrentSkipListMap<PositionImpl, BitSetRecyclable> 
batchDeletedIndexes;
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
     private RateLimiter markDeleteLimiter;
@@ -3622,4 +3622,29 @@ public class ManagedCursorImpl implements ManagedCursor {
     public ManagedLedgerConfig getConfig() {
         return config;
     }
+
+    /***
+     * Create a non-durable cursor and copy the ack stats.
+     */
+    public ManagedCursor duplicateNonDurableCursor(String 
nonDurableCursorName) throws ManagedLedgerException {
+        NonDurableCursorImpl newNonDurableCursor =
+                (NonDurableCursorImpl) 
ledger.newNonDurableCursor(getMarkDeletedPosition(), nonDurableCursorName);
+        if (individualDeletedMessages != null) {
+            this.individualDeletedMessages.forEach(range -> {
+                newNonDurableCursor.individualDeletedMessages.addOpenClosed(
+                        range.lowerEndpoint().getLedgerId(),
+                        range.lowerEndpoint().getEntryId(),
+                        range.upperEndpoint().getLedgerId(),
+                        range.upperEndpoint().getEntryId());
+                return true;
+            });
+        }
+        if (batchDeletedIndexes != null) {
+            for (Map.Entry<PositionImpl, BitSetRecyclable> entry : 
this.batchDeletedIndexes.entrySet()) {
+                BitSetRecyclable copiedBitSet = 
BitSetRecyclable.valueOf(entry.getValue());
+                newNonDurableCursor.batchDeletedIndexes.put(entry.getKey(), 
copiedBitSet);
+            }
+        }
+        return newNonDurableCursor;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 1baa4087e55..a01904d86f3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.TreeMap;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -530,9 +531,15 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
         return "Null";
     }
 
-    @Override
     public CompletableFuture<AnalyzeBacklogResult> 
analyzeBacklog(Optional<Position> position) {
-
+        final ManagedLedger managedLedger = topic.getManagedLedger();
+        final String newNonDurableCursorName = "analyze-backlog-" + 
UUID.randomUUID();
+        ManagedCursor newNonDurableCursor;
+        try {
+            newNonDurableCursor = ((ManagedCursorImpl) 
cursor).duplicateNonDurableCursor(newNonDurableCursorName);
+        } catch (ManagedLedgerException e) {
+            return CompletableFuture.failedFuture(e);
+        }
         long start = System.currentTimeMillis();
         if (log.isDebugEnabled()) {
             log.debug("[{}][{}] Starting to analyze backlog", topicName, 
subName);
@@ -547,7 +554,7 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
         AtomicLong rejectedMessages = new AtomicLong();
         AtomicLong rescheduledMessages = new AtomicLong();
 
-        Position currentPosition = cursor.getMarkDeletedPosition();
+        Position currentPosition = 
newNonDurableCursor.getMarkDeletedPosition();
 
         if (log.isDebugEnabled()) {
             log.debug("[{}][{}] currentPosition {}",
@@ -607,7 +614,7 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
 
             return true;
         };
-        return cursor.scan(
+        CompletableFuture<AnalyzeBacklogResult> res = newNonDurableCursor.scan(
                 position,
                 condition,
                 batchSize,
@@ -634,7 +641,22 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
                     topicName, subName, end - start, result);
             return result;
         });
+        res.whenComplete((__, ex) -> {
+            managedLedger.asyncDeleteCursor(newNonDurableCursorName,
+                new AsyncCallbacks.DeleteCursorCallback(){
+                    @Override
+                    public void deleteCursorComplete(Object ctx) {
+                        // Nothing to do.
+                    }
 
+                    @Override
+                    public void deleteCursorFailed(ManagedLedgerException 
exception, Object ctx) {
+                        log.warn("[{}][{}] Delete non-durable cursor[{}] 
failed when analyze backlog.",
+                                topicName, subName, 
newNonDurableCursor.getName());
+                    }
+                }, null);
+        });
+        return res;
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index f0bc80fa364..bbcae37c4e2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -3389,4 +3389,33 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         // cleanup.
         admin.namespaces().deleteNamespace(ns);
     }
+
+    @Test
+    private void testAnalyzeSubscriptionBacklogNotCauseStuck() throws 
Exception {
+        final String topic = BrokerTestUtil.newUniqueName("persistent://" + 
defaultNamespace + "/tp");
+        final String subscription = "s1";
+        admin.topics().createNonPartitionedTopic(topic);
+        // Send 10 messages.
+        Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(subscription)
+                .receiverQueueSize(0).subscribe();
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+        for (int i = 0; i < 10; i++) {
+            producer.send(i + "");
+        }
+
+        // Verify consumer can receive all messages after calling 
"analyzeSubscriptionBacklog".
+        admin.topics().analyzeSubscriptionBacklog(topic, subscription, 
Optional.of(MessageIdImpl.earliest));
+        for (int i = 0; i < 10; i++) {
+            Awaitility.await().untilAsserted(() -> {
+                Message m = consumer.receive();
+                assertNotNull(m);
+                consumer.acknowledge(m);
+            });
+        }
+
+        // cleanup.
+        consumer.close();
+        producer.close();
+        admin.topics().delete(topic);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
index 64b2a58ab86..f8aa3dc355d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
@@ -154,17 +154,17 @@ public class AnalyzeBacklogSubscriptionTest extends 
ProducerConsumerBase {
         AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklogResult
                 = admin.topics().analyzeSubscriptionBacklog(topic, 
subscription, Optional.empty());
 
-        assertEquals(numEntries, 
analyzeSubscriptionBacklogResult.getEntries());
-        assertEquals(numEntries, 
analyzeSubscriptionBacklogResult.getFilterAcceptedEntries());
-        assertEquals(0, 
analyzeSubscriptionBacklogResult.getFilterRejectedEntries());
-        assertEquals(0, 
analyzeSubscriptionBacklogResult.getFilterRescheduledEntries());
-        assertEquals(0, 
analyzeSubscriptionBacklogResult.getFilterRescheduledEntries());
+        assertEquals(analyzeSubscriptionBacklogResult.getEntries(), 
numEntries);
+        
assertEquals(analyzeSubscriptionBacklogResult.getFilterAcceptedEntries(), 
numEntries);
+        
assertEquals(analyzeSubscriptionBacklogResult.getFilterRejectedEntries(), 0);
+        
assertEquals(analyzeSubscriptionBacklogResult.getFilterRescheduledEntries(), 0);
+        
assertEquals(analyzeSubscriptionBacklogResult.getFilterRescheduledEntries(), 0);
 
-        assertEquals(numMessages, 
analyzeSubscriptionBacklogResult.getMessages());
-        assertEquals(numMessages, 
analyzeSubscriptionBacklogResult.getFilterAcceptedMessages());
-        assertEquals(0, 
analyzeSubscriptionBacklogResult.getFilterRejectedMessages());
+        assertEquals(analyzeSubscriptionBacklogResult.getMessages(), 
numMessages);
+        
assertEquals(analyzeSubscriptionBacklogResult.getFilterAcceptedMessages(), 
numMessages);
+        
assertEquals(analyzeSubscriptionBacklogResult.getFilterRejectedMessages(), 0);
 
-        assertEquals(0, 
analyzeSubscriptionBacklogResult.getFilterRescheduledMessages());
+        
assertEquals(analyzeSubscriptionBacklogResult.getFilterRescheduledMessages(), 
0);
         assertFalse(analyzeSubscriptionBacklogResult.isAborted());
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/BitSetRecyclable.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/BitSetRecyclable.java
index 12ce7eb74c7..b801d5f2b05 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/BitSetRecyclable.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/BitSetRecyclable.java
@@ -216,6 +216,14 @@ public class BitSetRecyclable implements Cloneable, 
java.io.Serializable {
         return BitSetRecyclable.valueOf(ByteBuffer.wrap(bytes));
     }
 
+    /**
+     * Copy a BitSetRecyclable.
+     */
+    public static BitSetRecyclable valueOf(BitSetRecyclable src) {
+        // The internal implementation will do the array-copy.
+        return valueOf(src.words);
+    }
+
     /**
      * Returns a new bit set containing all the bits in the given byte
      * buffer between its position and limit.

Reply via email to