This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8f791f0315d7a20e8984cf13ee8487ea2525619e
Author: Oneby Wang <[email protected]>
AuthorDate: Fri Feb 6 18:05:42 2026 +0800

    [improve][admin] Add client side looping to analyze-backlog in Topics to 
avoid potential HTTP call timeout (#25127)
    
    (cherry picked from commit e160b1add39945148150365aefcabbc77f13b759)
---
 .../org/apache/bookkeeper/mledger/impl/OpScan.java |  34 ++--
 .../admin/AnalyzeBacklogSubscriptionTest.java      | 195 +++++++++++++++++++++
 .../org/apache/pulsar/client/admin/Topics.java     | 116 +++++++++++-
 .../pulsar/client/admin/internal/TopicsImpl.java   | 112 ++++++++++++
 4 files changed, 436 insertions(+), 21 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
index 732071ee01a..413bb5c018e 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
@@ -69,29 +69,23 @@ class OpScan implements ReadEntriesCallback {
         try {
             Position lastPositionForBatch = entries.get(entries.size() - 
1).getPosition();
             lastSeenPosition = lastPositionForBatch;
-            // filter out the entry if it has been already deleted
-            // filterReadEntries will call entry.release if the entry is 
filtered out
-            List<Entry> entriesFiltered = 
this.cursor.filterReadEntries(entries);
-            int skippedEntries = entries.size() - entriesFiltered.size();
-            remainingEntries.addAndGet(-skippedEntries);
-            if (!entriesFiltered.isEmpty()) {
-                for (Entry entry : entriesFiltered) {
-                    if (remainingEntries.decrementAndGet() <= 0) {
-                        log.warn("[{}] Scan abort after reading too many 
entries", OpScan.this.cursor);
-                        callback.scanComplete(lastSeenPosition, 
ScanOutcome.ABORTED, OpScan.this.ctx);
-                        return;
-                    }
-                    if (!condition.test(entry)) {
-                        log.warn("[{}] Scan abort due to user code", 
OpScan.this.cursor);
-                        callback.scanComplete(lastSeenPosition, 
ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx);
-                        return;
-                    }
+            for (Entry entry : entries) {
+                if (remainingEntries.getAndDecrement() <= 0) {
+                    log.info("[{}] Scan abort after reading too many entries", 
OpScan.this.cursor);
+                    callback.scanComplete(lastSeenPosition, 
ScanOutcome.ABORTED, OpScan.this.ctx);
+                    return;
+                }
+                if (!condition.test(entry)) {
+                    log.info("[{}] Scan abort due to user code", 
OpScan.this.cursor);
+                    callback.scanComplete(lastSeenPosition, 
ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx);
+                    return;
                 }
             }
             searchPosition = ledger.getPositionAfterN(lastPositionForBatch, 1,
                     PositionBound.startExcluded);
             if (log.isDebugEnabled()) {
-                log.debug("readEntryComplete {} at {} next is {}", 
lastPositionForBatch, searchPosition);
+                log.debug("[{}] readEntryComplete at {} next is {}", 
OpScan.this.cursor, lastPositionForBatch,
+                        searchPosition);
             }
 
             if (searchPosition.compareTo(lastPositionForBatch) == 0) {
@@ -117,12 +111,12 @@ class OpScan implements ReadEntriesCallback {
 
     public void find() {
         if (remainingEntries.get() <= 0) {
-            log.warn("[{}] Scan abort after reading too many entries", 
OpScan.this.cursor);
+            log.info("[{}] Scan abort after reading too many entries", 
OpScan.this.cursor);
             callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, 
OpScan.this.ctx);
             return;
         }
         if (System.currentTimeMillis() - startTime > timeOutMs) {
-            log.warn("[{}] Scan abort after hitting the deadline", 
OpScan.this.cursor);
+            log.info("[{}] Scan abort after hitting the deadline", 
OpScan.this.cursor);
             callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, 
OpScan.this.ctx);
             return;
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
index acea9132049..4425436954a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.admin;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertThrows;
 import java.util.ArrayList;
 import java.util.List;
@@ -27,10 +28,12 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -50,6 +53,12 @@ public class AnalyzeBacklogSubscriptionTest extends 
ProducerConsumerBase {
         producerBaseSetup();
     }
 
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        conf.setDispatcherMaxReadBatchSize(10);
+    }
+
     @AfterMethod(alwaysRun = true)
     @Override
     public void cleanup() throws Exception {
@@ -189,4 +198,190 @@ public class AnalyzeBacklogSubscriptionTest extends 
ProducerConsumerBase {
         assertEquals(0, analyzeSubscriptionBacklogResult.getEntries());
     }
 
+    @Test
+    public void analyzeBacklogServerReturnFalseAbortedFlagWithoutLoop() throws 
Exception {
+        int serverSubscriptionBacklogScanMaxEntries = 20;
+        
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
+
+        String topic = 
"persistent://my-property/my-ns/analyze-backlog-server-return-false-aborted-flag-without-loop";
+        String subName = "sub-1";
+        int numMessages = 10;
+
+        // Test server returns false aborted flag.
+        List<MessageId> messageIds = clientSideLoopAnalyzeBacklogSetup(topic, 
subName, numMessages);
+
+        verifyClientSideLoopBacklog(topic, subName, numMessages - 1, 
numMessages, messageIds.get(0),
+                messageIds.get(numMessages - 1));
+    }
+
+    @Test
+    public void analyzeBacklogMaxEntriesExceedWithoutLoop() throws Exception {
+        int serverSubscriptionBacklogScanMaxEntries = 20;
+        
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
+
+        String topic = 
"persistent://my-property/my-ns/analyze-backlog-max-entries-exceed-without-loop";
+        String subName = "sub-1";
+        int numMessages = 25;
+
+        // Test backlogScanMaxEntries(client side) <= 
subscriptionBacklogScanMaxEntries(server side), but server
+        // returns true aborted flag. Server dispatcherMaxReadBatchSize is set 
to 10.
+        List<MessageId> messageIds = clientSideLoopAnalyzeBacklogSetup(topic, 
subName, numMessages);
+
+        verifyClientSideLoopBacklog(topic, subName, 
serverSubscriptionBacklogScanMaxEntries - 1,
+                serverSubscriptionBacklogScanMaxEntries, messageIds.get(0),
+                messageIds.get(serverSubscriptionBacklogScanMaxEntries - 1));
+
+    }
+
+    @Test
+    public void analyzeBacklogServerReturnFalseAbortedFlagWithLoop() throws 
Exception {
+        int serverSubscriptionBacklogScanMaxEntries = 20;
+        
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
+
+        String topic = 
"persistent://my-property/my-ns/analyze-backlog-server-return-false-aborted-flag-with-loop";
+        String subName = "sub-1";
+        int numMessages = 45;
+
+        // Test client side loop: backlogScanMaxEntries > 
subscriptionBacklogScanMaxEntries, the loop termination
+        // condition is that server returns false aborted flag.
+        List<MessageId> messageIds = clientSideLoopAnalyzeBacklogSetup(topic, 
subName, numMessages);
+
+        verifyClientSideLoopBacklog(topic, subName, numMessages, numMessages, 
messageIds.get(0),
+                messageIds.get(numMessages - 1));
+    }
+
+    @Test
+    public void analyzeBacklogMaxEntriesExceedWithLoop() throws Exception {
+        int serverSubscriptionBacklogScanMaxEntries = 15;
+        
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
+
+        String topic = 
"persistent://my-property/my-ns/analyze-backlog-max-entries-exceed-with-loop";
+        String subName = "sub-1";
+        int numMessages = 55;
+        int backlogScanMaxEntries = 40;
+
+        // Test client side loop: backlogScanMaxEntries > 
subscriptionBacklogScanMaxEntries, the loop termination
+        // condition is that total entries exceeds backlogScanMaxEntries.
+        // Server dispatcherMaxReadBatchSize is set to 10.
+        List<MessageId> messageIds = clientSideLoopAnalyzeBacklogSetup(topic, 
subName, numMessages);
+
+        // Broker returns 15 + 15 + 15 = 45 entries.
+        int expectedEntries = (backlogScanMaxEntries / 
serverSubscriptionBacklogScanMaxEntries + 1)
+                * serverSubscriptionBacklogScanMaxEntries;
+        verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, 
expectedEntries, messageIds.get(0),
+                messageIds.get(expectedEntries - 1));
+    }
+
+    @Test
+    public void analyzeBacklogWithTopicUnload() throws Exception {
+        int serverSubscriptionBacklogScanMaxEntries = 10;
+        
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
+
+        String topic = 
"persistent://my-property/my-ns/analyze-backlog-with-topic-unload";
+        String subName = "sub-1";
+        int numMessages = 35;
+
+        admin.topics().createSubscription(topic, subName, MessageId.latest);
+
+        assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1"));
+        verifyBacklog(topic, subName, 0, 0);
+
+        // Test client side loop with topic unload. Use sync send method here 
to avoid potential message duplication.
+        @Cleanup Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).enableBatching(false).create();
+        List<MessageId> messageIds = new ArrayList<>();
+        for (int i = 0; i < numMessages; i++) {
+            MessageId messageId = producer.send(("test-" + i).getBytes());
+            messageIds.add(messageId);
+            if (RandomUtils.secure().randomBoolean()) {
+                admin.topics().unload(topic);
+            }
+        }
+
+        verifyClientSideLoopBacklog(topic, subName, numMessages, numMessages, 
messageIds.get(0),
+                messageIds.get(numMessages - 1));
+    }
+
+    @Test
+    public void analyzeBacklogWithIndividualAck() throws Exception {
+        int serverSubscriptionBacklogScanMaxEntries = 20;
+        
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
+
+        String topic = 
"persistent://my-property/my-ns/analyze-backlog-with-individual-ack";
+        String subName = "sub-1";
+        int messages = 55;
+
+        // Test client side loop with individual ack.
+        List<MessageId> messageIds = clientSideLoopAnalyzeBacklogSetup(topic, 
subName, messages);
+
+        // We want to wait for the server to process acks, in order to not 
have a flaky test.
+        @Cleanup Consumer<byte[]> consumer =
+                
pulsarClient.newConsumer().topic(topic).isAckReceiptEnabled(true).subscriptionName(subName)
+                        .subscriptionType(SubscriptionType.Shared).subscribe();
+
+        // Individual ack message2.
+        Message<byte[]> message1 = consumer.receive();
+        Message<byte[]> message2 = consumer.receive();
+        consumer.acknowledge(message2);
+
+        int backlogScanMaxEntries = 20;
+        verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, 
backlogScanMaxEntries, messageIds.get(0),
+                messageIds.get(backlogScanMaxEntries));
+
+        // Ack message1.
+        consumer.acknowledge(message1);
+        verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, 
backlogScanMaxEntries, messageIds.get(2),
+                messageIds.get(backlogScanMaxEntries + 1));
+
+        // Ack all messages.
+        for (int i = 2; i < messages; i++) {
+            Message<byte[]> message = consumer.receive();
+            consumer.acknowledge(message);
+        }
+
+        verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, 0, 
null, null);
+    }
+
+    private List<MessageId> clientSideLoopAnalyzeBacklogSetup(String topic, 
String subName, int numMessages)
+            throws Exception {
+        admin.topics().createSubscription(topic, subName, MessageId.latest);
+
+        assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1"));
+        verifyClientSideLoopBacklog(topic, subName, -1, 0, null, null);
+
+        @Cleanup Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).enableBatching(false).create();
+        List<CompletableFuture<MessageId>> futures = new ArrayList<>();
+        for (int i = 0; i < numMessages; i++) {
+            CompletableFuture<MessageId> future = producer.sendAsync(("test-" 
+ i).getBytes());
+            futures.add(future);
+        }
+        FutureUtil.waitForAll(futures).get();
+        return futures.stream().map(CompletableFuture::join).toList();
+    }
+
+    private void verifyClientSideLoopBacklog(String topic, String subName, int 
backlogMaxScanEntries,
+                                             int expectedEntries, MessageId 
firstMessageId, MessageId lastMessageId)
+            throws Exception {
+        AnalyzeSubscriptionBacklogResult backlogResult =
+                admin.topics().analyzeSubscriptionBacklog(topic, subName, 
Optional.empty(), backlogMaxScanEntries);
+
+        assertEquals(backlogResult.getEntries(), expectedEntries);
+        assertEquals(backlogResult.getMessages(), expectedEntries);
+
+        if (firstMessageId == null) {
+            assertNull(backlogResult.getFirstMessageId());
+        } else {
+            MessageIdAdv firstMessageIdAdv = (MessageIdAdv) firstMessageId;
+            assertEquals(backlogResult.getFirstMessageId(),
+                    firstMessageIdAdv.getLedgerId() + ":" + 
firstMessageIdAdv.getEntryId());
+        }
+
+        if (lastMessageId == null) {
+            assertNull(backlogResult.getLastMessageId());
+        } else {
+            MessageIdAdv lastMessageIdAdv = (MessageIdAdv) lastMessageId;
+            assertEquals(backlogResult.getLastMessageId(),
+                    lastMessageIdAdv.getLedgerId() + ":" + 
lastMessageIdAdv.getEntryId());
+        }
+    }
+
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index a2fcd60deb5..4360d37433f 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
 import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotAllowedException;
 import 
org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
@@ -2220,21 +2221,134 @@ public interface Topics {
      * This is a potentially expensive operation, as it requires
      * to read the messages from storage.
      * This function takes into consideration batch messages
-     * and also Subscription filters.
+     * and also Subscription filters. <br/>
+     * See also: {@link #analyzeSubscriptionBacklogAsync(String, String, 
Optional, long)}
      * @param topic
      *            Topic name
      * @param subscriptionName
      *            the subscription
      * @param startPosition
      *           the position to start the scan from (empty means the last 
processed message)
+     * @param backlogScanMaxEntries
+     *           the maximum number of backlog entries the client will scan 
before terminating its loop
      * @return an accurate analysis of the backlog
      * @throws PulsarAdminException
      *            Unexpected error
      */
+    AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, 
String subscriptionName,
+                                                                
Optional<MessageId> startPosition,
+                                                                long 
backlogScanMaxEntries) throws PulsarAdminException;
+
+    /**
+     * Analyze subscription backlog.
+     * This is a potentially expensive operation, as it requires
+     * to read the messages from storage.
+     * This function takes into consideration batch messages
+     * and also Subscription filters. <br/>
+     * See also: {@link #analyzeSubscriptionBacklogAsync(String, String, 
Optional, Predicate)} <br/>
+     *
+     * @param topic
+     *            Topic name
+     * @param subscriptionName
+     *            the subscription
+     * @param startPosition
+     *           the position to start the scan from (empty means the last 
processed message)
+     * @param terminatePredicate
+     *           the predicate to determine whether to terminate the loop
+     * @return an accurate analysis of the backlog
+     */
+    AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, 
String subscriptionName,
+                                                        Optional<MessageId> 
startPosition,
+                                                        
Predicate<AnalyzeSubscriptionBacklogResult> terminatePredicate)
+            throws PulsarAdminException;
+
+    /**
+     * Analyze subscription backlog.
+     * This is a potentially expensive operation, as it requires
+     * to read the messages from storage.
+     * This function takes into consideration batch messages
+     * and also Subscription filters.
+     * @param topic
+     *            Topic name
+     * @param subscriptionName
+     *            the subscription
+     * @param startPosition
+     *           the position to start the scan from (empty means the last 
processed message)
+     * @return an accurate analysis of the backlog
+     */
     CompletableFuture<AnalyzeSubscriptionBacklogResult> 
analyzeSubscriptionBacklogAsync(String topic,
                                                                            
String subscriptionName,
                                                                            
Optional<MessageId> startPosition);
 
+    /**
+     * Analyze subscription backlog.
+     * This is a potentially expensive operation, as it requires
+     * to read the messages from storage.
+     * This function takes into consideration batch messages
+     * and also Subscription filters.
+     *
+     *<p>
+     * What's the purpose of this overloaded method? <br/>
+     * There are broker side configurable maximum limits how many entries will 
be read and how long the scanning can
+     * take. The subscriptionBacklogScanMaxTimeMs (default 2 minutes) and 
subscriptionBacklogScanMaxEntries
+     * (default 10000) control this behavior. <br/>
+     * Increasing these settings is possible. However, it's possible that the 
HTTP request times out (also idle timeout
+     * in NAT/firewall etc.) before the command completes so increasing the 
limits might not be useful beyond a few
+     * minutes.
+     *</p>
+     *
+     *<p>
+     * How does this method work? <br/>
+     * 1. Add a new parameter backlogScanMaxEntries in client side method to 
control the client-side loop termination
+     *    condition. <br/>
+     * 2. If subscriptionBacklogScanMaxEntries(server side) >= 
backlogScanMaxEntries(client side), then
+     *    backlogScanMaxEntries parameter will take no effect. <br/>
+     * 3. If subscriptionBacklogScanMaxEntries < backlogScanMaxEntries, the 
client will call analyze-backlog method in
+     *    a loop until server return ScanOutcome.COMPLETED or the total 
entries exceeds backlogScanMaxEntries. <br/>
+     * 4. This means that backlogScanMaxEntries cannot be used to precisely 
control the number of entries scanned by
+     *    the server, it only serves to determine when the loop should 
terminate. <br/>
+     * 5. With this method, the server can reduce the values of the two 
parameters subscriptionBacklogScanMaxTimeMs and
+     *    subscriptionBacklogScanMaxEntries, so user can retrieve the desired 
number of backlog entries through
+     *    client-side looping.
+     *</p>
+     * @param topic
+     *            Topic name
+     * @param subscriptionName
+     *            the subscription
+     * @param startPosition
+     *           the position to start the scan from (empty means the last 
processed message)
+     * @param backlogScanMaxEntries
+     *           the maximum number of backlog entries the client will scan 
before terminating its loop
+     * @return an accurate analysis of the backlog
+     */
+    CompletableFuture<AnalyzeSubscriptionBacklogResult> 
analyzeSubscriptionBacklogAsync(String topic,
+                                                                            
String subscriptionName,
+                                                                            
Optional<MessageId> startPosition,
+                                                                            
long backlogScanMaxEntries);
+
+    /**
+     * Analyze subscription backlog.
+     * This is a potentially expensive operation, as it requires
+     * to read the messages from storage.
+     * This function takes into consideration batch messages
+     * and also Subscription filters. <br/>
+     * See also: {@link #analyzeSubscriptionBacklogAsync(String, String, 
Optional, long)} <br/>
+     * User can control the loop termination condition by terminatePredicate.
+     *
+     * @param topic
+     *            Topic name
+     * @param subscriptionName
+     *            the subscription
+     * @param startPosition
+     *           the position to start the scan from (empty means the last 
processed message)
+     * @param terminatePredicate
+     *           the predicate to determine whether to terminate the loop
+     * @return an accurate analysis of the backlog
+     */
+    CompletableFuture<AnalyzeSubscriptionBacklogResult> 
analyzeSubscriptionBacklogAsync(String topic,
+                                                        String 
subscriptionName, Optional<MessageId> startPosition,
+                                                        
Predicate<AnalyzeSubscriptionBacklogResult> terminatePredicate);
+
     /**
      * Get backlog size by a message ID.
      * @param topic
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 11dd69a23ce..78b7329159c 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -34,6 +34,10 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.ws.rs.client.Entity;
@@ -44,6 +48,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.GetStatsOptions;
 import org.apache.pulsar.client.admin.ListTopicsOptions;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
@@ -1560,6 +1565,23 @@ public class TopicsImpl extends BaseResource implements 
Topics {
         return sync(() -> analyzeSubscriptionBacklogAsync(topic, 
subscriptionName, startPosition));
     }
 
+    @Override
+    public AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String 
topic, String subscriptionName,
+                                                                       
Optional<MessageId> startPosition,
+                                                                       long 
backlogScanMaxEntries)
+            throws PulsarAdminException {
+        return sync(
+                () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, 
startPosition, backlogScanMaxEntries));
+    }
+
+    @Override
+    public AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String 
topic, String subscriptionName,
+                                                       Optional<MessageId> 
startPosition,
+                                                       
Predicate<AnalyzeSubscriptionBacklogResult> terminatePredicate)
+            throws PulsarAdminException {
+        return sync(() -> analyzeSubscriptionBacklogAsync(topic, 
subscriptionName, startPosition, terminatePredicate));
+    }
+
     @Override
     public CompletableFuture<AnalyzeSubscriptionBacklogResult> 
analyzeSubscriptionBacklogAsync(String topic,
                                                                                
 String subscriptionName,
@@ -1591,6 +1613,96 @@ public class TopicsImpl extends BaseResource implements 
Topics {
         return future;
     }
 
+    @Override
+    public CompletableFuture<AnalyzeSubscriptionBacklogResult> 
analyzeSubscriptionBacklogAsync(String topic,
+                                                                               
String subscriptionName,
+                                                                               
Optional<MessageId> startPosition,
+                                                                               
long backlogScanMaxEntries) {
+        return analyzeSubscriptionBacklogAsync(topic, subscriptionName, 
startPosition,
+                (backlogResult) -> backlogResult.getEntries() >= 
backlogScanMaxEntries);
+    }
+
+    @Override
+    public CompletableFuture<AnalyzeSubscriptionBacklogResult> 
analyzeSubscriptionBacklogAsync(String topic,
+                                                       String 
subscriptionName, Optional<MessageId> startPosition,
+                                                       
Predicate<AnalyzeSubscriptionBacklogResult> terminatePredicate) {
+        final CompletableFuture<AnalyzeSubscriptionBacklogResult> future = new 
CompletableFuture<>();
+        AtomicReference<AnalyzeSubscriptionBacklogResult> resultRef = new 
AtomicReference<>();
+        int partitionIndex = TopicName.get(topic).getPartitionIndex();
+        AtomicReference<Optional<MessageId>> startPositionRef = new 
AtomicReference<>(startPosition);
+
+        Supplier<CompletableFuture<AnalyzeSubscriptionBacklogResult>> 
resultSupplier =
+                () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, 
startPositionRef.get());
+        BiConsumer<AnalyzeSubscriptionBacklogResult, Throwable> completeAction 
= new BiConsumer<>() {
+            @Override
+            public void accept(AnalyzeSubscriptionBacklogResult currentResult, 
Throwable throwable) {
+                if (throwable != null) {
+                    future.completeExceptionally(throwable);
+                    return;
+                }
+
+                AnalyzeSubscriptionBacklogResult mergedResult = 
mergeBacklogResults(currentResult, resultRef.get());
+                resultRef.set(mergedResult);
+                if (!mergedResult.isAborted() || 
terminatePredicate.test(mergedResult)) {
+                    future.complete(mergedResult);
+                    return;
+                }
+
+                // In analyze-backlog, we treat 0 entries or null 
lastMessageId as scan completed for mere safety.
+                // 0 entries or a null lastMessageId indicates no entries were 
scanned.
+                if (currentResult.getEntries() <= 0 || 
StringUtils.isBlank(currentResult.getLastMessageId())) {
+                    log.info("[{}][{}] complete scan due total entry <= 0 or 
last message id is blank, "
+                            + "start position is: {}, current result: {}", 
topic, subscriptionName,
+                            startPositionRef.get(), currentResult);
+                    future.complete(mergedResult);
+                    return;
+                }
+
+                String[] messageIdSplits = 
mergedResult.getLastMessageId().split(":");
+                MessageIdImpl nextScanMessageId =
+                        new MessageIdImpl(Long.parseLong(messageIdSplits[0]), 
Long.parseLong(messageIdSplits[1]) + 1,
+                                partitionIndex);
+                startPositionRef.set(Optional.of(nextScanMessageId));
+
+                resultSupplier.get().whenComplete(this);
+            }
+        };
+
+        resultSupplier.get().whenComplete(completeAction);
+        return future;
+    }
+
+    private AnalyzeSubscriptionBacklogResult 
mergeBacklogResults(AnalyzeSubscriptionBacklogResult current,
+                                                                 
AnalyzeSubscriptionBacklogResult previous) {
+        if (previous == null) {
+            return current;
+        }
+
+        AnalyzeSubscriptionBacklogResult mergedRes = new 
AnalyzeSubscriptionBacklogResult();
+        mergedRes.setEntries(current.getEntries() + previous.getEntries());
+        mergedRes.setMessages(current.getMessages() + previous.getMessages());
+        mergedRes.setMarkerMessages(current.getMarkerMessages() + 
previous.getMarkerMessages());
+
+        mergedRes.setFilterAcceptedEntries(current.getFilterAcceptedEntries() 
+ previous.getFilterAcceptedEntries());
+        mergedRes.setFilterRejectedEntries(current.getFilterRejectedEntries() 
+ previous.getFilterRejectedEntries());
+        mergedRes.setFilterRescheduledEntries(
+                current.getFilterRescheduledEntries() + 
previous.getFilterRescheduledEntries());
+
+        
mergedRes.setFilterAcceptedMessages(current.getFilterAcceptedMessages() + 
previous.getFilterAcceptedMessages());
+        
mergedRes.setFilterRejectedMessages(current.getFilterRejectedMessages() + 
previous.getFilterRejectedMessages());
+        mergedRes.setFilterRescheduledMessages(
+                current.getFilterRescheduledMessages() + 
previous.getFilterRescheduledMessages());
+
+        mergedRes.setAborted(current.isAborted());
+        mergedRes.setFirstMessageId(previous.getFirstMessageId());
+        String lastMessageId = current.getLastMessageId();
+        if (StringUtils.isNotBlank(lastMessageId)) {
+            mergedRes.setLastMessageId(lastMessageId);
+        }
+
+        return mergedRes;
+    }
+
     @Override
     public Long getBacklogSizeByMessageId(String topic, MessageId messageId)
             throws PulsarAdminException {

Reply via email to