Hi all

Is there any other comments about this design :)

Thanks,
Xiaoyu Hou

houxiaoyu <anonhx...@gmail.com> 于2023年3月8日周三 16:22写道:

> Hi Michael,
>
> > is there a reason that we couldn't also use this to improve PIP 145?
>
> The protocol described in this PIP could also be used to improve PIP-145.
> However I think that it' not a good reason that we use  the regex sub
> watcher to implement the partitioned update watcher because of the other
> reasons we mentioned above.
>
> > Since we know we're using a TCP connection, is it possible to rely on
> > pulsar's keep alive timeout (the broker and the client each have their
> > own) to close a connection that isn't responsive?
>
> Maybe it could fail on application layer I think,  for example, the
> partitioned update listener run fail unexceptionly.  Currently another task
> will be scheduled if the poll task encounters error in partition auto
> update timer task. [0]
>
> > Regarding the connection, which connection should the client use to send
> the watch requests?
>
> The `PartitionUpdateWatcher` will call `connectionHandler.grabCnx()` to
> open an connection, which is analogous to `TopicListWatcher`. [1]
>
> > do we plan on using metadata storenotifications to trigger the callbacks
> that trigger notifications sent
> > to the clients
>
> Yes, we will just look up the metadataStore to fetch the count of the
> partitions and register a watcher to the metadataStore to trigger the count
> update.
>
> > One nit on the protobuf for CommandWatchPartitionUpdateSuccess:
> >
> >    repeated string topics         = 3;
> >   repeated uint32 partitions     = 4;
> >
> > What do you think about using a repeated message that represents a
> > pair of a topic and its partition count instead of using two lists?
>
> Great. It looks better using a repeated message, I will update the
> protobuf.
>
> > How will we handle the case where a watched topic does not exist?
>
> 1. When `PulsarClient` calls `create()` to create a producer or  calls
> `subscribe()` to create a consumer,  the client will first get
> partitioned-topic metadata from broker, [2]. If the topic doesn't exist and
> `isAllowAutoTopicCreation=true` in broker, the partitioned-topic zk node
> will auto create with default partition num.
> 2.  After the client getting partitioned-topic metadata successfully,  the
> `PartitionedProducerImpl` will be create if `meta.partition >
> 0`.  `PartitionUpdateWatcher` will be initilized in
> `PartitionedProducerImpl` constructor. The `PartitionUpdateWatcher` sends
> command to broker to register a watcher. If any topic in the topicList
> doesn't exist,  the broker will send error to the client and the
> `PartitionedProducerImpl` will start fail.  `MultiTopicsConsumerImpl` will
> work in the same way.
>
> > I want to touch on authorization. A role should have "lookup"
> > permission to watch for updates on each partitioned topic that it
> > watches. As a result, if we allow for a request to watch multiple
> > topics, some might succeed while others fail. How do we handle partial
> > success?
>
> If any topic in the topicList authorizes fail, the broker will send error
> to the client. The following reasons support this action:
> 1. Before we sending command to register a partition update watcher, the
> client should have send the `CommandPartitionedTopicMetadata` and should
> have the `lookup` permission [3] [4].
> 2. Currently if any topic subsrbies fail the consumer wil start faiil. [5]
>
>
> [0]
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L1453-L1461
>
> [1]
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java#L67-L81
>
> [2]
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java#L365-L371
>
> [3]
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L903-L923
>
> [4]
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L558-L560
>
> [5]
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L171-L193
>
> Thanks,
> Xiaoyu Hou
>
> Michael Marshall <mmarsh...@apache.org> 于2023年3月7日周二 15:43写道:
>
>> Thanks for the context Xiaoyu Hou and Asaf. I appreciate the
>> efficiencies that we can gain by creating a specific implementation
>> for the partitioned topic use case. I agree that this new notification
>> system makes sense based on Pulsar's current features, and I have some
>> implementation questions.
>>
>> >- If the broker sends notification and it's lost due network issues,
>> > you'll only know about it due to the client doing constant polling,
>> using
>> > its hash to minimize response.
>>
>> I see that we implemented an ack mechanism to get around this. I
>> haven't looked closely, but is there a reason that we couldn't also
>> use this to improve PIP 145?
>>
>> Since we know we're using a TCP connection, is it possible to rely on
>> pulsar's keep alive timeout (the broker and the client each have their
>> own) to close a connection that isn't responsive? Then, when the
>> connection is re-established, the client would get the latest topic
>> partition count.
>>
>> Regarding the connection, which connection should the client use to
>> send the watch requests? At the moment, the "parent" partitioned topic
>> does not have an owner, but perhaps it would help this design to make
>> a single owner for a given partitioned topic. This could trivially be
>> done using the existing bundle mapping. Then, all watchers for a given
>> partitioned topic would be hosted on the same broker, which should be
>> more efficient. I don't think we currently redirect clients to any
>> specific bundle when creating the metadata for a partitioned topic,
>> but if we did, then we might be able to remove some edge cases for
>> notification delivery because a single broker would update the
>> metadata store and then trigger the notifications to the clients. If
>> we don't use this implementation, do we plan on using metadata store
>> notifications to trigger the callbacks that trigger notifications sent
>> to the clients?
>>
>> > - Each time meta-update you'll need to run it through regular
>> > expression, on all topics hosted on the broker, for any given client.
>> > That's a lot of CPU.
>> > - Suggested mechanism mainly cares about the count of partitions, so
>> > it's a lot more efficient.
>>
>> I forgot the partition count was its own piece of metadata that the
>> broker can watch for. That part definitely makes sense to me.
>>
>> One nit on the protobuf for CommandWatchPartitionUpdateSuccess:
>>
>>     repeated string topics         = 3;
>>     repeated uint32 partitions     = 4;
>>
>> What do you think about using a repeated message that represents a
>> pair of a topic and its partition count instead of using two lists?
>>
>> How will we handle the case where a watched topic does not exist?
>>
>> I want to touch on authorization. A role should have "lookup"
>> permission to watch for updates on each partitioned topic that it
>> watches. As a result, if we allow for a request to watch multiple
>> topics, some might succeed while others fail. How do we handle partial
>> success?
>>
>> One interesting detail is that this PIP is essentially aligned with
>> notifying clients when topic metadata changes while PIP 145 was
>> related to topic creation itself. An analogous proposal could request
>> a notification for any topic that gets a new metadata label. I do not
>> think it is worth considering that case in this design.
>>
>> Thanks,
>> Michael
>>
>> [0] https://lists.apache.org/thread/t4cwht08d4mhp3qzoxmqh6tht8l0728r
>>
>> On Sun, Mar 5, 2023 at 8:01 PM houxiaoyu <anonhx...@gmail.com> wrote:
>> >
>> > Bump. Are there other concerns or suggestions about this PIP :)  Ping @
>> > Michael @Joe @Enrico
>> >
>> > Thanks
>> > Xiaoyu Hou
>> >
>> > houxiaoyu <anonhx...@gmail.com> 于2023年2月27日周一 14:10写道:
>> >
>> > > Hi Joe and Michael,
>> > >
>> > > I think I misunderstood what you replied before. Now I understand and
>> > > explain it again.
>> > >
>> > > Besides the reasons what Asaf mentioned above, there are also some
>> limits
>> > > for using topic list watcher.  For example the
>> `topicsPattern.pattern` must
>> > > less that `maxSubscriptionPatternLeng` [0]. If the consumer subscribes
>> > > multi partitioned-topics, the `topicsPattern.pattern` maybe very long.
>> > >
>> > > So I think that it's better to have a separate notification
>> implementation
>> > > for partition update.
>> > >
>> > > [0]
>> > >
>> https://github.com/apache/pulsar/blob/5d6932137d76d544f939bef27df25f61b4a4d00d/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java#L115-L126
>> > >
>> > > Thanks,
>> > > Xiaoyu Hou
>> > >
>> > > houxiaoyu <anonhx...@gmail.com> 于2023年2月27日周一 10:56写道:
>> > >
>> > >> Hi Michael,
>> > >>
>> > >> >  I think we just need the client to "subscribe" to a topic
>> notification
>> > >> for
>> > >> >  "<topic-name>-partition-[0-9]+" to eliminate the polling
>> > >>
>> > >> If pulsar users want to pub/sub a partitioned-topic, I think most of
>> the
>> > >> users would like to create a simple producer or consumer like
>> following:
>> > >> ```
>> > >> Producer<byte[]> producer =
>> client.newProducer().topic(topic).create();
>> > >> producer.sendAsync(msg);
>> > >> ```
>> > >> ```
>> > >> client.newConsumer()
>> > >>         .topic(topic)
>> > >>         .subscriptionName(subscription)
>> > >>         .subscribe();
>> > >> ```
>> > >> I think there is no reason for users to use `topicsPattern` if a
>> pulsar
>> > >> just wants to subscribe a partitioned-topic. In addition,
>> `topicsPattern`
>> > >> couldn't be used for producers.
>> > >>
>> > >> So I think PIP-145 [0] will benefit for regex subscriptions.  And
>> this
>> > >> PIP [1] will benefit for the common partitioned-topic pub/sub
>> scenario.
>> > >>
>> > >> [0] https://github.com/apache/pulsar/issues/14505
>> > >> [1] https://github.com/apache/pulsar/issues/19596
>> > >>
>> > >> Thanks
>> > >> Xiaoyu Hou
>> > >>
>> > >> Michael Marshall <mmarsh...@apache.org> 于2023年2月25日周六 01:29写道:
>> > >>
>> > >>> > Just the way to implements partitioned-topic metadata
>> > >>> > notification mechanism is much like notifications on regex sub
>> changes
>> > >>>
>> > >>> Why do we need a separate notification implementation? The regex
>> > >>> subscription feature is about discovering topics (not subscriptions)
>> > >>> that match a regular expression. As Joe mentioned, I think we just
>> > >>> need the client to "subscribe" to a topic notification for
>> > >>> "<topic-name>-partition-[0-9]+" to eliminate the polling.
>> > >>>
>> > >>> Building on PIP 145, the work for this PIP would be in implementing
>> a
>> > >>> different `TopicsChangedListener` [1] so that the result of an added
>> > >>> topic is to add a producer/consumer to the new partition.
>> > >>>
>> > >>> I support removing polling in our streaming platform, but I'd prefer
>> > >>> to limit the number of notification systems we implement.
>> > >>>
>> > >>> Thanks,
>> > >>> Michael
>> > >>>
>> > >>> [0] https://github.com/apache/pulsar/pull/16062
>> > >>> [1]
>> > >>>
>> https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java#L169-L175
>> > >>>
>> > >>>
>> > >>>
>> > >>> On Fri, Feb 24, 2023 at 1:57 AM houxiaoyu <anonhx...@gmail.com>
>> wrote:
>> > >>> >
>> > >>> > Hi Joe,
>> > >>> >
>> > >>> > When we use PartitionedProducerImpl or MultiTopicsConsumerImpl,
>> there
>> > >>> is a
>> > >>> > poll task to fetch the metadata of the partitioned-topic
>> regularly for
>> > >>> the
>> > >>> > number of partitions updated.  This PIP wants to use a
>> > >>> > notification mechanism to replace the metadata poll task.
>> > >>> >
>> > >>> > Just the way to implements partitioned-topic metadata
>> > >>> > notification mechanism is much like notifications on regex sub
>> changes
>> > >>> >
>> > >>> > Joe F <joefranc...@gmail.com> 于2023年2月24日周五 13:37写道:
>> > >>> >
>> > >>> > > Why is this needed when we have notifications on regex sub
>> changes?
>> > >>> Aren't
>> > >>> > > the partition names a well-defined regex?
>> > >>> > >
>> > >>> > > Joe
>> > >>> > >
>> > >>> > > On Thu, Feb 23, 2023 at 8:52 PM houxiaoyu <anonhx...@gmail.com>
>> > >>> wrote:
>> > >>> > >
>> > >>> > > > Hi Asaf,
>> > >>> > > > thanks for your reminder.
>> > >>> > > >
>> > >>> > > > ## Changing
>> > >>> > > > I have updated the following changes to make sure the
>> notification
>> > >>> > > arrived
>> > >>> > > > successfully:
>> > >>> > > > 1. The watch success response
>> `CommandWatchPartitionUpdateSuccess`
>> > >>> will
>> > >>> > > > contain all the concerned topics of this watcher
>> > >>> > > > 2. The notification `CommandPartitionUpdate` will always
>> contain
>> > >>> all the
>> > >>> > > > concerned topics of this watcher.
>> > >>> > > > 3. The notification `CommandPartitionUpdate`contains a
>> > >>> monotonically
>> > >>> > > > increased version.
>> > >>> > > > 4. A map
>> > >>> `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/,
>> > >>> > > > Pair<version, long/*timestamp*/>>` will keep track of the
>> updating
>> > >>> > > > 5. A timer will check the updating timeout through
>> `inFlightUpdate`
>> > >>> > > > 6. The client acks `CommandPartitionUpdateResult` to broker
>> when it
>> > >>> > > > finishes updating.
>> > >>> > > >
>> > >>> > > > ## Details
>> > >>> > > >
>> > >>> > > > The following mechanism could make sure the newest
>> notification
>> > >>> arrived
>> > >>> > > > successfully, copying the description from GH:
>> > >>> > > >
>> > >>> > > > A new class, `org.apache.pulsar.PartitonUpdateWatcherService`
>> will
>> > >>> keep
>> > >>> > > > track of watchers and will listen to the changes in the
>> metadata.
>> > >>> > > Whenever
>> > >>> > > > a topic partition updates it checks if any watchers should be
>> > >>> notified
>> > >>> > > and
>> > >>> > > > sends an update for all topics the watcher concerns through
>> the
>> > >>> > > ServerCnx.
>> > >>> > > > Then we will record this request into a map,
>> > >>> > > > `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/,
>> > >>> > > Pair<version,
>> > >>> > > > long/*timestamp*/>>`.  A timer will check this update timeout
>> > >>> through
>> > >>> > > > inFlightUpdate .  We will query all the concerned topics's
>> > >>> partition if
>> > >>> > > > this watcher has sent an update timeout and will resend it.
>> > >>> > > >
>> > >>> > > > The client acks `CommandPartitionUpdateResult` to broker when
>> it
>> > >>> finishes
>> > >>> > > > updating.  The broker handle `CommandPartitionUpdateResult`
>> > >>> request:
>> > >>> > > >  - If CommandPartitionUpdateResult#version <
>> > >>> > > >
>> PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version,
>> > >>> > > broker
>> > >>> > > > ignores this ack.
>> > >>> > > >  -  If CommandPartitionUpdateResult#version ==
>> > >>> > > >
>> PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version
>> > >>> > > >     - If CommandPartitionUpdateResult#success is true,
>> broker just
>> > >>> > > removes
>> > >>> > > > the watcherID from inFlightUpdate.
>> > >>> > > >     - If CommandPartitionUpdateResult#success is false,
>> broker
>> > >>> removes
>> > >>> > > the
>> > >>> > > > watcherId from inFlightUpdate, and queries all the concerned
>> > >>> topics's
>> > >>> > > > partition and resend.
>> > >>> > > >  - If CommandPartitionUpdateResult#version >
>> > >>> > > >
>> > >>> PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version,
>> this
>> > >>> > > > should not happen.
>> > >>> > > >
>> > >>> > > >  ## Edge cases
>> > >>> > > > - Broker restarts or crashes
>> > >>> > > > Client will reconnect to another broker, broker responses
>> > >>> > > > `CommandWatchPartitionUpdateSuccess` with watcher concerned
>> > >>> topics's
>> > >>> > > > partitions.  We will call `PartitionsUpdateListener` if the
>> > >>> connection
>> > >>> > > > opens.
>> > >>> > > > - Client acks fail or timeout
>> > >>> > > > Broker will resend the watcher concerned topics's partitions
>> either
>> > >>> > > client
>> > >>> > > > acks fail or acks timeout.
>> > >>> > > > - Partition updates before client acks.
>> > >>> > > > `CommandPartitionUpdate#version` monotonically increases every
>> > >>> time it is
>> > >>> > > > updated. If Partition updates before client acks, a greater
>> > >>> version will
>> > >>> > > be
>> > >>> > > > put into `PartitonUpdateWatcherService#inFlightUpdate`.  The
>> > >>> previous
>> > >>> > > acks
>> > >>> > > > will be ignored because the version is less than the current
>> > >>> version.
>> > >>> > > >
>> > >>> > > >
>> > >>> > > > Asaf Mesika <asaf.mes...@gmail.com> 于2023年2月22日周三 21:33写道:
>> > >>> > > >
>> > >>> > > > > How about edge cases?
>> > >>> > > > > In Andra's PIP he took into account cases where updates were
>> > >>> lost, so
>> > >>> > > he
>> > >>> > > > > created a secondary poll. Not saying it's the best
>> situation for
>> > >>> your
>> > >>> > > > case
>> > >>> > > > > of course.
>> > >>> > > > > I'm saying that when a broker sends an update
>> > >>> CommandPartitionUpdate,
>> > >>> > > how
>> > >>> > > > > do you know it arrived successfully? From my memory, there
>> is no
>> > >>> ACK in
>> > >>> > > > the
>> > >>> > > > > protocol, saying "I'm the client, I got the update
>> successfully"
>> > >>> and
>> > >>> > > only
>> > >>> > > > > then it removed the "dirty" flag for that topic, for this
>> > >>> watcher ID.
>> > >>> > > > >
>> > >>> > > > > Are there any other edge cases we can have? Let's be
>> exhaustive.
>> > >>> > > > >
>> > >>> > > > >
>> > >>> > > > >
>> > >>> > > > > On Wed, Feb 22, 2023 at 1:14 PM houxiaoyu <
>> anonhx...@gmail.com>
>> > >>> wrote:
>> > >>> > > > >
>> > >>> > > > > > Thanks for your great suggestion Enrico.
>> > >>> > > > > >
>> > >>> > > > > > I agreed with you. It's more reasonable to add a
>> > >>> > > > > > `supports_partition_update_watchers`  in `FeatureFlags`
>> to
>> > >>> detect
>> > >>> > > that
>> > >>> > > > > the
>> > >>> > > > > > connected broker supporting this feature , and add a new
>> broker
>> > >>> > > > > > configuration property
>> `enableNotificationForPartitionUpdate`
>> > >>> with
>> > >>> > > > > default
>> > >>> > > > > > value true, which is much like PIP-145.
>> > >>> > > > > >
>> > >>> > > > > > I have updated the descriptions.
>> > >>> > > > > >
>> > >>> > > > > > Enrico Olivelli <eolive...@gmail.com> 于2023年2月22日周三
>> 17:26写道:
>> > >>> > > > > >
>> > >>> > > > > > > I support this proposal.
>> > >>> > > > > > > Coping here my comments from GH:
>> > >>> > > > > > >
>> > >>> > > > > > > can't we enable this by default in case we detect that
>> the
>> > >>> > > connected
>> > >>> > > > > > > Broker supports it ?
>> > >>> > > > > > > I can't find any reason for not using this mechanism if
>> it is
>> > >>> > > > > available.
>> > >>> > > > > > >
>> > >>> > > > > > > Maybe we can set the default to "true" and allow users
>> to
>> > >>> disable
>> > >>> > > it
>> > >>> > > > > > > in case it impacts their systems in an unwanted way.
>> > >>> > > > > > >
>> > >>> > > > > > > Maybe It would be useful to have a way to disable the
>> > >>> mechanism on
>> > >>> > > > the
>> > >>> > > > > > > broker side as well
>> > >>> > > > > > >
>> > >>> > > > > > > Enrico
>> > >>> > > > > > >
>> > >>> > > > > > > Il giorno mer 22 feb 2023 alle ore 10:22 houxiaoyu
>> > >>> > > > > > > <anonhx...@gmail.com> ha scritto:
>> > >>> > > > > > > >
>> > >>> > > > > > > > Hi Pulsar community:
>> > >>> > > > > > > >
>> > >>> > > > > > > > I opened a PIP to discuss "Notifications for
>> partitions
>> > >>> update"
>> > >>> > > > > > > >
>> > >>> > > > > > > > ### Motivation
>> > >>> > > > > > > >
>> > >>> > > > > > > > Pulsar client will poll brokers at fix time for
>> checking
>> > >>> the
>> > >>> > > > > partitions
>> > >>> > > > > > > > update if we publish/subscribe the partitioned topics
>> with
>> > >>> > > > > > > > `autoUpdatePartitions` as true. This causes
>> unnecessary
>> > >>> load for
>> > >>> > > > > both
>> > >>> > > > > > > > clients and brokers since most of the time the number
>> of
>> > >>> > > partitions
>> > >>> > > > > > will
>> > >>> > > > > > > > not change. In addition polling introduces latency in
>> > >>> partitions
>> > >>> > > > > update
>> > >>> > > > > > > >  which is specified by `autoUpdatePartitionsInterval`.
>> > >>> > > > > > > > This PIP would like to introduce a notification
>> mechanism
>> > >>> for
>> > >>> > > > > partition
>> > >>> > > > > > > > update, which is much like PIP-145 for regex
>> subscriptions
>> > >>> > > > > > > > https://github.com/apache/pulsar/issues/14505.
>> > >>> > > > > > > >
>> > >>> > > > > > > > For more details, please read the PIP at:
>> > >>> > > > > > > > https://github.com/apache/pulsar/issues/19596
>> > >>> > > > > > > > Looking forward to hearing your thoughts.
>> > >>> > > > > > > >
>> > >>> > > > > > > > Thanks,
>> > >>> > > > > > > > Xiaoyu Hou
>> > >>> > > > > > > > ----
>> > >>> > > > > > >
>> > >>> > > > > >
>> > >>> > > > >
>> > >>> > > >
>> > >>> > >
>> > >>>
>> > >>
>>
>

Reply via email to