merlimat commented on code in PR #16062:
URL: https://github.com/apache/pulsar/pull/16062#discussion_r905309249


##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java:
##########
@@ -110,4 +123,34 @@ public CompletableFuture<Void> 
clearTenantPersistence(String tenant) {
                     }
                 });
     }
+
+    void handleNotification(Notification notification) {
+        if (notification.getPath().startsWith(MANAGED_LEDGER_PATH)
+                && EnumSet.of(NotificationType.Created, 
NotificationType.Deleted).contains(notification.getType())) {
+            for (Map.Entry<BiConsumer<String, NotificationType>, Pattern> 
entry :
+                    new HashMap<>(topicListeners).entrySet()) {
+                Matcher matcher = 
entry.getValue().matcher(notification.getPath());
+                if (matcher.matches()) {
+                    TopicName topicName = TopicName.get(
+                            matcher.group(2), 
NamespaceName.get(matcher.group(1)), matcher.group(3));
+                    entry.getKey().accept(topicName.toString(), 
notification.getType());
+                }
+            }
+        }
+    }
+
+    Pattern namespaceNameToTopicNamePattern(NamespaceName namespaceName) {

Review Comment:
   We could have this as part of `NamespaceName` where we can store the 
compiled pattern.



##########
pulsar-common/src/main/proto/PulsarApi.proto:
##########
@@ -987,6 +1017,11 @@ message BaseCommand {
         TC_CLIENT_CONNECT_REQUEST = 62;
         TC_CLIENT_CONNECT_RESPONSE = 63;
 
+        WATCH_TOPIC_LIST = 64;
+        WATCH_TOPIC_LIST_SUCCESS = 65;
+        WATCH_TOPIC_UPDATE = 66;
+        UNWATCH_TOPIC_LIST = 67;

Review Comment:
   nit: maybe we could rename to `WATCH_TOPIC_CLOSE` to have the same prefix



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2513,6 +2519,53 @@ protected void 
handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
                 }));
     }
 
+    protected void handleCommandWatchTopicList(CommandWatchTopicList 
commandWatchTopicList) {
+        final long requestId = commandWatchTopicList.getRequestId();
+        final long watcherId = commandWatchTopicList.getWatcherId();
+        final NamespaceName namespaceName = 
NamespaceName.get(commandWatchTopicList.getNamespace());
+
+        final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
+        if (lookupSemaphore.tryAcquire()) {
+            if (invalidOriginalPrincipal(originalPrincipal)) {
+                final String msg = "Valid Proxy Client role should be provided 
for watchTopicListRequest ";
+                log.warn("[{}] {} with role {} and proxyClientAuthRole {} on 
namespace {}", remoteAddress, msg,
+                        authRole, originalPrincipal, namespaceName);
+                commandSender.sendErrorResponse(watcherId, 
ServerError.AuthorizationError, msg);
+                lookupSemaphore.release();
+                return;
+            }
+            isNamespaceOperationAllowed(namespaceName, 
NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
+                if (isAuthorized) {
+                    
topicListService.handleWatchTopicList(commandWatchTopicList, lookupSemaphore);

Review Comment:
   This method will potentially get called from a different thread (if the 
authz is not already cached), therefore we cannot pass on the 
`commandWatchTopicList` object because it will get reused when the io-thread is 
processing the next call. 
   
   We need to extract the fields that we are interested in into local variables.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2513,6 +2519,53 @@ protected void 
handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
                 }));
     }
 
+    protected void handleCommandWatchTopicList(CommandWatchTopicList 
commandWatchTopicList) {
+        final long requestId = commandWatchTopicList.getRequestId();
+        final long watcherId = commandWatchTopicList.getWatcherId();
+        final NamespaceName namespaceName = 
NamespaceName.get(commandWatchTopicList.getNamespace());
+
+        final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
+        if (lookupSemaphore.tryAcquire()) {
+            if (invalidOriginalPrincipal(originalPrincipal)) {
+                final String msg = "Valid Proxy Client role should be provided 
for watchTopicListRequest ";
+                log.warn("[{}] {} with role {} and proxyClientAuthRole {} on 
namespace {}", remoteAddress, msg,
+                        authRole, originalPrincipal, namespaceName);
+                commandSender.sendErrorResponse(watcherId, 
ServerError.AuthorizationError, msg);
+                lookupSemaphore.release();
+                return;
+            }
+            isNamespaceOperationAllowed(namespaceName, 
NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
+                if (isAuthorized) {
+                    
topicListService.handleWatchTopicList(commandWatchTopicList, lookupSemaphore);
+                } else {
+                    final String msg = "Proxy Client is not authorized to 
watchTopicList";
+                    log.warn("[{}] {} with role {} on namespace {}", 
remoteAddress, msg, getPrincipal(), namespaceName);
+                    commandSender.sendErrorResponse(requestId, 
ServerError.AuthorizationError, msg);
+                    lookupSemaphore.release();
+                }
+                return null;
+            }).exceptionally(ex -> {
+                logNamespaceNameAuthException(remoteAddress, "watchTopicList", 
getPrincipal(),
+                        Optional.of(namespaceName), ex);
+                final String msg = "Exception occurred while trying to handle 
command WatchTopicList";
+                commandSender.sendErrorResponse(requestId, 
ServerError.AuthorizationError, msg);
+                lookupSemaphore.release();
+                return null;
+            });
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed WatchTopicList due to too many 
lookup-requests {}", remoteAddress,
+                        namespaceName);
+            }
+            commandSender.sendErrorResponse(requestId, 
ServerError.TooManyRequests,
+                    "Failed due to too many pending lookup requests");
+        }
+    }
+
+    protected void handleCommandUnwatchTopicList(CommandUnwatchTopicList 
commandUnwatchTopicList) {
+        topicListService.handleUnwatchTopicList(commandUnwatchTopicList);

Review Comment:
   Should we check authorization here or it will be not necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to