oneby-wang commented on code in PR #25127:
URL: https://github.com/apache/pulsar/pull/25127#discussion_r2710612049


##########
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java:
##########
@@ -1591,6 +1604,94 @@ public void failed(Throwable throwable) {
         return future;
     }
 
+    @Override
+    public CompletableFuture<AnalyzeSubscriptionBacklogResult> 
analyzeSubscriptionBacklogAsync(String topic,
+                                                                               
String subscriptionName,
+                                                                               
Optional<MessageId> startPosition,
+                                                                               
long backlogScanMaxEntries) {
+        final CompletableFuture<AnalyzeSubscriptionBacklogResult> future = new 
CompletableFuture<>();
+        AtomicReference<AnalyzeSubscriptionBacklogResult> resultRef = new 
AtomicReference<>();
+        int partitionIndex = TopicName.get(topic).getPartitionIndex();
+        AtomicReference<Optional<MessageId>> startPositionRef = new 
AtomicReference<>(startPosition);
+
+        Supplier<CompletableFuture<AnalyzeSubscriptionBacklogResult>> 
resultSupplier =
+                () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, 
startPositionRef.get());
+        BiConsumer<AnalyzeSubscriptionBacklogResult, Throwable> completeAction 
= new BiConsumer<>() {
+            @Override
+            public void accept(AnalyzeSubscriptionBacklogResult currentResult, 
Throwable throwable) {
+                if (throwable != null) {
+                    future.completeExceptionally(throwable);
+                    return;
+                }
+
+                AnalyzeSubscriptionBacklogResult mergedResult = 
mergeBacklogResults(currentResult, resultRef.get());
+                resultRef.set(mergedResult);
+                if (!mergedResult.isAborted() || mergedResult.getEntries() >= 
backlogScanMaxEntries) {

Review Comment:
   > It seems that 
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
 isn't effective in the tests since some of the tests should have failed.
   
   I use 
`conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries)`
 to crontol the loop condition. 
   
   I ran all these tests locally, and they all passed(as I expected). Please 
let me know which unit tests are expected to fail.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to