AnonHxy opened a new issue, #19596: URL: https://github.com/apache/pulsar/issues/19596
### Motivation Pulsar client will poll brokers at fix time for the partitions update if we publish/subsribe the partitioned topics with `autoUpdatePartitions` as true. This causes unnecessary load for both client and broker since most of the time the number of partitions will not change. In addition polling introduces latency in partitions update which is specified by `autoUpdatePartitionsInterval`. This PIP would like introduce a notifications mechanism for partition update, like PIP-145 for regex subscriptions https://github.com/apache/pulsar/issues/14505. ### Goal This PIP proposes the following change to improve performance and decrease network utilization: - clients will be able to register with brokers as observers of the concerned topics partition update. Brokers will send events to clients whenever there's a change of the number of partition update. To help compatibility of new clients with older brokers, a new feature flag will be introduced in client for this feature. That means only new clients could be able to register with brokers as observers. ### API Changes ## Protocol Changes - Clients can register as partiton update observers by sending the command CommandWatchPartitionUpdate: ``` message CommandWatchPartitionUpdate { required uint64 request_id = 1; required uint64 watcher_id = 2; repeated string topics = 3; } ``` - Brokers will respond with a success message containing the watcher ID. ``` message CommandWatchPartitionUpdateSuccess { required uint64 request_id = 1; required uint64 watcher_id = 2; } ``` - When concerned topics partition update, the broker sends the number of partiton with topic name. Broker only sends the topics which partitons updated. ``` message CommandPartitionUpdate { required uint64 watcher_id = 1; repeated string topics = 2; repeated uint32 partitions = 3; } ``` - Clients will send CommandWatchPartitionUpdateClose to unregister observers when closing ``` message CommandWatchPartitionUpdateClose { required uint64 request_id = 1; required uint64 watcher_id = 2; } ``` * Configuration Changes A new producer configuration property `ProducerConfigurationData#enableNotificationForPartitionUpdate` and consumer config property `ConsumerConfigurationData#enableNotificationForPartitionUpdate` will be added with default value false. Setting this to true will enable the feature. * Interface Changes A new listener in client will be introduced to notify the partition update action. ``` public interface PartitionsUpdateListener { CompletableFuture<Void> onTopicsExtended(List<String> topics, List<Integer> partitions); } ``` Here we didn't use the old listener `org.apache.pulsar.client.impl.PartitionsChangedListener`, because it only has one `topics` parameter. We will not change the actions if this feature is not enable. ``` public interface PartitionsChangedListener { CompletableFuture<Void> onTopicsExtended(Collection<String> topicsExtended); } ``` ### Implementation ## Notifications If this feature is enable in clients, when `PartitionedProducerImpl` or `MultiTopicsConsumerImpl` initializes, the client will create such a watcher, called 'org.apache.pulsar.client.impl.PartitionUpdateWatcher', and send `CommandWatchPartitionUpdate` to broker to register as an observers. A new class, `org.apache.pulsar.PartitonUpdateWatcherService` will keep track of watchers and will listen to the changes in the metadata. Whenever a topic partition update it checks if any watchers should be notified and sends an update through the ServerCnx. To prevent memory leaks, all watchers will be removed from the `PartitonUpdateWatcherService` when the ServerCnx's channel becomes inactive. Of course, the poll task will not start if this feature is enable ## Compatibility - Old clients with new servers Old clients will using the poll task to fetch partitions at fixed time, which is much like new clients with this feature disable. - New clients with old servers This feature is disable by default. So new clients will using poll task to fetch partitions by default. ### Alternatives _No response_ ### Anything else? _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
