zhaohaidao commented on a change in pull request #5767: Support batch 
authorization of partitioned topic
URL: https://github.com/apache/pulsar/pull/5767#discussion_r352316028
 
 

 ##########
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 ##########
 @@ -312,4 +316,82 @@ public void testGetPartitionedTopicsList() throws 
KeeperException, InterruptedEx
         Assert.assertEquals(nonPersistentPartitionedTopics.size(), 1);
         
Assert.assertEquals(TopicName.get(nonPersistentPartitionedTopics.get(0)).getDomain().value(),
 TopicDomain.non_persistent.value());
     }
+
+    @Test
+    public void testGrantNonPartitionedTopic() {
+        final String topicName = "non-partitioned-topic";
+        persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, 
topicName, true);
+        String role = "role";
+        Set<AuthAction> expectActions = new HashSet<>();
+        expectActions.add(AuthAction.produce);
+        persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, 
topicName, role, expectActions);
+        Map<String, Set<AuthAction>> permissions = 
persistentTopics.getPermissionsOnTopic(testTenant, testNamespace, topicName);
+        Assert.assertEquals(permissions.get(role), expectActions);
+    }
+
+    @Test
+    public void testGrantPartitionedTopic() {
+        final String partitionedTopicName = "partitioned-topic";
+        final int numPartitions = 5;
+        LocalZooKeeperCacheService mockLocalZooKeeperCacheService = 
mock(LocalZooKeeperCacheService.class);
+        ZooKeeperChildrenCache mockZooKeeperChildrenCache = 
mock(ZooKeeperChildrenCache.class);
+        
doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService();
+        
doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache();
 
 Review comment:
   Hi, I went through the grant and get logic for permissions and It seems 
current logic support get permissions for partitions of a topic. Pls help me 
check if my understanding is right.
   The permissions for partitions will be stored in a map named 
destination_auth, the same as the parent topic of partitions. 
   ```java
       private void grantPermissions(String topicUri, String role, 
Set<AuthAction> actions) {
           try {
               ...
               Policies policies = jsonMapper().readValue(content, 
Policies.class);
   
               if 
(!policies.auth_policies.destination_auth.containsKey(topicUri)) {
                   policies.auth_policies.destination_auth.put(topicUri, new 
TreeMap<String, Set<AuthAction>>());
               }
               policies.auth_policies.destination_auth.get(topicUri).put(role, 
actions);
   
               // Write the new policies to zookeeper
               globalZk().setData(path(POLICIES, namespaceName.toString()), 
jsonMapper().writeValueAsBytes(policies),
                       nodeStat.getVersion());
           ...
           }
   ```
   Then get permissions logic for a partition: try to get permissions from  
auth_policies.destination_auth directly by topicUri.
   ```java
   protected Map<String, Set<AuthAction>> internalGetPermissionsOnTopic() {
           // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
           validateAdminAccessForTenant(namespaceName.getTenant());
   
           String topicUri = topicName.toString();
   
           try {
               ...
               // Then add topic level permissions
               if (auth.destination_auth.containsKey(topicUri)) {
                   for (Map.Entry<String, Set<AuthAction>> entry : 
auth.destination_auth.get(topicUri).entrySet()) {
                       String role = entry.getKey();
                       Set<AuthAction> topicPermissions = entry.getValue();
   
                       if (!permissions.containsKey(role)) {
                           permissions.put(role, topicPermissions);
                       } else {
                           // Do the union between namespace and topic level
                           Set<AuthAction> union = 
Sets.union(permissions.get(role), topicPermissions);
                           permissions.put(role, union);
                       }
                   }
               }
   
               return permissions;
           } catch (Exception e) {
               ...
           }
       }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to