Michael,

I think the current suggested protocol is more optimized compared to
existing regex mechanism:

   - If the broker sends notification and it's lost due network issues,
   you'll only know about it due to the client doing constant polling, using
   its hash to minimize response.
      - In the suggested mechanism, there's no constant polling at all.
      Just think of introducing polling to *all* multiple partition
producers and
      consumers (Which likely all pulsar users). The suggested mechanism has a
      configured timeout allowing it to resend on timeout, but only
when it fails
      as opposed to constant polling mechanisms.
   - Each time meta-update you'll need to run it through regular
   expression, on all topics hosted on the broker, for any given client.
   That's a lot of CPU.
      - Suggested mechanism mainly cares about the count of partitions, so
      it's a lot more efficient.


Xiaoyu,

I don't like the "onTopicExtend" method name. Prefer something more
explicit: onTopicPartitionsCountChanged.

Let's think about the case of endless retries - you know, the client is
acting out, and never acknowledges. Need to to think about it.
60 seconds seems like *a lot* to miss an update. Maybe 5-10 sec?

Can you elaborate on which broker you'll be doing that conversation with?
Partitioned topic are spread across several brokers, so how can one broker
know the count of the partitioned topic?

Maybe we should mark "PartitionsChangedListener" deprecated? Having two
interfaces is confusing.



On Fri, Feb 24, 2023 at 7:29 PM Michael Marshall <mmarsh...@apache.org>
wrote:

> > Just the way to implements partitioned-topic metadata
> > notification mechanism is much like notifications on regex sub changes
>
> Why do we need a separate notification implementation? The regex
> subscription feature is about discovering topics (not subscriptions)
> that match a regular expression. As Joe mentioned, I think we just
> need the client to "subscribe" to a topic notification for
> "<topic-name>-partition-[0-9]+" to eliminate the polling.
>
> Building on PIP 145, the work for this PIP would be in implementing a
> different `TopicsChangedListener` [1] so that the result of an added
> topic is to add a producer/consumer to the new partition.
>
> I support removing polling in our streaming platform, but I'd prefer
> to limit the number of notification systems we implement.
>
> Thanks,
> Michael
>
> [0] https://github.com/apache/pulsar/pull/16062
> [1]
> https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java#L169-L175
>
>
>
> On Fri, Feb 24, 2023 at 1:57 AM houxiaoyu <anonhx...@gmail.com> wrote:
> >
> > Hi Joe,
> >
> > When we use PartitionedProducerImpl or MultiTopicsConsumerImpl,  there
> is a
> > poll task to fetch the metadata of the partitioned-topic regularly for
> the
> > number of partitions updated.  This PIP wants to use a
> > notification mechanism to replace the metadata poll task.
> >
> > Just the way to implements partitioned-topic metadata
> > notification mechanism is much like notifications on regex sub changes
> >
> > Joe F <joefranc...@gmail.com> 于2023年2月24日周五 13:37写道:
> >
> > > Why is this needed when we have notifications on regex sub changes?
> Aren't
> > > the partition names a well-defined regex?
> > >
> > > Joe
> > >
> > > On Thu, Feb 23, 2023 at 8:52 PM houxiaoyu <anonhx...@gmail.com> wrote:
> > >
> > > > Hi Asaf,
> > > > thanks for your reminder.
> > > >
> > > > ## Changing
> > > > I have updated the following changes to make sure the notification
> > > arrived
> > > > successfully:
> > > > 1. The watch success response `CommandWatchPartitionUpdateSuccess`
> will
> > > > contain all the concerned topics of this watcher
> > > > 2. The notification `CommandPartitionUpdate` will always contain all
> the
> > > > concerned topics of this watcher.
> > > > 3. The notification `CommandPartitionUpdate`contains a monotonically
> > > > increased version.
> > > > 4. A map
> `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/,
> > > > Pair<version, long/*timestamp*/>>` will keep track of the updating
> > > > 5. A timer will check the updating timeout through `inFlightUpdate`
> > > > 6. The client acks `CommandPartitionUpdateResult` to broker when it
> > > > finishes updating.
> > > >
> > > > ## Details
> > > >
> > > > The following mechanism could make sure the newest notification
> arrived
> > > > successfully, copying the description from GH:
> > > >
> > > > A new class, `org.apache.pulsar.PartitonUpdateWatcherService` will
> keep
> > > > track of watchers and will listen to the changes in the metadata.
> > > Whenever
> > > > a topic partition updates it checks if any watchers should be
> notified
> > > and
> > > > sends an update for all topics the watcher concerns through the
> > > ServerCnx.
> > > > Then we will record this request into a map,
> > > > `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/,
> > > Pair<version,
> > > > long/*timestamp*/>>`.  A timer will check this update timeout through
> > > > inFlightUpdate .  We will query all the concerned topics's partition
> if
> > > > this watcher has sent an update timeout and will resend it.
> > > >
> > > > The client acks `CommandPartitionUpdateResult` to broker when it
> finishes
> > > > updating.  The broker handle `CommandPartitionUpdateResult` request:
> > > >  - If CommandPartitionUpdateResult#version <
> > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version,
> > > broker
> > > > ignores this ack.
> > > >  -  If CommandPartitionUpdateResult#version ==
> > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version
> > > >     - If CommandPartitionUpdateResult#success is true,  broker just
> > > removes
> > > > the watcherID from inFlightUpdate.
> > > >     - If CommandPartitionUpdateResult#success is false,  broker
> removes
> > > the
> > > > watcherId from inFlightUpdate, and queries all the concerned topics's
> > > > partition and resend.
> > > >  - If CommandPartitionUpdateResult#version >
> > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version,
> this
> > > > should not happen.
> > > >
> > > >  ## Edge cases
> > > > - Broker restarts or crashes
> > > > Client will reconnect to another broker, broker responses
> > > > `CommandWatchPartitionUpdateSuccess` with watcher concerned topics's
> > > > partitions.  We will call `PartitionsUpdateListener` if the
> connection
> > > > opens.
> > > > - Client acks fail or timeout
> > > > Broker will resend the watcher concerned topics's partitions either
> > > client
> > > > acks fail or acks timeout.
> > > > - Partition updates before client acks.
> > > > `CommandPartitionUpdate#version` monotonically increases every time
> it is
> > > > updated. If Partition updates before client acks, a greater version
> will
> > > be
> > > > put into `PartitonUpdateWatcherService#inFlightUpdate`.  The previous
> > > acks
> > > > will be ignored because the version is less than the current version.
> > > >
> > > >
> > > > Asaf Mesika <asaf.mes...@gmail.com> 于2023年2月22日周三 21:33写道:
> > > >
> > > > > How about edge cases?
> > > > > In Andra's PIP he took into account cases where updates were lost,
> so
> > > he
> > > > > created a secondary poll. Not saying it's the best situation for
> your
> > > > case
> > > > > of course.
> > > > > I'm saying that when a broker sends an update
> CommandPartitionUpdate,
> > > how
> > > > > do you know it arrived successfully? From my memory, there is no
> ACK in
> > > > the
> > > > > protocol, saying "I'm the client, I got the update successfully"
> and
> > > only
> > > > > then it removed the "dirty" flag for that topic, for this watcher
> ID.
> > > > >
> > > > > Are there any other edge cases we can have? Let's be exhaustive.
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Feb 22, 2023 at 1:14 PM houxiaoyu <anonhx...@gmail.com>
> wrote:
> > > > >
> > > > > > Thanks for your great suggestion Enrico.
> > > > > >
> > > > > > I agreed with you. It's more reasonable to add a
> > > > > > `supports_partition_update_watchers`  in `FeatureFlags`  to
> detect
> > > that
> > > > > the
> > > > > > connected broker supporting this feature , and add a new broker
> > > > > > configuration property `enableNotificationForPartitionUpdate`
> with
> > > > > default
> > > > > > value true, which is much like PIP-145.
> > > > > >
> > > > > > I have updated the descriptions.
> > > > > >
> > > > > > Enrico Olivelli <eolive...@gmail.com> 于2023年2月22日周三 17:26写道:
> > > > > >
> > > > > > > I support this proposal.
> > > > > > > Coping here my comments from GH:
> > > > > > >
> > > > > > > can't we enable this by default in case we detect that the
> > > connected
> > > > > > > Broker supports it ?
> > > > > > > I can't find any reason for not using this mechanism if it is
> > > > > available.
> > > > > > >
> > > > > > > Maybe we can set the default to "true" and allow users to
> disable
> > > it
> > > > > > > in case it impacts their systems in an unwanted way.
> > > > > > >
> > > > > > > Maybe It would be useful to have a way to disable the
> mechanism on
> > > > the
> > > > > > > broker side as well
> > > > > > >
> > > > > > > Enrico
> > > > > > >
> > > > > > > Il giorno mer 22 feb 2023 alle ore 10:22 houxiaoyu
> > > > > > > <anonhx...@gmail.com> ha scritto:
> > > > > > > >
> > > > > > > > Hi Pulsar community:
> > > > > > > >
> > > > > > > > I opened a PIP to discuss "Notifications for partitions
> update"
> > > > > > > >
> > > > > > > > ### Motivation
> > > > > > > >
> > > > > > > > Pulsar client will poll brokers at fix time for checking the
> > > > > partitions
> > > > > > > > update if we publish/subscribe the partitioned topics with
> > > > > > > > `autoUpdatePartitions` as true. This causes unnecessary load
> for
> > > > > both
> > > > > > > > clients and brokers since most of the time the number of
> > > partitions
> > > > > > will
> > > > > > > > not change. In addition polling introduces latency in
> partitions
> > > > > update
> > > > > > > >  which is specified by `autoUpdatePartitionsInterval`.
> > > > > > > > This PIP would like to introduce a notification mechanism for
> > > > > partition
> > > > > > > > update, which is much like PIP-145 for regex subscriptions
> > > > > > > > https://github.com/apache/pulsar/issues/14505.
> > > > > > > >
> > > > > > > > For more details, please read the PIP at:
> > > > > > > > https://github.com/apache/pulsar/issues/19596
> > > > > > > > Looking forward to hearing your thoughts.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Xiaoyu Hou
> > > > > > > > ----
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
>

Reply via email to