zhaohaidao commented on a change in pull request #5767: Support batch authorization of partitioned topic URL: https://github.com/apache/pulsar/pull/5767#discussion_r352316028
########## File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java ########## @@ -312,4 +316,82 @@ public void testGetPartitionedTopicsList() throws KeeperException, InterruptedEx Assert.assertEquals(nonPersistentPartitionedTopics.size(), 1); Assert.assertEquals(TopicName.get(nonPersistentPartitionedTopics.get(0)).getDomain().value(), TopicDomain.non_persistent.value()); } + + @Test + public void testGrantNonPartitionedTopic() { + final String topicName = "non-partitioned-topic"; + persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true); + String role = "role"; + Set<AuthAction> expectActions = new HashSet<>(); + expectActions.add(AuthAction.produce); + persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, topicName, role, expectActions); + Map<String, Set<AuthAction>> permissions = persistentTopics.getPermissionsOnTopic(testTenant, testNamespace, topicName); + Assert.assertEquals(permissions.get(role), expectActions); + } + + @Test + public void testGrantPartitionedTopic() { + final String partitionedTopicName = "partitioned-topic"; + final int numPartitions = 5; + LocalZooKeeperCacheService mockLocalZooKeeperCacheService = mock(LocalZooKeeperCacheService.class); + ZooKeeperChildrenCache mockZooKeeperChildrenCache = mock(ZooKeeperChildrenCache.class); + doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService(); + doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache(); Review comment: Hi, I went through the grant and get logic for permissions and It seems current logic support get permissions for partitions of a topic. Pls help me check if my understanding is right. The permissions for partitions will be stored in a map named destination_auth, the same as the parent topic of partitions. ```java private void grantPermissions(String topicUri, String role, Set<AuthAction> actions) { try { ... Policies policies = jsonMapper().readValue(content, Policies.class); if (!policies.auth_policies.destination_auth.containsKey(topicUri)) { policies.auth_policies.destination_auth.put(topicUri, new TreeMap<String, Set<AuthAction>>()); } policies.auth_policies.destination_auth.get(topicUri).put(role, actions); // Write the new policies to zookeeper globalZk().setData(path(POLICIES, namespaceName.toString()), jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion()); ... } ``` Then get permissions logic for a partition: try to get permissions from auth_policies.destination_auth directly by topicUri. ```java protected Map<String, Set<AuthAction>> internalGetPermissionsOnTopic() { // This operation should be reading from zookeeper and it should be allowed without having admin privileges validateAdminAccessForTenant(namespaceName.getTenant()); String topicUri = topicName.toString(); try { ... // Then add topic level permissions if (auth.destination_auth.containsKey(topicUri)) { for (Map.Entry<String, Set<AuthAction>> entry : auth.destination_auth.get(topicUri).entrySet()) { String role = entry.getKey(); Set<AuthAction> topicPermissions = entry.getValue(); if (!permissions.containsKey(role)) { permissions.put(role, topicPermissions); } else { // Do the union between namespace and topic level Set<AuthAction> union = Sets.union(permissions.get(role), topicPermissions); permissions.put(role, union); } } } return permissions; } catch (Exception e) { ... } } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services