eolivelli commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r920825256
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -512,6 +522,95 @@ public String getTypeString() {
return "Null";
}
+ @Override
+ public CompletableFuture<AnaliseBacklogResult> analiseBacklog() {
+
+ long start = System.currentTimeMillis();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}] Starting to analise backlog", topicName,
subName);
+ }
+
+ AtomicLong entries = new AtomicLong();
+ AtomicLong accepted = new AtomicLong();
+ AtomicLong rejected = new AtomicLong();
+ AtomicLong rescheduled = new AtomicLong();
+ AtomicLong messages = new AtomicLong();
+ AtomicLong acceptedMessages = new AtomicLong();
+ AtomicLong rejectedMessages = new AtomicLong();
+ AtomicLong rescheduledMessages = new AtomicLong();
+
+ Position currentPosition = cursor.getMarkDeletedPosition();
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}] currentPosition {}",
+ topicName, subName, currentPosition);
+ }
+ final AbstractBaseDispatcher abstractBaseDispatcher = dispatcher !=
null
+ ? (AbstractBaseDispatcher) dispatcher : new
DummyDispatcherForFilters();
+ // we put some hard limits on the scan, in order to prevent denial of
services
+ ServiceConfiguration configuration =
topic.getBrokerService().getPulsar().getConfiguration();
+ long maxEntries = configuration.getSubscriptionBacklogScanMaxEntries();
+ long timeOutMs = configuration.getSubscriptionBacklogScanMaxTimeMs();
+ return cursor.scan(new Predicate<Entry>() {
+ @Override
+ public boolean apply(Entry entry) {
+ if (log.isDebugEnabled()) {
+ log.debug("found {}", entry);
+ }
+
+ ByteBuf metadataAndPayload = entry.getDataBuffer();
+ MessageMetadata messageMetadata =
Commands.peekMessageMetadata(metadataAndPayload, "", -1);
+ int numMessages = 1;
+ if (messageMetadata.hasNumMessagesInBatch()) {
+ numMessages = messageMetadata.getNumMessagesInBatch();
+ }
+ EntryFilter.FilterResult filterResult = abstractBaseDispatcher
+ .runFiltersForEntry(entry, messageMetadata, null);
+
+ if (filterResult == null) {
+ filterResult = EntryFilter.FilterResult.ACCEPT;
+ }
+ switch (filterResult) {
+ case REJECT:
+ rejected.incrementAndGet();
+ rejectedMessages.addAndGet(numMessages);
+ break;
+ case RESCHEDULE:
+ rescheduled.incrementAndGet();
+ rescheduledMessages.addAndGet(numMessages);
+ break;
+ default:
+ accepted.incrementAndGet();
+ acceptedMessages.addAndGet(numMessages);
+ break;
+ }
+ entries.incrementAndGet();
+ messages.addAndGet(numMessages);
+
+ return true;
+ }
+ }, maxEntries, timeOutMs).thenApply((ScanOutcome outcome) -> {
+ long end = System.currentTimeMillis();
+ AnaliseBacklogResult result = new AnaliseBacklogResult();
+ result.setEntries(entries.get());
+ result.setMessages(messages.get());
+ result.setFilterAcceptedEntries(accepted.get());
+ result.setFilterAcceptedMessages(acceptedMessages.get());
+ result.setFilterRejectedEntries(rejected.get());
+ result.setFilterRejectedMessages(rejectedMessages.get());
+ result.setFilterRescheduledEntries(rescheduled.get());
+ result.setFilterRescheduledMessages(rescheduledMessages.get());
+ // sometimes we abort the execution due to a timeout or
+ // when we reach a maximum number of entries
+ result.setScanOutcome(outcome);
+ log.info(
+ "[{}][{}] scan took {} ms - {}",
+ topicName, subName, end, result);
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]