Hi all Is there any other comments about this design :)
Thanks, Xiaoyu Hou houxiaoyu <anonhx...@gmail.com> 于2023年3月8日周三 16:22写道: > Hi Michael, > > > is there a reason that we couldn't also use this to improve PIP 145? > > The protocol described in this PIP could also be used to improve PIP-145. > However I think that it' not a good reason that we use the regex sub > watcher to implement the partitioned update watcher because of the other > reasons we mentioned above. > > > Since we know we're using a TCP connection, is it possible to rely on > > pulsar's keep alive timeout (the broker and the client each have their > > own) to close a connection that isn't responsive? > > Maybe it could fail on application layer I think, for example, the > partitioned update listener run fail unexceptionly. Currently another task > will be scheduled if the poll task encounters error in partition auto > update timer task. [0] > > > Regarding the connection, which connection should the client use to send > the watch requests? > > The `PartitionUpdateWatcher` will call `connectionHandler.grabCnx()` to > open an connection, which is analogous to `TopicListWatcher`. [1] > > > do we plan on using metadata storenotifications to trigger the callbacks > that trigger notifications sent > > to the clients > > Yes, we will just look up the metadataStore to fetch the count of the > partitions and register a watcher to the metadataStore to trigger the count > update. > > > One nit on the protobuf for CommandWatchPartitionUpdateSuccess: > > > > repeated string topics = 3; > > repeated uint32 partitions = 4; > > > > What do you think about using a repeated message that represents a > > pair of a topic and its partition count instead of using two lists? > > Great. It looks better using a repeated message, I will update the > protobuf. > > > How will we handle the case where a watched topic does not exist? > > 1. When `PulsarClient` calls `create()` to create a producer or calls > `subscribe()` to create a consumer, the client will first get > partitioned-topic metadata from broker, [2]. If the topic doesn't exist and > `isAllowAutoTopicCreation=true` in broker, the partitioned-topic zk node > will auto create with default partition num. > 2. After the client getting partitioned-topic metadata successfully, the > `PartitionedProducerImpl` will be create if `meta.partition > > 0`. `PartitionUpdateWatcher` will be initilized in > `PartitionedProducerImpl` constructor. The `PartitionUpdateWatcher` sends > command to broker to register a watcher. If any topic in the topicList > doesn't exist, the broker will send error to the client and the > `PartitionedProducerImpl` will start fail. `MultiTopicsConsumerImpl` will > work in the same way. > > > I want to touch on authorization. A role should have "lookup" > > permission to watch for updates on each partitioned topic that it > > watches. As a result, if we allow for a request to watch multiple > > topics, some might succeed while others fail. How do we handle partial > > success? > > If any topic in the topicList authorizes fail, the broker will send error > to the client. The following reasons support this action: > 1. Before we sending command to register a partition update watcher, the > client should have send the `CommandPartitionedTopicMetadata` and should > have the `lookup` permission [3] [4]. > 2. Currently if any topic subsrbies fail the consumer wil start faiil. [5] > > > [0] > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L1453-L1461 > > [1] > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java#L67-L81 > > [2] > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java#L365-L371 > > [3] > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L903-L923 > > [4] > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L558-L560 > > [5] > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L171-L193 > > Thanks, > Xiaoyu Hou > > Michael Marshall <mmarsh...@apache.org> 于2023年3月7日周二 15:43写道: > >> Thanks for the context Xiaoyu Hou and Asaf. I appreciate the >> efficiencies that we can gain by creating a specific implementation >> for the partitioned topic use case. I agree that this new notification >> system makes sense based on Pulsar's current features, and I have some >> implementation questions. >> >> >- If the broker sends notification and it's lost due network issues, >> > you'll only know about it due to the client doing constant polling, >> using >> > its hash to minimize response. >> >> I see that we implemented an ack mechanism to get around this. I >> haven't looked closely, but is there a reason that we couldn't also >> use this to improve PIP 145? >> >> Since we know we're using a TCP connection, is it possible to rely on >> pulsar's keep alive timeout (the broker and the client each have their >> own) to close a connection that isn't responsive? Then, when the >> connection is re-established, the client would get the latest topic >> partition count. >> >> Regarding the connection, which connection should the client use to >> send the watch requests? At the moment, the "parent" partitioned topic >> does not have an owner, but perhaps it would help this design to make >> a single owner for a given partitioned topic. This could trivially be >> done using the existing bundle mapping. Then, all watchers for a given >> partitioned topic would be hosted on the same broker, which should be >> more efficient. I don't think we currently redirect clients to any >> specific bundle when creating the metadata for a partitioned topic, >> but if we did, then we might be able to remove some edge cases for >> notification delivery because a single broker would update the >> metadata store and then trigger the notifications to the clients. If >> we don't use this implementation, do we plan on using metadata store >> notifications to trigger the callbacks that trigger notifications sent >> to the clients? >> >> > - Each time meta-update you'll need to run it through regular >> > expression, on all topics hosted on the broker, for any given client. >> > That's a lot of CPU. >> > - Suggested mechanism mainly cares about the count of partitions, so >> > it's a lot more efficient. >> >> I forgot the partition count was its own piece of metadata that the >> broker can watch for. That part definitely makes sense to me. >> >> One nit on the protobuf for CommandWatchPartitionUpdateSuccess: >> >> repeated string topics = 3; >> repeated uint32 partitions = 4; >> >> What do you think about using a repeated message that represents a >> pair of a topic and its partition count instead of using two lists? >> >> How will we handle the case where a watched topic does not exist? >> >> I want to touch on authorization. A role should have "lookup" >> permission to watch for updates on each partitioned topic that it >> watches. As a result, if we allow for a request to watch multiple >> topics, some might succeed while others fail. How do we handle partial >> success? >> >> One interesting detail is that this PIP is essentially aligned with >> notifying clients when topic metadata changes while PIP 145 was >> related to topic creation itself. An analogous proposal could request >> a notification for any topic that gets a new metadata label. I do not >> think it is worth considering that case in this design. >> >> Thanks, >> Michael >> >> [0] https://lists.apache.org/thread/t4cwht08d4mhp3qzoxmqh6tht8l0728r >> >> On Sun, Mar 5, 2023 at 8:01 PM houxiaoyu <anonhx...@gmail.com> wrote: >> > >> > Bump. Are there other concerns or suggestions about this PIP :) Ping @ >> > Michael @Joe @Enrico >> > >> > Thanks >> > Xiaoyu Hou >> > >> > houxiaoyu <anonhx...@gmail.com> 于2023年2月27日周一 14:10写道: >> > >> > > Hi Joe and Michael, >> > > >> > > I think I misunderstood what you replied before. Now I understand and >> > > explain it again. >> > > >> > > Besides the reasons what Asaf mentioned above, there are also some >> limits >> > > for using topic list watcher. For example the >> `topicsPattern.pattern` must >> > > less that `maxSubscriptionPatternLeng` [0]. If the consumer subscribes >> > > multi partitioned-topics, the `topicsPattern.pattern` maybe very long. >> > > >> > > So I think that it's better to have a separate notification >> implementation >> > > for partition update. >> > > >> > > [0] >> > > >> https://github.com/apache/pulsar/blob/5d6932137d76d544f939bef27df25f61b4a4d00d/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java#L115-L126 >> > > >> > > Thanks, >> > > Xiaoyu Hou >> > > >> > > houxiaoyu <anonhx...@gmail.com> 于2023年2月27日周一 10:56写道: >> > > >> > >> Hi Michael, >> > >> >> > >> > I think we just need the client to "subscribe" to a topic >> notification >> > >> for >> > >> > "<topic-name>-partition-[0-9]+" to eliminate the polling >> > >> >> > >> If pulsar users want to pub/sub a partitioned-topic, I think most of >> the >> > >> users would like to create a simple producer or consumer like >> following: >> > >> ``` >> > >> Producer<byte[]> producer = >> client.newProducer().topic(topic).create(); >> > >> producer.sendAsync(msg); >> > >> ``` >> > >> ``` >> > >> client.newConsumer() >> > >> .topic(topic) >> > >> .subscriptionName(subscription) >> > >> .subscribe(); >> > >> ``` >> > >> I think there is no reason for users to use `topicsPattern` if a >> pulsar >> > >> just wants to subscribe a partitioned-topic. In addition, >> `topicsPattern` >> > >> couldn't be used for producers. >> > >> >> > >> So I think PIP-145 [0] will benefit for regex subscriptions. And >> this >> > >> PIP [1] will benefit for the common partitioned-topic pub/sub >> scenario. >> > >> >> > >> [0] https://github.com/apache/pulsar/issues/14505 >> > >> [1] https://github.com/apache/pulsar/issues/19596 >> > >> >> > >> Thanks >> > >> Xiaoyu Hou >> > >> >> > >> Michael Marshall <mmarsh...@apache.org> 于2023年2月25日周六 01:29写道: >> > >> >> > >>> > Just the way to implements partitioned-topic metadata >> > >>> > notification mechanism is much like notifications on regex sub >> changes >> > >>> >> > >>> Why do we need a separate notification implementation? The regex >> > >>> subscription feature is about discovering topics (not subscriptions) >> > >>> that match a regular expression. As Joe mentioned, I think we just >> > >>> need the client to "subscribe" to a topic notification for >> > >>> "<topic-name>-partition-[0-9]+" to eliminate the polling. >> > >>> >> > >>> Building on PIP 145, the work for this PIP would be in implementing >> a >> > >>> different `TopicsChangedListener` [1] so that the result of an added >> > >>> topic is to add a producer/consumer to the new partition. >> > >>> >> > >>> I support removing polling in our streaming platform, but I'd prefer >> > >>> to limit the number of notification systems we implement. >> > >>> >> > >>> Thanks, >> > >>> Michael >> > >>> >> > >>> [0] https://github.com/apache/pulsar/pull/16062 >> > >>> [1] >> > >>> >> https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java#L169-L175 >> > >>> >> > >>> >> > >>> >> > >>> On Fri, Feb 24, 2023 at 1:57 AM houxiaoyu <anonhx...@gmail.com> >> wrote: >> > >>> > >> > >>> > Hi Joe, >> > >>> > >> > >>> > When we use PartitionedProducerImpl or MultiTopicsConsumerImpl, >> there >> > >>> is a >> > >>> > poll task to fetch the metadata of the partitioned-topic >> regularly for >> > >>> the >> > >>> > number of partitions updated. This PIP wants to use a >> > >>> > notification mechanism to replace the metadata poll task. >> > >>> > >> > >>> > Just the way to implements partitioned-topic metadata >> > >>> > notification mechanism is much like notifications on regex sub >> changes >> > >>> > >> > >>> > Joe F <joefranc...@gmail.com> 于2023年2月24日周五 13:37写道: >> > >>> > >> > >>> > > Why is this needed when we have notifications on regex sub >> changes? >> > >>> Aren't >> > >>> > > the partition names a well-defined regex? >> > >>> > > >> > >>> > > Joe >> > >>> > > >> > >>> > > On Thu, Feb 23, 2023 at 8:52 PM houxiaoyu <anonhx...@gmail.com> >> > >>> wrote: >> > >>> > > >> > >>> > > > Hi Asaf, >> > >>> > > > thanks for your reminder. >> > >>> > > > >> > >>> > > > ## Changing >> > >>> > > > I have updated the following changes to make sure the >> notification >> > >>> > > arrived >> > >>> > > > successfully: >> > >>> > > > 1. The watch success response >> `CommandWatchPartitionUpdateSuccess` >> > >>> will >> > >>> > > > contain all the concerned topics of this watcher >> > >>> > > > 2. The notification `CommandPartitionUpdate` will always >> contain >> > >>> all the >> > >>> > > > concerned topics of this watcher. >> > >>> > > > 3. The notification `CommandPartitionUpdate`contains a >> > >>> monotonically >> > >>> > > > increased version. >> > >>> > > > 4. A map >> > >>> `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/, >> > >>> > > > Pair<version, long/*timestamp*/>>` will keep track of the >> updating >> > >>> > > > 5. A timer will check the updating timeout through >> `inFlightUpdate` >> > >>> > > > 6. The client acks `CommandPartitionUpdateResult` to broker >> when it >> > >>> > > > finishes updating. >> > >>> > > > >> > >>> > > > ## Details >> > >>> > > > >> > >>> > > > The following mechanism could make sure the newest >> notification >> > >>> arrived >> > >>> > > > successfully, copying the description from GH: >> > >>> > > > >> > >>> > > > A new class, `org.apache.pulsar.PartitonUpdateWatcherService` >> will >> > >>> keep >> > >>> > > > track of watchers and will listen to the changes in the >> metadata. >> > >>> > > Whenever >> > >>> > > > a topic partition updates it checks if any watchers should be >> > >>> notified >> > >>> > > and >> > >>> > > > sends an update for all topics the watcher concerns through >> the >> > >>> > > ServerCnx. >> > >>> > > > Then we will record this request into a map, >> > >>> > > > `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/, >> > >>> > > Pair<version, >> > >>> > > > long/*timestamp*/>>`. A timer will check this update timeout >> > >>> through >> > >>> > > > inFlightUpdate . We will query all the concerned topics's >> > >>> partition if >> > >>> > > > this watcher has sent an update timeout and will resend it. >> > >>> > > > >> > >>> > > > The client acks `CommandPartitionUpdateResult` to broker when >> it >> > >>> finishes >> > >>> > > > updating. The broker handle `CommandPartitionUpdateResult` >> > >>> request: >> > >>> > > > - If CommandPartitionUpdateResult#version < >> > >>> > > > >> PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version, >> > >>> > > broker >> > >>> > > > ignores this ack. >> > >>> > > > - If CommandPartitionUpdateResult#version == >> > >>> > > > >> PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version >> > >>> > > > - If CommandPartitionUpdateResult#success is true, >> broker just >> > >>> > > removes >> > >>> > > > the watcherID from inFlightUpdate. >> > >>> > > > - If CommandPartitionUpdateResult#success is false, >> broker >> > >>> removes >> > >>> > > the >> > >>> > > > watcherId from inFlightUpdate, and queries all the concerned >> > >>> topics's >> > >>> > > > partition and resend. >> > >>> > > > - If CommandPartitionUpdateResult#version > >> > >>> > > > >> > >>> PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version, >> this >> > >>> > > > should not happen. >> > >>> > > > >> > >>> > > > ## Edge cases >> > >>> > > > - Broker restarts or crashes >> > >>> > > > Client will reconnect to another broker, broker responses >> > >>> > > > `CommandWatchPartitionUpdateSuccess` with watcher concerned >> > >>> topics's >> > >>> > > > partitions. We will call `PartitionsUpdateListener` if the >> > >>> connection >> > >>> > > > opens. >> > >>> > > > - Client acks fail or timeout >> > >>> > > > Broker will resend the watcher concerned topics's partitions >> either >> > >>> > > client >> > >>> > > > acks fail or acks timeout. >> > >>> > > > - Partition updates before client acks. >> > >>> > > > `CommandPartitionUpdate#version` monotonically increases every >> > >>> time it is >> > >>> > > > updated. If Partition updates before client acks, a greater >> > >>> version will >> > >>> > > be >> > >>> > > > put into `PartitonUpdateWatcherService#inFlightUpdate`. The >> > >>> previous >> > >>> > > acks >> > >>> > > > will be ignored because the version is less than the current >> > >>> version. >> > >>> > > > >> > >>> > > > >> > >>> > > > Asaf Mesika <asaf.mes...@gmail.com> 于2023年2月22日周三 21:33写道: >> > >>> > > > >> > >>> > > > > How about edge cases? >> > >>> > > > > In Andra's PIP he took into account cases where updates were >> > >>> lost, so >> > >>> > > he >> > >>> > > > > created a secondary poll. Not saying it's the best >> situation for >> > >>> your >> > >>> > > > case >> > >>> > > > > of course. >> > >>> > > > > I'm saying that when a broker sends an update >> > >>> CommandPartitionUpdate, >> > >>> > > how >> > >>> > > > > do you know it arrived successfully? From my memory, there >> is no >> > >>> ACK in >> > >>> > > > the >> > >>> > > > > protocol, saying "I'm the client, I got the update >> successfully" >> > >>> and >> > >>> > > only >> > >>> > > > > then it removed the "dirty" flag for that topic, for this >> > >>> watcher ID. >> > >>> > > > > >> > >>> > > > > Are there any other edge cases we can have? Let's be >> exhaustive. >> > >>> > > > > >> > >>> > > > > >> > >>> > > > > >> > >>> > > > > On Wed, Feb 22, 2023 at 1:14 PM houxiaoyu < >> anonhx...@gmail.com> >> > >>> wrote: >> > >>> > > > > >> > >>> > > > > > Thanks for your great suggestion Enrico. >> > >>> > > > > > >> > >>> > > > > > I agreed with you. It's more reasonable to add a >> > >>> > > > > > `supports_partition_update_watchers` in `FeatureFlags` >> to >> > >>> detect >> > >>> > > that >> > >>> > > > > the >> > >>> > > > > > connected broker supporting this feature , and add a new >> broker >> > >>> > > > > > configuration property >> `enableNotificationForPartitionUpdate` >> > >>> with >> > >>> > > > > default >> > >>> > > > > > value true, which is much like PIP-145. >> > >>> > > > > > >> > >>> > > > > > I have updated the descriptions. >> > >>> > > > > > >> > >>> > > > > > Enrico Olivelli <eolive...@gmail.com> 于2023年2月22日周三 >> 17:26写道: >> > >>> > > > > > >> > >>> > > > > > > I support this proposal. >> > >>> > > > > > > Coping here my comments from GH: >> > >>> > > > > > > >> > >>> > > > > > > can't we enable this by default in case we detect that >> the >> > >>> > > connected >> > >>> > > > > > > Broker supports it ? >> > >>> > > > > > > I can't find any reason for not using this mechanism if >> it is >> > >>> > > > > available. >> > >>> > > > > > > >> > >>> > > > > > > Maybe we can set the default to "true" and allow users >> to >> > >>> disable >> > >>> > > it >> > >>> > > > > > > in case it impacts their systems in an unwanted way. >> > >>> > > > > > > >> > >>> > > > > > > Maybe It would be useful to have a way to disable the >> > >>> mechanism on >> > >>> > > > the >> > >>> > > > > > > broker side as well >> > >>> > > > > > > >> > >>> > > > > > > Enrico >> > >>> > > > > > > >> > >>> > > > > > > Il giorno mer 22 feb 2023 alle ore 10:22 houxiaoyu >> > >>> > > > > > > <anonhx...@gmail.com> ha scritto: >> > >>> > > > > > > > >> > >>> > > > > > > > Hi Pulsar community: >> > >>> > > > > > > > >> > >>> > > > > > > > I opened a PIP to discuss "Notifications for >> partitions >> > >>> update" >> > >>> > > > > > > > >> > >>> > > > > > > > ### Motivation >> > >>> > > > > > > > >> > >>> > > > > > > > Pulsar client will poll brokers at fix time for >> checking >> > >>> the >> > >>> > > > > partitions >> > >>> > > > > > > > update if we publish/subscribe the partitioned topics >> with >> > >>> > > > > > > > `autoUpdatePartitions` as true. This causes >> unnecessary >> > >>> load for >> > >>> > > > > both >> > >>> > > > > > > > clients and brokers since most of the time the number >> of >> > >>> > > partitions >> > >>> > > > > > will >> > >>> > > > > > > > not change. In addition polling introduces latency in >> > >>> partitions >> > >>> > > > > update >> > >>> > > > > > > > which is specified by `autoUpdatePartitionsInterval`. >> > >>> > > > > > > > This PIP would like to introduce a notification >> mechanism >> > >>> for >> > >>> > > > > partition >> > >>> > > > > > > > update, which is much like PIP-145 for regex >> subscriptions >> > >>> > > > > > > > https://github.com/apache/pulsar/issues/14505. >> > >>> > > > > > > > >> > >>> > > > > > > > For more details, please read the PIP at: >> > >>> > > > > > > > https://github.com/apache/pulsar/issues/19596 >> > >>> > > > > > > > Looking forward to hearing your thoughts. >> > >>> > > > > > > > >> > >>> > > > > > > > Thanks, >> > >>> > > > > > > > Xiaoyu Hou >> > >>> > > > > > > > ---- >> > >>> > > > > > > >> > >>> > > > > > >> > >>> > > > > >> > >>> > > > >> > >>> > > >> > >>> >> > >> >> >