315157973 opened a new issue #12269: URL: https://github.com/apache/pulsar/issues/12269
## Motivation There are many users who need to use tag messages. The implementation of this part has also been discussed before: https://lists.apache.org/list.html?*@pulsar.apache.org:lte=2y:Proposal%20for%20Consumer%20Filtering%20in%20Pulsar%20brokers https://lists.apache.org/thread.html/ra4639e75659b2adf384b77bca85ab2ca9b4969dea2bd8896741baae2%40%3Cdev.pulsar.apache.org%3E We need to provide a plug-in way so that developers can decide which messages can be sent to consumers and which ones are not. In this way, not only tag messages can be supported, users can also use this extension point to implement other features. ## Goal In this PIP, we only implement the entry filter framework at the broker level, and we will continue to support the namespace and topic level in other PR. ## API Changes And Implementation 1) add option `entryFilterClassName` in broker.conf and ServiceConfiguration, default value is null. 2) Add a factory class, if the `entryFilterClassName` is set, the factory class will be created, then init the filter ``` public interface EntriesFilterProvider { // Use `EntriesFilterProvider` to create `EntriesFilter` through reflection EntriesFilter createEntriesFilter(Subscription subscription); // The default implementation class of EntriesFilterProvider public DefaultEntriesFilterProviderImpl implements EntriesFilterProvider { public DefaultEntriesFilterProviderImpl(ServiceConfiguration serviceConfiguration) { //... } EntriesFilter createEntriesFilter(Subscription subscription) { //… } } } ``` 3) Add some new interfaces to avoid relying on existing interfaces inside Pulsar. In this way, even if the internal interface changes in the future, the plugin can continue to run ``` // The interface that needs to be implemented in the plugin public interface EntryFilter { public FilterResult filterEntries(Entry entry, FilterContext context); public static class FilterContext { EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean isReplayRead, MessageMetadata msgMetadata ... } enum FilterResult { ACCEPT, // deliver to the Consumer REJECT, // skip the message // ... } } ``` Why filter a single entry instead of List<Entry> ? The existing Dispatchers already traverse the entry list and parse the metadata of each Entry. We can reuse this data to avoid traversing the entry list twice and parsing the metadata twice. 4) Call the plugin in AbstractBaseDispatcher#filterEntriesForConsumer ``` if(entryFilter != null) { FilterResult result = filterEntries(entry, FilterContext context); if(FilterResult.REJECT.equals(result)) { entries.set(i, null); entry.release(); subscription.acknowledgeMessage(entry); continue; } } ``` ## Reject Alternatives 1) Plug-in Dispatcher If we make this pluggable that we must define a limited private but "stable" API. Enrico's suggestion is to define particular needs and then add features to make pluggable single specific parts of the dispatcher. 2) Create a message dimension filter At this stage, we can only do Entry-level filtering. If the Message in the Entry is forced to be filtered on the Broker side, there will be problems in the subsequent consumer ack. Therefore, if we want to use this filter, we must set enableBatching=false, which is the same as delayed messages. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
