This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bb9378b50a [improve][test] Add policy authentication test for 
namespace API (#22593)
1bb9378b50a is described below

commit 1bb9378b50aa891834b64cd39f55ae0e32a055bb
Author: Cong Zhao <[email protected]>
AuthorDate: Sun Apr 28 10:37:37 2024 +0800

    [improve][test] Add policy authentication test for namespace API (#22593)
---
 .../pulsar/broker/admin/NamespaceAuthZTest.java    | 1248 ++++++++++++++++++--
 1 file changed, 1140 insertions(+), 108 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
index 5358295b785..ec6a122f7df 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
@@ -20,9 +20,11 @@
 package org.apache.pulsar.broker.admin;
 
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.deleteNamespaceWithRetry;
+import static 
org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy.AutoUpdateDisabled;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+import com.google.common.collect.Sets;
 import io.jsonwebtoken.Jwts;
 import java.io.File;
 import java.util.ArrayList;
@@ -32,6 +34,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
 import org.apache.commons.lang3.StringUtils;
@@ -44,17 +48,33 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
 import org.apache.pulsar.common.policies.data.BundlesData;
+import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.EntryFilters;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.PublishRate;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import 
org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
 import org.apache.pulsar.packages.management.core.common.PackageMetadata;
 import org.apache.pulsar.security.MockedPulsarStandalone;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
@@ -72,7 +92,7 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
 
     private AuthorizationService authorizationService;
 
-    private static final String TENANT_ADMIN_SUBJECT =  
UUID.randomUUID().toString();
+    private static final String TENANT_ADMIN_SUBJECT = 
UUID.randomUUID().toString();
     private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
             .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
 
@@ -122,16 +142,46 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         superUserAdmin.namespaces().createNamespace("public/default");
     }
 
-    private void setAuthorizationOperationChecker(String role, 
NamespaceOperation operation) {
+    private AtomicBoolean setAuthorizationOperationChecker(String role, 
NamespaceOperation operation) {
+        AtomicBoolean execFlag = new AtomicBoolean(false);
         Mockito.doAnswer(invocationOnMock -> {
             String role_ = invocationOnMock.getArgument(2);
             if (role.equals(role_)) {
                 NamespaceOperation operation_ = 
invocationOnMock.getArgument(1);
                 Assert.assertEquals(operation_, operation);
             }
+            execFlag.set(true);
             return invocationOnMock.callRealMethod();
         
}).when(authorizationService).allowNamespaceOperationAsync(Mockito.any(), 
Mockito.any(), Mockito.any(),
                 Mockito.any());
+         return execFlag;
+    }
+
+    private void clearAuthorizationOperationChecker() {
+        
Mockito.doAnswer(InvocationOnMock::callRealMethod).when(authorizationService)
+                .allowNamespaceOperationAsync(Mockito.any(), Mockito.any(), 
Mockito.any(),
+                        Mockito.any());
+    }
+
+    private AtomicBoolean setAuthorizationPolicyOperationChecker(String role, 
Object policyName, Object operation) {
+        AtomicBoolean execFlag = new AtomicBoolean(false);
+        if (operation instanceof PolicyOperation) {
+            Mockito.doAnswer(invocationOnMock -> {
+                String role_ = invocationOnMock.getArgument(3);
+                if (role.equals(role_)) {
+                    PolicyName policyName_ = invocationOnMock.getArgument(1);
+                    PolicyOperation operation_ = 
invocationOnMock.getArgument(2);
+                    assertEquals(operation_, operation);
+                    assertEquals(policyName_, policyName);
+                }
+                execFlag.set(true);
+                return invocationOnMock.callRealMethod();
+            
}).when(authorizationService).allowNamespacePolicyOperationAsync(Mockito.any(), 
Mockito.any(), Mockito.any(),
+                    Mockito.any(), Mockito.any());
+        } else {
+            throw new IllegalArgumentException("");
+        }
+        return execFlag;
     }
 
     @SneakyThrows
@@ -140,13 +190,12 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         final String random = UUID.randomUUID().toString();
         final String namespace = "public/default";
         final String topic = "persistent://public/default/" + random;
-        final String subject =  UUID.randomUUID().toString();
+        final String subject = UUID.randomUUID().toString();
         final String token = Jwts.builder()
                 .claim("sub", subject).signWith(SECRET_KEY).compact();
         superUserAdmin.topics().createNonPartitionedTopic(topic);
 
-        @Cleanup
-        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
                 .serviceHttpUrl(getPulsarService().getWebServiceAddress())
                 .authentication(new AuthenticationToken(token))
                 .build();
@@ -214,18 +263,17 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         superUserAdmin.topics().delete(topic, true);
     }
 
-     @Test
+    @Test
     public void testTopics() throws Exception {
         final String random = UUID.randomUUID().toString();
         final String namespace = "public/default";
         final String topic = "persistent://" + namespace + "/" + random;
-        final String subject =  UUID.randomUUID().toString();
+        final String subject = UUID.randomUUID().toString();
         final String token = Jwts.builder()
                 .claim("sub", subject).signWith(SECRET_KEY).compact();
         superUserAdmin.topics().createNonPartitionedTopic(topic);
 
-        @Cleanup
-        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
                 .serviceHttpUrl(getPulsarService().getWebServiceAddress())
                 .authentication(new AuthenticationToken(token))
                 .build();
@@ -236,10 +284,10 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         // test tenant manager
         tenantManagerAdmin.namespaces().getTopics(namespace);
 
+        AtomicBoolean execFlag = setAuthorizationOperationChecker(subject, 
NamespaceOperation.GET_TOPICS);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.namespaces().getTopics(namespace));
-
-        setAuthorizationOperationChecker(subject, 
NamespaceOperation.GET_TOPICS);
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, 
subject, Set.of(action));
@@ -260,13 +308,12 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         final String random = UUID.randomUUID().toString();
         final String namespace = "public/default";
         final String topic = "persistent://" + namespace + "/" + random;
-        final String subject =  UUID.randomUUID().toString();
+        final String subject = UUID.randomUUID().toString();
         final String token = Jwts.builder()
                 .claim("sub", subject).signWith(SECRET_KEY).compact();
         superUserAdmin.topics().createNonPartitionedTopic(topic);
 
-        @Cleanup
-        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
                 .serviceHttpUrl(getPulsarService().getWebServiceAddress())
                 .authentication(new AuthenticationToken(token))
                 .build();
@@ -302,11 +349,11 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, 
subject, Set.of(action));
             
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
-                        () -> 
subAdmin.namespaces().setBookieAffinityGroup(namespace, 
bookieAffinityGroupData));
+                    () -> 
subAdmin.namespaces().setBookieAffinityGroup(namespace, 
bookieAffinityGroupData));
             
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
-                        () -> 
subAdmin.namespaces().getBookieAffinityGroup(namespace));
+                    () -> 
subAdmin.namespaces().getBookieAffinityGroup(namespace));
             
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
-                        () -> 
subAdmin.namespaces().deleteBookieAffinityGroup(namespace));
+                    () -> 
subAdmin.namespaces().deleteBookieAffinityGroup(namespace));
             
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
         }
 
@@ -319,20 +366,19 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         final String random = UUID.randomUUID().toString();
         final String namespace = "public/default";
         final String topic = "persistent://" + namespace + "/" + random;
-        final String subject =  UUID.randomUUID().toString();
+        final String subject = UUID.randomUUID().toString();
         final String token = Jwts.builder()
                 .claim("sub", subject).signWith(SECRET_KEY).compact();
         superUserAdmin.topics().createNonPartitionedTopic(topic);
 
         Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
-            .topic(topic)
-            .enableBatching(false)
-            .messageRoutingMode(MessageRoutingMode.SinglePartition)
-            .create();
+                .topic(topic)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
         producer.send("message".getBytes());
 
-        @Cleanup
-        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
                 .serviceHttpUrl(getPulsarService().getWebServiceAddress())
                 .authentication(new AuthenticationToken(token))
                 .build();
@@ -343,10 +389,10 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         tenantManagerAdmin.namespaces().getBundles(namespace);
 
         // test nobody
+        AtomicBoolean execFlag = setAuthorizationOperationChecker(subject, 
NamespaceOperation.GET_BUNDLE);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.namespaces().getBundles(namespace));
-
-        setAuthorizationOperationChecker(subject, 
NamespaceOperation.GET_BUNDLE);
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, 
subject, Set.of(action));
@@ -367,20 +413,19 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         final String random = UUID.randomUUID().toString();
         final String namespace = "public/default";
         final String topic = "persistent://" + namespace + "/" + random;
-        final String subject =  UUID.randomUUID().toString();
+        final String subject = UUID.randomUUID().toString();
         final String token = Jwts.builder()
                 .claim("sub", subject).signWith(SECRET_KEY).compact();
         superUserAdmin.topics().createNonPartitionedTopic(topic);
 
         Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
-            .topic(topic)
-            .enableBatching(false)
-            .messageRoutingMode(MessageRoutingMode.SinglePartition)
-            .create();
+                .topic(topic)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
         producer.send("message".getBytes());
 
-        @Cleanup
-        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
                 .serviceHttpUrl(getPulsarService().getWebServiceAddress())
                 .authentication(new AuthenticationToken(token))
                 .build();
@@ -401,7 +446,7 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, 
subject, Set.of(action));
             
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
-                        () -> 
subAdmin.namespaces().unloadNamespaceBundle(namespace, defaultBundle));
+                    () -> 
subAdmin.namespaces().unloadNamespaceBundle(namespace, defaultBundle));
             
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
         }
 
@@ -413,20 +458,19 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         final String random = UUID.randomUUID().toString();
         final String namespace = "public/default";
         final String topic = "persistent://" + namespace + "/" + random;
-        final String subject =  UUID.randomUUID().toString();
+        final String subject = UUID.randomUUID().toString();
         final String token = Jwts.builder()
                 .claim("sub", subject).signWith(SECRET_KEY).compact();
         superUserAdmin.topics().createNonPartitionedTopic(topic);
 
         Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
-            .topic(topic)
-            .enableBatching(false)
-            .messageRoutingMode(MessageRoutingMode.SinglePartition)
-            .create();
+                .topic(topic)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
         producer.send("message".getBytes());
 
-        @Cleanup
-        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
                 .serviceHttpUrl(getPulsarService().getWebServiceAddress())
                 .authentication(new AuthenticationToken(token))
                 .build();
@@ -447,7 +491,7 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, 
subject, Set.of(action));
             
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
-                        () -> 
subAdmin.namespaces().splitNamespaceBundle(namespace, defaultBundle, false, 
null));
+                    () -> 
subAdmin.namespaces().splitNamespaceBundle(namespace, defaultBundle, false, 
null));
             
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
         }
 
@@ -459,13 +503,12 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         final String random = UUID.randomUUID().toString();
         final String namespace = "public/default";
         final String topic = "persistent://" + namespace + "/" + random;
-        final String subject =  UUID.randomUUID().toString();
+        final String subject = UUID.randomUUID().toString();
         final String token = Jwts.builder()
                 .claim("sub", subject).signWith(SECRET_KEY).compact();
         superUserAdmin.topics().createNonPartitionedTopic(topic);
 
-        @Cleanup
-        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
                 .serviceHttpUrl(getPulsarService().getWebServiceAddress())
                 .authentication(new AuthenticationToken(token))
                 .build();
@@ -478,7 +521,8 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         producer.send("message".getBytes());
 
         for (int i = 0; i < 3; i++) {
-            superUserAdmin.namespaces().splitNamespaceBundle(namespace, 
Policies.BundleType.LARGEST.toString(), false, null);
+            superUserAdmin.namespaces()
+                    .splitNamespaceBundle(namespace, 
Policies.BundleType.LARGEST.toString(), false, null);
         }
 
         BundlesData bundles = 
superUserAdmin.namespaces().getBundles(namespace);
@@ -490,7 +534,7 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         for (int i = 0; i < boundaries.size() - 1; i++) {
             String bundleRange = boundaries.get(i) + "_" + boundaries.get(i + 
1);
             List<Topic> allTopicsFromNamespaceBundle = 
getPulsarService().getBrokerService()
-                            .getAllTopicsFromNamespaceBundle(namespace, 
namespace + "/" + bundleRange);
+                    .getAllTopicsFromNamespaceBundle(namespace, namespace + 
"/" + bundleRange);
             System.out.println(StringUtils.join(allTopicsFromNamespaceBundle));
             if (allTopicsFromNamespaceBundle.isEmpty()) {
                 bundleRanges.add(bundleRange);
@@ -504,15 +548,15 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         tenantManagerAdmin.namespaces().deleteNamespaceBundle(namespace, 
bundleRanges.get(1));
 
         // test nobody
+        AtomicBoolean execFlag = setAuthorizationOperationChecker(subject, 
NamespaceOperation.DELETE_BUNDLE);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.namespaces().deleteNamespaceBundle(namespace, 
bundleRanges.get(1)));
-
-        setAuthorizationOperationChecker(subject, 
NamespaceOperation.DELETE_BUNDLE);
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, 
subject, Set.of(action));
             
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
-                () -> subAdmin.namespaces().deleteNamespaceBundle(namespace, 
bundleRanges.get(1)));
+                    () -> 
subAdmin.namespaces().deleteNamespaceBundle(namespace, bundleRanges.get(1)));
             
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
         }
     }
@@ -522,7 +566,7 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         final String random = UUID.randomUUID().toString();
         final String namespace = "public/default";
         final String topic = "persistent://" + namespace + "/" + random;
-        final String subject =  UUID.randomUUID().toString();
+        final String subject = UUID.randomUUID().toString();
         final String token = Jwts.builder()
                 .claim("sub", subject).signWith(SECRET_KEY).compact();
         superUserAdmin.topics().createNonPartitionedTopic(topic);
@@ -530,13 +574,11 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         final String role = "sub";
         final AuthAction testAction = AuthAction.consume;
 
-        @Cleanup
-        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
                 .serviceHttpUrl(getPulsarService().getWebServiceAddress())
                 .authentication(new AuthenticationToken(token))
                 .build();
 
-
         // test super admin
         superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, 
role, Set.of(testAction));
         Map<String, Set<AuthAction>> permissions = 
superUserAdmin.namespaces().getPermissions(namespace);
@@ -554,25 +596,33 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         Assert.assertTrue(permissions.isEmpty());
 
         // test nobody
+        AtomicBoolean execFlag =
+                    setAuthorizationOperationChecker(subject, 
NamespaceOperation.GRANT_PERMISSION);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> 
subAdmin.namespaces().grantPermissionOnNamespace(namespace, role, 
Set.of(testAction)));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationOperationChecker(subject, 
NamespaceOperation.GET_PERMISSION);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.namespaces().getPermissions(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag =
+                    setAuthorizationOperationChecker(subject, 
NamespaceOperation.REVOKE_PERMISSION);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> 
subAdmin.namespaces().revokePermissionsOnNamespace(namespace, role));
+        Assert.assertTrue(execFlag.get());
 
+        clearAuthorizationOperationChecker();
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, 
subject, Set.of(action));
-            setAuthorizationOperationChecker(subject, 
NamespaceOperation.GRANT_PERMISSION);
             
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
-                        () -> 
subAdmin.namespaces().grantPermissionOnNamespace(namespace, role, 
Set.of(testAction)));
-            setAuthorizationOperationChecker(subject, 
NamespaceOperation.GET_PERMISSION);
+                    () -> 
subAdmin.namespaces().grantPermissionOnNamespace(namespace, role, 
Set.of(testAction)));
             
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
-                        () -> subAdmin.namespaces().getPermissions(namespace));
-            setAuthorizationOperationChecker(subject, 
NamespaceOperation.REVOKE_PERMISSION);
+                    () -> subAdmin.namespaces().getPermissions(namespace));
             
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
-                        () -> 
subAdmin.namespaces().revokePermissionsOnNamespace(namespace, role));
+                    () -> 
subAdmin.namespaces().revokePermissionsOnNamespace(namespace, role));
             
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
         }
 
@@ -584,13 +634,12 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         final String random = UUID.randomUUID().toString();
         final String namespace = "public/default";
         final String topic = "persistent://" + namespace + "/" + random;
-        final String subject =  UUID.randomUUID().toString();
+        final String subject = UUID.randomUUID().toString();
         final String token = Jwts.builder()
                 .claim("sub", subject).signWith(SECRET_KEY).compact();
         superUserAdmin.topics().createNonPartitionedTopic(topic);
 
-        @Cleanup
-        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
                 .serviceHttpUrl(getPulsarService().getWebServiceAddress())
                 .authentication(new AuthenticationToken(token))
                 .build();
@@ -604,7 +653,8 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
 
         // test super admin
         superUserAdmin.namespaces().grantPermissionOnSubscription(namespace, 
subscription, Set.of(role));
-        Map<String, Set<String>> permissionOnSubscription = 
superUserAdmin.namespaces().getPermissionOnSubscription(namespace);
+        Map<String, Set<String>> permissionOnSubscription =
+                
superUserAdmin.namespaces().getPermissionOnSubscription(namespace);
         Assert.assertEquals(permissionOnSubscription.get(subscription), 
Set.of(role));
         superUserAdmin.namespaces().revokePermissionOnSubscription(namespace, 
subscription, role);
         permissionOnSubscription = 
superUserAdmin.namespaces().getPermissionOnSubscription(namespace);
@@ -619,25 +669,29 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         Assert.assertTrue(permissionOnSubscription.isEmpty());
 
         // test nobody
+        AtomicBoolean execFlag = setAuthorizationOperationChecker(subject, 
NamespaceOperation.GRANT_PERMISSION);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> 
subAdmin.namespaces().grantPermissionOnSubscription(namespace, subscription, 
Set.of(role)));
+        Assert.assertTrue(execFlag.get());
+        execFlag = setAuthorizationOperationChecker(subject, 
NamespaceOperation.GET_PERMISSION);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> 
subAdmin.namespaces().getPermissionOnSubscription(namespace));
+        Assert.assertTrue(execFlag.get());
+        execFlag = setAuthorizationOperationChecker(subject, 
NamespaceOperation.REVOKE_PERMISSION);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> 
subAdmin.namespaces().revokePermissionOnSubscription(namespace, subscription, 
role));
+        Assert.assertTrue(execFlag.get());
 
+        clearAuthorizationOperationChecker();
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, 
subject, Set.of(action));
-            setAuthorizationOperationChecker(subject, 
NamespaceOperation.GRANT_PERMISSION);
             
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
-                        () -> 
subAdmin.namespaces().grantPermissionOnSubscription(namespace, subscription, 
Set.of(role)));
-            setAuthorizationOperationChecker(subject, 
NamespaceOperation.GET_PERMISSION);
+                    () -> 
subAdmin.namespaces().grantPermissionOnSubscription(namespace, subscription, 
Set.of(role)));
             
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
-                        () -> 
subAdmin.namespaces().getPermissionOnSubscription(namespace));
-            setAuthorizationOperationChecker(subject, 
NamespaceOperation.REVOKE_PERMISSION);
+                    () -> 
subAdmin.namespaces().getPermissionOnSubscription(namespace));
             
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
-                        () -> 
subAdmin.namespaces().revokePermissionOnSubscription(namespace, subscription, 
role));
+                    () -> 
subAdmin.namespaces().revokePermissionOnSubscription(namespace, subscription, 
role));
             
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
         }
 
@@ -649,12 +703,11 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         final String random = UUID.randomUUID().toString();
         final String namespace = "public/default";
         final String topic = "persistent://" + namespace + "/" + random;
-        final String subject =  UUID.randomUUID().toString();
+        final String subject = UUID.randomUUID().toString();
         final String token = Jwts.builder()
                 .claim("sub", subject).signWith(SECRET_KEY).compact();
         superUserAdmin.topics().createNonPartitionedTopic(topic);
-        @Cleanup
-        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
                 .serviceHttpUrl(getPulsarService().getWebServiceAddress())
                 .authentication(new AuthenticationToken(token))
                 .build();
@@ -665,10 +718,10 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         tenantManagerAdmin.namespaces().clearNamespaceBacklog(namespace);
 
         // test nobody
+        AtomicBoolean execFlag = setAuthorizationOperationChecker(subject, 
NamespaceOperation.CLEAR_BACKLOG);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.namespaces().clearNamespaceBacklog(namespace));
-
-        setAuthorizationOperationChecker(subject, 
NamespaceOperation.CLEAR_BACKLOG);
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, 
subject, Set.of(action));
@@ -684,17 +737,16 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         superUserAdmin.topics().delete(topic, true);
     }
 
-     @Test
+    @Test
     public void testClearNamespaceBundleBacklog() throws Exception {
         final String random = UUID.randomUUID().toString();
         final String namespace = "public/default";
         final String topic = "persistent://" + namespace + "/" + random;
-        final String subject =  UUID.randomUUID().toString();
+        final String subject = UUID.randomUUID().toString();
         final String token = Jwts.builder()
                 .claim("sub", subject).signWith(SECRET_KEY).compact();
         superUserAdmin.topics().createNonPartitionedTopic(topic);
-        @Cleanup
-        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
                 .serviceHttpUrl(getPulsarService().getWebServiceAddress())
                 .authentication(new AuthenticationToken(token))
                 .build();
@@ -706,17 +758,17 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
 
         final String defaultBundle = "0x00000000_0xffffffff";
 
-         // test super admin
+        // test super admin
         superUserAdmin.namespaces().clearNamespaceBundleBacklog(namespace, 
defaultBundle);
 
         // test tenant manager
         tenantManagerAdmin.namespaces().clearNamespaceBundleBacklog(namespace, 
defaultBundle);
 
         // test nobody
-         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+        AtomicBoolean execFlag = setAuthorizationOperationChecker(subject, 
NamespaceOperation.CLEAR_BACKLOG);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> 
subAdmin.namespaces().clearNamespaceBundleBacklog(namespace, defaultBundle));
-
-         setAuthorizationOperationChecker(subject, 
NamespaceOperation.CLEAR_BACKLOG);
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, 
subject, Set.of(action));
@@ -737,12 +789,11 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         final String random = UUID.randomUUID().toString();
         final String namespace = "public/default";
         final String topic = "persistent://" + namespace + "/" + random;
-        final String subject =  UUID.randomUUID().toString();
+        final String subject = UUID.randomUUID().toString();
         final String token = Jwts.builder()
                 .claim("sub", subject).signWith(SECRET_KEY).compact();
         superUserAdmin.topics().createNonPartitionedTopic(topic);
-        @Cleanup
-        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
                 .serviceHttpUrl(getPulsarService().getWebServiceAddress())
                 .authentication(new AuthenticationToken(token))
                 .build();
@@ -756,17 +807,17 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
                 .subscriptionName("sub")
                 .subscribe().close();
 
-         // test super admin
+        // test super admin
         superUserAdmin.namespaces().unsubscribeNamespace(namespace, "sub");
 
         // test tenant manager
         tenantManagerAdmin.namespaces().unsubscribeNamespace(namespace, "sub");
 
         // test nobody
-         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+        AtomicBoolean execFlag = setAuthorizationOperationChecker(subject, 
NamespaceOperation.UNSUBSCRIBE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.namespaces().unsubscribeNamespace(namespace, 
"sub"));
-
-         setAuthorizationOperationChecker(subject, 
NamespaceOperation.UNSUBSCRIBE);
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, 
subject, Set.of(action));
@@ -786,13 +837,12 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
     public void testUnsubscribeNamespaceBundle() throws Exception {
         final String random = UUID.randomUUID().toString();
         final String namespace = "public/default";
-        final String topic = "persistent://" + namespace + "/" + random;
-        final String subject =  UUID.randomUUID().toString();
+        final String topic = "persistent://" + namespace + "/" + random ;
+        final String subject = UUID.randomUUID().toString();
         final String token = Jwts.builder()
                 .claim("sub", subject).signWith(SECRET_KEY).compact();
         superUserAdmin.topics().createNonPartitionedTopic(topic);
-        @Cleanup
-        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
                 .serviceHttpUrl(getPulsarService().getWebServiceAddress())
                 .authentication(new AuthenticationToken(token))
                 .build();
@@ -808,17 +858,17 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
 
         final String defaultBundle = "0x00000000_0xffffffff";
 
-         // test super admin
+        // test super admin
         superUserAdmin.namespaces().unsubscribeNamespaceBundle(namespace, 
defaultBundle, "sub");
 
         // test tenant manager
         tenantManagerAdmin.namespaces().unsubscribeNamespaceBundle(namespace, 
defaultBundle, "sub");
 
         // test nobody
-         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+        AtomicBoolean execFlag = setAuthorizationOperationChecker(subject, 
NamespaceOperation.UNSUBSCRIBE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> 
subAdmin.namespaces().unsubscribeNamespaceBundle(namespace, defaultBundle, 
"sub"));
-
-         setAuthorizationOperationChecker(subject, 
NamespaceOperation.UNSUBSCRIBE);
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, 
subject, Set.of(action));
@@ -921,6 +971,7 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         tenantManagerAdmin.packages().updateMetadata(packageName, 
updatedMetadata);
 
         // ---- test nobody ---
+        AtomicBoolean execFlag = setAuthorizationOperationChecker(subject, 
NamespaceOperation.PACKAGES);
 
         File file3 = File.createTempFile("package-api-test", ".package");
 
@@ -954,9 +1005,7 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
         updatedMetadata3.setProperties(Collections.singletonMap("key", 
"value"));
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.packages().updateMetadata(packageName3, 
updatedMetadata3));
-
-
-        setAuthorizationOperationChecker(subject, NamespaceOperation.PACKAGES);
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, 
subject, Set.of(action));
@@ -1022,7 +1071,7 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
 
     @Test
     @SneakyThrows
-    public void testOffloadThresholdInSeconds() {
+    public void testDispatchRate() {
         final String namespace = "public/default";
         final String subject = UUID.randomUUID().toString();
         final String token = Jwts.builder()
@@ -1031,16 +1080,27 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
                 .serviceHttpUrl(getPulsarService().getWebServiceAddress())
                 .authentication(new AuthenticationToken(token))
                 .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, 
PolicyOperation.READ);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
-                () -> 
subAdmin.namespaces().getOffloadThresholdInSeconds(namespace));
+                () -> subAdmin.namespaces().getDispatchRate(namespace));
+        Assert.assertTrue(execFlag.get());
 
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.RATE, PolicyOperation.WRITE);
+        DispatchRate dispatchRate =
+                
DispatchRate.builder().dispatchThrottlingRateInByte(10).dispatchThrottlingRateInMsg(10).build();
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
-                () -> 
subAdmin.namespaces().setOffloadThresholdInSeconds(namespace, 10000));
+                () -> subAdmin.namespaces().setDispatchRate(namespace, 
dispatchRate));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.RATE, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().removeDispatchRate(namespace));
+        Assert.assertTrue(execFlag.get());
     }
 
     @Test
     @SneakyThrows
-    public void testMaxSubscriptionsPerTopic() {
+    public void testSubscribeRate() {
         final String namespace = "public/default";
         final String subject = UUID.randomUUID().toString();
         final String token = Jwts.builder()
@@ -1049,13 +1109,985 @@ public class NamespaceAuthZTest extends 
MockedPulsarStandalone {
                 .serviceHttpUrl(getPulsarService().getWebServiceAddress())
                 .authentication(new AuthenticationToken(token))
                 .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, 
PolicyOperation.READ);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
-                () -> 
subAdmin.namespaces().getMaxSubscriptionsPerTopic(namespace));
+                () -> subAdmin.namespaces().getSubscribeRate(namespace));
+        Assert.assertTrue(execFlag.get());
 
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.RATE, PolicyOperation.WRITE);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
-                () -> 
subAdmin.namespaces().setMaxSubscriptionsPerTopic(namespace, 100));
+                () -> subAdmin.namespaces().setSubscribeRate(namespace, new 
SubscribeRate()));
+        Assert.assertTrue(execFlag.get());
 
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.RATE, PolicyOperation.WRITE);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
-                () -> 
subAdmin.namespaces().removeMaxSubscriptionsPerTopic(namespace));
+                () -> subAdmin.namespaces().removeSubscribeRate(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testPublishRate() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().getPublishRate(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.RATE, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().setPublishRate(namespace, new 
PublishRate(10, 10)));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.RATE, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().removePublishRate(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testSubscriptionDispatchRate() {
+        final String random = UUID.randomUUID().toString();
+        final String namespace = "public/default";
+        final String topic = "persistent://" + namespace + "/" + random;
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        superUserAdmin.topics().createNonPartitionedTopic(topic);
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().getSubscriptionDispatchRate(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.RATE, PolicyOperation.WRITE);
+        DispatchRate dispatchRate = 
DispatchRate.builder().dispatchThrottlingRateInMsg(10).dispatchThrottlingRateInByte(10).build();
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.RATE, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().removeSubscriptionDispatchRate(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testCompactionThreshold() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag =
+                setAuthorizationPolicyOperationChecker(subject, 
PolicyName.COMPACTION, PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().getCompactionThreshold(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.COMPACTION, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().setCompactionThreshold(namespace, 
100L * 1024L *1024L));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.COMPACTION, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().removeCompactionThreshold(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testAutoTopicCreation() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag =
+                setAuthorizationPolicyOperationChecker(subject, 
PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().getAutoTopicCreation(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag =
+                setAuthorizationPolicyOperationChecker(subject, 
PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.WRITE);
+        AutoTopicCreationOverride build = 
AutoTopicCreationOverride.builder().allowAutoTopicCreation(true).build();
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().setAutoTopicCreation(namespace, 
build));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag =
+                setAuthorizationPolicyOperationChecker(subject, 
PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().removeAutoTopicCreation(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testAutoSubscriptionCreation() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, 
PolicyName.AUTO_SUBSCRIPTION_CREATION,
+                PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().getAutoSubscriptionCreation(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.AUTO_SUBSCRIPTION_CREATION,
+                PolicyOperation.WRITE);
+        AutoSubscriptionCreationOverride build =
+                
AutoSubscriptionCreationOverride.builder().allowAutoSubscriptionCreation(true).build();
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().setAutoSubscriptionCreation(namespace, build));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.AUTO_SUBSCRIPTION_CREATION,
+                PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().removeAutoSubscriptionCreation(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testMaxUnackedMessagesPerConsumer() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_UNACKED,
+                PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().getMaxUnackedMessagesPerConsumer(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_UNACKED,
+                PolicyOperation.WRITE);
+        AutoSubscriptionCreationOverride build =
+                
AutoSubscriptionCreationOverride.builder().allowAutoSubscriptionCreation(true).build();
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().setMaxUnackedMessagesPerConsumer(namespace, 100));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_UNACKED,
+                PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().removeMaxUnackedMessagesPerConsumer(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testMaxUnackedMessagesPerSubscription() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_UNACKED,
+                PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().getMaxUnackedMessagesPerSubscription(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_UNACKED,
+                PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().setMaxUnackedMessagesPerSubscription(namespace, 100));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_UNACKED,
+                PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().removeMaxUnackedMessagesPerSubscription(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testNamespaceResourceGroup() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.RESOURCEGROUP,
+                PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().getNamespaceResourceGroup(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.RESOURCEGROUP,
+                PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().setNamespaceResourceGroup(namespace, "test-group"));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.RESOURCEGROUP,
+                PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().removeNamespaceResourceGroup(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testDispatcherPauseOnAckStatePersistent() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag =
+                setAuthorizationPolicyOperationChecker(subject, 
PolicyName.DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT,
+                        PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().getDispatcherPauseOnAckStatePersistent(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT,
+                PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().setDispatcherPauseOnAckStatePersistent(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT,
+                PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().removeDispatcherPauseOnAckStatePersistent(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testBacklogQuota() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.BACKLOG, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().getBacklogQuotaMap(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.BACKLOG, PolicyOperation.WRITE);
+        BacklogQuota backlogQuota = 
BacklogQuota.builder().limitTime(10).limitSize(10).build();
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().setBacklogQuota(namespace, 
backlogQuota));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.BACKLOG, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().removeBacklogQuota(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testDeduplicationSnapshotInterval() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, 
PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().getDeduplicationSnapshotInterval(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().setDeduplicationSnapshotInterval(namespace, 100));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().removeDeduplicationSnapshotInterval(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testMaxSubscriptionsPerTopic() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag =
+                setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().getMaxSubscriptionsPerTopic(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().setMaxSubscriptionsPerTopic(namespace, 10));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().removeMaxSubscriptionsPerTopic(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testMaxProducersPerTopic() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag =
+                setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_PRODUCERS, PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().getMaxProducersPerTopic(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().setMaxProducersPerTopic(namespace, 
10));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().removeMaxProducersPerTopic(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testMaxConsumersPerTopic() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag =
+                setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_CONSUMERS, PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().getMaxConsumersPerTopic(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().setMaxConsumersPerTopic(namespace, 
10));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().removeMaxConsumersPerTopic(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testNamespaceReplicationClusters() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag =
+                setAuthorizationPolicyOperationChecker(subject, 
PolicyName.REPLICATION, PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().getNamespaceReplicationClusters(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.REPLICATION, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("test")));
+        Assert.assertTrue(execFlag.get());
+    }
+
+        @Test
+    @SneakyThrows
+    public void testReplicatorDispatchRate() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag =
+                setAuthorizationPolicyOperationChecker(subject, 
PolicyName.REPLICATION_RATE, PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().getReplicatorDispatchRate(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.REPLICATION_RATE, PolicyOperation.WRITE);
+        DispatchRate build =
+                    
DispatchRate.builder().dispatchThrottlingRateInByte(10).dispatchThrottlingRateInMsg(10).build();
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().setReplicatorDispatchRate(namespace, build));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.REPLICATION_RATE, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().removeReplicatorDispatchRate(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testMaxConsumersPerSubscription() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag =
+                setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_CONSUMERS, PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().getMaxConsumersPerSubscription(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().setMaxConsumersPerSubscription(namespace, 10));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().removeMaxConsumersPerSubscription(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testOffloadThreshold() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.OFFLOAD, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().getOffloadThreshold(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.OFFLOAD, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().setOffloadThreshold(namespace, 
10));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testOffloadPolicies() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.OFFLOAD, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().getOffloadPolicies(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.OFFLOAD, PolicyOperation.WRITE);
+        OffloadPolicies offloadPolicies = 
OffloadPolicies.builder().managedLedgerOffloadThresholdInBytes(10L).build();
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().setOffloadPolicies(namespace, 
offloadPolicies));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.OFFLOAD, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().removeOffloadPolicies(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testMaxTopicsPerNamespace() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_TOPICS, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().getMaxTopicsPerNamespace(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_TOPICS, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().setMaxTopicsPerNamespace(namespace, 10));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_TOPICS, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().removeMaxTopicsPerNamespace(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testDeduplicationStatus() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag =
+                setAuthorizationPolicyOperationChecker(subject, 
PolicyName.DEDUPLICATION, PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().getDeduplicationStatus(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.DEDUPLICATION, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().setDeduplicationStatus(namespace, 
true));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.DEDUPLICATION, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().removeDeduplicationStatus(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testPersistence() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag =
+                setAuthorizationPolicyOperationChecker(subject, 
PolicyName.PERSISTENCE, PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().getPersistence(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.PERSISTENCE, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().setPersistence(namespace, new 
PersistencePolicies(10, 10, 10, 10)));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.PERSISTENCE, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().removePersistence(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testNamespaceMessageTTL() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.TTL, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().getNamespaceMessageTTL(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.TTL, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().setNamespaceMessageTTL(namespace, 
10));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.TTL, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().removeNamespaceMessageTTL(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testSubscriptionExpirationTime() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag =
+                setAuthorizationPolicyOperationChecker(subject, 
PolicyName.SUBSCRIPTION_EXPIRATION_TIME,
+                        PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().getSubscriptionExpirationTime(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.SUBSCRIPTION_EXPIRATION_TIME, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().setSubscriptionExpirationTime(namespace, 10));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.SUBSCRIPTION_EXPIRATION_TIME, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().removeSubscriptionExpirationTime(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testDelayedDeliveryMessages() {
+        final String random = UUID.randomUUID().toString();
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag =
+                setAuthorizationPolicyOperationChecker(subject, 
PolicyName.DELAYED_DELIVERY, PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().getDelayedDelivery(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testRetention() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.RETENTION, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().getRetention(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.RETENTION, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().setRetention(namespace, new 
RetentionPolicies(10, 10)));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.RETENTION, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().removeRetention(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testInactiveTopicPolicies() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.INACTIVE_TOPIC, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().getInactiveTopicPolicies(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE);
+        InactiveTopicPolicies inactiveTopicPolicies = new 
InactiveTopicPolicies(
+                InactiveTopicDeleteMode.delete_when_no_subscriptions, 10, 
false);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().setInactiveTopicPolicies(namespace, 
inactiveTopicPolicies));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().removeInactiveTopicPolicies(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testNamespaceAntiAffinityGroup() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.ANTI_AFFINITY, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().getNamespaceAntiAffinityGroup(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().setNamespaceAntiAffinityGroup(namespace, 
"invalid-group"));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testOffloadDeleteLagMs() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.OFFLOAD, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().getOffloadDeleteLagMs(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.OFFLOAD, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().setOffloadDeleteLag(namespace, 
100, TimeUnit.HOURS));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testOffloadThresholdInSeconds() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag =
+                setAuthorizationPolicyOperationChecker(subject, 
PolicyName.OFFLOAD, PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().getOffloadThresholdInSeconds(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.OFFLOAD, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().setOffloadThresholdInSeconds(namespace, 10000));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testNamespaceEntryFilters() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.ENTRY_FILTERS, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().getNamespaceEntryFilters(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().setNamespaceEntryFilters(namespace, new 
EntryFilters("filter1")));
+        Assert.assertTrue(execFlag.get());
+
+                execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().removeNamespaceEntryFilters(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testEncryptionRequiredStatus() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.ENCRYPTION, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().getEncryptionRequiredStatus(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.ENCRYPTION, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().setEncryptionRequiredStatus(namespace, false));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testSubscriptionTypesEnabled() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, 
PolicyName.SUBSCRIPTION_AUTH_MODE,
+                PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().getSubscriptionTypesEnabled(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().setSubscriptionTypesEnabled(namespace, 
Sets.newHashSet(SubscriptionType.Failover)));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().removeSubscriptionTypesEnabled(namespace));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testIsAllowAutoUpdateSchema() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, 
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().getIsAllowAutoUpdateSchema(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().setIsAllowAutoUpdateSchema(namespace, true));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testSchemaAutoUpdateCompatibilityStrategy() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, 
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace, 
AutoUpdateDisabled));
+        Assert.assertTrue(execFlag.get());
+    }
+
+    @Test
+    @SneakyThrows
+    public void testSchemaValidationEnforced() {
+        final String namespace = "public/default";
+        final String subject = UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, 
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().getSchemaValidationEnforced(namespace));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.namespaces().setSchemaValidationEnforced(namespace, true));
+        Assert.assertTrue(execFlag.get());
     }
 }


Reply via email to