315157973 opened a new issue #12269:
URL: https://github.com/apache/pulsar/issues/12269


   
   ## Motivation
   
   There are many users who need to use tag messages. The implementation of
   this part has also been discussed before:
   
   
https://lists.apache.org/list.html?*@pulsar.apache.org:lte=2y:Proposal%20for%20Consumer%20Filtering%20in%20Pulsar%20brokers
   
   
https://lists.apache.org/thread.html/ra4639e75659b2adf384b77bca85ab2ca9b4969dea2bd8896741baae2%40%3Cdev.pulsar.apache.org%3E
   
   We need to provide a plug-in way so that developers can decide which 
messages can be sent to consumers and which ones are not.
   In this way, not only tag messages can be supported, users can also use this 
extension point to implement other features.
   
   ## Goal
   
   In this PIP, we only implement the entry filter framework at the broker 
level, and we will continue to support the namespace and topic level in other 
PR.
   
   ## API Changes And Implementation
   
   1)
   add option `entryFilterClassName` in broker.conf and ServiceConfiguration, 
default value is null.
   
   2)
   Add a factory class, if the `entryFilterClassName` is set, the factory class 
will be created, then init the filter
   ```
   public interface EntriesFilterProvider {
   
       // Use `EntriesFilterProvider` to create `EntriesFilter` through 
reflection
       EntriesFilter createEntriesFilter(Subscription subscription);
   
      // The default implementation class of EntriesFilterProvider
       public DefaultEntriesFilterProviderImpl implements EntriesFilterProvider 
{
        public  DefaultEntriesFilterProviderImpl(ServiceConfiguration  
serviceConfiguration)  {
                  //...
           }
   
            EntriesFilter createEntriesFilter(Subscription subscription) { //…  
}
       }
   
   }
   ```
   3)
   Add some new interfaces to avoid relying on existing interfaces inside 
Pulsar. In this way, even if the internal interface changes in the future, the 
plugin can continue to run
   ```
   // The interface that needs to be implemented in the plugin
   public interface EntryFilter {
   
           public FilterResult filterEntries(Entry entry, FilterContext 
context);
           
           public static class FilterContext {
                  EntryBatchSizes batchSizes,
                  SendMessageInfo sendMessageInfo, 
                  EntryBatchIndexesAcks indexesAcks,
                  ManagedCursor cursor, 
                  boolean isReplayRead,
                  MessageMetadata msgMetadata
                   ...
           }
           
           enum FilterResult {
                  ACCEPT, // deliver to the Consumer
                  REJECT, // skip the message
                  // ...
           }
   
   }
   ```
   Why filter a single entry instead of List<Entry> ?
   The existing Dispatchers already traverse the entry list and parse the 
metadata of each Entry. We can reuse this data to avoid traversing the entry 
list twice and parsing the metadata twice.
   
   4)
   Call the plugin in AbstractBaseDispatcher#filterEntriesForConsumer
   ```
   if(entryFilter != null) {
       FilterResult result =  filterEntries(entry, FilterContext context);
       if(FilterResult.REJECT.equals(result)) {
              entries.set(i, null);
              entry.release();
           subscription.acknowledgeMessage(entry);
              continue;
        } 
   }
   ```
   
   ## Reject Alternatives
   
   1) Plug-in Dispatcher
   If we make this pluggable that we must define a limited private but
   "stable" API. Enrico's suggestion is to define particular needs and then add 
features to make
   pluggable single specific parts of the dispatcher.
   
   2) Create a message dimension filter
   At this stage, we can only do Entry-level filtering. If the Message in the 
Entry is forced to be filtered on the Broker side, there will be problems in 
the subsequent consumer ack.
   Therefore, if we want to use this filter, we must set enableBatching=false, 
which is the same as delayed messages.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to