andrasbeni commented on code in PR #16062:
URL: https://github.com/apache/pulsar/pull/16062#discussion_r906179524
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2513,6 +2519,53 @@ protected void
handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
}));
}
+ protected void handleCommandWatchTopicList(CommandWatchTopicList
commandWatchTopicList) {
+ final long requestId = commandWatchTopicList.getRequestId();
+ final long watcherId = commandWatchTopicList.getWatcherId();
+ final NamespaceName namespaceName =
NamespaceName.get(commandWatchTopicList.getNamespace());
+
+ final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
+ if (lookupSemaphore.tryAcquire()) {
+ if (invalidOriginalPrincipal(originalPrincipal)) {
+ final String msg = "Valid Proxy Client role should be provided
for watchTopicListRequest ";
+ log.warn("[{}] {} with role {} and proxyClientAuthRole {} on
namespace {}", remoteAddress, msg,
+ authRole, originalPrincipal, namespaceName);
+ commandSender.sendErrorResponse(watcherId,
ServerError.AuthorizationError, msg);
+ lookupSemaphore.release();
+ return;
+ }
+ isNamespaceOperationAllowed(namespaceName,
NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
+ if (isAuthorized) {
+
topicListService.handleWatchTopicList(commandWatchTopicList, lookupSemaphore);
+ } else {
+ final String msg = "Proxy Client is not authorized to
watchTopicList";
+ log.warn("[{}] {} with role {} on namespace {}",
remoteAddress, msg, getPrincipal(), namespaceName);
+ commandSender.sendErrorResponse(requestId,
ServerError.AuthorizationError, msg);
+ lookupSemaphore.release();
+ }
+ return null;
+ }).exceptionally(ex -> {
+ logNamespaceNameAuthException(remoteAddress, "watchTopicList",
getPrincipal(),
+ Optional.of(namespaceName), ex);
+ final String msg = "Exception occurred while trying to handle
command WatchTopicList";
+ commandSender.sendErrorResponse(requestId,
ServerError.AuthorizationError, msg);
+ lookupSemaphore.release();
+ return null;
+ });
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Failed WatchTopicList due to too many
lookup-requests {}", remoteAddress,
+ namespaceName);
+ }
+ commandSender.sendErrorResponse(requestId,
ServerError.TooManyRequests,
+ "Failed due to too many pending lookup requests");
+ }
+ }
+
+ protected void handleCommandUnwatchTopicList(CommandUnwatchTopicList
commandUnwatchTopicList) {
+ topicListService.handleUnwatchTopicList(commandUnwatchTopicList);
Review Comment:
We don't need to check authorization because executing this command only
alters the state of `TopicListService`, which is in a 1:1 relation with
`ServerCnx`. So it belongs to a single client, which was already checked when
it created the watcher. (And if it did not have the authz to create one, it
will be unable to delete anything)
Similar to `handleCloseConsumer`.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2513,6 +2519,53 @@ protected void
handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
}));
}
+ protected void handleCommandWatchTopicList(CommandWatchTopicList
commandWatchTopicList) {
+ final long requestId = commandWatchTopicList.getRequestId();
+ final long watcherId = commandWatchTopicList.getWatcherId();
+ final NamespaceName namespaceName =
NamespaceName.get(commandWatchTopicList.getNamespace());
+
+ final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
+ if (lookupSemaphore.tryAcquire()) {
+ if (invalidOriginalPrincipal(originalPrincipal)) {
+ final String msg = "Valid Proxy Client role should be provided
for watchTopicListRequest ";
+ log.warn("[{}] {} with role {} and proxyClientAuthRole {} on
namespace {}", remoteAddress, msg,
+ authRole, originalPrincipal, namespaceName);
+ commandSender.sendErrorResponse(watcherId,
ServerError.AuthorizationError, msg);
+ lookupSemaphore.release();
+ return;
+ }
+ isNamespaceOperationAllowed(namespaceName,
NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
+ if (isAuthorized) {
+
topicListService.handleWatchTopicList(commandWatchTopicList, lookupSemaphore);
Review Comment:
Thanks for pointing this out. I haven't thought of it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]