This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8f791f0315d7a20e8984cf13ee8487ea2525619e Author: Oneby Wang <[email protected]> AuthorDate: Fri Feb 6 18:05:42 2026 +0800 [improve][admin] Add client side looping to analyze-backlog in Topics to avoid potential HTTP call timeout (#25127) (cherry picked from commit e160b1add39945148150365aefcabbc77f13b759) --- .../org/apache/bookkeeper/mledger/impl/OpScan.java | 34 ++-- .../admin/AnalyzeBacklogSubscriptionTest.java | 195 +++++++++++++++++++++ .../org/apache/pulsar/client/admin/Topics.java | 116 +++++++++++- .../pulsar/client/admin/internal/TopicsImpl.java | 112 ++++++++++++ 4 files changed, 436 insertions(+), 21 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java index 732071ee01a..413bb5c018e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java @@ -69,29 +69,23 @@ class OpScan implements ReadEntriesCallback { try { Position lastPositionForBatch = entries.get(entries.size() - 1).getPosition(); lastSeenPosition = lastPositionForBatch; - // filter out the entry if it has been already deleted - // filterReadEntries will call entry.release if the entry is filtered out - List<Entry> entriesFiltered = this.cursor.filterReadEntries(entries); - int skippedEntries = entries.size() - entriesFiltered.size(); - remainingEntries.addAndGet(-skippedEntries); - if (!entriesFiltered.isEmpty()) { - for (Entry entry : entriesFiltered) { - if (remainingEntries.decrementAndGet() <= 0) { - log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor); - callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx); - return; - } - if (!condition.test(entry)) { - log.warn("[{}] Scan abort due to user code", OpScan.this.cursor); - callback.scanComplete(lastSeenPosition, ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx); - return; - } + for (Entry entry : entries) { + if (remainingEntries.getAndDecrement() <= 0) { + log.info("[{}] Scan abort after reading too many entries", OpScan.this.cursor); + callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx); + return; + } + if (!condition.test(entry)) { + log.info("[{}] Scan abort due to user code", OpScan.this.cursor); + callback.scanComplete(lastSeenPosition, ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx); + return; } } searchPosition = ledger.getPositionAfterN(lastPositionForBatch, 1, PositionBound.startExcluded); if (log.isDebugEnabled()) { - log.debug("readEntryComplete {} at {} next is {}", lastPositionForBatch, searchPosition); + log.debug("[{}] readEntryComplete at {} next is {}", OpScan.this.cursor, lastPositionForBatch, + searchPosition); } if (searchPosition.compareTo(lastPositionForBatch) == 0) { @@ -117,12 +111,12 @@ class OpScan implements ReadEntriesCallback { public void find() { if (remainingEntries.get() <= 0) { - log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor); + log.info("[{}] Scan abort after reading too many entries", OpScan.this.cursor); callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx); return; } if (System.currentTimeMillis() - startTime > timeOutMs) { - log.warn("[{}] Scan abort after hitting the deadline", OpScan.this.cursor); + log.info("[{}] Scan abort after hitting the deadline", OpScan.this.cursor); callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx); return; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java index acea9132049..4425436954a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java @@ -20,6 +20,7 @@ package org.apache.pulsar.broker.admin; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertThrows; import java.util.ArrayList; import java.util.List; @@ -27,10 +28,12 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import org.apache.commons.lang3.RandomUtils; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.SubscriptionType; @@ -50,6 +53,12 @@ public class AnalyzeBacklogSubscriptionTest extends ProducerConsumerBase { producerBaseSetup(); } + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setDispatcherMaxReadBatchSize(10); + } + @AfterMethod(alwaysRun = true) @Override public void cleanup() throws Exception { @@ -189,4 +198,190 @@ public class AnalyzeBacklogSubscriptionTest extends ProducerConsumerBase { assertEquals(0, analyzeSubscriptionBacklogResult.getEntries()); } + @Test + public void analyzeBacklogServerReturnFalseAbortedFlagWithoutLoop() throws Exception { + int serverSubscriptionBacklogScanMaxEntries = 20; + conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); + + String topic = "persistent://my-property/my-ns/analyze-backlog-server-return-false-aborted-flag-without-loop"; + String subName = "sub-1"; + int numMessages = 10; + + // Test server returns false aborted flag. + List<MessageId> messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages); + + verifyClientSideLoopBacklog(topic, subName, numMessages - 1, numMessages, messageIds.get(0), + messageIds.get(numMessages - 1)); + } + + @Test + public void analyzeBacklogMaxEntriesExceedWithoutLoop() throws Exception { + int serverSubscriptionBacklogScanMaxEntries = 20; + conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); + + String topic = "persistent://my-property/my-ns/analyze-backlog-max-entries-exceed-without-loop"; + String subName = "sub-1"; + int numMessages = 25; + + // Test backlogScanMaxEntries(client side) <= subscriptionBacklogScanMaxEntries(server side), but server + // returns true aborted flag. Server dispatcherMaxReadBatchSize is set to 10. + List<MessageId> messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages); + + verifyClientSideLoopBacklog(topic, subName, serverSubscriptionBacklogScanMaxEntries - 1, + serverSubscriptionBacklogScanMaxEntries, messageIds.get(0), + messageIds.get(serverSubscriptionBacklogScanMaxEntries - 1)); + + } + + @Test + public void analyzeBacklogServerReturnFalseAbortedFlagWithLoop() throws Exception { + int serverSubscriptionBacklogScanMaxEntries = 20; + conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); + + String topic = "persistent://my-property/my-ns/analyze-backlog-server-return-false-aborted-flag-with-loop"; + String subName = "sub-1"; + int numMessages = 45; + + // Test client side loop: backlogScanMaxEntries > subscriptionBacklogScanMaxEntries, the loop termination + // condition is that server returns false aborted flag. + List<MessageId> messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages); + + verifyClientSideLoopBacklog(topic, subName, numMessages, numMessages, messageIds.get(0), + messageIds.get(numMessages - 1)); + } + + @Test + public void analyzeBacklogMaxEntriesExceedWithLoop() throws Exception { + int serverSubscriptionBacklogScanMaxEntries = 15; + conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); + + String topic = "persistent://my-property/my-ns/analyze-backlog-max-entries-exceed-with-loop"; + String subName = "sub-1"; + int numMessages = 55; + int backlogScanMaxEntries = 40; + + // Test client side loop: backlogScanMaxEntries > subscriptionBacklogScanMaxEntries, the loop termination + // condition is that total entries exceeds backlogScanMaxEntries. + // Server dispatcherMaxReadBatchSize is set to 10. + List<MessageId> messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages); + + // Broker returns 15 + 15 + 15 = 45 entries. + int expectedEntries = (backlogScanMaxEntries / serverSubscriptionBacklogScanMaxEntries + 1) + * serverSubscriptionBacklogScanMaxEntries; + verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, expectedEntries, messageIds.get(0), + messageIds.get(expectedEntries - 1)); + } + + @Test + public void analyzeBacklogWithTopicUnload() throws Exception { + int serverSubscriptionBacklogScanMaxEntries = 10; + conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); + + String topic = "persistent://my-property/my-ns/analyze-backlog-with-topic-unload"; + String subName = "sub-1"; + int numMessages = 35; + + admin.topics().createSubscription(topic, subName, MessageId.latest); + + assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1")); + verifyBacklog(topic, subName, 0, 0); + + // Test client side loop with topic unload. Use sync send method here to avoid potential message duplication. + @Cleanup Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create(); + List<MessageId> messageIds = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + MessageId messageId = producer.send(("test-" + i).getBytes()); + messageIds.add(messageId); + if (RandomUtils.secure().randomBoolean()) { + admin.topics().unload(topic); + } + } + + verifyClientSideLoopBacklog(topic, subName, numMessages, numMessages, messageIds.get(0), + messageIds.get(numMessages - 1)); + } + + @Test + public void analyzeBacklogWithIndividualAck() throws Exception { + int serverSubscriptionBacklogScanMaxEntries = 20; + conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); + + String topic = "persistent://my-property/my-ns/analyze-backlog-with-individual-ack"; + String subName = "sub-1"; + int messages = 55; + + // Test client side loop with individual ack. + List<MessageId> messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, messages); + + // We want to wait for the server to process acks, in order to not have a flaky test. + @Cleanup Consumer<byte[]> consumer = + pulsarClient.newConsumer().topic(topic).isAckReceiptEnabled(true).subscriptionName(subName) + .subscriptionType(SubscriptionType.Shared).subscribe(); + + // Individual ack message2. + Message<byte[]> message1 = consumer.receive(); + Message<byte[]> message2 = consumer.receive(); + consumer.acknowledge(message2); + + int backlogScanMaxEntries = 20; + verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, backlogScanMaxEntries, messageIds.get(0), + messageIds.get(backlogScanMaxEntries)); + + // Ack message1. + consumer.acknowledge(message1); + verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, backlogScanMaxEntries, messageIds.get(2), + messageIds.get(backlogScanMaxEntries + 1)); + + // Ack all messages. + for (int i = 2; i < messages; i++) { + Message<byte[]> message = consumer.receive(); + consumer.acknowledge(message); + } + + verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, 0, null, null); + } + + private List<MessageId> clientSideLoopAnalyzeBacklogSetup(String topic, String subName, int numMessages) + throws Exception { + admin.topics().createSubscription(topic, subName, MessageId.latest); + + assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1")); + verifyClientSideLoopBacklog(topic, subName, -1, 0, null, null); + + @Cleanup Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create(); + List<CompletableFuture<MessageId>> futures = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + CompletableFuture<MessageId> future = producer.sendAsync(("test-" + i).getBytes()); + futures.add(future); + } + FutureUtil.waitForAll(futures).get(); + return futures.stream().map(CompletableFuture::join).toList(); + } + + private void verifyClientSideLoopBacklog(String topic, String subName, int backlogMaxScanEntries, + int expectedEntries, MessageId firstMessageId, MessageId lastMessageId) + throws Exception { + AnalyzeSubscriptionBacklogResult backlogResult = + admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogMaxScanEntries); + + assertEquals(backlogResult.getEntries(), expectedEntries); + assertEquals(backlogResult.getMessages(), expectedEntries); + + if (firstMessageId == null) { + assertNull(backlogResult.getFirstMessageId()); + } else { + MessageIdAdv firstMessageIdAdv = (MessageIdAdv) firstMessageId; + assertEquals(backlogResult.getFirstMessageId(), + firstMessageIdAdv.getLedgerId() + ":" + firstMessageIdAdv.getEntryId()); + } + + if (lastMessageId == null) { + assertNull(backlogResult.getLastMessageId()); + } else { + MessageIdAdv lastMessageIdAdv = (MessageIdAdv) lastMessageId; + assertEquals(backlogResult.getLastMessageId(), + lastMessageIdAdv.getLedgerId() + ":" + lastMessageIdAdv.getEntryId()); + } + } + } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index a2fcd60deb5..4360d37433f 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; import org.apache.pulsar.client.admin.PulsarAdminException.NotAllowedException; import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; @@ -2220,21 +2221,134 @@ public interface Topics { * This is a potentially expensive operation, as it requires * to read the messages from storage. * This function takes into consideration batch messages - * and also Subscription filters. + * and also Subscription filters. <br/> + * See also: {@link #analyzeSubscriptionBacklogAsync(String, String, Optional, long)} * @param topic * Topic name * @param subscriptionName * the subscription * @param startPosition * the position to start the scan from (empty means the last processed message) + * @param backlogScanMaxEntries + * the maximum number of backlog entries the client will scan before terminating its loop * @return an accurate analysis of the backlog * @throws PulsarAdminException * Unexpected error */ + AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String subscriptionName, + Optional<MessageId> startPosition, + long backlogScanMaxEntries) throws PulsarAdminException; + + /** + * Analyze subscription backlog. + * This is a potentially expensive operation, as it requires + * to read the messages from storage. + * This function takes into consideration batch messages + * and also Subscription filters. <br/> + * See also: {@link #analyzeSubscriptionBacklogAsync(String, String, Optional, Predicate)} <br/> + * + * @param topic + * Topic name + * @param subscriptionName + * the subscription + * @param startPosition + * the position to start the scan from (empty means the last processed message) + * @param terminatePredicate + * the predicate to determine whether to terminate the loop + * @return an accurate analysis of the backlog + */ + AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String subscriptionName, + Optional<MessageId> startPosition, + Predicate<AnalyzeSubscriptionBacklogResult> terminatePredicate) + throws PulsarAdminException; + + /** + * Analyze subscription backlog. + * This is a potentially expensive operation, as it requires + * to read the messages from storage. + * This function takes into consideration batch messages + * and also Subscription filters. + * @param topic + * Topic name + * @param subscriptionName + * the subscription + * @param startPosition + * the position to start the scan from (empty means the last processed message) + * @return an accurate analysis of the backlog + */ CompletableFuture<AnalyzeSubscriptionBacklogResult> analyzeSubscriptionBacklogAsync(String topic, String subscriptionName, Optional<MessageId> startPosition); + /** + * Analyze subscription backlog. + * This is a potentially expensive operation, as it requires + * to read the messages from storage. + * This function takes into consideration batch messages + * and also Subscription filters. + * + *<p> + * What's the purpose of this overloaded method? <br/> + * There are broker side configurable maximum limits how many entries will be read and how long the scanning can + * take. The subscriptionBacklogScanMaxTimeMs (default 2 minutes) and subscriptionBacklogScanMaxEntries + * (default 10000) control this behavior. <br/> + * Increasing these settings is possible. However, it's possible that the HTTP request times out (also idle timeout + * in NAT/firewall etc.) before the command completes so increasing the limits might not be useful beyond a few + * minutes. + *</p> + * + *<p> + * How does this method work? <br/> + * 1. Add a new parameter backlogScanMaxEntries in client side method to control the client-side loop termination + * condition. <br/> + * 2. If subscriptionBacklogScanMaxEntries(server side) >= backlogScanMaxEntries(client side), then + * backlogScanMaxEntries parameter will take no effect. <br/> + * 3. If subscriptionBacklogScanMaxEntries < backlogScanMaxEntries, the client will call analyze-backlog method in + * a loop until server return ScanOutcome.COMPLETED or the total entries exceeds backlogScanMaxEntries. <br/> + * 4. This means that backlogScanMaxEntries cannot be used to precisely control the number of entries scanned by + * the server, it only serves to determine when the loop should terminate. <br/> + * 5. With this method, the server can reduce the values of the two parameters subscriptionBacklogScanMaxTimeMs and + * subscriptionBacklogScanMaxEntries, so user can retrieve the desired number of backlog entries through + * client-side looping. + *</p> + * @param topic + * Topic name + * @param subscriptionName + * the subscription + * @param startPosition + * the position to start the scan from (empty means the last processed message) + * @param backlogScanMaxEntries + * the maximum number of backlog entries the client will scan before terminating its loop + * @return an accurate analysis of the backlog + */ + CompletableFuture<AnalyzeSubscriptionBacklogResult> analyzeSubscriptionBacklogAsync(String topic, + String subscriptionName, + Optional<MessageId> startPosition, + long backlogScanMaxEntries); + + /** + * Analyze subscription backlog. + * This is a potentially expensive operation, as it requires + * to read the messages from storage. + * This function takes into consideration batch messages + * and also Subscription filters. <br/> + * See also: {@link #analyzeSubscriptionBacklogAsync(String, String, Optional, long)} <br/> + * User can control the loop termination condition by terminatePredicate. + * + * @param topic + * Topic name + * @param subscriptionName + * the subscription + * @param startPosition + * the position to start the scan from (empty means the last processed message) + * @param terminatePredicate + * the predicate to determine whether to terminate the loop + * @return an accurate analysis of the backlog + */ + CompletableFuture<AnalyzeSubscriptionBacklogResult> analyzeSubscriptionBacklogAsync(String topic, + String subscriptionName, Optional<MessageId> startPosition, + Predicate<AnalyzeSubscriptionBacklogResult> terminatePredicate); + /** * Get backlog size by a message ID. * @param topic diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 11dd69a23ce..78b7329159c 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -34,6 +34,10 @@ import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.ws.rs.client.Entity; @@ -44,6 +48,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.admin.ListTopicsOptions; import org.apache.pulsar.client.admin.LongRunningProcessStatus; @@ -1560,6 +1565,23 @@ public class TopicsImpl extends BaseResource implements Topics { return sync(() -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPosition)); } + @Override + public AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String subscriptionName, + Optional<MessageId> startPosition, + long backlogScanMaxEntries) + throws PulsarAdminException { + return sync( + () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPosition, backlogScanMaxEntries)); + } + + @Override + public AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String subscriptionName, + Optional<MessageId> startPosition, + Predicate<AnalyzeSubscriptionBacklogResult> terminatePredicate) + throws PulsarAdminException { + return sync(() -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPosition, terminatePredicate)); + } + @Override public CompletableFuture<AnalyzeSubscriptionBacklogResult> analyzeSubscriptionBacklogAsync(String topic, String subscriptionName, @@ -1591,6 +1613,96 @@ public class TopicsImpl extends BaseResource implements Topics { return future; } + @Override + public CompletableFuture<AnalyzeSubscriptionBacklogResult> analyzeSubscriptionBacklogAsync(String topic, + String subscriptionName, + Optional<MessageId> startPosition, + long backlogScanMaxEntries) { + return analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPosition, + (backlogResult) -> backlogResult.getEntries() >= backlogScanMaxEntries); + } + + @Override + public CompletableFuture<AnalyzeSubscriptionBacklogResult> analyzeSubscriptionBacklogAsync(String topic, + String subscriptionName, Optional<MessageId> startPosition, + Predicate<AnalyzeSubscriptionBacklogResult> terminatePredicate) { + final CompletableFuture<AnalyzeSubscriptionBacklogResult> future = new CompletableFuture<>(); + AtomicReference<AnalyzeSubscriptionBacklogResult> resultRef = new AtomicReference<>(); + int partitionIndex = TopicName.get(topic).getPartitionIndex(); + AtomicReference<Optional<MessageId>> startPositionRef = new AtomicReference<>(startPosition); + + Supplier<CompletableFuture<AnalyzeSubscriptionBacklogResult>> resultSupplier = + () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPositionRef.get()); + BiConsumer<AnalyzeSubscriptionBacklogResult, Throwable> completeAction = new BiConsumer<>() { + @Override + public void accept(AnalyzeSubscriptionBacklogResult currentResult, Throwable throwable) { + if (throwable != null) { + future.completeExceptionally(throwable); + return; + } + + AnalyzeSubscriptionBacklogResult mergedResult = mergeBacklogResults(currentResult, resultRef.get()); + resultRef.set(mergedResult); + if (!mergedResult.isAborted() || terminatePredicate.test(mergedResult)) { + future.complete(mergedResult); + return; + } + + // In analyze-backlog, we treat 0 entries or null lastMessageId as scan completed for mere safety. + // 0 entries or a null lastMessageId indicates no entries were scanned. + if (currentResult.getEntries() <= 0 || StringUtils.isBlank(currentResult.getLastMessageId())) { + log.info("[{}][{}] complete scan due total entry <= 0 or last message id is blank, " + + "start position is: {}, current result: {}", topic, subscriptionName, + startPositionRef.get(), currentResult); + future.complete(mergedResult); + return; + } + + String[] messageIdSplits = mergedResult.getLastMessageId().split(":"); + MessageIdImpl nextScanMessageId = + new MessageIdImpl(Long.parseLong(messageIdSplits[0]), Long.parseLong(messageIdSplits[1]) + 1, + partitionIndex); + startPositionRef.set(Optional.of(nextScanMessageId)); + + resultSupplier.get().whenComplete(this); + } + }; + + resultSupplier.get().whenComplete(completeAction); + return future; + } + + private AnalyzeSubscriptionBacklogResult mergeBacklogResults(AnalyzeSubscriptionBacklogResult current, + AnalyzeSubscriptionBacklogResult previous) { + if (previous == null) { + return current; + } + + AnalyzeSubscriptionBacklogResult mergedRes = new AnalyzeSubscriptionBacklogResult(); + mergedRes.setEntries(current.getEntries() + previous.getEntries()); + mergedRes.setMessages(current.getMessages() + previous.getMessages()); + mergedRes.setMarkerMessages(current.getMarkerMessages() + previous.getMarkerMessages()); + + mergedRes.setFilterAcceptedEntries(current.getFilterAcceptedEntries() + previous.getFilterAcceptedEntries()); + mergedRes.setFilterRejectedEntries(current.getFilterRejectedEntries() + previous.getFilterRejectedEntries()); + mergedRes.setFilterRescheduledEntries( + current.getFilterRescheduledEntries() + previous.getFilterRescheduledEntries()); + + mergedRes.setFilterAcceptedMessages(current.getFilterAcceptedMessages() + previous.getFilterAcceptedMessages()); + mergedRes.setFilterRejectedMessages(current.getFilterRejectedMessages() + previous.getFilterRejectedMessages()); + mergedRes.setFilterRescheduledMessages( + current.getFilterRescheduledMessages() + previous.getFilterRescheduledMessages()); + + mergedRes.setAborted(current.isAborted()); + mergedRes.setFirstMessageId(previous.getFirstMessageId()); + String lastMessageId = current.getLastMessageId(); + if (StringUtils.isNotBlank(lastMessageId)) { + mergedRes.setLastMessageId(lastMessageId); + } + + return mergedRes; + } + @Override public Long getBacklogSizeByMessageId(String topic, MessageId messageId) throws PulsarAdminException {
