merlimat commented on code in PR #16062:
URL: https://github.com/apache/pulsar/pull/16062#discussion_r905309249
##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java:
##########
@@ -110,4 +123,34 @@ public CompletableFuture<Void>
clearTenantPersistence(String tenant) {
}
});
}
+
+ void handleNotification(Notification notification) {
+ if (notification.getPath().startsWith(MANAGED_LEDGER_PATH)
+ && EnumSet.of(NotificationType.Created,
NotificationType.Deleted).contains(notification.getType())) {
+ for (Map.Entry<BiConsumer<String, NotificationType>, Pattern>
entry :
+ new HashMap<>(topicListeners).entrySet()) {
+ Matcher matcher =
entry.getValue().matcher(notification.getPath());
+ if (matcher.matches()) {
+ TopicName topicName = TopicName.get(
+ matcher.group(2),
NamespaceName.get(matcher.group(1)), matcher.group(3));
+ entry.getKey().accept(topicName.toString(),
notification.getType());
+ }
+ }
+ }
+ }
+
+ Pattern namespaceNameToTopicNamePattern(NamespaceName namespaceName) {
Review Comment:
We could have this as part of `NamespaceName` where we can store the
compiled pattern.
##########
pulsar-common/src/main/proto/PulsarApi.proto:
##########
@@ -987,6 +1017,11 @@ message BaseCommand {
TC_CLIENT_CONNECT_REQUEST = 62;
TC_CLIENT_CONNECT_RESPONSE = 63;
+ WATCH_TOPIC_LIST = 64;
+ WATCH_TOPIC_LIST_SUCCESS = 65;
+ WATCH_TOPIC_UPDATE = 66;
+ UNWATCH_TOPIC_LIST = 67;
Review Comment:
nit: maybe we could rename to `WATCH_TOPIC_CLOSE` to have the same prefix
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2513,6 +2519,53 @@ protected void
handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
}));
}
+ protected void handleCommandWatchTopicList(CommandWatchTopicList
commandWatchTopicList) {
+ final long requestId = commandWatchTopicList.getRequestId();
+ final long watcherId = commandWatchTopicList.getWatcherId();
+ final NamespaceName namespaceName =
NamespaceName.get(commandWatchTopicList.getNamespace());
+
+ final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
+ if (lookupSemaphore.tryAcquire()) {
+ if (invalidOriginalPrincipal(originalPrincipal)) {
+ final String msg = "Valid Proxy Client role should be provided
for watchTopicListRequest ";
+ log.warn("[{}] {} with role {} and proxyClientAuthRole {} on
namespace {}", remoteAddress, msg,
+ authRole, originalPrincipal, namespaceName);
+ commandSender.sendErrorResponse(watcherId,
ServerError.AuthorizationError, msg);
+ lookupSemaphore.release();
+ return;
+ }
+ isNamespaceOperationAllowed(namespaceName,
NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
+ if (isAuthorized) {
+
topicListService.handleWatchTopicList(commandWatchTopicList, lookupSemaphore);
Review Comment:
This method will potentially get called from a different thread (if the
authz is not already cached), therefore we cannot pass on the
`commandWatchTopicList` object because it will get reused when the io-thread is
processing the next call.
We need to extract the fields that we are interested in into local variables.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2513,6 +2519,53 @@ protected void
handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
}));
}
+ protected void handleCommandWatchTopicList(CommandWatchTopicList
commandWatchTopicList) {
+ final long requestId = commandWatchTopicList.getRequestId();
+ final long watcherId = commandWatchTopicList.getWatcherId();
+ final NamespaceName namespaceName =
NamespaceName.get(commandWatchTopicList.getNamespace());
+
+ final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
+ if (lookupSemaphore.tryAcquire()) {
+ if (invalidOriginalPrincipal(originalPrincipal)) {
+ final String msg = "Valid Proxy Client role should be provided
for watchTopicListRequest ";
+ log.warn("[{}] {} with role {} and proxyClientAuthRole {} on
namespace {}", remoteAddress, msg,
+ authRole, originalPrincipal, namespaceName);
+ commandSender.sendErrorResponse(watcherId,
ServerError.AuthorizationError, msg);
+ lookupSemaphore.release();
+ return;
+ }
+ isNamespaceOperationAllowed(namespaceName,
NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
+ if (isAuthorized) {
+
topicListService.handleWatchTopicList(commandWatchTopicList, lookupSemaphore);
+ } else {
+ final String msg = "Proxy Client is not authorized to
watchTopicList";
+ log.warn("[{}] {} with role {} on namespace {}",
remoteAddress, msg, getPrincipal(), namespaceName);
+ commandSender.sendErrorResponse(requestId,
ServerError.AuthorizationError, msg);
+ lookupSemaphore.release();
+ }
+ return null;
+ }).exceptionally(ex -> {
+ logNamespaceNameAuthException(remoteAddress, "watchTopicList",
getPrincipal(),
+ Optional.of(namespaceName), ex);
+ final String msg = "Exception occurred while trying to handle
command WatchTopicList";
+ commandSender.sendErrorResponse(requestId,
ServerError.AuthorizationError, msg);
+ lookupSemaphore.release();
+ return null;
+ });
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Failed WatchTopicList due to too many
lookup-requests {}", remoteAddress,
+ namespaceName);
+ }
+ commandSender.sendErrorResponse(requestId,
ServerError.TooManyRequests,
+ "Failed due to too many pending lookup requests");
+ }
+ }
+
+ protected void handleCommandUnwatchTopicList(CommandUnwatchTopicList
commandUnwatchTopicList) {
+ topicListService.handleUnwatchTopicList(commandUnwatchTopicList);
Review Comment:
Should we check authorization here or it will be not necessary?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]