This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9d72e6bd847 [improve][test] Add topic operation checker for topic API
(#22468)
9d72e6bd847 is described below
commit 9d72e6bd847df85a7d18f1827274df96a446798f
Author: Cong Zhao <[email protected]>
AuthorDate: Mon Apr 15 16:15:59 2024 +0800
[improve][test] Add topic operation checker for topic API (#22468)
---
.../apache/pulsar/broker/admin/TopicAuthZTest.java | 156 ++++++++++++++++++---
1 file changed, 135 insertions(+), 21 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
index d09bc0a3ffd..e6ff0ce2bb4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
@@ -20,9 +20,18 @@
package org.apache.pulsar.broker.admin;
import io.jsonwebtoken.Jwts;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import lombok.SneakyThrows;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
@@ -37,22 +46,21 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.security.MockedPulsarStandalone;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
@Test(groups = "broker-admin")
public class TopicAuthZTest extends MockedPulsarStandalone {
@@ -61,13 +69,17 @@ public class TopicAuthZTest extends MockedPulsarStandalone {
private PulsarAdmin tenantManagerAdmin;
+ private AuthorizationService authorizationService;
+
+ private AuthorizationService orignalAuthorizationService;
+
private static final String TENANT_ADMIN_SUBJECT =
UUID.randomUUID().toString();
private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
.claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
@SneakyThrows
@BeforeClass(alwaysRun = true)
- public void before() {
+ public void setup() {
configureTokenAuthentication();
configureDefaultAuthorization();
enableTransaction();
@@ -99,7 +111,7 @@ public class TopicAuthZTest extends MockedPulsarStandalone {
@SneakyThrows
@AfterClass(alwaysRun = true)
- public void after() {
+ public void cleanup() {
if (superUserAdmin != null) {
superUserAdmin.close();
}
@@ -109,6 +121,51 @@ public class TopicAuthZTest extends MockedPulsarStandalone
{
close();
}
+ @BeforeMethod
+ public void before() throws IllegalAccessException {
+ orignalAuthorizationService =
getPulsarService().getBrokerService().getAuthorizationService();
+ authorizationService = Mockito.spy(orignalAuthorizationService);
+ FieldUtils.writeField(getPulsarService().getBrokerService(),
"authorizationService",
+ authorizationService, true);
+ }
+
+ @AfterMethod
+ public void after() throws IllegalAccessException {
+ FieldUtils.writeField(getPulsarService().getBrokerService(),
"authorizationService",
+ orignalAuthorizationService, true);
+ }
+
+ private AtomicBoolean setAuthorizationTopicOperationChecker(String role,
Object operation) {
+ AtomicBoolean execFlag = new AtomicBoolean(false);
+ if (operation instanceof TopicOperation) {
+ Mockito.doAnswer(invocationOnMock -> {
+ String role_ = invocationOnMock.getArgument(2);
+ if (role.equals(role_)) {
+ TopicOperation operation_ = invocationOnMock.getArgument(1);
+ Assert.assertEquals(operation_, operation);
+ }
+ execFlag.set(true);
+ return invocationOnMock.callRealMethod();
+ }).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
Mockito.any(), Mockito.any(),
+ Mockito.any(), Mockito.any());
+ } else if (operation instanceof NamespaceOperation) {
+ Mockito.doAnswer(invocationOnMock -> {
+ String role_ = invocationOnMock.getArgument(2);
+ if (role.equals(role_)) {
+ TopicOperation operation_ = invocationOnMock.getArgument(1);
+ Assert.assertEquals(operation_, operation);
+ }
+ execFlag.set(true);
+ return invocationOnMock.callRealMethod();
+
}).when(authorizationService).allowNamespaceOperationAsync(Mockito.any(),
Mockito.any(), Mockito.any(),
+ Mockito.any(), Mockito.any());
+ } else {
+ throw new IllegalArgumentException("");
+ }
+
+ return execFlag;
+ }
+
@DataProvider(name = "partitioned")
public static Object[][] partitioned() {
return new Object[][] {
@@ -204,6 +261,8 @@ public class TopicAuthZTest extends MockedPulsarStandalone {
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().getInternalInfo(topic));
+ AtomicBoolean execFlag =
setAuthorizationTopicOperationChecker(subject, TopicOperation.GET_STATS);
+
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
if (action == AuthAction.produce || action == AuthAction.consume) {
@@ -214,6 +273,9 @@ public class TopicAuthZTest extends MockedPulsarStandalone {
}
superUserAdmin.topics().revokePermissions(topic, subject);
}
+
+ Assert.assertTrue(execFlag.get());
+
superUserAdmin.topics().deletePartitionedTopic(topic, true);
}
@@ -244,8 +306,10 @@ public class TopicAuthZTest extends MockedPulsarStandalone
{
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().getPartitionedStats(topic, false));
+ AtomicBoolean execFlag =
setAuthorizationTopicOperationChecker(subject, TopicOperation.GET_STATS);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().getPartitionedInternalStats(topic));
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
@@ -313,14 +377,20 @@ public class TopicAuthZTest extends
MockedPulsarStandalone {
tenantManagerAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(),
"test-sub", Optional.empty());
// test nobody
+ AtomicBoolean execFlag =
setAuthorizationTopicOperationChecker(subject, TopicOperation.CONSUME);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().updateSubscriptionProperties(topic,
"test-sub", properties));
+ Assert.assertTrue(execFlag.get());
+ execFlag = setAuthorizationTopicOperationChecker(subject,
TopicOperation.CONSUME);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().getSubscriptionProperties(topic,
"test-sub"));
+ Assert.assertTrue(execFlag.get());
+ execFlag = setAuthorizationTopicOperationChecker(subject,
TopicOperation.CONSUME);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() ->
subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(),
"test-sub", Optional.empty()));
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
@@ -362,10 +432,15 @@ public class TopicAuthZTest extends
MockedPulsarStandalone {
superUserAdmin.topics().createMissedPartitions(topic);
// test tenant manager
+
+ AtomicBoolean execFlag =
setAuthorizationTopicOperationChecker(subject, NamespaceOperation.CREATE_TOPIC);
tenantManagerAdmin.topics().createMissedPartitions(topic);
+ Assert.assertTrue(execFlag.get());
+ execFlag = setAuthorizationTopicOperationChecker(subject,
TopicOperation.LOOKUP);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().createMissedPartitions(topic));
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
@@ -396,8 +471,10 @@ public class TopicAuthZTest extends MockedPulsarStandalone
{
// test tenant manager
tenantManagerAdmin.topics().getPartitionedTopicMetadata(topic);
+ AtomicBoolean execFlag =
setAuthorizationTopicOperationChecker(subject, TopicOperation.LOOKUP);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().getPartitionedTopicMetadata(topic));
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
@@ -434,16 +511,18 @@ public class TopicAuthZTest extends
MockedPulsarStandalone {
// test tenant manager
tenantManagerAdmin.topics().getProperties(topic);
+ AtomicBoolean execFlag =
setAuthorizationTopicOperationChecker(subject, TopicOperation.GET_METADATA);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().getProperties(topic));
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
if (AuthAction.produce == action || AuthAction.consume == action) {
- subAdmin.topics().getPartitionedTopicMetadata(topic);
+ subAdmin.topics().getProperties(topic);
} else {
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
- () ->
subAdmin.topics().getPartitionedTopicMetadata(topic));
+ () -> subAdmin.topics().getProperties(topic));
}
superUserAdmin.topics().revokePermissions(topic, subject);
}
@@ -472,8 +551,10 @@ public class TopicAuthZTest extends MockedPulsarStandalone
{
// test tenant manager
tenantManagerAdmin.topics().updateProperties(topic, properties);
+ AtomicBoolean execFlag =
setAuthorizationTopicOperationChecker(subject, TopicOperation.UPDATE_METADATA);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().updateProperties(topic, properties));
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
@@ -504,8 +585,10 @@ public class TopicAuthZTest extends MockedPulsarStandalone
{
// test tenant manager
tenantManagerAdmin.topics().removeProperties(topic, "key1");
+ AtomicBoolean execFlag =
setAuthorizationTopicOperationChecker(subject, TopicOperation.DELETE_METADATA);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().removeProperties(topic, "key1"));
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
@@ -539,8 +622,11 @@ public class TopicAuthZTest extends MockedPulsarStandalone
{
tenantManagerAdmin.topics().deletePartitionedTopic(topic);
createTopic(topic, true);
+
+ AtomicBoolean execFlag =
setAuthorizationTopicOperationChecker(subject, NamespaceOperation.DELETE_TOPIC);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().deletePartitionedTopic(topic));
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.namespaces().grantPermissionOnNamespace(ns,
subject, Set.of(action));
@@ -548,7 +634,6 @@ public class TopicAuthZTest extends MockedPulsarStandalone {
() -> subAdmin.topics().deletePartitionedTopic(topic));
superUserAdmin.namespaces().revokePermissionsOnNamespace(ns,
subject);
}
- deleteTopic(topic, true);
}
@Test(dataProvider = "partitioned")
@@ -571,8 +656,10 @@ public class TopicAuthZTest extends MockedPulsarStandalone
{
// test tenant manager
tenantManagerAdmin.topics().getSubscriptions(topic);
+ AtomicBoolean execFlag =
setAuthorizationTopicOperationChecker(subject,
TopicOperation.GET_SUBSCRIPTIONS);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().getSubscriptions(topic));
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
@@ -616,6 +703,7 @@ public class TopicAuthZTest extends MockedPulsarStandalone {
}
+ AtomicBoolean execFlag =
setAuthorizationTopicOperationChecker(subject, TopicOperation.GET_STATS);
if (partitioned) {
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() ->
subAdmin.topics().getPartitionedInternalStats(topic));
@@ -623,6 +711,7 @@ public class TopicAuthZTest extends MockedPulsarStandalone {
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().getInternalStats(topic));
}
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
@@ -671,8 +760,10 @@ public class TopicAuthZTest extends MockedPulsarStandalone
{
tenantManagerAdmin.topics().deleteSubscription(topic, subName);
superUserAdmin.topics().createSubscription(topic, subName,
MessageId.latest);
+ AtomicBoolean execFlag =
setAuthorizationTopicOperationChecker(subject, TopicOperation.UNSUBSCRIBE);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().deleteSubscription(topic, subName));
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
@@ -708,9 +799,10 @@ public class TopicAuthZTest extends MockedPulsarStandalone
{
// test tenant manager
tenantManagerAdmin.topics().skipAllMessages(topic, subName);
-
+ AtomicBoolean execFlag =
setAuthorizationTopicOperationChecker(subject, TopicOperation.SKIP);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().skipAllMessages(topic, subName));
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
@@ -746,10 +838,10 @@ public class TopicAuthZTest extends
MockedPulsarStandalone {
// test tenant manager
tenantManagerAdmin.topics().skipMessages(topic, subName, 1);
-
+ AtomicBoolean execFlag =
setAuthorizationTopicOperationChecker(subject, TopicOperation.SKIP);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().skipMessages(topic, subName, 1));
-
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
if (AuthAction.consume == action) {
@@ -782,10 +874,10 @@ public class TopicAuthZTest extends
MockedPulsarStandalone {
// test tenant manager
tenantManagerAdmin.topics().expireMessagesForAllSubscriptions(topic,
1);
-
+ AtomicBoolean execFlag =
setAuthorizationTopicOperationChecker(subject, TopicOperation.EXPIRE_MESSAGES);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() ->
subAdmin.topics().expireMessagesForAllSubscriptions(topic, 1));
-
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
if (AuthAction.consume == action) {
@@ -820,10 +912,10 @@ public class TopicAuthZTest extends
MockedPulsarStandalone {
// test tenant manager
tenantManagerAdmin.topics().resetCursor(topic, subName,
System.currentTimeMillis());
-
+ AtomicBoolean execFlag =
setAuthorizationTopicOperationChecker(subject, TopicOperation.RESET_CURSOR);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().resetCursor(topic, subName,
System.currentTimeMillis()));
-
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
if (AuthAction.consume == action) {
@@ -858,10 +950,10 @@ public class TopicAuthZTest extends
MockedPulsarStandalone {
// test tenant manager
tenantManagerAdmin.topics().resetCursor(topic, subName,
MessageId.latest);
-
+ AtomicBoolean execFlag =
setAuthorizationTopicOperationChecker(subject, TopicOperation.RESET_CURSOR);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().resetCursor(topic, subName,
MessageId.latest));
-
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
if (AuthAction.consume == action) {
@@ -903,8 +995,10 @@ public class TopicAuthZTest extends MockedPulsarStandalone
{
// test tenant manager
tenantManagerAdmin.topics().getMessagesById(topic,
messageId.getLedgerId(), messageId.getEntryId());
+ AtomicBoolean execFlag =
setAuthorizationTopicOperationChecker(subject, TopicOperation.PEEK_MESSAGES);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().getMessagesById(topic,
messageId.getLedgerId(), messageId.getEntryId()));
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
@@ -947,9 +1041,10 @@ public class TopicAuthZTest extends
MockedPulsarStandalone {
// test tenant manager
tenantManagerAdmin.topics().peekMessages(topic, subName, 1);
-
+ AtomicBoolean execFlag =
setAuthorizationTopicOperationChecker(subject, TopicOperation.PEEK_MESSAGES);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().peekMessages(topic, subName, 1));
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
@@ -992,8 +1087,10 @@ public class TopicAuthZTest extends
MockedPulsarStandalone {
// test tenant manager
tenantManagerAdmin.topics().examineMessage(topic, "latest", 1);
+ AtomicBoolean execFlag =
setAuthorizationTopicOperationChecker(subject, TopicOperation.PEEK_MESSAGES);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().examineMessage(topic, "latest", 1));
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
@@ -1039,7 +1136,9 @@ public class TopicAuthZTest extends
MockedPulsarStandalone {
superUserAdmin.topics().expireMessages(topic, subName, 1);
// test tenant manager
+ AtomicBoolean execFlag =
setAuthorizationTopicOperationChecker(subject, TopicOperation.EXPIRE_MESSAGES);
tenantManagerAdmin.topics().expireMessages(topic, subName, 1);
+ Assert.assertTrue(execFlag.get());
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().expireMessages(topic, subName, 1));
@@ -1090,8 +1189,10 @@ public class TopicAuthZTest extends
MockedPulsarStandalone {
// test tenant manager
tenantManagerAdmin.topics().expireMessages(topic, subName,
MessageId.earliest, false);
+ AtomicBoolean execFlag =
setAuthorizationTopicOperationChecker(subject, TopicOperation.EXPIRE_MESSAGES);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().expireMessages(topic, subName,
MessageId.earliest, false));
+ Assert.assertTrue(execFlag.get());
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
@@ -1294,6 +1395,15 @@ public class TopicAuthZTest extends
MockedPulsarStandalone {
() -> adminConsumer.accept(subAdmin));
}
+ AtomicBoolean execFlag = null;
+ if (topicOpType == OperationAuthType.Lookup) {
+ execFlag = setAuthorizationTopicOperationChecker(subject,
TopicOperation.LOOKUP);
+ } else if (topicOpType == OperationAuthType.Produce) {
+ execFlag = setAuthorizationTopicOperationChecker(subject,
TopicOperation.PRODUCE);
+ } else if (topicOpType == OperationAuthType.Consume) {
+ execFlag = setAuthorizationTopicOperationChecker(subject,
TopicOperation.CONSUME);
+ }
+
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(testTopic, subject,
Set.of(action));
@@ -1305,6 +1415,10 @@ public class TopicAuthZTest extends
MockedPulsarStandalone {
}
superUserAdmin.topics().revokePermissions(testTopic, subject);
}
+
+ if (execFlag != null) {
+ Assert.assertTrue(execFlag.get());
+ }
}