This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e160b1add39 [improve][admin] Add client side looping to
analyze-backlog in Topics to avoid potential HTTP call timeout (#25127)
e160b1add39 is described below
commit e160b1add39945148150365aefcabbc77f13b759
Author: Oneby Wang <[email protected]>
AuthorDate: Fri Feb 6 18:05:42 2026 +0800
[improve][admin] Add client side looping to analyze-backlog in Topics to
avoid potential HTTP call timeout (#25127)
---
.../org/apache/bookkeeper/mledger/impl/OpScan.java | 34 ++--
.../admin/AnalyzeBacklogSubscriptionTest.java | 195 +++++++++++++++++++++
.../org/apache/pulsar/client/admin/Topics.java | 116 +++++++++++-
.../pulsar/client/admin/internal/TopicsImpl.java | 112 ++++++++++++
4 files changed, 436 insertions(+), 21 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
index 732071ee01a..413bb5c018e 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
@@ -69,29 +69,23 @@ class OpScan implements ReadEntriesCallback {
try {
Position lastPositionForBatch = entries.get(entries.size() -
1).getPosition();
lastSeenPosition = lastPositionForBatch;
- // filter out the entry if it has been already deleted
- // filterReadEntries will call entry.release if the entry is
filtered out
- List<Entry> entriesFiltered =
this.cursor.filterReadEntries(entries);
- int skippedEntries = entries.size() - entriesFiltered.size();
- remainingEntries.addAndGet(-skippedEntries);
- if (!entriesFiltered.isEmpty()) {
- for (Entry entry : entriesFiltered) {
- if (remainingEntries.decrementAndGet() <= 0) {
- log.warn("[{}] Scan abort after reading too many
entries", OpScan.this.cursor);
- callback.scanComplete(lastSeenPosition,
ScanOutcome.ABORTED, OpScan.this.ctx);
- return;
- }
- if (!condition.test(entry)) {
- log.warn("[{}] Scan abort due to user code",
OpScan.this.cursor);
- callback.scanComplete(lastSeenPosition,
ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx);
- return;
- }
+ for (Entry entry : entries) {
+ if (remainingEntries.getAndDecrement() <= 0) {
+ log.info("[{}] Scan abort after reading too many entries",
OpScan.this.cursor);
+ callback.scanComplete(lastSeenPosition,
ScanOutcome.ABORTED, OpScan.this.ctx);
+ return;
+ }
+ if (!condition.test(entry)) {
+ log.info("[{}] Scan abort due to user code",
OpScan.this.cursor);
+ callback.scanComplete(lastSeenPosition,
ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx);
+ return;
}
}
searchPosition = ledger.getPositionAfterN(lastPositionForBatch, 1,
PositionBound.startExcluded);
if (log.isDebugEnabled()) {
- log.debug("readEntryComplete {} at {} next is {}",
lastPositionForBatch, searchPosition);
+ log.debug("[{}] readEntryComplete at {} next is {}",
OpScan.this.cursor, lastPositionForBatch,
+ searchPosition);
}
if (searchPosition.compareTo(lastPositionForBatch) == 0) {
@@ -117,12 +111,12 @@ class OpScan implements ReadEntriesCallback {
public void find() {
if (remainingEntries.get() <= 0) {
- log.warn("[{}] Scan abort after reading too many entries",
OpScan.this.cursor);
+ log.info("[{}] Scan abort after reading too many entries",
OpScan.this.cursor);
callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED,
OpScan.this.ctx);
return;
}
if (System.currentTimeMillis() - startTime > timeOutMs) {
- log.warn("[{}] Scan abort after hitting the deadline",
OpScan.this.cursor);
+ log.info("[{}] Scan abort after hitting the deadline",
OpScan.this.cursor);
callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED,
OpScan.this.ctx);
return;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
index acea9132049..4425436954a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.admin;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import java.util.ArrayList;
import java.util.List;
@@ -27,10 +28,12 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.commons.lang3.RandomUtils;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.SubscriptionType;
@@ -50,6 +53,12 @@ public class AnalyzeBacklogSubscriptionTest extends
ProducerConsumerBase {
producerBaseSetup();
}
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ conf.setDispatcherMaxReadBatchSize(10);
+ }
+
@AfterMethod(alwaysRun = true)
@Override
public void cleanup() throws Exception {
@@ -189,4 +198,190 @@ public class AnalyzeBacklogSubscriptionTest extends
ProducerConsumerBase {
assertEquals(0, analyzeSubscriptionBacklogResult.getEntries());
}
+ @Test
+ public void analyzeBacklogServerReturnFalseAbortedFlagWithoutLoop() throws
Exception {
+ int serverSubscriptionBacklogScanMaxEntries = 20;
+
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
+
+ String topic =
"persistent://my-property/my-ns/analyze-backlog-server-return-false-aborted-flag-without-loop";
+ String subName = "sub-1";
+ int numMessages = 10;
+
+ // Test server returns false aborted flag.
+ List<MessageId> messageIds = clientSideLoopAnalyzeBacklogSetup(topic,
subName, numMessages);
+
+ verifyClientSideLoopBacklog(topic, subName, numMessages - 1,
numMessages, messageIds.get(0),
+ messageIds.get(numMessages - 1));
+ }
+
+ @Test
+ public void analyzeBacklogMaxEntriesExceedWithoutLoop() throws Exception {
+ int serverSubscriptionBacklogScanMaxEntries = 20;
+
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
+
+ String topic =
"persistent://my-property/my-ns/analyze-backlog-max-entries-exceed-without-loop";
+ String subName = "sub-1";
+ int numMessages = 25;
+
+ // Test backlogScanMaxEntries(client side) <=
subscriptionBacklogScanMaxEntries(server side), but server
+ // returns true aborted flag. Server dispatcherMaxReadBatchSize is set
to 10.
+ List<MessageId> messageIds = clientSideLoopAnalyzeBacklogSetup(topic,
subName, numMessages);
+
+ verifyClientSideLoopBacklog(topic, subName,
serverSubscriptionBacklogScanMaxEntries - 1,
+ serverSubscriptionBacklogScanMaxEntries, messageIds.get(0),
+ messageIds.get(serverSubscriptionBacklogScanMaxEntries - 1));
+
+ }
+
+ @Test
+ public void analyzeBacklogServerReturnFalseAbortedFlagWithLoop() throws
Exception {
+ int serverSubscriptionBacklogScanMaxEntries = 20;
+
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
+
+ String topic =
"persistent://my-property/my-ns/analyze-backlog-server-return-false-aborted-flag-with-loop";
+ String subName = "sub-1";
+ int numMessages = 45;
+
+ // Test client side loop: backlogScanMaxEntries >
subscriptionBacklogScanMaxEntries, the loop termination
+ // condition is that server returns false aborted flag.
+ List<MessageId> messageIds = clientSideLoopAnalyzeBacklogSetup(topic,
subName, numMessages);
+
+ verifyClientSideLoopBacklog(topic, subName, numMessages, numMessages,
messageIds.get(0),
+ messageIds.get(numMessages - 1));
+ }
+
+ @Test
+ public void analyzeBacklogMaxEntriesExceedWithLoop() throws Exception {
+ int serverSubscriptionBacklogScanMaxEntries = 15;
+
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
+
+ String topic =
"persistent://my-property/my-ns/analyze-backlog-max-entries-exceed-with-loop";
+ String subName = "sub-1";
+ int numMessages = 55;
+ int backlogScanMaxEntries = 40;
+
+ // Test client side loop: backlogScanMaxEntries >
subscriptionBacklogScanMaxEntries, the loop termination
+ // condition is that total entries exceeds backlogScanMaxEntries.
+ // Server dispatcherMaxReadBatchSize is set to 10.
+ List<MessageId> messageIds = clientSideLoopAnalyzeBacklogSetup(topic,
subName, numMessages);
+
+ // Broker returns 15 + 15 + 15 = 45 entries.
+ int expectedEntries = (backlogScanMaxEntries /
serverSubscriptionBacklogScanMaxEntries + 1)
+ * serverSubscriptionBacklogScanMaxEntries;
+ verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries,
expectedEntries, messageIds.get(0),
+ messageIds.get(expectedEntries - 1));
+ }
+
+ @Test
+ public void analyzeBacklogWithTopicUnload() throws Exception {
+ int serverSubscriptionBacklogScanMaxEntries = 10;
+
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
+
+ String topic =
"persistent://my-property/my-ns/analyze-backlog-with-topic-unload";
+ String subName = "sub-1";
+ int numMessages = 35;
+
+ admin.topics().createSubscription(topic, subName, MessageId.latest);
+
+ assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1"));
+ verifyBacklog(topic, subName, 0, 0);
+
+ // Test client side loop with topic unload. Use sync send method here
to avoid potential message duplication.
+ @Cleanup Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).enableBatching(false).create();
+ List<MessageId> messageIds = new ArrayList<>();
+ for (int i = 0; i < numMessages; i++) {
+ MessageId messageId = producer.send(("test-" + i).getBytes());
+ messageIds.add(messageId);
+ if (RandomUtils.secure().randomBoolean()) {
+ admin.topics().unload(topic);
+ }
+ }
+
+ verifyClientSideLoopBacklog(topic, subName, numMessages, numMessages,
messageIds.get(0),
+ messageIds.get(numMessages - 1));
+ }
+
+ @Test
+ public void analyzeBacklogWithIndividualAck() throws Exception {
+ int serverSubscriptionBacklogScanMaxEntries = 20;
+
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
+
+ String topic =
"persistent://my-property/my-ns/analyze-backlog-with-individual-ack";
+ String subName = "sub-1";
+ int messages = 55;
+
+ // Test client side loop with individual ack.
+ List<MessageId> messageIds = clientSideLoopAnalyzeBacklogSetup(topic,
subName, messages);
+
+ // We want to wait for the server to process acks, in order to not
have a flaky test.
+ @Cleanup Consumer<byte[]> consumer =
+
pulsarClient.newConsumer().topic(topic).isAckReceiptEnabled(true).subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared).subscribe();
+
+ // Individual ack message2.
+ Message<byte[]> message1 = consumer.receive();
+ Message<byte[]> message2 = consumer.receive();
+ consumer.acknowledge(message2);
+
+ int backlogScanMaxEntries = 20;
+ verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries,
backlogScanMaxEntries, messageIds.get(0),
+ messageIds.get(backlogScanMaxEntries));
+
+ // Ack message1.
+ consumer.acknowledge(message1);
+ verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries,
backlogScanMaxEntries, messageIds.get(2),
+ messageIds.get(backlogScanMaxEntries + 1));
+
+ // Ack all messages.
+ for (int i = 2; i < messages; i++) {
+ Message<byte[]> message = consumer.receive();
+ consumer.acknowledge(message);
+ }
+
+ verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, 0,
null, null);
+ }
+
+ private List<MessageId> clientSideLoopAnalyzeBacklogSetup(String topic,
String subName, int numMessages)
+ throws Exception {
+ admin.topics().createSubscription(topic, subName, MessageId.latest);
+
+ assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1"));
+ verifyClientSideLoopBacklog(topic, subName, -1, 0, null, null);
+
+ @Cleanup Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).enableBatching(false).create();
+ List<CompletableFuture<MessageId>> futures = new ArrayList<>();
+ for (int i = 0; i < numMessages; i++) {
+ CompletableFuture<MessageId> future = producer.sendAsync(("test-"
+ i).getBytes());
+ futures.add(future);
+ }
+ FutureUtil.waitForAll(futures).get();
+ return futures.stream().map(CompletableFuture::join).toList();
+ }
+
+ private void verifyClientSideLoopBacklog(String topic, String subName, int
backlogMaxScanEntries,
+ int expectedEntries, MessageId
firstMessageId, MessageId lastMessageId)
+ throws Exception {
+ AnalyzeSubscriptionBacklogResult backlogResult =
+ admin.topics().analyzeSubscriptionBacklog(topic, subName,
Optional.empty(), backlogMaxScanEntries);
+
+ assertEquals(backlogResult.getEntries(), expectedEntries);
+ assertEquals(backlogResult.getMessages(), expectedEntries);
+
+ if (firstMessageId == null) {
+ assertNull(backlogResult.getFirstMessageId());
+ } else {
+ MessageIdAdv firstMessageIdAdv = (MessageIdAdv) firstMessageId;
+ assertEquals(backlogResult.getFirstMessageId(),
+ firstMessageIdAdv.getLedgerId() + ":" +
firstMessageIdAdv.getEntryId());
+ }
+
+ if (lastMessageId == null) {
+ assertNull(backlogResult.getLastMessageId());
+ } else {
+ MessageIdAdv lastMessageIdAdv = (MessageIdAdv) lastMessageId;
+ assertEquals(backlogResult.getLastMessageId(),
+ lastMessageIdAdv.getLedgerId() + ":" +
lastMessageIdAdv.getEntryId());
+ }
+ }
+
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index fbcf0b4a07b..e68be8fd2e8 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotAllowedException;
import
org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
@@ -2220,21 +2221,134 @@ public interface Topics {
* This is a potentially expensive operation, as it requires
* to read the messages from storage.
* This function takes into consideration batch messages
- * and also Subscription filters.
+ * and also Subscription filters. <br/>
+ * See also: {@link #analyzeSubscriptionBacklogAsync(String, String,
Optional, long)}
* @param topic
* Topic name
* @param subscriptionName
* the subscription
* @param startPosition
* the position to start the scan from (empty means the last
processed message)
+ * @param backlogScanMaxEntries
+ * the maximum number of backlog entries the client will scan
before terminating its loop
* @return an accurate analysis of the backlog
* @throws PulsarAdminException
* Unexpected error
*/
+ AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic,
String subscriptionName,
+
Optional<MessageId> startPosition,
+ long
backlogScanMaxEntries) throws PulsarAdminException;
+
+ /**
+ * Analyze subscription backlog.
+ * This is a potentially expensive operation, as it requires
+ * to read the messages from storage.
+ * This function takes into consideration batch messages
+ * and also Subscription filters. <br/>
+ * See also: {@link #analyzeSubscriptionBacklogAsync(String, String,
Optional, Predicate)} <br/>
+ *
+ * @param topic
+ * Topic name
+ * @param subscriptionName
+ * the subscription
+ * @param startPosition
+ * the position to start the scan from (empty means the last
processed message)
+ * @param terminatePredicate
+ * the predicate to determine whether to terminate the loop
+ * @return an accurate analysis of the backlog
+ */
+ AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic,
String subscriptionName,
+ Optional<MessageId>
startPosition,
+
Predicate<AnalyzeSubscriptionBacklogResult> terminatePredicate)
+ throws PulsarAdminException;
+
+ /**
+ * Analyze subscription backlog.
+ * This is a potentially expensive operation, as it requires
+ * to read the messages from storage.
+ * This function takes into consideration batch messages
+ * and also Subscription filters.
+ * @param topic
+ * Topic name
+ * @param subscriptionName
+ * the subscription
+ * @param startPosition
+ * the position to start the scan from (empty means the last
processed message)
+ * @return an accurate analysis of the backlog
+ */
CompletableFuture<AnalyzeSubscriptionBacklogResult>
analyzeSubscriptionBacklogAsync(String topic,
String subscriptionName,
Optional<MessageId> startPosition);
+ /**
+ * Analyze subscription backlog.
+ * This is a potentially expensive operation, as it requires
+ * to read the messages from storage.
+ * This function takes into consideration batch messages
+ * and also Subscription filters.
+ *
+ *<p>
+ * What's the purpose of this overloaded method? <br/>
+ * There are broker side configurable maximum limits how many entries will
be read and how long the scanning can
+ * take. The subscriptionBacklogScanMaxTimeMs (default 2 minutes) and
subscriptionBacklogScanMaxEntries
+ * (default 10000) control this behavior. <br/>
+ * Increasing these settings is possible. However, it's possible that the
HTTP request times out (also idle timeout
+ * in NAT/firewall etc.) before the command completes so increasing the
limits might not be useful beyond a few
+ * minutes.
+ *</p>
+ *
+ *<p>
+ * How does this method work? <br/>
+ * 1. Add a new parameter backlogScanMaxEntries in client side method to
control the client-side loop termination
+ * condition. <br/>
+ * 2. If subscriptionBacklogScanMaxEntries(server side) >=
backlogScanMaxEntries(client side), then
+ * backlogScanMaxEntries parameter will take no effect. <br/>
+ * 3. If subscriptionBacklogScanMaxEntries < backlogScanMaxEntries, the
client will call analyze-backlog method in
+ * a loop until server return ScanOutcome.COMPLETED or the total
entries exceeds backlogScanMaxEntries. <br/>
+ * 4. This means that backlogScanMaxEntries cannot be used to precisely
control the number of entries scanned by
+ * the server, it only serves to determine when the loop should
terminate. <br/>
+ * 5. With this method, the server can reduce the values of the two
parameters subscriptionBacklogScanMaxTimeMs and
+ * subscriptionBacklogScanMaxEntries, so user can retrieve the desired
number of backlog entries through
+ * client-side looping.
+ *</p>
+ * @param topic
+ * Topic name
+ * @param subscriptionName
+ * the subscription
+ * @param startPosition
+ * the position to start the scan from (empty means the last
processed message)
+ * @param backlogScanMaxEntries
+ * the maximum number of backlog entries the client will scan
before terminating its loop
+ * @return an accurate analysis of the backlog
+ */
+ CompletableFuture<AnalyzeSubscriptionBacklogResult>
analyzeSubscriptionBacklogAsync(String topic,
+
String subscriptionName,
+
Optional<MessageId> startPosition,
+
long backlogScanMaxEntries);
+
+ /**
+ * Analyze subscription backlog.
+ * This is a potentially expensive operation, as it requires
+ * to read the messages from storage.
+ * This function takes into consideration batch messages
+ * and also Subscription filters. <br/>
+ * See also: {@link #analyzeSubscriptionBacklogAsync(String, String,
Optional, long)} <br/>
+ * User can control the loop termination condition by terminatePredicate.
+ *
+ * @param topic
+ * Topic name
+ * @param subscriptionName
+ * the subscription
+ * @param startPosition
+ * the position to start the scan from (empty means the last
processed message)
+ * @param terminatePredicate
+ * the predicate to determine whether to terminate the loop
+ * @return an accurate analysis of the backlog
+ */
+ CompletableFuture<AnalyzeSubscriptionBacklogResult>
analyzeSubscriptionBacklogAsync(String topic,
+ String
subscriptionName, Optional<MessageId> startPosition,
+
Predicate<AnalyzeSubscriptionBacklogResult> terminatePredicate);
+
/**
* Get backlog size by a message ID.
* @param topic
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 11dd69a23ce..78b7329159c 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -34,6 +34,10 @@ import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.ws.rs.client.Entity;
@@ -44,6 +48,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.GetStatsOptions;
import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
@@ -1560,6 +1565,23 @@ public class TopicsImpl extends BaseResource implements
Topics {
return sync(() -> analyzeSubscriptionBacklogAsync(topic,
subscriptionName, startPosition));
}
+ @Override
+ public AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String
topic, String subscriptionName,
+
Optional<MessageId> startPosition,
+ long
backlogScanMaxEntries)
+ throws PulsarAdminException {
+ return sync(
+ () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName,
startPosition, backlogScanMaxEntries));
+ }
+
+ @Override
+ public AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String
topic, String subscriptionName,
+ Optional<MessageId>
startPosition,
+
Predicate<AnalyzeSubscriptionBacklogResult> terminatePredicate)
+ throws PulsarAdminException {
+ return sync(() -> analyzeSubscriptionBacklogAsync(topic,
subscriptionName, startPosition, terminatePredicate));
+ }
+
@Override
public CompletableFuture<AnalyzeSubscriptionBacklogResult>
analyzeSubscriptionBacklogAsync(String topic,
String subscriptionName,
@@ -1591,6 +1613,96 @@ public class TopicsImpl extends BaseResource implements
Topics {
return future;
}
+ @Override
+ public CompletableFuture<AnalyzeSubscriptionBacklogResult>
analyzeSubscriptionBacklogAsync(String topic,
+
String subscriptionName,
+
Optional<MessageId> startPosition,
+
long backlogScanMaxEntries) {
+ return analyzeSubscriptionBacklogAsync(topic, subscriptionName,
startPosition,
+ (backlogResult) -> backlogResult.getEntries() >=
backlogScanMaxEntries);
+ }
+
+ @Override
+ public CompletableFuture<AnalyzeSubscriptionBacklogResult>
analyzeSubscriptionBacklogAsync(String topic,
+ String
subscriptionName, Optional<MessageId> startPosition,
+
Predicate<AnalyzeSubscriptionBacklogResult> terminatePredicate) {
+ final CompletableFuture<AnalyzeSubscriptionBacklogResult> future = new
CompletableFuture<>();
+ AtomicReference<AnalyzeSubscriptionBacklogResult> resultRef = new
AtomicReference<>();
+ int partitionIndex = TopicName.get(topic).getPartitionIndex();
+ AtomicReference<Optional<MessageId>> startPositionRef = new
AtomicReference<>(startPosition);
+
+ Supplier<CompletableFuture<AnalyzeSubscriptionBacklogResult>>
resultSupplier =
+ () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName,
startPositionRef.get());
+ BiConsumer<AnalyzeSubscriptionBacklogResult, Throwable> completeAction
= new BiConsumer<>() {
+ @Override
+ public void accept(AnalyzeSubscriptionBacklogResult currentResult,
Throwable throwable) {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ return;
+ }
+
+ AnalyzeSubscriptionBacklogResult mergedResult =
mergeBacklogResults(currentResult, resultRef.get());
+ resultRef.set(mergedResult);
+ if (!mergedResult.isAborted() ||
terminatePredicate.test(mergedResult)) {
+ future.complete(mergedResult);
+ return;
+ }
+
+ // In analyze-backlog, we treat 0 entries or null
lastMessageId as scan completed for mere safety.
+ // 0 entries or a null lastMessageId indicates no entries were
scanned.
+ if (currentResult.getEntries() <= 0 ||
StringUtils.isBlank(currentResult.getLastMessageId())) {
+ log.info("[{}][{}] complete scan due total entry <= 0 or
last message id is blank, "
+ + "start position is: {}, current result: {}",
topic, subscriptionName,
+ startPositionRef.get(), currentResult);
+ future.complete(mergedResult);
+ return;
+ }
+
+ String[] messageIdSplits =
mergedResult.getLastMessageId().split(":");
+ MessageIdImpl nextScanMessageId =
+ new MessageIdImpl(Long.parseLong(messageIdSplits[0]),
Long.parseLong(messageIdSplits[1]) + 1,
+ partitionIndex);
+ startPositionRef.set(Optional.of(nextScanMessageId));
+
+ resultSupplier.get().whenComplete(this);
+ }
+ };
+
+ resultSupplier.get().whenComplete(completeAction);
+ return future;
+ }
+
+ private AnalyzeSubscriptionBacklogResult
mergeBacklogResults(AnalyzeSubscriptionBacklogResult current,
+
AnalyzeSubscriptionBacklogResult previous) {
+ if (previous == null) {
+ return current;
+ }
+
+ AnalyzeSubscriptionBacklogResult mergedRes = new
AnalyzeSubscriptionBacklogResult();
+ mergedRes.setEntries(current.getEntries() + previous.getEntries());
+ mergedRes.setMessages(current.getMessages() + previous.getMessages());
+ mergedRes.setMarkerMessages(current.getMarkerMessages() +
previous.getMarkerMessages());
+
+ mergedRes.setFilterAcceptedEntries(current.getFilterAcceptedEntries()
+ previous.getFilterAcceptedEntries());
+ mergedRes.setFilterRejectedEntries(current.getFilterRejectedEntries()
+ previous.getFilterRejectedEntries());
+ mergedRes.setFilterRescheduledEntries(
+ current.getFilterRescheduledEntries() +
previous.getFilterRescheduledEntries());
+
+
mergedRes.setFilterAcceptedMessages(current.getFilterAcceptedMessages() +
previous.getFilterAcceptedMessages());
+
mergedRes.setFilterRejectedMessages(current.getFilterRejectedMessages() +
previous.getFilterRejectedMessages());
+ mergedRes.setFilterRescheduledMessages(
+ current.getFilterRescheduledMessages() +
previous.getFilterRescheduledMessages());
+
+ mergedRes.setAborted(current.isAborted());
+ mergedRes.setFirstMessageId(previous.getFirstMessageId());
+ String lastMessageId = current.getLastMessageId();
+ if (StringUtils.isNotBlank(lastMessageId)) {
+ mergedRes.setLastMessageId(lastMessageId);
+ }
+
+ return mergedRes;
+ }
+
@Override
public Long getBacklogSizeByMessageId(String topic, MessageId messageId)
throws PulsarAdminException {