oneby-wang commented on code in PR #25126:
URL: https://github.com/apache/pulsar/pull/25126#discussion_r2710620696
##########
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java:
##########
@@ -2999,20 +3000,86 @@ private class AnalyzeBacklog extends CliCommand {
private String subName;
@Option(names = { "--position",
- "-p" }, description = "message position to start the scan from
(ledgerId:entryId)", required = false)
+ "-p" }, description = "Message position to start the scan from
(ledgerId:entryId)", required = false)
private String messagePosition;
+ @Option(names = {"--backlog-scan-max-entries", "-b"}, description =
+ "The maximum number of backlog entries the client will scan
before terminating its loop",
+ required = false)
+ private long backlogScanMaxEntries = -1;
+
+ @Option(names = {"--quiet", "-q"}, description = "Disable
analyze-backlog progress reporting", required = false)
+ private boolean quiet = false;
+
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(topicName);
Optional<MessageId> startPosition = Optional.empty();
+ int partitionIndex =
TopicName.get(persistentTopic).getPartitionIndex();
if (isNotBlank(messagePosition)) {
- int partitionIndex =
TopicName.get(persistentTopic).getPartitionIndex();
MessageId messageId = validateMessageIdString(messagePosition,
partitionIndex);
startPosition = Optional.of(messageId);
}
- print(getTopics().analyzeSubscriptionBacklog(persistentTopic,
subName, startPosition));
+ AnalyzeSubscriptionBacklogResult mergedResult = null;
+ while (true) {
+ AnalyzeSubscriptionBacklogResult currentResult =
+
getTopics().analyzeSubscriptionBacklog(persistentTopic, subName, startPosition);
+ if (mergedResult == null) {
+ mergedResult = currentResult;
+ } else {
+ mergedResult.setEntries(mergedResult.getEntries() +
currentResult.getEntries());
+ mergedResult.setMessages(mergedResult.getMessages() +
currentResult.getMessages());
+ mergedResult.setMarkerMessages(
+ mergedResult.getMarkerMessages() +
currentResult.getMarkerMessages());
+
+ mergedResult.setFilterRejectedEntries(
+ mergedResult.getFilterRejectedEntries() +
currentResult.getFilterRejectedEntries());
+ mergedResult.setFilterAcceptedEntries(
+ mergedResult.getFilterAcceptedEntries() +
currentResult.getFilterAcceptedEntries());
+ mergedResult.setFilterRescheduledEntries(
+ mergedResult.getFilterRescheduledEntries() +
currentResult.getFilterRescheduledEntries());
+
+ mergedResult.setFilterRejectedMessages(
+ mergedResult.getFilterRejectedMessages() +
currentResult.getFilterRejectedMessages());
+ mergedResult.setFilterAcceptedMessages(
+ mergedResult.getFilterAcceptedMessages() +
currentResult.getFilterAcceptedMessages());
+ mergedResult.setFilterRescheduledMessages(
+ mergedResult.getFilterRescheduledMessages() +
currentResult.getFilterRescheduledMessages());
+
+ mergedResult.setAborted(currentResult.isAborted());
+
mergedResult.setLastMessageId(currentResult.getLastMessageId());
+ }
+
+ if (!mergedResult.isAborted() || mergedResult.getEntries() >=
backlogScanMaxEntries) {
+ break;
+ }
+
+ // To avoid infinite loops, we ensure the entry count is
incremented after each loop.
+ if (currentResult.getEntries() <= 0) {
+ print("Incorrect total entry count returned from server");
+ return;
+ }
+
+ // In analyze-backlog, lastMessageId is null only when: total
entries is 0,
+ // with false aborted flag returned.
+ if (StringUtils.isBlank(mergedResult.getLastMessageId())) {
+ print("Incorrect last message id returned from server");
+ return;
+ }
+
+ if (!quiet) {
+ print("Analyze backlog progress, scanned entries: " +
mergedResult.getEntries()
+ + ", scan max entries: " + backlogScanMaxEntries);
Review Comment:
There are two `ObjectMapper` in `CliCommand`, so we should use the one
without pretty printer to print json without linefeeds, and then pass the json
string to `print()` method. Is my understanding correct?
https://github.com/apache/pulsar/blob/85625e0f100479dd95fb1311aeb52411b6b0a25d/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java#L140-L141
https://github.com/apache/pulsar/blob/85625e0f100479dd95fb1311aeb52411b6b0a25d/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java#L120-L130
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]