lhotari commented on code in PR #25126:
URL: https://github.com/apache/pulsar/pull/25126#discussion_r2707475003


##########
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java:
##########
@@ -2999,20 +3000,86 @@ private class AnalyzeBacklog extends CliCommand {
         private String subName;
 
         @Option(names = { "--position",
-                "-p" }, description = "message position to start the scan from 
(ledgerId:entryId)", required = false)
+                "-p" }, description = "Message position to start the scan from 
(ledgerId:entryId)", required = false)
         private String messagePosition;
 
+        @Option(names = {"--backlog-scan-max-entries", "-b"}, description =
+                "The maximum number of backlog entries the client will scan 
before terminating its loop",
+                required = false)
+        private long backlogScanMaxEntries = -1;
+
+        @Option(names = {"--quiet", "-q"}, description = "Disable 
analyze-backlog progress reporting", required = false)
+        private boolean quiet = false;
+
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(topicName);
             Optional<MessageId> startPosition = Optional.empty();
+            int partitionIndex = 
TopicName.get(persistentTopic).getPartitionIndex();
             if (isNotBlank(messagePosition)) {
-                int partitionIndex = 
TopicName.get(persistentTopic).getPartitionIndex();
                 MessageId messageId = validateMessageIdString(messagePosition, 
partitionIndex);
                 startPosition = Optional.of(messageId);
             }
-            print(getTopics().analyzeSubscriptionBacklog(persistentTopic, 
subName, startPosition));
 
+            AnalyzeSubscriptionBacklogResult mergedResult = null;
+            while (true) {
+                AnalyzeSubscriptionBacklogResult currentResult =
+                        
getTopics().analyzeSubscriptionBacklog(persistentTopic, subName, startPosition);
+                if (mergedResult == null) {
+                    mergedResult = currentResult;
+                } else {
+                    mergedResult.setEntries(mergedResult.getEntries() + 
currentResult.getEntries());
+                    mergedResult.setMessages(mergedResult.getMessages() + 
currentResult.getMessages());
+                    mergedResult.setMarkerMessages(
+                            mergedResult.getMarkerMessages() + 
currentResult.getMarkerMessages());
+
+                    mergedResult.setFilterRejectedEntries(
+                            mergedResult.getFilterRejectedEntries() + 
currentResult.getFilterRejectedEntries());
+                    mergedResult.setFilterAcceptedEntries(
+                            mergedResult.getFilterAcceptedEntries() + 
currentResult.getFilterAcceptedEntries());
+                    mergedResult.setFilterRescheduledEntries(
+                            mergedResult.getFilterRescheduledEntries() + 
currentResult.getFilterRescheduledEntries());
+
+                    mergedResult.setFilterRejectedMessages(
+                            mergedResult.getFilterRejectedMessages() + 
currentResult.getFilterRejectedMessages());
+                    mergedResult.setFilterAcceptedMessages(
+                            mergedResult.getFilterAcceptedMessages() + 
currentResult.getFilterAcceptedMessages());
+                    mergedResult.setFilterRescheduledMessages(
+                            mergedResult.getFilterRescheduledMessages() + 
currentResult.getFilterRescheduledMessages());
+
+                    mergedResult.setAborted(currentResult.isAborted());
+                    
mergedResult.setLastMessageId(currentResult.getLastMessageId());
+                }
+
+                if (!mergedResult.isAborted() || mergedResult.getEntries() >= 
backlogScanMaxEntries) {
+                    break;
+                }
+
+                // To avoid infinite loops, we ensure the entry count is 
incremented after each loop.
+                if (currentResult.getEntries() <= 0) {
+                    print("Incorrect total entry count returned from server");
+                    return;
+                }
+
+                // In analyze-backlog, lastMessageId is null only when: total 
entries is 0,
+                // with false aborted flag returned.
+                if (StringUtils.isBlank(mergedResult.getLastMessageId())) {
+                    print("Incorrect last message id returned from server");
+                    return;
+                }
+
+                if (!quiet) {
+                    print("Analyze backlog progress, scanned entries: " + 
mergedResult.getEntries()
+                            + ", scan max entries: " + backlogScanMaxEntries);

Review Comment:
   It would be useful to print the current value of mergedResult in json format 
without linefeeds so that the CLI output can be parsed as 
[NDJSON](https://en.wikipedia.org/wiki/JSON_streaming#Newline-delimited_JSON).



##########
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java:
##########
@@ -2999,20 +3000,86 @@ private class AnalyzeBacklog extends CliCommand {
         private String subName;
 
         @Option(names = { "--position",
-                "-p" }, description = "message position to start the scan from 
(ledgerId:entryId)", required = false)
+                "-p" }, description = "Message position to start the scan from 
(ledgerId:entryId)", required = false)
         private String messagePosition;
 
+        @Option(names = {"--backlog-scan-max-entries", "-b"}, description =
+                "The maximum number of backlog entries the client will scan 
before terminating its loop",
+                required = false)
+        private long backlogScanMaxEntries = -1;
+
+        @Option(names = {"--quiet", "-q"}, description = "Disable 
analyze-backlog progress reporting", required = false)
+        private boolean quiet = false;
+
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(topicName);
             Optional<MessageId> startPosition = Optional.empty();
+            int partitionIndex = 
TopicName.get(persistentTopic).getPartitionIndex();
             if (isNotBlank(messagePosition)) {
-                int partitionIndex = 
TopicName.get(persistentTopic).getPartitionIndex();
                 MessageId messageId = validateMessageIdString(messagePosition, 
partitionIndex);
                 startPosition = Optional.of(messageId);
             }
-            print(getTopics().analyzeSubscriptionBacklog(persistentTopic, 
subName, startPosition));
 
+            AnalyzeSubscriptionBacklogResult mergedResult = null;
+            while (true) {
+                AnalyzeSubscriptionBacklogResult currentResult =
+                        
getTopics().analyzeSubscriptionBacklog(persistentTopic, subName, startPosition);
+                if (mergedResult == null) {
+                    mergedResult = currentResult;
+                } else {
+                    mergedResult.setEntries(mergedResult.getEntries() + 
currentResult.getEntries());
+                    mergedResult.setMessages(mergedResult.getMessages() + 
currentResult.getMessages());
+                    mergedResult.setMarkerMessages(
+                            mergedResult.getMarkerMessages() + 
currentResult.getMarkerMessages());
+
+                    mergedResult.setFilterRejectedEntries(
+                            mergedResult.getFilterRejectedEntries() + 
currentResult.getFilterRejectedEntries());
+                    mergedResult.setFilterAcceptedEntries(
+                            mergedResult.getFilterAcceptedEntries() + 
currentResult.getFilterAcceptedEntries());
+                    mergedResult.setFilterRescheduledEntries(
+                            mergedResult.getFilterRescheduledEntries() + 
currentResult.getFilterRescheduledEntries());
+
+                    mergedResult.setFilterRejectedMessages(
+                            mergedResult.getFilterRejectedMessages() + 
currentResult.getFilterRejectedMessages());
+                    mergedResult.setFilterAcceptedMessages(
+                            mergedResult.getFilterAcceptedMessages() + 
currentResult.getFilterAcceptedMessages());
+                    mergedResult.setFilterRescheduledMessages(
+                            mergedResult.getFilterRescheduledMessages() + 
currentResult.getFilterRescheduledMessages());
+
+                    mergedResult.setAborted(currentResult.isAborted());
+                    
mergedResult.setLastMessageId(currentResult.getLastMessageId());
+                }
+
+                if (!mergedResult.isAborted() || mergedResult.getEntries() >= 
backlogScanMaxEntries) {
+                    break;
+                }
+
+                // To avoid infinite loops, we ensure the entry count is 
incremented after each loop.
+                if (currentResult.getEntries() <= 0) {
+                    print("Incorrect total entry count returned from server");
+                    return;
+                }
+
+                // In analyze-backlog, lastMessageId is null only when: total 
entries is 0,
+                // with false aborted flag returned.
+                if (StringUtils.isBlank(mergedResult.getLastMessageId())) {
+                    print("Incorrect last message id returned from server");
+                    return;
+                }
+
+                if (!quiet) {
+                    print("Analyze backlog progress, scanned entries: " + 
mergedResult.getEntries()
+                            + ", scan max entries: " + backlogScanMaxEntries);
+                }
+
+                String[] messageIdSplits = 
mergedResult.getLastMessageId().split(":");
+                MessageIdImpl nextScanMessageId =
+                        new MessageIdImpl(Long.parseLong(messageIdSplits[0]), 
Long.parseLong(messageIdSplits[1]) + 1,
+                                partitionIndex);

Review Comment:
   Instead of parsing the message Id String, it would be better to use 
`org.apache.pulsar.client.api.MessageIdAdv` interface to get the ledgerId and 
entryId.



##########
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java:
##########
@@ -2999,20 +3000,86 @@ private class AnalyzeBacklog extends CliCommand {
         private String subName;
 
         @Option(names = { "--position",
-                "-p" }, description = "message position to start the scan from 
(ledgerId:entryId)", required = false)
+                "-p" }, description = "Message position to start the scan from 
(ledgerId:entryId)", required = false)
         private String messagePosition;
 
+        @Option(names = {"--backlog-scan-max-entries", "-b"}, description =
+                "The maximum number of backlog entries the client will scan 
before terminating its loop",
+                required = false)
+        private long backlogScanMaxEntries = -1;
+
+        @Option(names = {"--quiet", "-q"}, description = "Disable 
analyze-backlog progress reporting", required = false)
+        private boolean quiet = false;
+
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(topicName);
             Optional<MessageId> startPosition = Optional.empty();
+            int partitionIndex = 
TopicName.get(persistentTopic).getPartitionIndex();
             if (isNotBlank(messagePosition)) {
-                int partitionIndex = 
TopicName.get(persistentTopic).getPartitionIndex();
                 MessageId messageId = validateMessageIdString(messagePosition, 
partitionIndex);
                 startPosition = Optional.of(messageId);
             }
-            print(getTopics().analyzeSubscriptionBacklog(persistentTopic, 
subName, startPosition));
 
+            AnalyzeSubscriptionBacklogResult mergedResult = null;
+            while (true) {
+                AnalyzeSubscriptionBacklogResult currentResult =
+                        
getTopics().analyzeSubscriptionBacklog(persistentTopic, subName, startPosition);
+                if (mergedResult == null) {
+                    mergedResult = currentResult;
+                } else {
+                    mergedResult.setEntries(mergedResult.getEntries() + 
currentResult.getEntries());
+                    mergedResult.setMessages(mergedResult.getMessages() + 
currentResult.getMessages());
+                    mergedResult.setMarkerMessages(
+                            mergedResult.getMarkerMessages() + 
currentResult.getMarkerMessages());
+
+                    mergedResult.setFilterRejectedEntries(
+                            mergedResult.getFilterRejectedEntries() + 
currentResult.getFilterRejectedEntries());
+                    mergedResult.setFilterAcceptedEntries(
+                            mergedResult.getFilterAcceptedEntries() + 
currentResult.getFilterAcceptedEntries());
+                    mergedResult.setFilterRescheduledEntries(
+                            mergedResult.getFilterRescheduledEntries() + 
currentResult.getFilterRescheduledEntries());
+
+                    mergedResult.setFilterRejectedMessages(
+                            mergedResult.getFilterRejectedMessages() + 
currentResult.getFilterRejectedMessages());
+                    mergedResult.setFilterAcceptedMessages(
+                            mergedResult.getFilterAcceptedMessages() + 
currentResult.getFilterAcceptedMessages());
+                    mergedResult.setFilterRescheduledMessages(
+                            mergedResult.getFilterRescheduledMessages() + 
currentResult.getFilterRescheduledMessages());
+
+                    mergedResult.setAborted(currentResult.isAborted());
+                    
mergedResult.setLastMessageId(currentResult.getLastMessageId());
+                }
+
+                if (!mergedResult.isAborted() || mergedResult.getEntries() >= 
backlogScanMaxEntries) {
+                    break;
+                }
+
+                // To avoid infinite loops, we ensure the entry count is 
incremented after each loop.
+                if (currentResult.getEntries() <= 0) {
+                    print("Incorrect total entry count returned from server");
+                    return;
+                }
+
+                // In analyze-backlog, lastMessageId is null only when: total 
entries is 0,
+                // with false aborted flag returned.
+                if (StringUtils.isBlank(mergedResult.getLastMessageId())) {
+                    print("Incorrect last message id returned from server");
+                    return;
+                }
+
+                if (!quiet) {
+                    print("Analyze backlog progress, scanned entries: " + 
mergedResult.getEntries()
+                            + ", scan max entries: " + backlogScanMaxEntries);
+                }
+
+                String[] messageIdSplits = 
mergedResult.getLastMessageId().split(":");
+                MessageIdImpl nextScanMessageId =
+                        new MessageIdImpl(Long.parseLong(messageIdSplits[0]), 
Long.parseLong(messageIdSplits[1]) + 1,
+                                partitionIndex);
+                startPosition = Optional.of(nextScanMessageId);
+            }
+            print(mergedResult);

Review Comment:
   output would have to be json format without linefeeds to make the CLI output 
parseable as NDJSON. The last line in the output would be the final result. 
Perhaps there could be a command line option to configure whether ndjson should 
be used since it's not pretty printed.



##########
pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java:
##########
@@ -2181,6 +2182,97 @@ public boolean matches(Long timestamp) {
 
     }
 
+    @Test
+    public void topicsAnalyzeBacklog() throws Exception {

Review Comment:
   A test without mocks, similar to the tests in #25127 would be useful in 
addition since it would serve as an integration test.



##########
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java:
##########
@@ -2999,20 +3000,86 @@ private class AnalyzeBacklog extends CliCommand {
         private String subName;
 
         @Option(names = { "--position",
-                "-p" }, description = "message position to start the scan from 
(ledgerId:entryId)", required = false)
+                "-p" }, description = "Message position to start the scan from 
(ledgerId:entryId)", required = false)
         private String messagePosition;
 
+        @Option(names = {"--backlog-scan-max-entries", "-b"}, description =
+                "The maximum number of backlog entries the client will scan 
before terminating its loop",
+                required = false)
+        private long backlogScanMaxEntries = -1;
+
+        @Option(names = {"--quiet", "-q"}, description = "Disable 
analyze-backlog progress reporting", required = false)
+        private boolean quiet = false;
+
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(topicName);
             Optional<MessageId> startPosition = Optional.empty();
+            int partitionIndex = 
TopicName.get(persistentTopic).getPartitionIndex();
             if (isNotBlank(messagePosition)) {
-                int partitionIndex = 
TopicName.get(persistentTopic).getPartitionIndex();
                 MessageId messageId = validateMessageIdString(messagePosition, 
partitionIndex);
                 startPosition = Optional.of(messageId);
             }
-            print(getTopics().analyzeSubscriptionBacklog(persistentTopic, 
subName, startPosition));
 
+            AnalyzeSubscriptionBacklogResult mergedResult = null;
+            while (true) {
+                AnalyzeSubscriptionBacklogResult currentResult =
+                        
getTopics().analyzeSubscriptionBacklog(persistentTopic, subName, startPosition);
+                if (mergedResult == null) {
+                    mergedResult = currentResult;
+                } else {
+                    mergedResult.setEntries(mergedResult.getEntries() + 
currentResult.getEntries());
+                    mergedResult.setMessages(mergedResult.getMessages() + 
currentResult.getMessages());
+                    mergedResult.setMarkerMessages(
+                            mergedResult.getMarkerMessages() + 
currentResult.getMarkerMessages());
+
+                    mergedResult.setFilterRejectedEntries(
+                            mergedResult.getFilterRejectedEntries() + 
currentResult.getFilterRejectedEntries());
+                    mergedResult.setFilterAcceptedEntries(
+                            mergedResult.getFilterAcceptedEntries() + 
currentResult.getFilterAcceptedEntries());
+                    mergedResult.setFilterRescheduledEntries(
+                            mergedResult.getFilterRescheduledEntries() + 
currentResult.getFilterRescheduledEntries());
+
+                    mergedResult.setFilterRejectedMessages(
+                            mergedResult.getFilterRejectedMessages() + 
currentResult.getFilterRejectedMessages());
+                    mergedResult.setFilterAcceptedMessages(
+                            mergedResult.getFilterAcceptedMessages() + 
currentResult.getFilterAcceptedMessages());
+                    mergedResult.setFilterRescheduledMessages(
+                            mergedResult.getFilterRescheduledMessages() + 
currentResult.getFilterRescheduledMessages());
+
+                    mergedResult.setAborted(currentResult.isAborted());
+                    
mergedResult.setLastMessageId(currentResult.getLastMessageId());
+                }
+
+                if (!mergedResult.isAborted() || mergedResult.getEntries() >= 
backlogScanMaxEntries) {

Review Comment:
   the `!mergedResult.isAborted()` part isn't correct.
   
   
https://github.com/apache/pulsar/blob/da0d11644f097ae657e79b2eb7835478d79ccd0e/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java#L119-L128
   
   The purpose of the CLI loop is to keep on iterating until the result is 
COMPLETED or the number of scanned entries exceeds `backlogScanMaxEntries`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to