andrasbeni opened a new issue #14505:
URL: https://github.com/apache/pulsar/issues/14505


   <!---
   Instructions for creating a PIP using this issue template:
   
    1. The author(s) of the proposal will create a GitHub issue ticket using 
this template.
       (Optionally, it can be helpful to send a note discussing the proposal to
       [email protected] mailing list before submitting this GitHub issue. 
This discussion can
       help developers gauge interest in the proposed changes before 
formalizing the proposal.)
    2. The author(s) will send a note to the [email protected] mailing list
       to start the discussion, using subject prefix `[PIP] xxx`. To determine 
the appropriate PIP
       number `xxx`, inspect the mailing list 
(https://lists.apache.org/[email protected])
       for the most recent PIP. Add 1 to that PIP's number to get your PIP's 
number.
    3. Based on the discussion and feedback, some changes might be applied by
       the author(s) to the text of the proposal.
    4. Once some consensus is reached, there will be a vote to formally approve
       the proposal. The vote will be held on the [email protected] 
mailing list. Everyone
       is welcome to vote on the proposal, though it will considered to be 
binding
       only the vote of PMC members. It will be required to have a lazy 
majority of
       at least 3 binding +1s votes. The vote should stay open for at least 48 
hours.
    5. When the vote is closed, if the outcome is positive, the state of the
       proposal is updated and the Pull Requests associated with this proposal 
can
       start to get merged into the master branch.
   
   -->
   
   ## Motivation
   
   Pulsar allows consumers to subscribe to multiple topics by a pattern. When 
using this feature, consumers poll brokers for the list of all topics in a 
namespace and filter the list on the client side based on the pattern. This 
causes unnecessary network load since  most of the time only a small fraction 
of returned topics match the pattern. In addition polling introduces latency in 
processing messages produced to a newly created topic.
   
   ## Goal
   
   This PIP proposes three changes to improve performance and decrease network 
utilization:
   
   - Server side filtering of topic list: clients will supply the broker with a 
regex pattern when requesting the list of topics for a namespace. Brokers will 
apply this pattern when present and only include matching topics in the 
response.
   - Hashing topic list to skip updates: the feature will introduce a hash 
similar in usage to the HTTP ETag header. The broker computes a hash of the 
topic list and checks it against the hash the client sends along its request. 
This allows for short responses that simply mean &quot;Nothing has 
changed&quot;.
   - Notifications for faster discovery: clients will be able to register with 
brokers as observers of the topic list. Brokers will send events to clients 
whenever there&#39;s a change in the list of matching topics.
   
   To help compatibility of new clients with older brokers, a new feature flag 
will be introduced for this feature. Brokers will return FeatureFlags as part 
of the CommandConnected message to let clients know what features they support.
   
   First, the feature will be implemented in the broker and the Java client, 
but later other clients can also make use of the capability.
   
   ## API Changes
   
   ### Protocol Changes
   
   New fields will be added to existing commands CommandGetTopicsOfNamespace 
and CommandGetTopicsOfNamespaceResponse.
   
   ```
   message CommandGetTopicsOfNamespace {
       enum Mode {
           PERSISTENT = 0;
           NON_PERSISTENT = 1;
           ALL = 2;
       }
       required uint64 request_id    = 1;
       required string namespace    = 2;
       optional Mode mode = 3 [default = PERSISTENT];
       optional string topics_pattern = 4;
       optional string topics_hash    = 5;
   }
   
   message CommandGetTopicsOfNamespaceResponse {
       required uint64 request_id    = 1;
       repeated string topics         = 2;
       // true iff the topic list was filtered by the pattern supplied by the 
client
       optional bool   filtered       = 3 [default = false];
       // hash computed from the names of matching topics
       optional string topics_hash    = 4;
       // if false, topics is empty and the list of matching topics has not 
changed
       optional bool   changed        = 5 [default = true];
   }
   ```
   
   Clients can register as topic list observers by sending the command 
CommandWatchTopicList:
   
   ```
   message CommandWatchTopicList {
       required uint64 watcher_id     = 1;
       required string namespace      = 2;
       required string topics_pattern = 3;
       // Only present when the client reconnects:
       optional string topics_hash    = 4;
   }
   ```
   
   Brokers will respond with a success message containing the watcher ID and 
the initial list of topics.
   
   ```
   message CommandWatchTopicListSuccess {
       required uint64 watcher_id     = 1;
       repeated string topic          = 2;
       required string topics_hash    = 3;
   }
   ```
   
   When new matching topics are added or deleted, the broker sends an update 
along with the hash computed from the whole list of matching topics (i.e. not 
just those that are listed in this message).
   
   ```
   message CommandWatchTopicUpdate {
       required uint64 watcher_id     = 1;
       repeated string new_topics     = 2;
       repeated string deleted_topics = 3;
       required string topics_hash    = 4;
   }
   ```
   
   Clients can unsubscribe the watcher by sending a CommandUnwatchTopicList 
message, to which the response is a CommandWatchTopicListSuccess without any 
topics.
   
   ```
   message CommandUnwatchTopicList {
       required uint64 watcher_id     = 1;
   }
   ```
   
   When a client connects to a broker it is notified if the broker supports 
topic watchers. If not, it will not send CommandWatchTopicList message and 
continues to rely on polling.
   
   ```
   message FeatureFlags {
     optional bool supports_auth_refresh = 1 [default = false];
     optional bool supports_broker_entry_metadata = 2 [default = false];
     optional bool supports_partial_producer = 3 [default = false];
     optional bool supports_topic_watchers = 4 [default = false];
   }
   
   message CommandConnected {
       required string server_version = 1;
       optional int32 protocol_version = 2 [default = 0];
       optional int32 max_message_size = 3;
       optional FeatureFlags feature_flags = 4;
   }
   ```
   
   HTTP clients will not be changed and will continue using the current polling 
behaviour.
   
   
   
   ### Configuration Changes
   
   A new broker configuration property enableBrokerSideTopicFiltering will be 
added with default value true. Setting this to false will disable the feature.
   
   
   ## Implementation
   
   ### Polling with pattern
   
   Pulsar clients will poll for topics with the pattern included in the 
command. Initially the client doesn&#39;t have a topicsHash, but once the 
broker has responded, clients will retain the hash and send it with the next 
command. If the response from the server contains no hash, the client will 
perform client side filtering. Otherwise, clients will consider the returned 
list as already filtered.
   
   The Pulsar broker will check the command for topicsPattern. If there&#39;s 
no pattern in the message, the broker will respond with all topics of the 
namespace. If a pattern is present, the list of topics is filtered and a hash 
is computed from the list. If the request contains a topicsHash and it equals 
the current hash the response will not contain the list of topics, only the 
changed flag is set to false. The pattern in topicsPattern will be evaluated 
using java.util.regex.Pattern.
   
   ### Notifications
   
   If the broker supports topic list watchers, the client will create such a 
watcher by sending CommandWatchTopicList. A new class, 
org.apache.pulsar.TopicsListService will keep track of watchers and will listen 
to changes in the metadata. Whenever a topic is created it checks if any 
watchers should be notified and sends an update through the ServerCnx. To 
prevent memory leaks, all watchers will be removed from the TopicsListService 
when the ServerCnx&#39;s channel becomes inactive.
   
   ### Compatibility
   
   #### Old clients with new servers
   
   When the server receives a message without the new fields, it will not 
filter the messages but sends the whole list of topics. These clients will 
ignore the new fields in the response.
   
   #### New clients with old servers
   
   New fields in the client request will be ignored by the protobuf parser. The 
server will send the unfiltered list and omit the new fields (as it does not 
know about them). The client will check the response for the new fields. If the 
hash is not present, the client filters the result as it does now.
   
   The introduction of FeatureFlags in CommandConnected will prevent the client 
from sending CommandWatchTopicList messages to brokers that don&#39;t yet 
support it.
   
   The same is true when broker-side topic filtering is switched off.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to