andrasbeni commented on code in PR #16062:
URL: https://github.com/apache/pulsar/pull/16062#discussion_r906179524


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2513,6 +2519,53 @@ protected void 
handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
                 }));
     }
 
+    protected void handleCommandWatchTopicList(CommandWatchTopicList 
commandWatchTopicList) {
+        final long requestId = commandWatchTopicList.getRequestId();
+        final long watcherId = commandWatchTopicList.getWatcherId();
+        final NamespaceName namespaceName = 
NamespaceName.get(commandWatchTopicList.getNamespace());
+
+        final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
+        if (lookupSemaphore.tryAcquire()) {
+            if (invalidOriginalPrincipal(originalPrincipal)) {
+                final String msg = "Valid Proxy Client role should be provided 
for watchTopicListRequest ";
+                log.warn("[{}] {} with role {} and proxyClientAuthRole {} on 
namespace {}", remoteAddress, msg,
+                        authRole, originalPrincipal, namespaceName);
+                commandSender.sendErrorResponse(watcherId, 
ServerError.AuthorizationError, msg);
+                lookupSemaphore.release();
+                return;
+            }
+            isNamespaceOperationAllowed(namespaceName, 
NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
+                if (isAuthorized) {
+                    
topicListService.handleWatchTopicList(commandWatchTopicList, lookupSemaphore);
+                } else {
+                    final String msg = "Proxy Client is not authorized to 
watchTopicList";
+                    log.warn("[{}] {} with role {} on namespace {}", 
remoteAddress, msg, getPrincipal(), namespaceName);
+                    commandSender.sendErrorResponse(requestId, 
ServerError.AuthorizationError, msg);
+                    lookupSemaphore.release();
+                }
+                return null;
+            }).exceptionally(ex -> {
+                logNamespaceNameAuthException(remoteAddress, "watchTopicList", 
getPrincipal(),
+                        Optional.of(namespaceName), ex);
+                final String msg = "Exception occurred while trying to handle 
command WatchTopicList";
+                commandSender.sendErrorResponse(requestId, 
ServerError.AuthorizationError, msg);
+                lookupSemaphore.release();
+                return null;
+            });
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed WatchTopicList due to too many 
lookup-requests {}", remoteAddress,
+                        namespaceName);
+            }
+            commandSender.sendErrorResponse(requestId, 
ServerError.TooManyRequests,
+                    "Failed due to too many pending lookup requests");
+        }
+    }
+
+    protected void handleCommandUnwatchTopicList(CommandUnwatchTopicList 
commandUnwatchTopicList) {
+        topicListService.handleUnwatchTopicList(commandUnwatchTopicList);

Review Comment:
   We don't need to check authorization because executing this command only 
alters the state of `TopicListService`, which is in a 1:1 relation with 
`ServerCnx`. So it belongs to a single client, which was already checked when 
it created the watcher. (And if it did not have the authz to create one, it 
will be unable to delete anything)
   Similar to `handleCloseConsumer`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2513,6 +2519,53 @@ protected void 
handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
                 }));
     }
 
+    protected void handleCommandWatchTopicList(CommandWatchTopicList 
commandWatchTopicList) {
+        final long requestId = commandWatchTopicList.getRequestId();
+        final long watcherId = commandWatchTopicList.getWatcherId();
+        final NamespaceName namespaceName = 
NamespaceName.get(commandWatchTopicList.getNamespace());
+
+        final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
+        if (lookupSemaphore.tryAcquire()) {
+            if (invalidOriginalPrincipal(originalPrincipal)) {
+                final String msg = "Valid Proxy Client role should be provided 
for watchTopicListRequest ";
+                log.warn("[{}] {} with role {} and proxyClientAuthRole {} on 
namespace {}", remoteAddress, msg,
+                        authRole, originalPrincipal, namespaceName);
+                commandSender.sendErrorResponse(watcherId, 
ServerError.AuthorizationError, msg);
+                lookupSemaphore.release();
+                return;
+            }
+            isNamespaceOperationAllowed(namespaceName, 
NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
+                if (isAuthorized) {
+                    
topicListService.handleWatchTopicList(commandWatchTopicList, lookupSemaphore);

Review Comment:
   Thanks for pointing this out. I haven't thought of it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to