coderzc commented on code in PR #25113:
URL: https://github.com/apache/pulsar/pull/25113#discussion_r2659649685


##########
pip/pip-451.md:
##########
@@ -0,0 +1,255 @@
+# PIP-451: Support Label-based Topic Subscription
+
+# Motivation
+
+Currently, Apache Pulsar supports Pattern Subscription, which allows consumers 
to subscribe to multiple topics using Regular Expressions (Regex). While 
powerful, Regex-based subscription has several structural limitations in 
complex microservice architectures:
+
+* Coupling: It couples the consumption logic with the topic naming convention. 
Changing business requirements often forces topic renaming, which is 
operationally expensive and risky.S
+
+* Flexibility: It is difficult to group semantically related but differently 
named topics (e.g., persistent://public/default/payment-core and 
persistent://public/legacy/billing-v1) into a single subscription without 
complex regex wizardry.
+
+* Complexity: complex Regex can be hard to maintain and error-prone.
+
+Label-based Subscription solves these issues by decoupling "Identity" (Topic 
Name) from "Attributes" (Labels). Users can attach Key-Value metadata (e.g., 
env=prod, dept=finance) to topics and subscribe by specifying a label selector.
+
+# Goals
+
+## In Scope
+* Management: Allow attaching, updating, and removing Key-Value labels to/from 
Topics via the Admin API.
+* Subscription: Allow Consumers to subscribe to topics matching specific 
Labels within specified Namespaces. Support cross-namespace subscription via an 
explicit namespace list in the Client API, avoiding the complexity of 
background metadata polling.
+
+
+
+# High-Level Design
+The design introduces a metadata-driven approach where labels are stored in 
TopicPolicies. 
+The Broker maintains an in-memory index to map labels to topics. The client 
utilizes a "Watch" mechanism to receive real-time updates when topics matching 
the labels are created or updated.
+## Key points
+* Storage: Labels are stored as Map<String, String> inside TopicPolicies.
+* Indexing: The Broker maintains an In-Memory Inverted Index (LabelKey -> 
LabelValue -> Set<Topic>) per Namespace. This hierarchical structure ensures 
efficient lookups for Key-Value pairs without iterating through all topics.
+* Discovery Protocol: We extend the CommandWatchTopicList protocol (PIP-179) 
to accept a label_selector.
+* Client Implementation: The Client accepts a list of target namespaces and 
manages multiple watchers (one per namespace) to aggregate matching topics.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### Storage 
+
+#### Topic Labels in TopicPolicies
+
+Add a labels field to the TopicPolicies class. Since Topic Policies are 
propagated via the __change_events system topic, this ensures durability and 
consistency across brokers.
+```java
+public class TopicPolicies {
+    // New field: Key-Value labels
+    private Map<String, String> customLabels;
+}
+```
+
+#### In-Memory Inverted Index for Labels-Topic Mapping
+
+The SystemTopicBasedTopicPoliciesService will maintain a nested map structure 
per Namespace to support efficient Key-Value lookups.
+
+**Data Structure**:
+
+```java
+// Map<Namespace, Map<LabelKey, Map<LabelValue, Set<TopicName>>>>
+Map<String, Map<String, Map<String, Set<String>>>> labelTopicInvertedIndex;
+```
+
+### Topic Labels Observation and Notification Mechanism
+
+### TopicPoliciesService Interface Changes
+
+```java
+public interface TopicPoliciesService {
+    // ... Existing methods ...
+
+    /**
+     *  Register a label change listener with the service.
+     */
+    void registerTopicLabelsListener(NamespaceName namespace, 
TopicLabelsListener listener);
+
+    /**
+     * Unregister a label change listener with the service.
+     */
+    void unregisterTopicLabelsListener(NamespaceName namespace, 
TopicLabelsListener listener);
+
+    /**
+     * Query the list of all matching Topics under the specified Namespace 
based on the label selector.
+     */
+    CompletableFuture<List<TopicName>> getTopicsByLabels(NamespaceName 
namespace, Map<String, String> labels);
+    
+    /**
+     * Query all existing labels within a Namespace.
+     */
+    CompletableFuture<Map<String, Set<String>>> getAllLabels(NamespaceName 
namespace);
+}
+```
+To support real-time label change notifications, we will introduce a listener 
mechanism in TopicPoliciesService.
+
+#### Add TopicLabelsListener Interface
+
+```java
+public interface TopicLabelsListener {
+    /**
+     * Triggered when topic labels are substantially changed.
+     * @param topicName The affected topic.
+     * @param allLabels The latest snapshot of all labels.
+     */
+    void onLabelsUpdate(TopicName topicName, Map<String, String> allLabels);
+}
+```
+
+#### Listener Management in TopicPoliciesService
+
+The TopicPoliciesService will maintain a list of TopicLabelsListener and 
notify them when a policy update includes label changes.
+
+#### Notification Logic in Broker
+* Change Detection: When SystemTopicBasedTopicPoliciesService consumes a 
policy event, it compares oldLabels from the cache with newLabels from the 
event, then update in-memory inverted index (labelTopicInvertedIndex).
+
+* Global Dispatch: If `!Objects.equals(oldLabels, newLabels)`, the service 
invokes `onLabelsUpdate(topicName, labels)` for all registered listeners.
+
+* Watcher Evaluation: The TopicListService (registered as a listener) iterates 
through its TopicListWatcher instances.
+
+#### TopicListWatcher State Machine
+
+Each TopicListWatcher maintains a set of currently matched topics to perform 
state-based updates:
+
+```java
+public void onLabelsUpdate(TopicName topicName, @Nullable Map<String, String> 
allLabels) {
+    // allLabels can be null if the topic is deleted
+    boolean wasMatching = matchedTopics.contains(topicName);
+    boolean matchesNow = allLabels != null && labelMatcher.matches(allLabels);
+
+    List<String> newTopics = Collections.emptyList();
+    List<String> deletedTopics = Collections.emptyList();
+
+    if (!wasMatching && matchesNow) {
+        // It did not match before, but now it matches (matches after adding 
or modifying labels).
+        newTopics = Collections.singletonList(topicName.toString());
+        matchedTopics.add(topicName.toString());
+    } else if (wasMatching && !matchesNow) {
+        //  It matched before, but now it no longer matches (fails to match 
after the Topic is deleted or its labels are modified).
+        deletedTopics = Collections.singletonList(topicName.toString());
+        matchedTopics.remove(topicName.toString());
+    } else {
+        // If the state remains unchanged
+        return;
+    }
+
+    String hash = TopicList.calculateHash(matchedTopics);
+    topicListService.sendTopicListUpdate(id, hash, deletedTopics, newTopics);
+}
+```
+
+### Client Implementation Details
+The Client implementation acts as an orchestrator to support the 
"Cross-Namespace" requirement defined in topicsByLabel.
+
+* Multi-Watcher Orchestration:
+
+When `subscribe()` is called, the Client iterates over the provided namespaces.
+For each namespace, it initiates a CommandWatchTopicList request. 
+
+* Aggregation:
+
+The Client maintains a unified view of matching topics.
+New Topic Event: If any of the namespace watchers receives a NEW_TOPIC update 
(meaning a topic in that namespace matched the labels), the Client adds it to 
the list and creates a child consumer.
+Deleted Topic Event: If a watcher receives a DELETED_TOPIC update (topic 
deleted or label removed), the Client closes the corresponding child consumer.
+
+* Deterministic Scope:
+
+Unlike Regex subscription which might require scanning metadata for matching 
namespaces, this design relies on the user providing the `Set<String>` 
namespaces.
+### Public API & CLI Changes
+
+#### Set Custom Labels:
+
+* CLI: pulsar-admin topics set-custom-labels <topic-name> --labels 
"key1=value1,key2=value2"
+* REST API: POST /admin/v2/topics/{tenant}/{namespace}/{topic}/custom-labels 
with a JSON payload {"labels": {"key1":"value1", "key2":"value2"}}
+* Action: Sets or updates custom labels for the specified topic. The broker 
(or admin client before sending) will validate that all provided keys (e.g., 
key1, key2) are present in the allowedCustomLabelKeys list defined in 
broker.conf. Invalid keys will result in an error. This operation will update 
the topic's policy and publish a change event to the system topic 
(__change_events) for that namespace.
+
+#### Get Custom Labels:
+
+* CLI: pulsar-admin topics get-custom-labels <topic-name>
+* REST API: GET /admin/v2/topics/{tenant}/{namespace}/{topic}/custom-labels
+* Action: Retrieves the currently set custom labels for the topic.
+
+#### Remove Custom Labels:
+
+* CLI: 
+  - pulsar-admin topics remove-custom-labels <topic-name> --labels "key1,key2" 
(to remove specific labels)
+  - pulsar-admin topics remove-custom-labels <topic-name> --all (to remove all 
custom labels from the topic)
+* pulsar-admin topics remove-custom-labels <topic-name> --labels "key1,key2" 
(to remove specific labels)
+* pulsar-admin topics remove-custom-labels <topic-name> --all (to remove all 
custom labels from the topic)
+* REST API: DELETE /admin/v2/topics/{tenant}/{namespace}/{topic}/custom-labels 
with a query params keys=k1&keys=k2 or all=true.
+* Action: Removes the specified custom labels or all custom labels from the 
topic. This also updates the topic policy.
+
+#### Query topic associated with specific labels:
+
+* CLI: pulsar-admin topics list <namespace> --custome-labels "k1:v1,k2:v2"

Review Comment:
   I think the topic policy under a namespace can be loaded onto any broker, 
since we call `prepareInitPoliciesCacheAsync(NamespaceName namespace)`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to