eolivelli opened a new issue, #16597:
URL: https://github.com/apache/pulsar/issues/16597
## Motivation
Currently there is no way to have a accurate backlog for a subscription:
- you have only the number of "entries", not messages
- server side filters (PIP-105) may filter out some messages
Having the number of entries is not enough because with batch messages the
amount of work on the Consumers is proportional to the number of messages, that
may vary from entry to entry.
## Goal
The idea of this patch is to provide a dedicate API (REST, pulsar-admin, and
Java PulsarAdmin) to "analise" a subscription and provide detailed information
about that is expected to be delivered to Consumers.
The operation will be quite expensive because we have to load the messages
from storage and pass them to the filters, but due to the dynamic nature of
Pulsar subscriptions there is no other way to have this value.
One good strategy to do monitoring/alerting is to setup alerts on the usual
"stats" and use this new API to inspect the subscription deeper, typically my
issuing a manual command.
## API Changes
internal ManagedCursor API:
`CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition, long
maxEntries, long timeOutMs);`
This method scans the Cursor from the lastMarkDelete position to the tail.
There is a time limit and a maxEntries limit, these are needed in order to
prevent huge (and useless) scans.
New REST API:
```
@GET
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analiseBacklog")
@ApiOperation(value = "Analyse a subscription, by scanning all the
unprocessed messages")
public void analiseSubscriptionBacklog(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Subscription", required = true)
@PathParam("subName") String encodedSubName,
@ApiParam(value = "Is authentication required to perform this
operation")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
```
API response model:
```
public class AnaliseSubscriptionBacklogResult {
private long entries;
private long messages;
private long filterRejectedEntries;
private long filterAcceptedEntries;
private long filterRescheduledEntries;
private long filterRejectedMessages;
private long filterAcceptedMessages;
private long filterRescheduledMessages;
private boolean aborted;
```
The response contains "aborted=true" is the request has been aborted by some
internal limitations, like a timeout or the scan hit the max number of entries.
We are not going to provide more details about the reason of the stop. It
will make the API too detailed and harder to maintain. Also, in the logs of the
broker you will find the details.
New PulsarAdmin API:
```
/**
* Analise subscription backlog.
* This is a potentially expensive operation, as it requires
* to read the messages from storage.
* This function takes into consideration batch messages
* and also Subscription filters.
* @param topic
* Topic name
* @param subscriptionName
* the subscription
* @return an accurate analysis of the backlog
* @throws PulsarAdminException
* Unexpected error
*/
AnaliseSubscriptionBacklogResult analiseSubscriptionBacklog(String
topic, String subscriptionName)
throws PulsarAdminException;
/**
* Analise subscription backlog.
* This is a potentially expensive operation, as it requires
* to read the messages from storage.
* This function takes into consideration batch messages
* and also Subscription filters.
* @param topic
* Topic name
* @param subscriptionName
* the subscription
* @return an accurate analysis of the backlog
* @throws PulsarAdminException
* Unexpected error
*/
CompletableFuture<AnaliseSubscriptionBacklogResult>
analiseSubscriptionBacklogAsync(String topic,
String subscriptionName);
```
A `pulsar-admin` command will be added as well as usual.
New configuration entries in broker.conf:
```
@FieldContext(
category = CATEGORY_POLICIES,
doc = "Maximum time to spend while scanning a subscription to
calculate the accurate backlog"
)
private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
@FieldContext(
category = CATEGORY_POLICIES,
doc = "Maximum number of entries to process while scanning a
subscription to calculate the accurate backlog"
)
private long subscriptionBacklogScanMaxEntries = 10_000;
```
## Implementation
## Reject Alternatives
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]