This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4c5f9d2acf94fab01206258ffbb1068bd9afd97e Author: Shen Liu <liushen...@126.com> AuthorDate: Wed Apr 19 23:40:28 2023 +0800 [fix][broker] Fix tenant admin authorization bug. (#20068) Co-authored-by: druidliu <druid...@tencent.com> (cherry picked from commit fc17c1d98a3c1edd975c131d174a9ef69887d9cd) --- .../broker/authorization/AuthorizationService.java | 27 +++++----------------- .../pulsar/broker/auth/AuthorizationTest.java | 22 ++++++++++-------- .../api/AuthorizationProducerConsumerTest.java | 16 +++++++++++++ .../websocket/proxy/ProxyAuthorizationTest.java | 14 ++++++++--- 4 files changed, 46 insertions(+), 33 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index 0c61219b57a..a9225f5e48f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -28,6 +28,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; import org.apache.pulsar.broker.authentication.AuthenticationParameters; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.common.naming.NamespaceName; @@ -182,13 +183,7 @@ public class AuthorizationService { if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return provider.isSuperUser(role, authenticationData, conf).thenComposeAsync(isSuperUser -> { - if (isSuperUser) { - return CompletableFuture.completedFuture(true); - } else { - return provider.canProduceAsync(topicName, role, authenticationData); - } - }); + return provider.allowTopicOperationAsync(topicName, role, TopicOperation.PRODUCE, authenticationData); } /** @@ -207,13 +202,9 @@ public class AuthorizationService { if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return provider.isSuperUser(role, authenticationData, conf).thenComposeAsync(isSuperUser -> { - if (isSuperUser) { - return CompletableFuture.completedFuture(true); - } else { - return provider.canConsumeAsync(topicName, role, authenticationData, subscription); - } - }); + + return provider.allowTopicOperationAsync(topicName, role, TopicOperation.CONSUME, + new AuthenticationDataSubscription(authenticationData, subscription)); } public boolean canProduce(TopicName topicName, String role, AuthenticationDataSource authenticationData) @@ -289,13 +280,7 @@ public class AuthorizationService { if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return provider.isSuperUser(role, authenticationData, conf).thenComposeAsync(isSuperUser -> { - if (isSuperUser) { - return CompletableFuture.completedFuture(true); - } else { - return provider.canLookupAsync(topicName, role, authenticationData); - } - }); + return provider.allowTopicOperationAsync(topicName, role, TopicOperation.LOOKUP, authenticationData); } public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java index 58cf4ee418e..4fce7c50e1c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java @@ -79,10 +79,13 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest { public void simple() throws Exception { AuthorizationService auth = pulsar.getBrokerService().getAuthorizationService(); - assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); - + try { + assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); + fail("Should throw exception when tenant not exist"); + } catch (Exception ignored) {} admin.clusters().createCluster("c1", ClusterData.builder().build()); - admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet("role1"), Sets.newHashSet("c1"))); + String tenantAdmin = "role1"; + admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet(tenantAdmin), Sets.newHashSet("c1"))); waitForChange(); admin.namespaces().createNamespace("p1/c1/ns1"); waitForChange(); @@ -215,21 +218,22 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest { SubscriptionAuthMode.Prefix); waitForChange(); - assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "role1", null)); + assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null)); assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "role2", null)); - try { - assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role1", null, "sub1")); - fail(); - } catch (Exception ignored) {} + // tenant admin can consume all topics, even if SubscriptionAuthMode.Prefix mode + assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null, "sub1")); try { assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role2", null, "sub2")); fail(); } catch (Exception ignored) {} - assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role1", null, "role1-sub1")); + assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null, "role1-sub1")); assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role2", null, "role2-sub2")); assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "pulsar.super_user", null, "role3-sub1")); + // tenant admin can produce all topics + assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null)); + admin.namespaces().deleteNamespace("p1/c1/ns1"); admin.tenants().deleteTenant("p1"); admin.clusters().deleteCluster("c1"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java index 0ce3b7df07d..dd351286d2e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java @@ -1035,6 +1035,22 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase { return CompletableFuture.completedFuture(grantRoles.contains(role)); } + @Override + public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String role, + TopicOperation operation, + AuthenticationDataSource authData) { + switch (operation) { + + case PRODUCE: + return canProduceAsync(topic, role, authData); + case CONSUME: + return canConsumeAsync(topic, role, authData, authData.getSubscription()); + case LOOKUP: + return canLookupAsync(topic, role, authData); + } + return super.allowTopicOperationAsync(topic, role, operation, authData); + } + @Override public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions, String role, String authData) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java index a3b26a4a9d1..29327d3d4eb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java @@ -25,6 +25,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import com.google.common.collect.Sets; import java.util.EnumSet; import java.util.Optional; @@ -83,10 +84,13 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest { public void test() throws Exception { AuthorizationService auth = service.getAuthorizationService(); - assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); - + try { + assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); + fail("Should throw exception when tenant not exist"); + } catch (Exception ignored) {} admin.clusters().createCluster(configClusterName, ClusterData.builder().build()); - admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet("role1"), Sets.newHashSet("c1"))); + String tenantAdmin = "role1"; + admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet(tenantAdmin), Sets.newHashSet("c1"))); waitForChange(); admin.namespaces().createNamespace("p1/c1/ns1"); waitForChange(); @@ -117,6 +121,10 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest { assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null, null)); + // tenant admin can produce/consume all topics, even if SubscriptionAuthMode.Prefix mode + assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null, "sub1")); + assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null)); + admin.namespaces().deleteNamespace("p1/c1/ns1"); admin.tenants().deleteTenant("p1"); admin.clusters().deleteCluster("c1");