This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9740408b364 [feat][pip] PIP-452: Customizable topic listing of 
namespace with properties (#25134)
9740408b364 is described below

commit 9740408b364b372b95939fa63980dff593492b2b
Author: Cong Zhao <[email protected]>
AuthorDate: Mon Feb 9 21:55:31 2026 +0800

    [feat][pip] PIP-452: Customizable topic listing of namespace with 
properties (#25134)
---
 pip/pip-452.md | 280 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 280 insertions(+)

diff --git a/pip/pip-452.md b/pip/pip-452.md
new file mode 100644
index 00000000000..6871a7576c5
--- /dev/null
+++ b/pip/pip-452.md
@@ -0,0 +1,280 @@
+# PIP-452: Customizable topic listing of namespace with properties
+
+# Motivation
+Currently, the CommandGetTopicsOfNamespace logic in the Pulsar Broker is 
hard-coded to scan the metadata store (ZooKeeper) for all children nodes under 
a namespace.
+
+This implementation limits the flexibility required for complex multi-tenant 
scenarios:
+
+No Client Context: The broker cannot distinguish who is asking for the topics 
or why. It cannot filter topics based on client properties (these client 
properties may correspond to, or be derived from topic properties).
+
+Inefficient Filtering: For namespaces with millions of topics, the broker must 
fetch the full list into memory before applying the topics_pattern regex. There 
is no way to "push down" the filtering to the data source (e.g., a database 
with an index).
+
+To address these issues, I propose making the topic listing logic pluggable 
and extending the protocol to accept client properties.
+
+# Goals
+
+## In Scope
+
+Protocol: Add a properties field to `CommandGetTopicsOfNamespace` to carry 
client-side context.
+
+Broker: Introduce a new pluggable interface `PulsarResourcesExtended` to allow 
customization of resource management logic, starting with the "Get Topics" 
request.
+
+Client: Update the Java Client to forward Consumer properties to the lookup 
service when using Regex subscriptions.
+
+Admin API & CLI: Update the REST API and CLI to accept properties for listing 
topics in a namespace.
+
+Configuration: Add a broker configuration to enable/disable topic list 
watching.
+
+## Out of Scope
+
+Not support topic list watching for customized topic listing in this PIP. It 
can be considered in future work.
+If you want to use this feature, you need to disable topic list watcher by set 
`enableBrokerTopicListWatcher = false`.
+
+# High Level Design
+We will modify the Pulsar Protocol to carry a properties map. On the Broker 
side, we will add a new pluggable interface `PulsarResourcesExtended`.
+The default implementation will preserve the existing behavior (delegating to 
`NamespaceService`). The Broker's connection handler will delegate the request 
to `NamespaceService`, which in turn delegates to the `PulsarResourcesExtended` 
interface.
+
+# Detailed Design
+
+## Public-facing Changes
+
+### Configuration
+
+broker.conf
+```properties
+# Enables watching topic add/remove events on broker side. It is separated 
from enableBrokerSideSubscriptionPatternEvaluation.
+enableBrokerTopicListWatcher = true
+
+# Class name for the extended Pulsar resources.
+# This class must implement org.apache.pulsar.broker.PulsarResourcesExtended.
+pulsarResourcesExtendedClassName = 
org.apache.pulsar.broker.DefaultPulsarResourcesExtended
+```
+
+### Protocol Changes
+Update `PulsarApi.proto` to include the properties field.
+
+PulsarApi.proto
+```protobuf
+message CommandGetTopicsOfNamespace {
+    required uint64 request_id = 1;
+    required string namespace = 2;
+    optional Mode mode = 3 [default = PERSISTENT];
+    
+    // Existing fields for filtering and hash optimization
+    optional string topics_pattern = 4;
+    optional string topics_hash = 5;
+
+    // New field: Context properties from the client
+    repeated KeyValue properties = 6;
+}
+```
+
+### REST API & CLI Changes
+
+Add `properties` parameter to the REST API endpoint for listing topics in a 
namespace to list topic with specific properties for customizable topic listing.
+
+REST API (URL encoding is required for values):
+```
+GET /admin/v2/persistent/{tenant}/{namespace}?properties=k1=v1,k2=v2
+```
+
+CLI:
+```
+pulsar-admin topics list <tenant>/<namespace> -p k1=v1 -p k2=v2
+```
+
+## Design & Implementation Details
+### Broker Changes
+
+Introduce `PulsarResourcesExtended` Interface
+
+This interface is designed to be pluggable, allowing custom implementations to 
provide extended capabilities such as custom topic listing strategies.
+
+```java
+package org.apache.pulsar.broker;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.jspecify.annotations.Nullable;
+
+/**
+ * Extended PulsarResources that provides additional functionality beyond 
PulsarResources.
+ *
+ * <p>This interface is designed to be pluggable, allowing custom 
implementations
+ * to provide extended PulsarResources capabilities such as custom topic 
listing strategies</p>
+ *
+ * <p>Implementations of this interface can be registered with PulsarService to
+ * provide extended resource management capabilities.</p>
+ */
[email protected]
+public interface PulsarResourcesExtended {
+
+    /**
+     * Lists topics in a namespace with optional property-based filtering.
+     *
+     * <p>This method provides a flexible way to list topics in a namespace,
+     * supporting property-based filtering through the properties 
parameter.</p>
+     *
+     * @param namespaceName the namespace to list topics from
+     * @param mode the listing mode (ALL, PERSISTENT, NON_PERSISTENT)
+     * @param properties optional property filters for topic listing, if null 
or empty, no filtering is applied
+     * @return a CompletableFuture containing the list of topic names
+     */
+    CompletableFuture<List<String>> listTopicOfNamespace(NamespaceName 
namespaceName,
+                                                         
CommandGetTopicsOfNamespace.Mode mode,
+                                                         @Nullable Map<String, 
String> properties);
+
+    /**
+     * Initializes this extended resources instance with the PulsarService.
+     *
+     * <p>This method is called during broker startup to provide the 
implementation
+     * with access to the PulsarService and its dependencies.</p>
+     *
+     * @param pulsarService the PulsarService instance
+     */
+    void initialize(PulsarService pulsarService);
+
+    /**
+     * Closes this extended resources instance and releases any resources.
+     *
+     * <p>This method should be called during broker shutdown to clean up
+     * any resources held by the implementation.</p>
+     */
+    void close();
+}
+```
+
+Add `DefaultPulsarResourcesExtended`
+
+The default implementation delegates to the existing `NamespaceService`.
+
+```java
+package org.apache.pulsar.broker;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
+import org.apache.pulsar.common.naming.NamespaceName;
+
+public class DefaultPulsarResourcesExtended implements PulsarResourcesExtended 
{
+
+    private NamespaceService namespaceService;
+
+    @Override
+    public CompletableFuture<List<String>> listTopicOfNamespace(NamespaceName 
namespaceName,
+                                                                
CommandGetTopicsOfNamespace.Mode mode,
+                                                                Map<String, 
String> properties) {
+        // Delegate to standard NamespaceService methods to preserve existing 
behavior
+        return namespaceService.getListOfTopics(namespaceName, mode);
+    }
+
+    @Override
+    public void initialize(PulsarService pulsarService) {
+        this.namespaceService = pulsarService.getNamespaceService();
+    }
+}
+```
+
+NamespaceService Changes
+
+Add `getListOfTopicsByProperties` and `getListOfUserTopicsByProperties` in 
`NamespaceService` to serve as the entry point for topic listing with 
properties support. This method acts as the hook to call the pluggable resource 
extension.
+
+```java
+// In NamespaceService.java
+
+public CompletableFuture<List<String>> 
getListOfTopicsByProperties(NamespaceName namespaceName, Mode mode,
+                                                                Map<String, 
String> properties) {
+    return pulsarResourcesExtended.listTopicOfNamespace(namespaceName, mode, 
properties);
+}
+
+public CompletableFuture<List<String>> 
getListOfUserTopicsByProperties(NamespaceName namespaceName, Mode mode,
+                                                                       
Map<String, String> properties) {
+    return getListOfUserTopicsInternal(cacheKeyWithProperties(namespaceName, 
mode, properties),
+        () -> getListOfTopicsByProperties(namespaceName, mode, properties));
+}
+
+private CompletableFuture<List<String>> getListOfUserTopicsInternal(
+    String key,
+    Supplier<CompletableFuture<List<String>>> topicsSupplier) {
+    final MutableBoolean initializedByCurrentThread = new MutableBoolean();
+    CompletableFuture<List<String>> queryRes = 
inProgressQueryUserTopics.computeIfAbsent(key, k -> {
+        initializedByCurrentThread.setTrue();
+        return 
topicsSupplier.get().thenApplyAsync(TopicList::filterSystemTopic, 
pulsar.getExecutor());
+    });
+    .....
+    return queryRes;
+}
+```
+
+Update CommandGetTopicsOfNamespace Handling
+
+Use `getListOfUserTopicsByProperties` instead of `getListOfUserTopics` to get 
topic listing with properties support.
+
+```java
+    private void internalHandleGetTopicsOfNamespace(String namespace, 
NamespaceName namespaceName, long requestId,
+                                                    
CommandGetTopicsOfNamespace.Mode mode,
+                                                    Optional<String> 
topicsPattern, Optional<String> topicsHash,
+                                                    Semaphore lookupSemaphore, 
Map<String, String> properties) {
+        listSizeHolder.getSizeAsync().thenAccept(initialSize -> {
+            maxTopicListInFlightLimiter.withAcquiredPermits(initialSize,
+                    AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, 
isPermitRequestCancelled, initialPermits -> {
+                        return 
getBrokerService().pulsar().getNamespaceService()
+                                
.getListOfUserTopicsByProperties(namespaceName, mode, properties);
+                    ......
+            });
+        });
+    }
+```
+
+### Client Changes
+Update LookupService
+The internal LookupService interface is updated to accept the properties map.
+
+```Java
+CompletableFuture<List<String>> getTopicsUnderNamespace(
+    NamespaceName namespace, 
+    Mode mode, 
+    String topicsPattern, 
+    String topicsHash, 
+    Map<String, String> properties
+);
+```
+Update `PatternMultiTopicsConsumerImpl`
+The regex consumer implementation (PatternMultiTopicsConsumerImpl) will be 
updated to extract the properties from ConsumerConfigurationData and pass them 
to the LookupService.
+
+```Java
+// In PatternMultiTopicsConsumerImpl.java
+ConsumerConfigurationData conf;
+Map<String, String> contextProperties = conf.getProperties();
+
+lookup.getTopicsUnderNamespace(
+    namespace, 
+    mode, 
+    topicsPattern.pattern(), 
+    topicsHash, 
+    contextProperties // Pass consumer properties here
+).thenAccept(topics -> {
+    // ... update subscriptions ...
+});
+```
+
+
+# Backward Compatibility
+Protocol: Adding an optional field (properties) to the Protobuf definition is 
non-breaking. Old clients will not send this field; old brokers will ignore it.
+
+Behavior: The default strategy mimics the current behavior exactly. The Broker 
retains the final filtering logic, ensuring that even custom strategies cannot 
return topics that violate the client's requested pattern.
+
+# Security Considerations
+Input Validation: The properties map is user-supplied input. Implementers of 
the customizable strategy MUST validate and sanitize these inputs before using 
them, especially if they are used to construct database queries.
+
+Authorization: This PIP only controls the discovery (listing) of topics. It 
does not bypass the Authorization Service for consuming or producing to those 
topics.
+
+# Links
+
+* Mailing List discussion thread: 
https://lists.apache.org/thread/nf11qzn6r9wps6l81trxwtdr1m9xszpm
+* Mailing List voting thread: 
https://lists.apache.org/thread/fx9spdm11c1491064gzocfvz7ptlbs4t

Reply via email to