eolivelli opened a new issue, #16597:
URL: https://github.com/apache/pulsar/issues/16597

   ## Motivation
   
   Currently there is no way to have a accurate backlog for a subscription:
   - you have only the number of "entries", not messages
   - server side filters (PIP-105) may filter out some messages
   
   Having the number of entries is not enough because with batch messages the 
amount of work on the Consumers is proportional to the number of messages, that 
may vary from entry to entry.
   
   
   
   ## Goal
   
   The idea of this patch is to provide a dedicate API (REST, pulsar-admin, and 
Java PulsarAdmin) to "analise" a subscription and provide detailed information 
about that is expected to be delivered to Consumers.
   
   The operation will be quite expensive because we have to load the messages 
from storage and pass them to the filters, but due to the dynamic nature of 
Pulsar subscriptions there is no other way to have this value.
   
   One good strategy to do monitoring/alerting is to setup alerts on the usual 
"stats" and use this new API to inspect the subscription deeper, typically my 
issuing a manual command.
   
   ## API Changes
   
   internal ManagedCursor API:
    
   `CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition, long 
maxEntries, long timeOutMs);`
   
   This method scans the Cursor from the lastMarkDelete position to the tail.
   There is a time limit and a maxEntries limit, these are needed in order to 
prevent huge (and useless) scans.
   
   New REST API:
   
   ```
       @GET
       
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analiseBacklog")
       @ApiOperation(value = "Analyse a subscription, by scanning all the 
unprocessed messages")
              
       public void analiseSubscriptionBacklog(
              @Suspended final AsyncResponse asyncResponse,
               @ApiParam(value = "Specify the tenant", required = true)
               @PathParam("tenant") String tenant,
               @ApiParam(value = "Specify the namespace", required = true)
               @PathParam("namespace") String namespace,
               @ApiParam(value = "Specify topic name", required = true)
               @PathParam("topic") @Encoded String encodedTopic,
               @ApiParam(value = "Subscription", required = true)
               @PathParam("subName") String encodedSubName,
               @ApiParam(value = "Is authentication required to perform this 
operation")
               @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
   ```
   
   API response model:
   ```
   public class AnaliseSubscriptionBacklogResult {
       private long entries;
       private long messages;
   
       private long filterRejectedEntries;
       private long filterAcceptedEntries;
       private long filterRescheduledEntries;
   
       private long filterRejectedMessages;
       private long filterAcceptedMessages;
       private long filterRescheduledMessages;
   
       private boolean aborted;
   ```
   
   The response contains "aborted=true" is the request has been aborted by some 
internal limitations, like a timeout or the scan hit the max number of entries.
   We are not going to provide more details about the reason of the stop. It 
will make the API too detailed and harder to maintain. Also, in the logs of the 
broker you will find the details.
   
   
   New PulsarAdmin API:
   ```
   /**
        * Analise subscription backlog.
        * This is a potentially expensive operation, as it requires
        * to read the messages from storage.
        * This function takes into consideration batch messages
        * and also Subscription filters.
        * @param topic
        *            Topic name
        * @param subscriptionName
        *            the subscription
        * @return an accurate analysis of the backlog
        * @throws PulsarAdminException
        *            Unexpected error
        */
       AnaliseSubscriptionBacklogResult analiseSubscriptionBacklog(String 
topic, String subscriptionName)
               throws PulsarAdminException;
   
       /**
        * Analise subscription backlog.
        * This is a potentially expensive operation, as it requires
        * to read the messages from storage.
        * This function takes into consideration batch messages
        * and also Subscription filters.
        * @param topic
        *            Topic name
        * @param subscriptionName
        *            the subscription
        * @return an accurate analysis of the backlog
        * @throws PulsarAdminException
        *            Unexpected error
        */
       CompletableFuture<AnaliseSubscriptionBacklogResult> 
analiseSubscriptionBacklogAsync(String topic,
                                                                                
           String subscriptionName);
   ```
   
   A `pulsar-admin` command will be added as well as usual.
   
   New configuration entries in broker.conf:
      ```
   @FieldContext(
               category = CATEGORY_POLICIES,
               doc = "Maximum time to spend while scanning a subscription to 
calculate the accurate backlog"
       )
       private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
       @FieldContext(
               category = CATEGORY_POLICIES,
               doc = "Maximum number of entries to process while scanning a 
subscription to calculate the accurate backlog"
       )
       private long subscriptionBacklogScanMaxEntries = 10_000;
   ```
   
   ## Implementation
   
   
   
   
   ## Reject Alternatives
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to