Hi Dave,
Yes, it is the idea.
The scope is `namespace`, so they should be in same namespace.

As @Matteo suggested, also changed the interfaces to:
```

Consumer subscribe(Collection<String> topics, String
subscription);Consumer subscribe(Pattern topicsPattern, String
subscription);

```
And should do `namespace` checking inside the methods.

https://gist.github.com/zhaijack/dc0ef5e94febdc2acada9dd4223667cf

On Thu, Jan 18, 2018 at 3:35 AM, Dave Fisher <dave2w...@comcast.net> wrote:

> Hi -
>
> I really like this feature. Let me confirm that I understand it correctly.
>
> Suppose I have a system where I want to monitor the logs of an account,
> batch and/or job I have a topic path like:
>
> /account/${account-key}/batch/${batchid}/job/${jobid}
>
> I can use this feature to monitor the batch with:
>
> /account/${account-key}/batch/${batchid}/job/*
>
> I can use this feature to monitor the account with:
>
> /account/${account-key}/batch/*/job/*
>
> In either case I would add a listener that would notice new ${jobid} and
> ${batchid} as these are introduced.
>
> Regards,
> Dave
>
> > On Jan 16, 2018, at 9:49 PM, Jia Zhai <zhaiji...@gmail.com> wrote:
> >
> > Hi All,
> > Here is an brief idea of how to implementation "Subscribe to topics
> > represented by regular expressions",  Would you please help go over and
> > provide your feedback/comments?
> >
> > -----------------------------
> > https://gist.github.com/zhaijack/dc0ef5e94febdc2acada9dd4223667cf
> > [Above it a gist link, for easy reading]
> > ------------------------------
> >
> > # PIP-12: Subscribe to topics represented by regular expressions
> >
> > * **Status**: Proposal
> > * **Author**: Jia Zhai - Streamlio
> > * **Pull Request**: []
> > * **Mailing List discussion**:
> >
> >
> > ## Motivation
> > The consumer needs to handle subscription to topics represented by
> > regular expressions. The scope is `namespace` in first stage, all
> > topics/patten should be targeted in same namespace, This will make
> > easy authentication and authorization control.
> >
> > At last, we should add and implementation a serials of new methods in
> > `PulsarClient.java`
> > ```java
> > Consumer subscribe(String namespace, Collection<String>
> > topicsNameWithoutNs, String subscription);
> > Consumer subscribe(String namespace, Pattern topicsPattern, String
> > subscription);
> > ```
> >
> > The goals the should be achieved are these below, we could achieve it
> > one by one:
> > - support subscription to multiple topics in the same namespace (no
> > guarantee on ordering between topics)
> > - support regex based subscription
> > - auto-discover topic addition/deletion
> >
> > ## Design
> >
> > ### support subscription to multiple topics
> > This will need a new implementation of `ConsumerBase` which wrapper
> > over multiple single-topic-consumers, let’s name it as
> > `TopicsConsumerImpl`.
> > When user call new method
> > `Consumer subscribe(String namespace, Collection<String>
> > topicsNameWithoutNs, String subscription);`
> > It will iteratively new a `ConsumerImpl` for each topic, and return a
> > `TopicsConsumerImpl`. The main work is:
> >
> > 1. This `TopicsConsumerImpl` class should provide implementation of
> > abstract methods in `ConsumerBase`, Should also provide some specific
> > methods such as:
> > ```java
> > // maintain a map for all the <Topic, Consumer>, after we subscribe
> > all the topics.
> > private final ConcurrentMap<String, ConsumerImpl> consumers = new
> > ConcurrentHashMap<>();
> > // get topics
> > Set<String> getTopics();
> > // get consumers
> > List<ConsumerImpl> getConsumers();
> >
> > // subscribe a topic
> > void subscribeTopic(String topic);
> > // unSubscribe a topic
> > void unSubscribeTopic(String topic);
> > ```
> >
> > 2. While Message receive/ack, the message identify is needed. In the
> > implementation, we need handle Message identify(MessageId) differently
> > for some of the abstract methods in `ConsumerBase`, because we have to
> > add `MessageId` with additional `String topic` or `consumer id`, Or we
> > may need to change `MessageIdData` in `PulsarApi.proto`.
> >
> >
> >
> > ### support regex based subscription.
> > As mentioned before, the scope is `namespace`. The main work is:
> >
> > 1. In above `TopicsConsumerImpl` class, need to keep the `Pattern`,
> > which was passed in from api for subscription.
> > 2. leverage currently pulsar admin API of `getList` to get a list of
> Topics.
> > In `interface PersistentTopics `:
> > ```java
> > List<String> getList(String namespace) throws PulsarAdminException;
> > List<String> getPartitionedTopicList(String namespace) throws
> > PulsarAdminException;
> > ```
> >
> > 3. The process of new method `Consumer subscribe(String namespace,
> > Pattern topicsPattern, String subscription)` should be like this:
> > - call method `List<String> getList(String namespace)` to get all the
> topics;
> > - Use `topicsPattern` to filter out the matched sub-topics-list.
> > - construct the `TopicsConsumerImpl` with the the sub-topics-list.
> >
> > ### auto-discover topic addition/deletion
> > The main work is:
> > 1. provide a listener, which based on topics changes, to do subscribe
> > and unsubscribe on individual topic when target topic been
> > changed(remove/add).
> > ```java
> > Interface TopicsChangeListener {
> >       // unsubscribe and delete ConsumerImpl in the `consumers` map in
> > `TopicsConsumerImpl` based on added topics.
> >       void onTopicsRemoved(Collection<String> topics);
> >       // subscribe and create a list of new ConsumerImpl, added them to
> the
> > `consumers` map in `TopicsConsumerImpl`.
> >       void onTopicsAdded(Collection<String> topics);
> > }
> > ```
> > Add a method `void registerListener(TopicsChangeListener listener)` to
> > `TopicsConsumerImpl`
> >
> > 2. Based on above work, using a timer, periodically call `List<String>
> > getList(String namespace)`. And comparing the filtered fresh
> > sub-topics-list with current topics holden in `TopicsConsumerImpl`,
> > try to get 2 lists: `newAddedTopicsList` and  `removedTopicsList`.
> > 3. If the 2 lists not empty, call
> > `TopicsChangeListener.onTopicsAdded(newAddedTopicsList)`, and
> > `TopicsChangeListener.onTopicsRemoved(removedTopicsList)` to do
> > subscribe and unsubscribe, and update `consumers` map in
> > `TopicsConsumerImpl`.
> >
> > # Changes
> > The changes will be mostly on the surface and on client side:
> > 1. add and implementation a serials of new methods in
> > `org.apache.pulsar.client.api.PulsarClient.java`
> > ```java
> > Consumer subscribe(String namespace, Collection<String>
> > topicsNameWithoutNs, String subscription);
> > Consumer subscribe(String namespace, Pattern topicsPattern, String
> > subscription);
> > ```
> > 2. add and implenentation of new `Consumer`, which is
> > `TopicsConsumerImpl` , returned by above `subscribe` method
> >
> >
> > -------------------------------
> >
> > Thanks a lot.
> > -Jia
>
>

Reply via email to