Lin,

Il giorno mer 8 set 2021 alle ore 09:40 Lin Lin <lin...@apache.org> ha
scritto:

>
> > I also share this problem, because if you want to efficiently implement
> > message filtering you need to do it in the broker side.
> >
> > I am not sure that making the full Dispatcher pluggable is a good idea,
> > because the code is too complex and also
> > it really depends on the internals of the Broker.
> >
> > If we make this pluggable that we must define a limited private but
> > "stable" API.
> >
> > My suggestion is to define particular needs and then add features to make
> > pluggable single specific parts
> > of the dispatcher.
> >
> > For instance I would add some support for "Message filtering", leaving
> the
> > implementation of the "filter" to a plugin.
> > This way you could implement filtering using JMS rules, or using other
> > metadata or security related information
> >
> > Regards
> >
> > Enrico
> >
>
>
>
> Hi, Enrico:
>
> Thank you for your feedback.
>
> We now have this method AbstractBaseDispatcher#filterEntriesForConsumer
> I think we can plug-in this method. Do you think this is okay?
>
> Provider:
> ```
> public interface EntriesFilterProvider {
>
>     // Use `EntriesFilterProvider` to create `EntriesFilter`
>     EntriesFilter createEntriesFilter(Subscription subscription);
>
>     static EntriesFilterProvider
> createEntriesFilterProvider(ServiceConfiguration serviceConfiguration)
>     {
>                 // According to `EntriesFilterProviderClassName`, create
> Provider through reflection
>     }
>
> }
> ```
>
> Add an interface for filtering:
>
> public interface EntriesFilter {
>         filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper,
> int entryWrapperOffset,
>              List<Entry> entries, EntryBatchSizes batchSizes,
> SendMessageInfo sendMessageInfo,
>              EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor,
> boolean isReplayRead)
>

I believe that this is still using too many internal Pulsar classes.

In your usecases, what do you need for filtering ?

In the usecases I know I need:
- the Message (metadata, headers, payload)
- something about the Consumer (this should connect to adding some
"metadata" to the Consumer and to the Subcription, but this would be
another story)


What about:

public interface MessageFilter {
       enum FilterOutcome {
           ACCEPT, -> deliver to the Consumer
           REJECT, -> skip the message
           SYSTEM  -> use standard system processing
      }

        public FilterOutcome filterMessages(List<MessageWrapper> messages,
FilterContext context) throws Exception;

}

interface MessageWrapper {
        ....allow to access Message payload, metadata, headers...
}

interface FilterContext {
       ...isReplayRead,
       ...access acks
       ...access ManagedCursor
}


This way the implementation of the filter will not use internal APIs that
evolve in Pulsar sometimes even in point releases.

Enrico



> }
>
>
> Regards
>
> Lin
>

Reply via email to