This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d72e6bd847 [improve][test] Add topic operation checker for topic API 
(#22468)
9d72e6bd847 is described below

commit 9d72e6bd847df85a7d18f1827274df96a446798f
Author: Cong Zhao <[email protected]>
AuthorDate: Mon Apr 15 16:15:59 2024 +0800

    [improve][test] Add topic operation checker for topic API (#22468)
---
 .../apache/pulsar/broker/admin/TopicAuthZTest.java | 156 ++++++++++++++++++---
 1 file changed, 135 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
index d09bc0a3ffd..e6ff0ce2bb4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
@@ -20,9 +20,18 @@
 package org.apache.pulsar.broker.admin;
 
 import io.jsonwebtoken.Jwts;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
@@ -37,22 +46,21 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.security.MockedPulsarStandalone;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
 
 @Test(groups = "broker-admin")
 public class TopicAuthZTest extends MockedPulsarStandalone {
@@ -61,13 +69,17 @@ public class TopicAuthZTest extends MockedPulsarStandalone {
 
     private PulsarAdmin tenantManagerAdmin;
 
+    private AuthorizationService authorizationService;
+
+    private AuthorizationService orignalAuthorizationService;
+
     private static final String TENANT_ADMIN_SUBJECT =  
UUID.randomUUID().toString();
     private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
             .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
 
     @SneakyThrows
     @BeforeClass(alwaysRun = true)
-    public void before() {
+    public void setup() {
         configureTokenAuthentication();
         configureDefaultAuthorization();
         enableTransaction();
@@ -99,7 +111,7 @@ public class TopicAuthZTest extends MockedPulsarStandalone {
 
     @SneakyThrows
     @AfterClass(alwaysRun = true)
-    public void after() {
+    public void cleanup() {
         if (superUserAdmin != null) {
             superUserAdmin.close();
         }
@@ -109,6 +121,51 @@ public class TopicAuthZTest extends MockedPulsarStandalone 
{
         close();
     }
 
+    @BeforeMethod
+    public void before() throws IllegalAccessException {
+        orignalAuthorizationService = 
getPulsarService().getBrokerService().getAuthorizationService();
+        authorizationService = Mockito.spy(orignalAuthorizationService);
+        FieldUtils.writeField(getPulsarService().getBrokerService(), 
"authorizationService",
+                authorizationService, true);
+    }
+
+    @AfterMethod
+    public void after() throws IllegalAccessException {
+        FieldUtils.writeField(getPulsarService().getBrokerService(), 
"authorizationService",
+                orignalAuthorizationService, true);
+    }
+
+    private AtomicBoolean setAuthorizationTopicOperationChecker(String role, 
Object operation) {
+        AtomicBoolean execFlag = new AtomicBoolean(false);
+        if (operation instanceof TopicOperation) {
+            Mockito.doAnswer(invocationOnMock -> {
+            String role_ = invocationOnMock.getArgument(2);
+            if (role.equals(role_)) {
+                TopicOperation operation_ = invocationOnMock.getArgument(1);
+                Assert.assertEquals(operation_, operation);
+            }
+            execFlag.set(true);
+            return invocationOnMock.callRealMethod();
+        }).when(authorizationService).allowTopicOperationAsync(Mockito.any(), 
Mockito.any(), Mockito.any(),
+                Mockito.any(), Mockito.any());
+        } else if (operation instanceof NamespaceOperation) {
+            Mockito.doAnswer(invocationOnMock -> {
+            String role_ = invocationOnMock.getArgument(2);
+            if (role.equals(role_)) {
+                TopicOperation operation_ = invocationOnMock.getArgument(1);
+                Assert.assertEquals(operation_, operation);
+            }
+            execFlag.set(true);
+            return invocationOnMock.callRealMethod();
+        
}).when(authorizationService).allowNamespaceOperationAsync(Mockito.any(), 
Mockito.any(), Mockito.any(),
+                Mockito.any(), Mockito.any());
+        } else {
+            throw new IllegalArgumentException("");
+        }
+
+        return execFlag;
+    }
+
     @DataProvider(name = "partitioned")
     public static Object[][] partitioned() {
         return new Object[][] {
@@ -204,6 +261,8 @@ public class TopicAuthZTest extends MockedPulsarStandalone {
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.topics().getInternalInfo(topic));
 
+        AtomicBoolean execFlag = 
setAuthorizationTopicOperationChecker(subject, TopicOperation.GET_STATS);
+
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
             if (action == AuthAction.produce || action == AuthAction.consume) {
@@ -214,6 +273,9 @@ public class TopicAuthZTest extends MockedPulsarStandalone {
             }
             superUserAdmin.topics().revokePermissions(topic, subject);
         }
+
+        Assert.assertTrue(execFlag.get());
+
         superUserAdmin.topics().deletePartitionedTopic(topic, true);
     }
 
@@ -244,8 +306,10 @@ public class TopicAuthZTest extends MockedPulsarStandalone 
{
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.topics().getPartitionedStats(topic, false));
 
+        AtomicBoolean execFlag = 
setAuthorizationTopicOperationChecker(subject, TopicOperation.GET_STATS);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.topics().getPartitionedInternalStats(topic));
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
@@ -313,14 +377,20 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
         
tenantManagerAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(),
 "test-sub", Optional.empty());
 
         // test nobody
+        AtomicBoolean execFlag = 
setAuthorizationTopicOperationChecker(subject, TopicOperation.CONSUME);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.topics().updateSubscriptionProperties(topic, 
"test-sub", properties));
+        Assert.assertTrue(execFlag.get());
 
+        execFlag = setAuthorizationTopicOperationChecker(subject, 
TopicOperation.CONSUME);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.topics().getSubscriptionProperties(topic, 
"test-sub"));
+        Assert.assertTrue(execFlag.get());
 
+        execFlag = setAuthorizationTopicOperationChecker(subject, 
TopicOperation.CONSUME);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> 
subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(),
 "test-sub", Optional.empty()));
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
@@ -362,10 +432,15 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
         superUserAdmin.topics().createMissedPartitions(topic);
 
         // test tenant manager
+
+        AtomicBoolean execFlag = 
setAuthorizationTopicOperationChecker(subject, NamespaceOperation.CREATE_TOPIC);
         tenantManagerAdmin.topics().createMissedPartitions(topic);
+        Assert.assertTrue(execFlag.get());
 
+        execFlag = setAuthorizationTopicOperationChecker(subject, 
TopicOperation.LOOKUP);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.topics().createMissedPartitions(topic));
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
@@ -396,8 +471,10 @@ public class TopicAuthZTest extends MockedPulsarStandalone 
{
         // test tenant manager
         tenantManagerAdmin.topics().getPartitionedTopicMetadata(topic);
 
+        AtomicBoolean execFlag = 
setAuthorizationTopicOperationChecker(subject, TopicOperation.LOOKUP);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.topics().getPartitionedTopicMetadata(topic));
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
@@ -434,16 +511,18 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
         // test tenant manager
         tenantManagerAdmin.topics().getProperties(topic);
 
+        AtomicBoolean execFlag = 
setAuthorizationTopicOperationChecker(subject, TopicOperation.GET_METADATA);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.topics().getProperties(topic));
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
             if (AuthAction.produce == action || AuthAction.consume == action) {
-                subAdmin.topics().getPartitionedTopicMetadata(topic);
+                subAdmin.topics().getProperties(topic);
             } else {
                 
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
-                        () -> 
subAdmin.topics().getPartitionedTopicMetadata(topic));
+                        () -> subAdmin.topics().getProperties(topic));
             }
             superUserAdmin.topics().revokePermissions(topic, subject);
         }
@@ -472,8 +551,10 @@ public class TopicAuthZTest extends MockedPulsarStandalone 
{
         // test tenant manager
         tenantManagerAdmin.topics().updateProperties(topic, properties);
 
+        AtomicBoolean execFlag = 
setAuthorizationTopicOperationChecker(subject, TopicOperation.UPDATE_METADATA);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.topics().updateProperties(topic, properties));
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
@@ -504,8 +585,10 @@ public class TopicAuthZTest extends MockedPulsarStandalone 
{
         // test tenant manager
         tenantManagerAdmin.topics().removeProperties(topic, "key1");
 
+        AtomicBoolean execFlag = 
setAuthorizationTopicOperationChecker(subject, TopicOperation.DELETE_METADATA);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.topics().removeProperties(topic, "key1"));
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
@@ -539,8 +622,11 @@ public class TopicAuthZTest extends MockedPulsarStandalone 
{
         tenantManagerAdmin.topics().deletePartitionedTopic(topic);
 
         createTopic(topic, true);
+
+        AtomicBoolean execFlag = 
setAuthorizationTopicOperationChecker(subject, NamespaceOperation.DELETE_TOPIC);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.topics().deletePartitionedTopic(topic));
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.namespaces().grantPermissionOnNamespace(ns, 
subject, Set.of(action));
@@ -548,7 +634,6 @@ public class TopicAuthZTest extends MockedPulsarStandalone {
                     () -> subAdmin.topics().deletePartitionedTopic(topic));
             superUserAdmin.namespaces().revokePermissionsOnNamespace(ns, 
subject);
         }
-        deleteTopic(topic, true);
     }
 
     @Test(dataProvider = "partitioned")
@@ -571,8 +656,10 @@ public class TopicAuthZTest extends MockedPulsarStandalone 
{
         // test tenant manager
         tenantManagerAdmin.topics().getSubscriptions(topic);
 
+        AtomicBoolean execFlag = 
setAuthorizationTopicOperationChecker(subject, 
TopicOperation.GET_SUBSCRIPTIONS);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.topics().getSubscriptions(topic));
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
@@ -616,6 +703,7 @@ public class TopicAuthZTest extends MockedPulsarStandalone {
 
         }
 
+        AtomicBoolean execFlag = 
setAuthorizationTopicOperationChecker(subject, TopicOperation.GET_STATS);
         if (partitioned) {
             
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                     () -> 
subAdmin.topics().getPartitionedInternalStats(topic));
@@ -623,6 +711,7 @@ public class TopicAuthZTest extends MockedPulsarStandalone {
             
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                     () -> subAdmin.topics().getInternalStats(topic));
         }
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
@@ -671,8 +760,10 @@ public class TopicAuthZTest extends MockedPulsarStandalone 
{
         tenantManagerAdmin.topics().deleteSubscription(topic, subName);
 
         superUserAdmin.topics().createSubscription(topic, subName, 
MessageId.latest);
+        AtomicBoolean execFlag = 
setAuthorizationTopicOperationChecker(subject, TopicOperation.UNSUBSCRIBE);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.topics().deleteSubscription(topic, subName));
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
@@ -708,9 +799,10 @@ public class TopicAuthZTest extends MockedPulsarStandalone 
{
 
         // test tenant manager
         tenantManagerAdmin.topics().skipAllMessages(topic, subName);
-
+        AtomicBoolean execFlag = 
setAuthorizationTopicOperationChecker(subject, TopicOperation.SKIP);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.topics().skipAllMessages(topic, subName));
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
@@ -746,10 +838,10 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
 
         // test tenant manager
         tenantManagerAdmin.topics().skipMessages(topic, subName, 1);
-
+        AtomicBoolean execFlag = 
setAuthorizationTopicOperationChecker(subject, TopicOperation.SKIP);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.topics().skipMessages(topic, subName, 1));
-
+        Assert.assertTrue(execFlag.get());
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
             if (AuthAction.consume == action) {
@@ -782,10 +874,10 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
 
         // test tenant manager
         tenantManagerAdmin.topics().expireMessagesForAllSubscriptions(topic, 
1);
-
+        AtomicBoolean execFlag = 
setAuthorizationTopicOperationChecker(subject, TopicOperation.EXPIRE_MESSAGES);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> 
subAdmin.topics().expireMessagesForAllSubscriptions(topic, 1));
-
+        Assert.assertTrue(execFlag.get());
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
             if (AuthAction.consume == action) {
@@ -820,10 +912,10 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
 
         // test tenant manager
         tenantManagerAdmin.topics().resetCursor(topic, subName, 
System.currentTimeMillis());
-
+        AtomicBoolean execFlag = 
setAuthorizationTopicOperationChecker(subject, TopicOperation.RESET_CURSOR);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.topics().resetCursor(topic, subName, 
System.currentTimeMillis()));
-
+        Assert.assertTrue(execFlag.get());
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
             if (AuthAction.consume == action) {
@@ -858,10 +950,10 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
 
         // test tenant manager
         tenantManagerAdmin.topics().resetCursor(topic, subName, 
MessageId.latest);
-
+        AtomicBoolean execFlag = 
setAuthorizationTopicOperationChecker(subject, TopicOperation.RESET_CURSOR);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.topics().resetCursor(topic, subName, 
MessageId.latest));
-
+        Assert.assertTrue(execFlag.get());
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
             if (AuthAction.consume == action) {
@@ -903,8 +995,10 @@ public class TopicAuthZTest extends MockedPulsarStandalone 
{
         // test tenant manager
         tenantManagerAdmin.topics().getMessagesById(topic, 
messageId.getLedgerId(), messageId.getEntryId());
 
+        AtomicBoolean execFlag = 
setAuthorizationTopicOperationChecker(subject, TopicOperation.PEEK_MESSAGES);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.topics().getMessagesById(topic, 
messageId.getLedgerId(), messageId.getEntryId()));
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
@@ -947,9 +1041,10 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
 
         // test tenant manager
         tenantManagerAdmin.topics().peekMessages(topic, subName, 1);
-
+        AtomicBoolean execFlag = 
setAuthorizationTopicOperationChecker(subject, TopicOperation.PEEK_MESSAGES);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.topics().peekMessages(topic, subName, 1));
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
@@ -992,8 +1087,10 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
         // test tenant manager
         tenantManagerAdmin.topics().examineMessage(topic, "latest", 1);
 
+        AtomicBoolean execFlag = 
setAuthorizationTopicOperationChecker(subject, TopicOperation.PEEK_MESSAGES);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.topics().examineMessage(topic, "latest", 1));
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
@@ -1039,7 +1136,9 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
         superUserAdmin.topics().expireMessages(topic, subName, 1);
 
         // test tenant manager
+        AtomicBoolean execFlag = 
setAuthorizationTopicOperationChecker(subject, TopicOperation.EXPIRE_MESSAGES);
         tenantManagerAdmin.topics().expireMessages(topic, subName, 1);
+        Assert.assertTrue(execFlag.get());
 
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.topics().expireMessages(topic, subName, 1));
@@ -1090,8 +1189,10 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
         // test tenant manager
         tenantManagerAdmin.topics().expireMessages(topic, subName, 
MessageId.earliest, false);
 
+        AtomicBoolean execFlag = 
setAuthorizationTopicOperationChecker(subject, TopicOperation.EXPIRE_MESSAGES);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.topics().expireMessages(topic, subName, 
MessageId.earliest, false));
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
@@ -1294,6 +1395,15 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
                     () -> adminConsumer.accept(subAdmin));
         }
 
+        AtomicBoolean execFlag = null;
+        if (topicOpType == OperationAuthType.Lookup) {
+            execFlag = setAuthorizationTopicOperationChecker(subject, 
TopicOperation.LOOKUP);
+        } else if (topicOpType == OperationAuthType.Produce) {
+            execFlag = setAuthorizationTopicOperationChecker(subject, 
TopicOperation.PRODUCE);
+        } else if (topicOpType == OperationAuthType.Consume) {
+            execFlag = setAuthorizationTopicOperationChecker(subject, 
TopicOperation.CONSUME);
+        }
+
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.topics().grantPermission(testTopic, subject, 
Set.of(action));
 
@@ -1305,6 +1415,10 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
             }
             superUserAdmin.topics().revokePermissions(testTopic, subject);
         }
+
+        if (execFlag != null) {
+            Assert.assertTrue(execFlag.get());
+        }
     }
 
 

Reply via email to