This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1bb9378b50a [improve][test] Add policy authentication test for
namespace API (#22593)
1bb9378b50a is described below
commit 1bb9378b50aa891834b64cd39f55ae0e32a055bb
Author: Cong Zhao <[email protected]>
AuthorDate: Sun Apr 28 10:37:37 2024 +0800
[improve][test] Add policy authentication test for namespace API (#22593)
---
.../pulsar/broker/admin/NamespaceAuthZTest.java | 1248 ++++++++++++++++++--
1 file changed, 1140 insertions(+), 108 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
index 5358295b785..ec6a122f7df 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
@@ -20,9 +20,11 @@
package org.apache.pulsar.broker.admin;
import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.deleteNamespaceWithRetry;
+import static
org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy.AutoUpdateDisabled;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+import com.google.common.collect.Sets;
import io.jsonwebtoken.Jwts;
import java.io.File;
import java.util.ArrayList;
@@ -32,6 +34,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
@@ -44,17 +48,33 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
+import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.EntryFilters;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.PublishRate;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
import
org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
import org.apache.pulsar.packages.management.core.common.PackageMetadata;
import org.apache.pulsar.security.MockedPulsarStandalone;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
@@ -72,7 +92,7 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
private AuthorizationService authorizationService;
- private static final String TENANT_ADMIN_SUBJECT =
UUID.randomUUID().toString();
+ private static final String TENANT_ADMIN_SUBJECT =
UUID.randomUUID().toString();
private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
.claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
@@ -122,16 +142,46 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
superUserAdmin.namespaces().createNamespace("public/default");
}
- private void setAuthorizationOperationChecker(String role,
NamespaceOperation operation) {
+ private AtomicBoolean setAuthorizationOperationChecker(String role,
NamespaceOperation operation) {
+ AtomicBoolean execFlag = new AtomicBoolean(false);
Mockito.doAnswer(invocationOnMock -> {
String role_ = invocationOnMock.getArgument(2);
if (role.equals(role_)) {
NamespaceOperation operation_ =
invocationOnMock.getArgument(1);
Assert.assertEquals(operation_, operation);
}
+ execFlag.set(true);
return invocationOnMock.callRealMethod();
}).when(authorizationService).allowNamespaceOperationAsync(Mockito.any(),
Mockito.any(), Mockito.any(),
Mockito.any());
+ return execFlag;
+ }
+
+ private void clearAuthorizationOperationChecker() {
+
Mockito.doAnswer(InvocationOnMock::callRealMethod).when(authorizationService)
+ .allowNamespaceOperationAsync(Mockito.any(), Mockito.any(),
Mockito.any(),
+ Mockito.any());
+ }
+
+ private AtomicBoolean setAuthorizationPolicyOperationChecker(String role,
Object policyName, Object operation) {
+ AtomicBoolean execFlag = new AtomicBoolean(false);
+ if (operation instanceof PolicyOperation) {
+ Mockito.doAnswer(invocationOnMock -> {
+ String role_ = invocationOnMock.getArgument(3);
+ if (role.equals(role_)) {
+ PolicyName policyName_ = invocationOnMock.getArgument(1);
+ PolicyOperation operation_ =
invocationOnMock.getArgument(2);
+ assertEquals(operation_, operation);
+ assertEquals(policyName_, policyName);
+ }
+ execFlag.set(true);
+ return invocationOnMock.callRealMethod();
+
}).when(authorizationService).allowNamespacePolicyOperationAsync(Mockito.any(),
Mockito.any(), Mockito.any(),
+ Mockito.any(), Mockito.any());
+ } else {
+ throw new IllegalArgumentException("");
+ }
+ return execFlag;
}
@SneakyThrows
@@ -140,13 +190,12 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
final String random = UUID.randomUUID().toString();
final String namespace = "public/default";
final String topic = "persistent://public/default/" + random;
- final String subject = UUID.randomUUID().toString();
+ final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
superUserAdmin.topics().createNonPartitionedTopic(topic);
- @Cleanup
- final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
@@ -214,18 +263,17 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
superUserAdmin.topics().delete(topic, true);
}
- @Test
+ @Test
public void testTopics() throws Exception {
final String random = UUID.randomUUID().toString();
final String namespace = "public/default";
final String topic = "persistent://" + namespace + "/" + random;
- final String subject = UUID.randomUUID().toString();
+ final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
superUserAdmin.topics().createNonPartitionedTopic(topic);
- @Cleanup
- final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
@@ -236,10 +284,10 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
// test tenant manager
tenantManagerAdmin.namespaces().getTopics(namespace);
+ AtomicBoolean execFlag = setAuthorizationOperationChecker(subject,
NamespaceOperation.GET_TOPICS);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().getTopics(namespace));
-
- setAuthorizationOperationChecker(subject,
NamespaceOperation.GET_TOPICS);
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
@@ -260,13 +308,12 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
final String random = UUID.randomUUID().toString();
final String namespace = "public/default";
final String topic = "persistent://" + namespace + "/" + random;
- final String subject = UUID.randomUUID().toString();
+ final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
superUserAdmin.topics().createNonPartitionedTopic(topic);
- @Cleanup
- final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
@@ -302,11 +349,11 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
for (AuthAction action : AuthAction.values()) {
superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
- () ->
subAdmin.namespaces().setBookieAffinityGroup(namespace,
bookieAffinityGroupData));
+ () ->
subAdmin.namespaces().setBookieAffinityGroup(namespace,
bookieAffinityGroupData));
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
- () ->
subAdmin.namespaces().getBookieAffinityGroup(namespace));
+ () ->
subAdmin.namespaces().getBookieAffinityGroup(namespace));
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
- () ->
subAdmin.namespaces().deleteBookieAffinityGroup(namespace));
+ () ->
subAdmin.namespaces().deleteBookieAffinityGroup(namespace));
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
}
@@ -319,20 +366,19 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
final String random = UUID.randomUUID().toString();
final String namespace = "public/default";
final String topic = "persistent://" + namespace + "/" + random;
- final String subject = UUID.randomUUID().toString();
+ final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
superUserAdmin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
- .topic(topic)
- .enableBatching(false)
- .messageRoutingMode(MessageRoutingMode.SinglePartition)
- .create();
+ .topic(topic)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
producer.send("message".getBytes());
- @Cleanup
- final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
@@ -343,10 +389,10 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
tenantManagerAdmin.namespaces().getBundles(namespace);
// test nobody
+ AtomicBoolean execFlag = setAuthorizationOperationChecker(subject,
NamespaceOperation.GET_BUNDLE);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().getBundles(namespace));
-
- setAuthorizationOperationChecker(subject,
NamespaceOperation.GET_BUNDLE);
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
@@ -367,20 +413,19 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
final String random = UUID.randomUUID().toString();
final String namespace = "public/default";
final String topic = "persistent://" + namespace + "/" + random;
- final String subject = UUID.randomUUID().toString();
+ final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
superUserAdmin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
- .topic(topic)
- .enableBatching(false)
- .messageRoutingMode(MessageRoutingMode.SinglePartition)
- .create();
+ .topic(topic)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
producer.send("message".getBytes());
- @Cleanup
- final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
@@ -401,7 +446,7 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
for (AuthAction action : AuthAction.values()) {
superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
- () ->
subAdmin.namespaces().unloadNamespaceBundle(namespace, defaultBundle));
+ () ->
subAdmin.namespaces().unloadNamespaceBundle(namespace, defaultBundle));
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
}
@@ -413,20 +458,19 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
final String random = UUID.randomUUID().toString();
final String namespace = "public/default";
final String topic = "persistent://" + namespace + "/" + random;
- final String subject = UUID.randomUUID().toString();
+ final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
superUserAdmin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
- .topic(topic)
- .enableBatching(false)
- .messageRoutingMode(MessageRoutingMode.SinglePartition)
- .create();
+ .topic(topic)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
producer.send("message".getBytes());
- @Cleanup
- final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
@@ -447,7 +491,7 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
for (AuthAction action : AuthAction.values()) {
superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
- () ->
subAdmin.namespaces().splitNamespaceBundle(namespace, defaultBundle, false,
null));
+ () ->
subAdmin.namespaces().splitNamespaceBundle(namespace, defaultBundle, false,
null));
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
}
@@ -459,13 +503,12 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
final String random = UUID.randomUUID().toString();
final String namespace = "public/default";
final String topic = "persistent://" + namespace + "/" + random;
- final String subject = UUID.randomUUID().toString();
+ final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
superUserAdmin.topics().createNonPartitionedTopic(topic);
- @Cleanup
- final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
@@ -478,7 +521,8 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
producer.send("message".getBytes());
for (int i = 0; i < 3; i++) {
- superUserAdmin.namespaces().splitNamespaceBundle(namespace,
Policies.BundleType.LARGEST.toString(), false, null);
+ superUserAdmin.namespaces()
+ .splitNamespaceBundle(namespace,
Policies.BundleType.LARGEST.toString(), false, null);
}
BundlesData bundles =
superUserAdmin.namespaces().getBundles(namespace);
@@ -490,7 +534,7 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
for (int i = 0; i < boundaries.size() - 1; i++) {
String bundleRange = boundaries.get(i) + "_" + boundaries.get(i +
1);
List<Topic> allTopicsFromNamespaceBundle =
getPulsarService().getBrokerService()
- .getAllTopicsFromNamespaceBundle(namespace,
namespace + "/" + bundleRange);
+ .getAllTopicsFromNamespaceBundle(namespace, namespace +
"/" + bundleRange);
System.out.println(StringUtils.join(allTopicsFromNamespaceBundle));
if (allTopicsFromNamespaceBundle.isEmpty()) {
bundleRanges.add(bundleRange);
@@ -504,15 +548,15 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
tenantManagerAdmin.namespaces().deleteNamespaceBundle(namespace,
bundleRanges.get(1));
// test nobody
+ AtomicBoolean execFlag = setAuthorizationOperationChecker(subject,
NamespaceOperation.DELETE_BUNDLE);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().deleteNamespaceBundle(namespace,
bundleRanges.get(1)));
-
- setAuthorizationOperationChecker(subject,
NamespaceOperation.DELETE_BUNDLE);
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
- () -> subAdmin.namespaces().deleteNamespaceBundle(namespace,
bundleRanges.get(1)));
+ () ->
subAdmin.namespaces().deleteNamespaceBundle(namespace, bundleRanges.get(1)));
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
}
}
@@ -522,7 +566,7 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
final String random = UUID.randomUUID().toString();
final String namespace = "public/default";
final String topic = "persistent://" + namespace + "/" + random;
- final String subject = UUID.randomUUID().toString();
+ final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
superUserAdmin.topics().createNonPartitionedTopic(topic);
@@ -530,13 +574,11 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
final String role = "sub";
final AuthAction testAction = AuthAction.consume;
- @Cleanup
- final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
-
// test super admin
superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
role, Set.of(testAction));
Map<String, Set<AuthAction>> permissions =
superUserAdmin.namespaces().getPermissions(namespace);
@@ -554,25 +596,33 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
Assert.assertTrue(permissions.isEmpty());
// test nobody
+ AtomicBoolean execFlag =
+ setAuthorizationOperationChecker(subject,
NamespaceOperation.GRANT_PERMISSION);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() ->
subAdmin.namespaces().grantPermissionOnNamespace(namespace, role,
Set.of(testAction)));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationOperationChecker(subject,
NamespaceOperation.GET_PERMISSION);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().getPermissions(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag =
+ setAuthorizationOperationChecker(subject,
NamespaceOperation.REVOKE_PERMISSION);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() ->
subAdmin.namespaces().revokePermissionsOnNamespace(namespace, role));
+ Assert.assertTrue(execFlag.get());
+ clearAuthorizationOperationChecker();
for (AuthAction action : AuthAction.values()) {
superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
- setAuthorizationOperationChecker(subject,
NamespaceOperation.GRANT_PERMISSION);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
- () ->
subAdmin.namespaces().grantPermissionOnNamespace(namespace, role,
Set.of(testAction)));
- setAuthorizationOperationChecker(subject,
NamespaceOperation.GET_PERMISSION);
+ () ->
subAdmin.namespaces().grantPermissionOnNamespace(namespace, role,
Set.of(testAction)));
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
- () -> subAdmin.namespaces().getPermissions(namespace));
- setAuthorizationOperationChecker(subject,
NamespaceOperation.REVOKE_PERMISSION);
+ () -> subAdmin.namespaces().getPermissions(namespace));
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
- () ->
subAdmin.namespaces().revokePermissionsOnNamespace(namespace, role));
+ () ->
subAdmin.namespaces().revokePermissionsOnNamespace(namespace, role));
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
}
@@ -584,13 +634,12 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
final String random = UUID.randomUUID().toString();
final String namespace = "public/default";
final String topic = "persistent://" + namespace + "/" + random;
- final String subject = UUID.randomUUID().toString();
+ final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
superUserAdmin.topics().createNonPartitionedTopic(topic);
- @Cleanup
- final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
@@ -604,7 +653,8 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
// test super admin
superUserAdmin.namespaces().grantPermissionOnSubscription(namespace,
subscription, Set.of(role));
- Map<String, Set<String>> permissionOnSubscription =
superUserAdmin.namespaces().getPermissionOnSubscription(namespace);
+ Map<String, Set<String>> permissionOnSubscription =
+
superUserAdmin.namespaces().getPermissionOnSubscription(namespace);
Assert.assertEquals(permissionOnSubscription.get(subscription),
Set.of(role));
superUserAdmin.namespaces().revokePermissionOnSubscription(namespace,
subscription, role);
permissionOnSubscription =
superUserAdmin.namespaces().getPermissionOnSubscription(namespace);
@@ -619,25 +669,29 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
Assert.assertTrue(permissionOnSubscription.isEmpty());
// test nobody
+ AtomicBoolean execFlag = setAuthorizationOperationChecker(subject,
NamespaceOperation.GRANT_PERMISSION);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() ->
subAdmin.namespaces().grantPermissionOnSubscription(namespace, subscription,
Set.of(role)));
+ Assert.assertTrue(execFlag.get());
+ execFlag = setAuthorizationOperationChecker(subject,
NamespaceOperation.GET_PERMISSION);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() ->
subAdmin.namespaces().getPermissionOnSubscription(namespace));
+ Assert.assertTrue(execFlag.get());
+ execFlag = setAuthorizationOperationChecker(subject,
NamespaceOperation.REVOKE_PERMISSION);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() ->
subAdmin.namespaces().revokePermissionOnSubscription(namespace, subscription,
role));
+ Assert.assertTrue(execFlag.get());
+ clearAuthorizationOperationChecker();
for (AuthAction action : AuthAction.values()) {
superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
- setAuthorizationOperationChecker(subject,
NamespaceOperation.GRANT_PERMISSION);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
- () ->
subAdmin.namespaces().grantPermissionOnSubscription(namespace, subscription,
Set.of(role)));
- setAuthorizationOperationChecker(subject,
NamespaceOperation.GET_PERMISSION);
+ () ->
subAdmin.namespaces().grantPermissionOnSubscription(namespace, subscription,
Set.of(role)));
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
- () ->
subAdmin.namespaces().getPermissionOnSubscription(namespace));
- setAuthorizationOperationChecker(subject,
NamespaceOperation.REVOKE_PERMISSION);
+ () ->
subAdmin.namespaces().getPermissionOnSubscription(namespace));
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
- () ->
subAdmin.namespaces().revokePermissionOnSubscription(namespace, subscription,
role));
+ () ->
subAdmin.namespaces().revokePermissionOnSubscription(namespace, subscription,
role));
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
}
@@ -649,12 +703,11 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
final String random = UUID.randomUUID().toString();
final String namespace = "public/default";
final String topic = "persistent://" + namespace + "/" + random;
- final String subject = UUID.randomUUID().toString();
+ final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
superUserAdmin.topics().createNonPartitionedTopic(topic);
- @Cleanup
- final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
@@ -665,10 +718,10 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
tenantManagerAdmin.namespaces().clearNamespaceBacklog(namespace);
// test nobody
+ AtomicBoolean execFlag = setAuthorizationOperationChecker(subject,
NamespaceOperation.CLEAR_BACKLOG);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().clearNamespaceBacklog(namespace));
-
- setAuthorizationOperationChecker(subject,
NamespaceOperation.CLEAR_BACKLOG);
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
@@ -684,17 +737,16 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
superUserAdmin.topics().delete(topic, true);
}
- @Test
+ @Test
public void testClearNamespaceBundleBacklog() throws Exception {
final String random = UUID.randomUUID().toString();
final String namespace = "public/default";
final String topic = "persistent://" + namespace + "/" + random;
- final String subject = UUID.randomUUID().toString();
+ final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
superUserAdmin.topics().createNonPartitionedTopic(topic);
- @Cleanup
- final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
@@ -706,17 +758,17 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
final String defaultBundle = "0x00000000_0xffffffff";
- // test super admin
+ // test super admin
superUserAdmin.namespaces().clearNamespaceBundleBacklog(namespace,
defaultBundle);
// test tenant manager
tenantManagerAdmin.namespaces().clearNamespaceBundleBacklog(namespace,
defaultBundle);
// test nobody
- Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ AtomicBoolean execFlag = setAuthorizationOperationChecker(subject,
NamespaceOperation.CLEAR_BACKLOG);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() ->
subAdmin.namespaces().clearNamespaceBundleBacklog(namespace, defaultBundle));
-
- setAuthorizationOperationChecker(subject,
NamespaceOperation.CLEAR_BACKLOG);
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
@@ -737,12 +789,11 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
final String random = UUID.randomUUID().toString();
final String namespace = "public/default";
final String topic = "persistent://" + namespace + "/" + random;
- final String subject = UUID.randomUUID().toString();
+ final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
superUserAdmin.topics().createNonPartitionedTopic(topic);
- @Cleanup
- final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
@@ -756,17 +807,17 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
.subscriptionName("sub")
.subscribe().close();
- // test super admin
+ // test super admin
superUserAdmin.namespaces().unsubscribeNamespace(namespace, "sub");
// test tenant manager
tenantManagerAdmin.namespaces().unsubscribeNamespace(namespace, "sub");
// test nobody
- Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ AtomicBoolean execFlag = setAuthorizationOperationChecker(subject,
NamespaceOperation.UNSUBSCRIBE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().unsubscribeNamespace(namespace,
"sub"));
-
- setAuthorizationOperationChecker(subject,
NamespaceOperation.UNSUBSCRIBE);
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
@@ -786,13 +837,12 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
public void testUnsubscribeNamespaceBundle() throws Exception {
final String random = UUID.randomUUID().toString();
final String namespace = "public/default";
- final String topic = "persistent://" + namespace + "/" + random;
- final String subject = UUID.randomUUID().toString();
+ final String topic = "persistent://" + namespace + "/" + random ;
+ final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
superUserAdmin.topics().createNonPartitionedTopic(topic);
- @Cleanup
- final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
@@ -808,17 +858,17 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
final String defaultBundle = "0x00000000_0xffffffff";
- // test super admin
+ // test super admin
superUserAdmin.namespaces().unsubscribeNamespaceBundle(namespace,
defaultBundle, "sub");
// test tenant manager
tenantManagerAdmin.namespaces().unsubscribeNamespaceBundle(namespace,
defaultBundle, "sub");
// test nobody
- Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ AtomicBoolean execFlag = setAuthorizationOperationChecker(subject,
NamespaceOperation.UNSUBSCRIBE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() ->
subAdmin.namespaces().unsubscribeNamespaceBundle(namespace, defaultBundle,
"sub"));
-
- setAuthorizationOperationChecker(subject,
NamespaceOperation.UNSUBSCRIBE);
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
@@ -921,6 +971,7 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
tenantManagerAdmin.packages().updateMetadata(packageName,
updatedMetadata);
// ---- test nobody ---
+ AtomicBoolean execFlag = setAuthorizationOperationChecker(subject,
NamespaceOperation.PACKAGES);
File file3 = File.createTempFile("package-api-test", ".package");
@@ -954,9 +1005,7 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
updatedMetadata3.setProperties(Collections.singletonMap("key",
"value"));
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.packages().updateMetadata(packageName3,
updatedMetadata3));
-
-
- setAuthorizationOperationChecker(subject, NamespaceOperation.PACKAGES);
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
@@ -1022,7 +1071,7 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
@Test
@SneakyThrows
- public void testOffloadThresholdInSeconds() {
+ public void testDispatchRate() {
final String namespace = "public/default";
final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
@@ -1031,16 +1080,27 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
+ AtomicBoolean execFlag =
setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE,
PolicyOperation.READ);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
- () ->
subAdmin.namespaces().getOffloadThresholdInSeconds(namespace));
+ () -> subAdmin.namespaces().getDispatchRate(namespace));
+ Assert.assertTrue(execFlag.get());
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.RATE, PolicyOperation.WRITE);
+ DispatchRate dispatchRate =
+
DispatchRate.builder().dispatchThrottlingRateInByte(10).dispatchThrottlingRateInMsg(10).build();
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
- () ->
subAdmin.namespaces().setOffloadThresholdInSeconds(namespace, 10000));
+ () -> subAdmin.namespaces().setDispatchRate(namespace,
dispatchRate));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.RATE, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().removeDispatchRate(namespace));
+ Assert.assertTrue(execFlag.get());
}
@Test
@SneakyThrows
- public void testMaxSubscriptionsPerTopic() {
+ public void testSubscribeRate() {
final String namespace = "public/default";
final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
@@ -1049,13 +1109,985 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
+ AtomicBoolean execFlag =
setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE,
PolicyOperation.READ);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
- () ->
subAdmin.namespaces().getMaxSubscriptionsPerTopic(namespace));
+ () -> subAdmin.namespaces().getSubscribeRate(namespace));
+ Assert.assertTrue(execFlag.get());
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.RATE, PolicyOperation.WRITE);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
- () ->
subAdmin.namespaces().setMaxSubscriptionsPerTopic(namespace, 100));
+ () -> subAdmin.namespaces().setSubscribeRate(namespace, new
SubscribeRate()));
+ Assert.assertTrue(execFlag.get());
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.RATE, PolicyOperation.WRITE);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
- () ->
subAdmin.namespaces().removeMaxSubscriptionsPerTopic(namespace));
+ () -> subAdmin.namespaces().removeSubscribeRate(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testPublishRate() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE,
PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getPublishRate(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.RATE, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().setPublishRate(namespace, new
PublishRate(10, 10)));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.RATE, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().removePublishRate(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testSubscriptionDispatchRate() {
+ final String random = UUID.randomUUID().toString();
+ final String namespace = "public/default";
+ final String topic = "persistent://" + namespace + "/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE,
PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getSubscriptionDispatchRate(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.RATE, PolicyOperation.WRITE);
+ DispatchRate dispatchRate =
DispatchRate.builder().dispatchThrottlingRateInMsg(10).dispatchThrottlingRateInByte(10).build();
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.RATE, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().removeSubscriptionDispatchRate(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testCompactionThreshold() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
+ setAuthorizationPolicyOperationChecker(subject,
PolicyName.COMPACTION, PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getCompactionThreshold(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.COMPACTION, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().setCompactionThreshold(namespace,
100L * 1024L *1024L));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.COMPACTION, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().removeCompactionThreshold(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testAutoTopicCreation() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
+ setAuthorizationPolicyOperationChecker(subject,
PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getAutoTopicCreation(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag =
+ setAuthorizationPolicyOperationChecker(subject,
PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.WRITE);
+ AutoTopicCreationOverride build =
AutoTopicCreationOverride.builder().allowAutoTopicCreation(true).build();
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().setAutoTopicCreation(namespace,
build));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag =
+ setAuthorizationPolicyOperationChecker(subject,
PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().removeAutoTopicCreation(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testAutoSubscriptionCreation() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
setAuthorizationPolicyOperationChecker(subject,
PolicyName.AUTO_SUBSCRIPTION_CREATION,
+ PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getAutoSubscriptionCreation(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.AUTO_SUBSCRIPTION_CREATION,
+ PolicyOperation.WRITE);
+ AutoSubscriptionCreationOverride build =
+
AutoSubscriptionCreationOverride.builder().allowAutoSubscriptionCreation(true).build();
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().setAutoSubscriptionCreation(namespace, build));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.AUTO_SUBSCRIPTION_CREATION,
+ PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().removeAutoSubscriptionCreation(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testMaxUnackedMessagesPerConsumer() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_UNACKED,
+ PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getMaxUnackedMessagesPerConsumer(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.MAX_UNACKED,
+ PolicyOperation.WRITE);
+ AutoSubscriptionCreationOverride build =
+
AutoSubscriptionCreationOverride.builder().allowAutoSubscriptionCreation(true).build();
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().setMaxUnackedMessagesPerConsumer(namespace, 100));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.MAX_UNACKED,
+ PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().removeMaxUnackedMessagesPerConsumer(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testMaxUnackedMessagesPerSubscription() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_UNACKED,
+ PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getMaxUnackedMessagesPerSubscription(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.MAX_UNACKED,
+ PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().setMaxUnackedMessagesPerSubscription(namespace, 100));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.MAX_UNACKED,
+ PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().removeMaxUnackedMessagesPerSubscription(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testNamespaceResourceGroup() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
setAuthorizationPolicyOperationChecker(subject, PolicyName.RESOURCEGROUP,
+ PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getNamespaceResourceGroup(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.RESOURCEGROUP,
+ PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().setNamespaceResourceGroup(namespace, "test-group"));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.RESOURCEGROUP,
+ PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().removeNamespaceResourceGroup(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testDispatcherPauseOnAckStatePersistent() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
+ setAuthorizationPolicyOperationChecker(subject,
PolicyName.DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT,
+ PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getDispatcherPauseOnAckStatePersistent(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT,
+ PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().setDispatcherPauseOnAckStatePersistent(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT,
+ PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().removeDispatcherPauseOnAckStatePersistent(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testBacklogQuota() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
setAuthorizationPolicyOperationChecker(subject, PolicyName.BACKLOG,
PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getBacklogQuotaMap(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.BACKLOG, PolicyOperation.WRITE);
+ BacklogQuota backlogQuota =
BacklogQuota.builder().limitTime(10).limitSize(10).build();
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().setBacklogQuota(namespace,
backlogQuota));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.BACKLOG, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().removeBacklogQuota(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testDeduplicationSnapshotInterval() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
setAuthorizationPolicyOperationChecker(subject,
PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getDeduplicationSnapshotInterval(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().setDeduplicationSnapshotInterval(namespace, 100));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().removeDeduplicationSnapshotInterval(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testMaxSubscriptionsPerTopic() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
+ setAuthorizationPolicyOperationChecker(subject,
PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getMaxSubscriptionsPerTopic(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().setMaxSubscriptionsPerTopic(namespace, 10));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().removeMaxSubscriptionsPerTopic(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testMaxProducersPerTopic() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
+ setAuthorizationPolicyOperationChecker(subject,
PolicyName.MAX_PRODUCERS, PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getMaxProducersPerTopic(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().setMaxProducersPerTopic(namespace,
10));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().removeMaxProducersPerTopic(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testMaxConsumersPerTopic() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
+ setAuthorizationPolicyOperationChecker(subject,
PolicyName.MAX_CONSUMERS, PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getMaxConsumersPerTopic(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().setMaxConsumersPerTopic(namespace,
10));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().removeMaxConsumersPerTopic(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testNamespaceReplicationClusters() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
+ setAuthorizationPolicyOperationChecker(subject,
PolicyName.REPLICATION, PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getNamespaceReplicationClusters(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.REPLICATION, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("test")));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testReplicatorDispatchRate() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
+ setAuthorizationPolicyOperationChecker(subject,
PolicyName.REPLICATION_RATE, PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getReplicatorDispatchRate(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.REPLICATION_RATE, PolicyOperation.WRITE);
+ DispatchRate build =
+
DispatchRate.builder().dispatchThrottlingRateInByte(10).dispatchThrottlingRateInMsg(10).build();
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().setReplicatorDispatchRate(namespace, build));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.REPLICATION_RATE, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().removeReplicatorDispatchRate(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testMaxConsumersPerSubscription() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
+ setAuthorizationPolicyOperationChecker(subject,
PolicyName.MAX_CONSUMERS, PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getMaxConsumersPerSubscription(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().setMaxConsumersPerSubscription(namespace, 10));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().removeMaxConsumersPerSubscription(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testOffloadThreshold() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
setAuthorizationPolicyOperationChecker(subject, PolicyName.OFFLOAD,
PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getOffloadThreshold(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.OFFLOAD, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().setOffloadThreshold(namespace,
10));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testOffloadPolicies() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
setAuthorizationPolicyOperationChecker(subject, PolicyName.OFFLOAD,
PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getOffloadPolicies(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.OFFLOAD, PolicyOperation.WRITE);
+ OffloadPolicies offloadPolicies =
OffloadPolicies.builder().managedLedgerOffloadThresholdInBytes(10L).build();
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().setOffloadPolicies(namespace,
offloadPolicies));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.OFFLOAD, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().removeOffloadPolicies(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testMaxTopicsPerNamespace() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_TOPICS,
PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getMaxTopicsPerNamespace(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.MAX_TOPICS, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().setMaxTopicsPerNamespace(namespace, 10));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.MAX_TOPICS, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().removeMaxTopicsPerNamespace(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testDeduplicationStatus() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
+ setAuthorizationPolicyOperationChecker(subject,
PolicyName.DEDUPLICATION, PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getDeduplicationStatus(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.DEDUPLICATION, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().setDeduplicationStatus(namespace,
true));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.DEDUPLICATION, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().removeDeduplicationStatus(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testPersistence() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
+ setAuthorizationPolicyOperationChecker(subject,
PolicyName.PERSISTENCE, PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getPersistence(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.PERSISTENCE, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().setPersistence(namespace, new
PersistencePolicies(10, 10, 10, 10)));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.PERSISTENCE, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().removePersistence(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testNamespaceMessageTTL() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
setAuthorizationPolicyOperationChecker(subject, PolicyName.TTL,
PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getNamespaceMessageTTL(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.TTL, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().setNamespaceMessageTTL(namespace,
10));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.TTL, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().removeNamespaceMessageTTL(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testSubscriptionExpirationTime() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
+ setAuthorizationPolicyOperationChecker(subject,
PolicyName.SUBSCRIPTION_EXPIRATION_TIME,
+ PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getSubscriptionExpirationTime(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.SUBSCRIPTION_EXPIRATION_TIME, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().setSubscriptionExpirationTime(namespace, 10));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.SUBSCRIPTION_EXPIRATION_TIME, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().removeSubscriptionExpirationTime(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testDelayedDeliveryMessages() {
+ final String random = UUID.randomUUID().toString();
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
+ setAuthorizationPolicyOperationChecker(subject,
PolicyName.DELAYED_DELIVERY, PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getDelayedDelivery(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testRetention() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
setAuthorizationPolicyOperationChecker(subject, PolicyName.RETENTION,
PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getRetention(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.RETENTION, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().setRetention(namespace, new
RetentionPolicies(10, 10)));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.RETENTION, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().removeRetention(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testInactiveTopicPolicies() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
setAuthorizationPolicyOperationChecker(subject, PolicyName.INACTIVE_TOPIC,
PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getInactiveTopicPolicies(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE);
+ InactiveTopicPolicies inactiveTopicPolicies = new
InactiveTopicPolicies(
+ InactiveTopicDeleteMode.delete_when_no_subscriptions, 10,
false);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().setInactiveTopicPolicies(namespace,
inactiveTopicPolicies));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().removeInactiveTopicPolicies(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testNamespaceAntiAffinityGroup() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
setAuthorizationPolicyOperationChecker(subject, PolicyName.ANTI_AFFINITY,
PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getNamespaceAntiAffinityGroup(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().setNamespaceAntiAffinityGroup(namespace,
"invalid-group"));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testOffloadDeleteLagMs() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
setAuthorizationPolicyOperationChecker(subject, PolicyName.OFFLOAD,
PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getOffloadDeleteLagMs(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.OFFLOAD, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().setOffloadDeleteLag(namespace,
100, TimeUnit.HOURS));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testOffloadThresholdInSeconds() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
+ setAuthorizationPolicyOperationChecker(subject,
PolicyName.OFFLOAD, PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getOffloadThresholdInSeconds(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.OFFLOAD, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().setOffloadThresholdInSeconds(namespace, 10000));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testNamespaceEntryFilters() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
setAuthorizationPolicyOperationChecker(subject, PolicyName.ENTRY_FILTERS,
PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getNamespaceEntryFilters(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().setNamespaceEntryFilters(namespace, new
EntryFilters("filter1")));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().removeNamespaceEntryFilters(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testEncryptionRequiredStatus() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
setAuthorizationPolicyOperationChecker(subject, PolicyName.ENCRYPTION,
PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getEncryptionRequiredStatus(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.ENCRYPTION, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().setEncryptionRequiredStatus(namespace, false));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testSubscriptionTypesEnabled() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
setAuthorizationPolicyOperationChecker(subject,
PolicyName.SUBSCRIPTION_AUTH_MODE,
+ PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getSubscriptionTypesEnabled(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().setSubscriptionTypesEnabled(namespace,
Sets.newHashSet(SubscriptionType.Failover)));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().removeSubscriptionTypesEnabled(namespace));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testIsAllowAutoUpdateSchema() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
setAuthorizationPolicyOperationChecker(subject,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getIsAllowAutoUpdateSchema(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().setIsAllowAutoUpdateSchema(namespace, true));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testSchemaAutoUpdateCompatibilityStrategy() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
setAuthorizationPolicyOperationChecker(subject,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace,
AutoUpdateDisabled));
+ Assert.assertTrue(execFlag.get());
+ }
+
+ @Test
+ @SneakyThrows
+ public void testSchemaValidationEnforced() {
+ final String namespace = "public/default";
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ AtomicBoolean execFlag =
setAuthorizationPolicyOperationChecker(subject,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getSchemaValidationEnforced(namespace));
+ Assert.assertTrue(execFlag.get());
+
+ execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().setSchemaValidationEnforced(namespace, true));
+ Assert.assertTrue(execFlag.get());
}
}