This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4c5f9d2acf94fab01206258ffbb1068bd9afd97e
Author: Shen Liu <liushen...@126.com>
AuthorDate: Wed Apr 19 23:40:28 2023 +0800

    [fix][broker] Fix tenant admin authorization bug. (#20068)
    
    Co-authored-by: druidliu <druid...@tencent.com>
    (cherry picked from commit fc17c1d98a3c1edd975c131d174a9ef69887d9cd)
---
 .../broker/authorization/AuthorizationService.java | 27 +++++-----------------
 .../pulsar/broker/auth/AuthorizationTest.java      | 22 ++++++++++--------
 .../api/AuthorizationProducerConsumerTest.java     | 16 +++++++++++++
 .../websocket/proxy/ProxyAuthorizationTest.java    | 14 ++++++++---
 4 files changed, 46 insertions(+), 33 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 0c61219b57a..a9225f5e48f 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
 import org.apache.pulsar.broker.authentication.AuthenticationParameters;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -182,13 +183,7 @@ public class AuthorizationService {
         if (!this.conf.isAuthorizationEnabled()) {
             return CompletableFuture.completedFuture(true);
         }
-        return provider.isSuperUser(role, authenticationData, 
conf).thenComposeAsync(isSuperUser -> {
-            if (isSuperUser) {
-                return CompletableFuture.completedFuture(true);
-            } else {
-                return provider.canProduceAsync(topicName, role, 
authenticationData);
-            }
-        });
+        return provider.allowTopicOperationAsync(topicName, role, 
TopicOperation.PRODUCE, authenticationData);
     }
 
     /**
@@ -207,13 +202,9 @@ public class AuthorizationService {
         if (!this.conf.isAuthorizationEnabled()) {
             return CompletableFuture.completedFuture(true);
         }
-        return provider.isSuperUser(role, authenticationData, 
conf).thenComposeAsync(isSuperUser -> {
-            if (isSuperUser) {
-                return CompletableFuture.completedFuture(true);
-            } else {
-                return provider.canConsumeAsync(topicName, role, 
authenticationData, subscription);
-            }
-        });
+
+        return provider.allowTopicOperationAsync(topicName, role, 
TopicOperation.CONSUME,
+                new AuthenticationDataSubscription(authenticationData, 
subscription));
     }
 
     public boolean canProduce(TopicName topicName, String role, 
AuthenticationDataSource authenticationData)
@@ -289,13 +280,7 @@ public class AuthorizationService {
         if (!this.conf.isAuthorizationEnabled()) {
             return CompletableFuture.completedFuture(true);
         }
-        return provider.isSuperUser(role, authenticationData, 
conf).thenComposeAsync(isSuperUser -> {
-            if (isSuperUser) {
-                return CompletableFuture.completedFuture(true);
-            } else {
-                return provider.canLookupAsync(topicName, role, 
authenticationData);
-            }
-        });
+        return provider.allowTopicOperationAsync(topicName, role, 
TopicOperation.LOOKUP, authenticationData);
     }
 
     public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName 
namespaceName, String role,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
index 58cf4ee418e..4fce7c50e1c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
@@ -79,10 +79,13 @@ public class AuthorizationTest extends 
MockedPulsarServiceBaseTest {
     public void simple() throws Exception {
         AuthorizationService auth = 
pulsar.getBrokerService().getAuthorizationService();
 
-        
assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), 
"my-role", null));
-
+        try {
+            
assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), 
"my-role", null));
+            fail("Should throw exception when tenant not exist");
+        } catch (Exception ignored) {}
         admin.clusters().createCluster("c1", ClusterData.builder().build());
-        admin.tenants().createTenant("p1", new 
TenantInfoImpl(Sets.newHashSet("role1"), Sets.newHashSet("c1")));
+        String tenantAdmin = "role1";
+        admin.tenants().createTenant("p1", new 
TenantInfoImpl(Sets.newHashSet(tenantAdmin), Sets.newHashSet("c1")));
         waitForChange();
         admin.namespaces().createNamespace("p1/c1/ns1");
         waitForChange();
@@ -215,21 +218,22 @@ public class AuthorizationTest extends 
MockedPulsarServiceBaseTest {
                 SubscriptionAuthMode.Prefix);
         waitForChange();
 
-        assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), 
"role1", null));
+        assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), 
tenantAdmin, null));
         assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), 
"role2", null));
-        try {
-            
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), 
"role1", null, "sub1"));
-            fail();
-        } catch (Exception ignored) {}
+        // tenant admin can consume all topics, even if 
SubscriptionAuthMode.Prefix mode
+        
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), 
tenantAdmin, null, "sub1"));
         try {
             
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), 
"role2", null, "sub2"));
             fail();
         } catch (Exception ignored) {}
 
-        
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), 
"role1", null, "role1-sub1"));
+        
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), 
tenantAdmin, null, "role1-sub1"));
         
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), 
"role2", null, "role2-sub2"));
         
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), 
"pulsar.super_user", null, "role3-sub1"));
 
+        // tenant admin can produce all topics
+        
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), 
tenantAdmin, null));
+
         admin.namespaces().deleteNamespace("p1/c1/ns1");
         admin.tenants().deleteTenant("p1");
         admin.clusters().deleteCluster("c1");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 0ce3b7df07d..dd351286d2e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -1035,6 +1035,22 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
             return 
CompletableFuture.completedFuture(grantRoles.contains(role));
         }
 
+        @Override
+        public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName 
topic, String role,
+                                                                   
TopicOperation operation,
+                                                                   
AuthenticationDataSource authData) {
+            switch (operation) {
+
+                case PRODUCE:
+                    return canProduceAsync(topic, role, authData);
+                case CONSUME:
+                    return canConsumeAsync(topic, role, authData, 
authData.getSubscription());
+                case LOOKUP:
+                    return canLookupAsync(topic, role, authData);
+            }
+            return super.allowTopicOperationAsync(topic, role, operation, 
authData);
+        }
+
         @Override
         public CompletableFuture<Void> grantPermissionAsync(NamespaceName 
namespace, Set<AuthAction> actions,
                 String role, String authData) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
index a3b26a4a9d1..29327d3d4eb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
@@ -25,6 +25,7 @@ import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
 import java.util.EnumSet;
 import java.util.Optional;
@@ -83,10 +84,13 @@ public class ProxyAuthorizationTest extends 
MockedPulsarServiceBaseTest {
     public void test() throws Exception {
         AuthorizationService auth = service.getAuthorizationService();
 
-        
assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), 
"my-role", null));
-
+        try {
+            
assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), 
"my-role", null));
+            fail("Should throw exception when tenant not exist");
+        } catch (Exception ignored) {}
         admin.clusters().createCluster(configClusterName, 
ClusterData.builder().build());
-        admin.tenants().createTenant("p1", new 
TenantInfoImpl(Sets.newHashSet("role1"), Sets.newHashSet("c1")));
+        String tenantAdmin = "role1";
+        admin.tenants().createTenant("p1", new 
TenantInfoImpl(Sets.newHashSet(tenantAdmin), Sets.newHashSet("c1")));
         waitForChange();
         admin.namespaces().createNamespace("p1/c1/ns1");
         waitForChange();
@@ -117,6 +121,10 @@ public class ProxyAuthorizationTest extends 
MockedPulsarServiceBaseTest {
         
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), 
"my-role", null));
         
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), 
"my-role", null, null));
 
+        // tenant admin can produce/consume all topics, even if 
SubscriptionAuthMode.Prefix mode
+        
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), 
tenantAdmin, null, "sub1"));
+        
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), 
tenantAdmin, null));
+
         admin.namespaces().deleteNamespace("p1/c1/ns1");
         admin.tenants().deleteTenant("p1");
         admin.clusters().deleteCluster("c1");

Reply via email to