Hi Dave, Yes, it is the idea. The scope is `namespace`, so they should be in same namespace.
As @Matteo suggested, also changed the interfaces to: ``` Consumer subscribe(Collection<String> topics, String subscription);Consumer subscribe(Pattern topicsPattern, String subscription); ``` And should do `namespace` checking inside the methods. https://gist.github.com/zhaijack/dc0ef5e94febdc2acada9dd4223667cf On Thu, Jan 18, 2018 at 3:35 AM, Dave Fisher <dave2w...@comcast.net> wrote: > Hi - > > I really like this feature. Let me confirm that I understand it correctly. > > Suppose I have a system where I want to monitor the logs of an account, > batch and/or job I have a topic path like: > > /account/${account-key}/batch/${batchid}/job/${jobid} > > I can use this feature to monitor the batch with: > > /account/${account-key}/batch/${batchid}/job/* > > I can use this feature to monitor the account with: > > /account/${account-key}/batch/*/job/* > > In either case I would add a listener that would notice new ${jobid} and > ${batchid} as these are introduced. > > Regards, > Dave > > > On Jan 16, 2018, at 9:49 PM, Jia Zhai <zhaiji...@gmail.com> wrote: > > > > Hi All, > > Here is an brief idea of how to implementation "Subscribe to topics > > represented by regular expressions", Would you please help go over and > > provide your feedback/comments? > > > > ----------------------------- > > https://gist.github.com/zhaijack/dc0ef5e94febdc2acada9dd4223667cf > > [Above it a gist link, for easy reading] > > ------------------------------ > > > > # PIP-12: Subscribe to topics represented by regular expressions > > > > * **Status**: Proposal > > * **Author**: Jia Zhai - Streamlio > > * **Pull Request**: [] > > * **Mailing List discussion**: > > > > > > ## Motivation > > The consumer needs to handle subscription to topics represented by > > regular expressions. The scope is `namespace` in first stage, all > > topics/patten should be targeted in same namespace, This will make > > easy authentication and authorization control. > > > > At last, we should add and implementation a serials of new methods in > > `PulsarClient.java` > > ```java > > Consumer subscribe(String namespace, Collection<String> > > topicsNameWithoutNs, String subscription); > > Consumer subscribe(String namespace, Pattern topicsPattern, String > > subscription); > > ``` > > > > The goals the should be achieved are these below, we could achieve it > > one by one: > > - support subscription to multiple topics in the same namespace (no > > guarantee on ordering between topics) > > - support regex based subscription > > - auto-discover topic addition/deletion > > > > ## Design > > > > ### support subscription to multiple topics > > This will need a new implementation of `ConsumerBase` which wrapper > > over multiple single-topic-consumers, let’s name it as > > `TopicsConsumerImpl`. > > When user call new method > > `Consumer subscribe(String namespace, Collection<String> > > topicsNameWithoutNs, String subscription);` > > It will iteratively new a `ConsumerImpl` for each topic, and return a > > `TopicsConsumerImpl`. The main work is: > > > > 1. This `TopicsConsumerImpl` class should provide implementation of > > abstract methods in `ConsumerBase`, Should also provide some specific > > methods such as: > > ```java > > // maintain a map for all the <Topic, Consumer>, after we subscribe > > all the topics. > > private final ConcurrentMap<String, ConsumerImpl> consumers = new > > ConcurrentHashMap<>(); > > // get topics > > Set<String> getTopics(); > > // get consumers > > List<ConsumerImpl> getConsumers(); > > > > // subscribe a topic > > void subscribeTopic(String topic); > > // unSubscribe a topic > > void unSubscribeTopic(String topic); > > ``` > > > > 2. While Message receive/ack, the message identify is needed. In the > > implementation, we need handle Message identify(MessageId) differently > > for some of the abstract methods in `ConsumerBase`, because we have to > > add `MessageId` with additional `String topic` or `consumer id`, Or we > > may need to change `MessageIdData` in `PulsarApi.proto`. > > > > > > > > ### support regex based subscription. > > As mentioned before, the scope is `namespace`. The main work is: > > > > 1. In above `TopicsConsumerImpl` class, need to keep the `Pattern`, > > which was passed in from api for subscription. > > 2. leverage currently pulsar admin API of `getList` to get a list of > Topics. > > In `interface PersistentTopics `: > > ```java > > List<String> getList(String namespace) throws PulsarAdminException; > > List<String> getPartitionedTopicList(String namespace) throws > > PulsarAdminException; > > ``` > > > > 3. The process of new method `Consumer subscribe(String namespace, > > Pattern topicsPattern, String subscription)` should be like this: > > - call method `List<String> getList(String namespace)` to get all the > topics; > > - Use `topicsPattern` to filter out the matched sub-topics-list. > > - construct the `TopicsConsumerImpl` with the the sub-topics-list. > > > > ### auto-discover topic addition/deletion > > The main work is: > > 1. provide a listener, which based on topics changes, to do subscribe > > and unsubscribe on individual topic when target topic been > > changed(remove/add). > > ```java > > Interface TopicsChangeListener { > > // unsubscribe and delete ConsumerImpl in the `consumers` map in > > `TopicsConsumerImpl` based on added topics. > > void onTopicsRemoved(Collection<String> topics); > > // subscribe and create a list of new ConsumerImpl, added them to > the > > `consumers` map in `TopicsConsumerImpl`. > > void onTopicsAdded(Collection<String> topics); > > } > > ``` > > Add a method `void registerListener(TopicsChangeListener listener)` to > > `TopicsConsumerImpl` > > > > 2. Based on above work, using a timer, periodically call `List<String> > > getList(String namespace)`. And comparing the filtered fresh > > sub-topics-list with current topics holden in `TopicsConsumerImpl`, > > try to get 2 lists: `newAddedTopicsList` and `removedTopicsList`. > > 3. If the 2 lists not empty, call > > `TopicsChangeListener.onTopicsAdded(newAddedTopicsList)`, and > > `TopicsChangeListener.onTopicsRemoved(removedTopicsList)` to do > > subscribe and unsubscribe, and update `consumers` map in > > `TopicsConsumerImpl`. > > > > # Changes > > The changes will be mostly on the surface and on client side: > > 1. add and implementation a serials of new methods in > > `org.apache.pulsar.client.api.PulsarClient.java` > > ```java > > Consumer subscribe(String namespace, Collection<String> > > topicsNameWithoutNs, String subscription); > > Consumer subscribe(String namespace, Pattern topicsPattern, String > > subscription); > > ``` > > 2. add and implenentation of new `Consumer`, which is > > `TopicsConsumerImpl` , returned by above `subscribe` method > > > > > > ------------------------------- > > > > Thanks a lot. > > -Jia > >