Lin, Il giorno mer 8 set 2021 alle ore 09:40 Lin Lin <lin...@apache.org> ha scritto:
> > > I also share this problem, because if you want to efficiently implement > > message filtering you need to do it in the broker side. > > > > I am not sure that making the full Dispatcher pluggable is a good idea, > > because the code is too complex and also > > it really depends on the internals of the Broker. > > > > If we make this pluggable that we must define a limited private but > > "stable" API. > > > > My suggestion is to define particular needs and then add features to make > > pluggable single specific parts > > of the dispatcher. > > > > For instance I would add some support for "Message filtering", leaving > the > > implementation of the "filter" to a plugin. > > This way you could implement filtering using JMS rules, or using other > > metadata or security related information > > > > Regards > > > > Enrico > > > > > > Hi, Enrico: > > Thank you for your feedback. > > We now have this method AbstractBaseDispatcher#filterEntriesForConsumer > I think we can plug-in this method. Do you think this is okay? > > Provider: > ``` > public interface EntriesFilterProvider { > > // Use `EntriesFilterProvider` to create `EntriesFilter` > EntriesFilter createEntriesFilter(Subscription subscription); > > static EntriesFilterProvider > createEntriesFilterProvider(ServiceConfiguration serviceConfiguration) > { > // According to `EntriesFilterProviderClassName`, create > Provider through reflection > } > > } > ``` > > Add an interface for filtering: > > public interface EntriesFilter { > filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, > int entryWrapperOffset, > List<Entry> entries, EntryBatchSizes batchSizes, > SendMessageInfo sendMessageInfo, > EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, > boolean isReplayRead) > I believe that this is still using too many internal Pulsar classes. In your usecases, what do you need for filtering ? In the usecases I know I need: - the Message (metadata, headers, payload) - something about the Consumer (this should connect to adding some "metadata" to the Consumer and to the Subcription, but this would be another story) What about: public interface MessageFilter { enum FilterOutcome { ACCEPT, -> deliver to the Consumer REJECT, -> skip the message SYSTEM -> use standard system processing } public FilterOutcome filterMessages(List<MessageWrapper> messages, FilterContext context) throws Exception; } interface MessageWrapper { ....allow to access Message payload, metadata, headers... } interface FilterContext { ...isReplayRead, ...access acks ...access ManagedCursor } This way the implementation of the filter will not use internal APIs that evolve in Pulsar sometimes even in point releases. Enrico > } > > > Regards > > Lin >