michaeljmarshall commented on code in PR #16792:
URL: https://github.com/apache/pulsar/pull/16792#discussion_r960227112


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -324,32 +342,41 @@ protected void internalGrantPermissionsOnTopic(final 
AsyncResponse asyncResponse
         // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
         validateAdminAccessForTenantAsync(namespaceName.getTenant())
                 .thenCompose(__ -> 
validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
-             getPartitionedTopicMetadataAsync(topicName, true, false)
-                  .thenCompose(metadata -> {
-                      int numPartitions = metadata.partitions;
-                      CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);
-                      if (numPartitions > 0) {
-                          for (int i = 0; i < numPartitions; i++) {
-                              TopicName topicNamePartition = 
topicName.getPartition(i);
-                              future = future.thenCompose(unused -> 
grantPermissionsAsync(topicNamePartition, role,
-                                      actions));
-                          }
-                      }

Review Comment:
   In order to make this change backwards compatible, I think we need to leave 
this block (at least in the PR that is focused on fixing the historical bug). 
Otherwise, custom implementations of the `AuthorizationProvider` interface 
would have the behavior broken on them, which shouldn't happen in a patch 
release.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -271,6 +271,24 @@ protected CompletableFuture<Map<String, Set<AuthAction>>> 
internalGetPermissions
                         }
                     }
                 }
+
+                // If topic is partitioned, add based topic permission
+                if (topicName.isPartitioned() && 
auth.getTopicAuthentication().containsKey(
+                        topicName.getPartitionedTopicName())) {
+                    for (Map.Entry<String, Set<AuthAction>> entry :
+                            
auth.getTopicAuthentication().get(topicName.getPartitionedTopicName()).entrySet())
 {
+                        String role = entry.getKey();
+                        Set<AuthAction> topicPermissions = entry.getValue();
+
+                        if (!permissions.containsKey(role)) {
+                            permissions.put(role, topicPermissions);
+                        } else {
+                            // Do the union between namespace and topic level
+                            Set<AuthAction> union = 
Sets.union(permissions.get(role), topicPermissions);
+                            permissions.put(role, union);
+                        }
+                    }
+                }

Review Comment:
   I am guessing this is meant to grant permission based on a topic's partition 
name. This code block will be insufficient. We'll need to look at the 
`AuthorizationProvider` to get a complete implementation.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -324,32 +342,41 @@ protected void internalGrantPermissionsOnTopic(final 
AsyncResponse asyncResponse
         // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
         validateAdminAccessForTenantAsync(namespaceName.getTenant())
                 .thenCompose(__ -> 
validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
-             getPartitionedTopicMetadataAsync(topicName, true, false)
-                  .thenCompose(metadata -> {
-                      int numPartitions = metadata.partitions;
-                      CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);
-                      if (numPartitions > 0) {
-                          for (int i = 0; i < numPartitions; i++) {
-                              TopicName topicNamePartition = 
topicName.getPartition(i);
-                              future = future.thenCompose(unused -> 
grantPermissionsAsync(topicNamePartition, role,
-                                      actions));
-                          }
-                      }
-                      return future.thenCompose(unused -> 
grantPermissionsAsync(topicName, role, actions))
-                              .thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()));
-                  }))).exceptionally(ex -> {
+                        
grantPermissionsAsync(TopicName.get(topicName.getPartitionedTopicName()), role, 
actions)

Review Comment:
   Instead of passing the topic name without any of the partition information. 
I think we should instead grant permission to the result `topicName`, and then 
we can leave it up to the `AuthorizationProvider` to determine how to handle 
partitioned topics. The current method already calls `grantPermissionAsync` 
with that `topicName`, so we might be able to simply deprecate passing each 
partition to the `AuthorizationProvider`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -324,32 +342,41 @@ protected void internalGrantPermissionsOnTopic(final 
AsyncResponse asyncResponse
         // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
         validateAdminAccessForTenantAsync(namespaceName.getTenant())
                 .thenCompose(__ -> 
validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
-             getPartitionedTopicMetadataAsync(topicName, true, false)
-                  .thenCompose(metadata -> {
-                      int numPartitions = metadata.partitions;
-                      CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);
-                      if (numPartitions > 0) {
-                          for (int i = 0; i < numPartitions; i++) {
-                              TopicName topicNamePartition = 
topicName.getPartition(i);
-                              future = future.thenCompose(unused -> 
grantPermissionsAsync(topicNamePartition, role,
-                                      actions));
-                          }
-                      }
-                      return future.thenCompose(unused -> 
grantPermissionsAsync(topicName, role, actions))
-                              .thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()));
-                  }))).exceptionally(ex -> {
+                        
grantPermissionsAsync(TopicName.get(topicName.getPartitionedTopicName()), role, 
actions)
+                                .thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()))))
+                .exceptionally(ex -> {
                     Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
                     log.error("[{}] Failed to get permissions for topic {}", 
clientAppId(), topicName, realCause);
                     resumeAsyncResponseExceptionally(asyncResponse, realCause);
                     return null;
                 });
     }
 
-    private CompletableFuture<Void> revokePermissionsAsync(String topicUri, 
String role) {
+    private CompletableFuture<Void> revokePermissionsAsync(TopicName 
topicName, String role, int numPartitions) {
+        String topicUri = topicName.toString();
         return 
namespaceResources().getPoliciesAsync(namespaceName).thenCompose(
                 policiesOptional -> {
                     Policies policies = policiesOptional.orElseThrow(() ->
                             new RestException(Status.NOT_FOUND, "Namespace 
does not exist"));
+                    CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);
+                    // do compatible with previous pulsar version
+                    // revoke all the partition permissions granted in 
previous version
+                    future = future.thenComposeAsync(unused ->
+                        namespaceResources().setPoliciesAsync(namespaceName, p 
-> {
+                            if (numPartitions > 0) {
+                                for (int i = 0; i < numPartitions; i++) {
+                                    
p.auth_policies.getTopicAuthentication().computeIfPresent(
+                                            
topicName.getPartition(i).toString(), (k, roles) -> {
+                                                roles.remove(role);
+                                                if (roles.isEmpty()) {
+                                                    return null;
+                                                }
+                                                return roles;
+                                            });
+                                }
+                            }
+                            return p;
+                        }));
                     if 
(!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
                             || 
!policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role))
 {

Review Comment:
   I wonder if instead of failing here, we should just attempt to remove 
permission on every partition, even if it isn't in the map? I think that would 
be the expected behavior. This would also be backwards compatible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to