This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6de711d4008 [improve][test] Add operation authentication test for
namespace API (#22398)
6de711d4008 is described below
commit 6de711d4008338a875c5c145e856c90dcb041f8f
Author: Cong Zhao <[email protected]>
AuthorDate: Tue Apr 9 16:38:18 2024 +0800
[improve][test] Add operation authentication test for namespace API (#22398)
---
.../pulsar/broker/admin/NamespaceAuthZTest.java | 882 ++++++++++++++++++++-
1 file changed, 875 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
index ce0b925614c..d5a0468f340 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
@@ -19,23 +19,47 @@
package org.apache.pulsar.broker.admin;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
import io.jsonwebtoken.Jwts;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
import lombok.Cleanup;
import lombok.SneakyThrows;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
+import org.apache.pulsar.common.policies.data.BundlesData;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import
org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
+import org.apache.pulsar.packages.management.core.common.PackageMetadata;
import org.apache.pulsar.security.MockedPulsarStandalone;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
@Test(groups = "broker-admin")
public class NamespaceAuthZTest extends MockedPulsarStandalone {
@@ -44,17 +68,27 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
private PulsarAdmin tenantManagerAdmin;
+ private PulsarClient pulsarClient;
+
+ private AuthorizationService authorizationService;
+
+ private AuthorizationService orignalAuthorizationService;
+
private static final String TENANT_ADMIN_SUBJECT =
UUID.randomUUID().toString();
private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
.claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
@SneakyThrows
@BeforeClass
- public void before() {
+ public void setup() {
+ getServiceConfiguration().setEnablePackagesManagement(true);
+
getServiceConfiguration().setPackagesManagementStorageProvider(MockedPackagesStorageProvider.class.getName());
+ getServiceConfiguration().setDefaultNumberOfNamespaceBundles(1);
+ getServiceConfiguration().setForceDeleteNamespaceAllowed(true);
configureTokenAuthentication();
configureDefaultAuthorization();
start();
- this.superUserAdmin =PulsarAdmin.builder()
+ this.superUserAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(SUPER_USER_TOKEN))
.build();
@@ -65,12 +99,13 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
.build();
+ this.pulsarClient = super.getPulsarService().getClient();
}
@SneakyThrows
@AfterClass
- public void after() {
+ public void cleanup() {
if (superUserAdmin != null) {
superUserAdmin.close();
}
@@ -80,6 +115,33 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
close();
}
+ @BeforeMethod
+ public void before() throws IllegalAccessException {
+ orignalAuthorizationService =
getPulsarService().getBrokerService().getAuthorizationService();
+ authorizationService = Mockito.spy(orignalAuthorizationService);
+ FieldUtils.writeField(getPulsarService().getBrokerService(),
"authorizationService",
+ authorizationService, true);
+ }
+
+ @AfterMethod
+ public void after() throws IllegalAccessException, PulsarAdminException {
+ FieldUtils.writeField(getPulsarService().getBrokerService(),
"authorizationService",
+ orignalAuthorizationService, true);
+ superUserAdmin.namespaces().deleteNamespace("public/default", true);
+ superUserAdmin.namespaces().createNamespace("public/default");
+ }
+
+ private void setAuthorizationOperationChecker(String role,
NamespaceOperation operation) {
+ Mockito.doAnswer(invocationOnMock -> {
+ String role_ = invocationOnMock.getArgument(2);
+ if (role.equals(role_)) {
+ NamespaceOperation operation_ =
invocationOnMock.getArgument(1);
+ Assert.assertEquals(operation_, operation);
+ }
+ return invocationOnMock.callRealMethod();
+
}).when(authorizationService).allowNamespaceOperationAsync(Mockito.any(),
Mockito.any(), Mockito.any(),
+ Mockito.any());
+ }
@SneakyThrows
@Test
@@ -160,4 +222,810 @@ public class NamespaceAuthZTest extends
MockedPulsarStandalone {
}
superUserAdmin.topics().delete(topic, true);
}
+
+ @Test
+ public void testTopics() throws Exception {
+ final String random = UUID.randomUUID().toString();
+ final String namespace = "public/default";
+ final String topic = "persistent://" + namespace + "/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+
+ // test super admin
+ superUserAdmin.namespaces().getTopics(namespace);
+
+ // test tenant manager
+ tenantManagerAdmin.namespaces().getTopics(namespace);
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getTopics(namespace));
+
+ setAuthorizationOperationChecker(subject,
NamespaceOperation.GET_TOPICS);
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
+ if (AuthAction.consume == action || AuthAction.produce == action) {
+ subAdmin.namespaces().getTopics(namespace);
+ } else {
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getTopics(namespace));
+ }
+
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
+ }
+
+ superUserAdmin.topics().delete(topic, true);
+ }
+
+ @Test
+ public void testBookieAffinityGroup() throws Exception {
+ final String random = UUID.randomUUID().toString();
+ final String namespace = "public/default";
+ final String topic = "persistent://" + namespace + "/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+
+ // test super admin
+ BookieAffinityGroupData bookieAffinityGroupData =
BookieAffinityGroupData.builder()
+ .bookkeeperAffinityGroupPrimary("aaa")
+ .bookkeeperAffinityGroupSecondary("bbb")
+ .build();
+ superUserAdmin.namespaces().setBookieAffinityGroup(namespace,
bookieAffinityGroupData);
+ BookieAffinityGroupData bookieAffinityGroup =
superUserAdmin.namespaces().getBookieAffinityGroup(namespace);
+ Assert.assertEquals(bookieAffinityGroupData, bookieAffinityGroup);
+ superUserAdmin.namespaces().deleteBookieAffinityGroup(namespace);
+ bookieAffinityGroup =
superUserAdmin.namespaces().getBookieAffinityGroup(namespace);
+ Assert.assertNull(bookieAffinityGroup);
+
+ // test tenant manager
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
tenantManagerAdmin.namespaces().setBookieAffinityGroup(namespace,
bookieAffinityGroupData));
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
tenantManagerAdmin.namespaces().getBookieAffinityGroup(namespace));
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
tenantManagerAdmin.namespaces().deleteBookieAffinityGroup(namespace));
+
+ // test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().setBookieAffinityGroup(namespace,
bookieAffinityGroupData));
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getBookieAffinityGroup(namespace));
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().deleteBookieAffinityGroup(namespace));
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().setBookieAffinityGroup(namespace,
bookieAffinityGroupData));
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getBookieAffinityGroup(namespace));
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().deleteBookieAffinityGroup(namespace));
+
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
+ }
+
+ superUserAdmin.topics().delete(topic, true);
+ }
+
+
+ @Test
+ public void testGetBundles() throws Exception {
+ final String random = UUID.randomUUID().toString();
+ final String namespace = "public/default";
+ final String topic = "persistent://" + namespace + "/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+ .topic(topic)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ producer.send("message".getBytes());
+
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ // test super admin
+ superUserAdmin.namespaces().getBundles(namespace);
+
+ // test tenant manager
+ tenantManagerAdmin.namespaces().getBundles(namespace);
+
+ // test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getBundles(namespace));
+
+ setAuthorizationOperationChecker(subject,
NamespaceOperation.GET_BUNDLE);
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
+ if (AuthAction.consume == action || AuthAction.produce == action) {
+ subAdmin.namespaces().getBundles(namespace);
+ } else {
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getBundles(namespace));
+ }
+
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
+ }
+
+ superUserAdmin.topics().delete(topic, true);
+ }
+
+ @Test
+ public void testUnloadBundles() throws Exception {
+ final String random = UUID.randomUUID().toString();
+ final String namespace = "public/default";
+ final String topic = "persistent://" + namespace + "/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+ .topic(topic)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ producer.send("message".getBytes());
+
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+
+ final String defaultBundle = "0x00000000_0xffffffff";
+
+ // test super admin
+ superUserAdmin.namespaces().unloadNamespaceBundle(namespace,
defaultBundle);
+
+ // test tenant manager
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
tenantManagerAdmin.namespaces().unloadNamespaceBundle(namespace,
defaultBundle));
+
+ // test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().unloadNamespaceBundle(namespace,
defaultBundle));
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().unloadNamespaceBundle(namespace, defaultBundle));
+
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
+ }
+
+ superUserAdmin.topics().delete(topic, true);
+ }
+
+ @Test
+ public void testSplitBundles() throws Exception {
+ final String random = UUID.randomUUID().toString();
+ final String namespace = "public/default";
+ final String topic = "persistent://" + namespace + "/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+ .topic(topic)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ producer.send("message".getBytes());
+
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+
+ final String defaultBundle = "0x00000000_0xffffffff";
+
+ // test super admin
+ superUserAdmin.namespaces().splitNamespaceBundle(namespace,
defaultBundle, false, null);
+
+ // test tenant manager
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
tenantManagerAdmin.namespaces().splitNamespaceBundle(namespace, defaultBundle,
false, null));
+
+ // test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().splitNamespaceBundle(namespace,
defaultBundle, false, null));
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().splitNamespaceBundle(namespace, defaultBundle, false,
null));
+
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
+ }
+
+ superUserAdmin.topics().delete(topic, true);
+ }
+
+ @Test
+ public void testDeleteBundles() throws Exception {
+ final String random = UUID.randomUUID().toString();
+ final String namespace = "public/default";
+ final String topic = "persistent://" + namespace + "/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+ .topic(topic)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ producer.send("message".getBytes());
+
+ for (int i = 0; i < 3; i++) {
+ superUserAdmin.namespaces().splitNamespaceBundle(namespace,
Policies.BundleType.LARGEST.toString(), false, null);
+ }
+
+ BundlesData bundles =
superUserAdmin.namespaces().getBundles(namespace);
+ Assert.assertEquals(bundles.getNumBundles(), 4);
+ List<String> boundaries = bundles.getBoundaries();
+ Assert.assertEquals(boundaries.size(), 5);
+
+ List<String> bundleRanges = new ArrayList<>();
+ for (int i = 0; i < boundaries.size() - 1; i++) {
+ String bundleRange = boundaries.get(i) + "_" + boundaries.get(i +
1);
+ List<Topic> allTopicsFromNamespaceBundle =
getPulsarService().getBrokerService()
+ .getAllTopicsFromNamespaceBundle(namespace,
namespace + "/" + bundleRange);
+ System.out.println(StringUtils.join(allTopicsFromNamespaceBundle));
+ if (allTopicsFromNamespaceBundle.isEmpty()) {
+ bundleRanges.add(bundleRange);
+ }
+ }
+
+ // test super admin
+ superUserAdmin.namespaces().deleteNamespaceBundle(namespace,
bundleRanges.get(0));
+
+ // test tenant manager
+ tenantManagerAdmin.namespaces().deleteNamespaceBundle(namespace,
bundleRanges.get(1));
+
+ // test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().deleteNamespaceBundle(namespace,
bundleRanges.get(1)));
+
+ setAuthorizationOperationChecker(subject,
NamespaceOperation.DELETE_BUNDLE);
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().deleteNamespaceBundle(namespace,
bundleRanges.get(1)));
+
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
+ }
+ }
+
+ @Test
+ public void testPermission() throws Exception {
+ final String random = UUID.randomUUID().toString();
+ final String namespace = "public/default";
+ final String topic = "persistent://" + namespace + "/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+ final String role = "sub";
+ final AuthAction testAction = AuthAction.consume;
+
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+
+
+ // test super admin
+ superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
role, Set.of(testAction));
+ Map<String, Set<AuthAction>> permissions =
superUserAdmin.namespaces().getPermissions(namespace);
+ Assert.assertEquals(permissions.get(role), Set.of(testAction));
+ superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace,
role);
+ permissions = superUserAdmin.namespaces().getPermissions(namespace);
+ Assert.assertTrue(permissions.isEmpty());
+
+ // test tenant manager
+ tenantManagerAdmin.namespaces().grantPermissionOnNamespace(namespace,
role, Set.of(testAction));
+ permissions =
tenantManagerAdmin.namespaces().getPermissions(namespace);
+ Assert.assertEquals(permissions.get(role), Set.of(testAction));
+
tenantManagerAdmin.namespaces().revokePermissionsOnNamespace(namespace, role);
+ permissions =
tenantManagerAdmin.namespaces().getPermissions(namespace);
+ Assert.assertTrue(permissions.isEmpty());
+
+ // test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().grantPermissionOnNamespace(namespace, role,
Set.of(testAction)));
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getPermissions(namespace));
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().revokePermissionsOnNamespace(namespace, role));
+
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
+ setAuthorizationOperationChecker(subject,
NamespaceOperation.GRANT_PERMISSION);
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().grantPermissionOnNamespace(namespace, role,
Set.of(testAction)));
+ setAuthorizationOperationChecker(subject,
NamespaceOperation.GET_PERMISSION);
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getPermissions(namespace));
+ setAuthorizationOperationChecker(subject,
NamespaceOperation.REVOKE_PERMISSION);
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().revokePermissionsOnNamespace(namespace, role));
+
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
+ }
+
+ superUserAdmin.topics().delete(topic, true);
+ }
+
+ @Test
+ public void testPermissionOnSubscription() throws Exception {
+ final String random = UUID.randomUUID().toString();
+ final String namespace = "public/default";
+ final String topic = "persistent://" + namespace + "/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+
+ final String subscription = "my-sub";
+ final String role = "sub";
+ pulsarClient.newConsumer().topic(topic)
+ .subscriptionName(subscription)
+ .subscribe().close();
+
+
+ // test super admin
+ superUserAdmin.namespaces().grantPermissionOnSubscription(namespace,
subscription, Set.of(role));
+ Map<String, Set<String>> permissionOnSubscription =
superUserAdmin.namespaces().getPermissionOnSubscription(namespace);
+ Assert.assertEquals(permissionOnSubscription.get(subscription),
Set.of(role));
+ superUserAdmin.namespaces().revokePermissionOnSubscription(namespace,
subscription, role);
+ permissionOnSubscription =
superUserAdmin.namespaces().getPermissionOnSubscription(namespace);
+ Assert.assertTrue(permissionOnSubscription.isEmpty());
+
+ // test tenant manager
+
tenantManagerAdmin.namespaces().grantPermissionOnSubscription(namespace,
subscription, Set.of(role));
+ permissionOnSubscription =
tenantManagerAdmin.namespaces().getPermissionOnSubscription(namespace);
+ Assert.assertEquals(permissionOnSubscription.get(subscription),
Set.of(role));
+
tenantManagerAdmin.namespaces().revokePermissionOnSubscription(namespace,
subscription, role);
+ permissionOnSubscription =
tenantManagerAdmin.namespaces().getPermissionOnSubscription(namespace);
+ Assert.assertTrue(permissionOnSubscription.isEmpty());
+
+ // test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().grantPermissionOnSubscription(namespace, subscription,
Set.of(role)));
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getPermissionOnSubscription(namespace));
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().revokePermissionOnSubscription(namespace, subscription,
role));
+
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
+ setAuthorizationOperationChecker(subject,
NamespaceOperation.GRANT_PERMISSION);
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().grantPermissionOnSubscription(namespace, subscription,
Set.of(role)));
+ setAuthorizationOperationChecker(subject,
NamespaceOperation.GET_PERMISSION);
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().getPermissionOnSubscription(namespace));
+ setAuthorizationOperationChecker(subject,
NamespaceOperation.REVOKE_PERMISSION);
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().revokePermissionOnSubscription(namespace, subscription,
role));
+
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
+ }
+
+ superUserAdmin.topics().delete(topic, true);
+ }
+
+ @Test
+ public void testClearBacklog() throws Exception {
+ final String random = UUID.randomUUID().toString();
+ final String namespace = "public/default";
+ final String topic = "persistent://" + namespace + "/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ // test super admin
+ superUserAdmin.namespaces().clearNamespaceBacklog(namespace);
+
+ // test tenant manager
+ tenantManagerAdmin.namespaces().clearNamespaceBacklog(namespace);
+
+ // test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().clearNamespaceBacklog(namespace));
+
+ setAuthorizationOperationChecker(subject,
NamespaceOperation.CLEAR_BACKLOG);
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
+ if (AuthAction.consume == action) {
+ subAdmin.namespaces().clearNamespaceBacklog(namespace);
+ } else {
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().clearNamespaceBacklog(namespace));
+ }
+
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
+ }
+
+ superUserAdmin.topics().delete(topic, true);
+ }
+
+ @Test
+ public void testClearNamespaceBundleBacklog() throws Exception {
+ final String random = UUID.randomUUID().toString();
+ final String namespace = "public/default";
+ final String topic = "persistent://" + namespace + "/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+
+ @Cleanup
+ Producer<byte[]> batchProducer =
pulsarClient.newProducer().topic(topic)
+ .enableBatching(false)
+ .create();
+
+ final String defaultBundle = "0x00000000_0xffffffff";
+
+ // test super admin
+ superUserAdmin.namespaces().clearNamespaceBundleBacklog(namespace,
defaultBundle);
+
+ // test tenant manager
+ tenantManagerAdmin.namespaces().clearNamespaceBundleBacklog(namespace,
defaultBundle);
+
+ // test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().clearNamespaceBundleBacklog(namespace, defaultBundle));
+
+ setAuthorizationOperationChecker(subject,
NamespaceOperation.CLEAR_BACKLOG);
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
+ if (AuthAction.consume == action) {
+ subAdmin.namespaces().clearNamespaceBundleBacklog(namespace,
defaultBundle);
+ } else {
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().clearNamespaceBundleBacklog(namespace, defaultBundle));
+ }
+
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
+ }
+
+ superUserAdmin.topics().delete(topic, true);
+ }
+
+ @Test
+ public void testUnsubscribeNamespace() throws Exception {
+ final String random = UUID.randomUUID().toString();
+ final String namespace = "public/default";
+ final String topic = "persistent://" + namespace + "/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+
+ @Cleanup
+ Producer<byte[]> batchProducer =
pulsarClient.newProducer().topic(topic)
+ .enableBatching(false)
+ .create();
+
+ pulsarClient.newConsumer().topic(topic)
+ .subscriptionName("sub")
+ .subscribe().close();
+
+ // test super admin
+ superUserAdmin.namespaces().unsubscribeNamespace(namespace, "sub");
+
+ // test tenant manager
+ tenantManagerAdmin.namespaces().unsubscribeNamespace(namespace, "sub");
+
+ // test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().unsubscribeNamespace(namespace,
"sub"));
+
+ setAuthorizationOperationChecker(subject,
NamespaceOperation.UNSUBSCRIBE);
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
+ if (AuthAction.consume == action) {
+ subAdmin.namespaces().unsubscribeNamespace(namespace, "sub");
+ } else {
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().unsubscribeNamespace(namespace, "sub"));
+ }
+
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
+ }
+
+ superUserAdmin.topics().delete(topic, true);
+ }
+
+ @Test
+ public void testUnsubscribeNamespaceBundle() throws Exception {
+ final String random = UUID.randomUUID().toString();
+ final String namespace = "public/default";
+ final String topic = "persistent://" + namespace + "/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+
+ @Cleanup
+ Producer<byte[]> batchProducer =
pulsarClient.newProducer().topic(topic)
+ .enableBatching(false)
+ .create();
+
+ pulsarClient.newConsumer().topic(topic)
+ .subscriptionName("sub")
+ .subscribe().close();
+
+ final String defaultBundle = "0x00000000_0xffffffff";
+
+ // test super admin
+ superUserAdmin.namespaces().unsubscribeNamespaceBundle(namespace,
defaultBundle, "sub");
+
+ // test tenant manager
+ tenantManagerAdmin.namespaces().unsubscribeNamespaceBundle(namespace,
defaultBundle, "sub");
+
+ // test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().unsubscribeNamespaceBundle(namespace, defaultBundle,
"sub"));
+
+ setAuthorizationOperationChecker(subject,
NamespaceOperation.UNSUBSCRIBE);
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
+ if (AuthAction.consume == action) {
+ subAdmin.namespaces().unsubscribeNamespaceBundle(namespace,
defaultBundle, "sub");
+ } else {
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.namespaces().unsubscribeNamespaceBundle(namespace, defaultBundle,
"sub"));
+ }
+
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
+ }
+
+ superUserAdmin.topics().delete(topic, true);
+ }
+
+ @Test
+ public void testPackageAPI() throws Exception {
+ final String namespace = "public/default";
+
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+
+
+ File file = File.createTempFile("package-api-test", ".package");
+
+ // testing upload api
+ String packageName = "function://public/default/test@v1";
+ PackageMetadata originalMetadata =
PackageMetadata.builder().description("test").build();
+ superUserAdmin.packages().upload(originalMetadata, packageName,
file.getPath());
+
+ // testing download api
+ String downloadPath = new File(file.getParentFile(),
"package-api-test-download.package").getPath();
+ superUserAdmin.packages().download(packageName, downloadPath);
+ File downloadFile = new File(downloadPath);
+ assertTrue(downloadFile.exists());
+ downloadFile.delete();
+
+ // testing list packages api
+ List<String> packages =
superUserAdmin.packages().listPackages("function", "public/default");
+ assertEquals(packages.size(), 1);
+ assertEquals(packages.get(0), "test");
+
+ // testing list versions api
+ List<String> versions =
superUserAdmin.packages().listPackageVersions(packageName);
+ assertEquals(versions.size(), 1);
+ assertEquals(versions.get(0), "v1");
+
+ // testing get packages api
+ PackageMetadata metadata =
superUserAdmin.packages().getMetadata(packageName);
+ assertEquals(metadata.getDescription(),
originalMetadata.getDescription());
+ assertNull(metadata.getContact());
+ assertTrue(metadata.getModificationTime() > 0);
+ assertTrue(metadata.getCreateTime() > 0);
+ assertNull(metadata.getProperties());
+
+ // testing update package metadata api
+ PackageMetadata updatedMetadata = originalMetadata;
+ updatedMetadata.setContact("[email protected]");
+ updatedMetadata.setProperties(Collections.singletonMap("key",
"value"));
+ superUserAdmin.packages().updateMetadata(packageName, updatedMetadata);
+
+ superUserAdmin.packages().getMetadata(packageName);
+
+ // ---- test tenant manager ---
+
+ file = File.createTempFile("package-api-test", ".package");
+
+ // test tenant manager
+ packageName = "function://public/default/test@v2";
+ originalMetadata =
PackageMetadata.builder().description("test").build();
+ tenantManagerAdmin.packages().upload(originalMetadata, packageName,
file.getPath());
+
+ // testing download api
+ downloadPath = new File(file.getParentFile(),
"package-api-test-download.package").getPath();
+ tenantManagerAdmin.packages().download(packageName, downloadPath);
+ downloadFile = new File(downloadPath);
+ assertTrue(downloadFile.exists());
+ downloadFile.delete();
+
+ // testing list packages api
+ packages = tenantManagerAdmin.packages().listPackages("function",
"public/default");
+ assertEquals(packages.size(), 1);
+ assertEquals(packages.get(0), "test");
+
+ // testing list versions api
+ tenantManagerAdmin.packages().listPackageVersions(packageName);
+
+ // testing get packages api
+ tenantManagerAdmin.packages().getMetadata(packageName);
+
+ // testing update package metadata api
+ updatedMetadata = originalMetadata;
+ updatedMetadata.setContact("[email protected]");
+ updatedMetadata.setProperties(Collections.singletonMap("key",
"value"));
+ tenantManagerAdmin.packages().updateMetadata(packageName,
updatedMetadata);
+
+ // ---- test nobody ---
+
+ File file3 = File.createTempFile("package-api-test", ".package");
+
+ // test tenant manager
+ String packageName3 = "function://public/default/test@v3";
+ PackageMetadata originalMetadata3 =
PackageMetadata.builder().description("test").build();
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.packages().upload(originalMetadata3,
packageName3, file3.getPath()));
+
+
+ // testing download api
+ String downloadPath3 = new File(file3.getParentFile(),
"package-api-test-download.package").getPath();
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.packages().download(packageName3,
downloadPath3));
+
+ // testing list packages api
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.packages().listPackages("function",
"public/default"));
+
+ // testing list versions api
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.packages().listPackageVersions(packageName3));
+
+ // testing get packages api
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.packages().getMetadata(packageName3));
+
+ // testing update package metadata api
+ PackageMetadata updatedMetadata3 = originalMetadata;
+ updatedMetadata3.setContact("[email protected]");
+ updatedMetadata3.setProperties(Collections.singletonMap("key",
"value"));
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.packages().updateMetadata(packageName3,
updatedMetadata3));
+
+
+ setAuthorizationOperationChecker(subject, NamespaceOperation.PACKAGES);
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
+ File file4 = File.createTempFile("package-api-test", ".package");
+ String packageName4 = "function://public/default/test@v4";
+ PackageMetadata originalMetadata4 =
PackageMetadata.builder().description("test").build();
+ String downloadPath4 = new File(file3.getParentFile(),
"package-api-test-download.package").getPath();
+ if (AuthAction.packages == action) {
+ subAdmin.packages().upload(originalMetadata4, packageName4,
file.getPath());
+
+ // testing download api
+ subAdmin.packages().download(packageName4, downloadPath4);
+ downloadFile = new File(downloadPath4);
+ assertTrue(downloadFile.exists());
+ downloadFile.delete();
+
+ // testing list packages api
+ packages = subAdmin.packages().listPackages("function",
"public/default");
+ assertEquals(packages.size(), 1);
+ assertEquals(packages.get(0), "test");
+
+ // testing list versions api
+ subAdmin.packages().listPackageVersions(packageName4);
+
+ // testing get packages api
+ subAdmin.packages().getMetadata(packageName4);
+
+ // testing update package metadata api
+ PackageMetadata updatedMetadata4 = originalMetadata;
+ updatedMetadata4.setContact("[email protected]");
+ updatedMetadata4.setProperties(Collections.singletonMap("key",
"value"));
+ subAdmin.packages().updateMetadata(packageName,
updatedMetadata4);
+ } else {
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.packages().upload(originalMetadata4,
packageName4, file4.getPath()));
+
+ // testing download api
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.packages().download(packageName4,
downloadPath4));
+
+ // testing list packages api
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.packages().listPackages("function",
"public/default"));
+
+ // testing list versions api
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.packages().listPackageVersions(packageName4));
+
+ // testing get packages api
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.packages().getMetadata(packageName4));
+
+ // testing update package metadata api
+ PackageMetadata updatedMetadata4 = originalMetadata;
+ updatedMetadata4.setContact("[email protected]");
+ updatedMetadata4.setProperties(Collections.singletonMap("key",
"value"));
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.packages().updateMetadata(packageName4,
updatedMetadata4));
+ }
+
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
+ }
+ }
}