This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 6bc35306283 [improve][broker] Authorize originalPrincipal when 
provided (#19830)
6bc35306283 is described below

commit 6bc3530628344570cbd9171485e0478c6f01eab4
Author: Michael Marshall <[email protected]>
AuthorDate: Fri Mar 17 09:57:49 2023 -0500

    [improve][broker] Authorize originalPrincipal when provided (#19830)
---
 .../broker/authorization/AuthorizationService.java |  22 ++--
 .../authorization/AuthorizationServiceTest.java    | 135 +++++++++++++++++++++
 .../authorization/MockAuthorizationProvider.java   | 135 +++++++++++++++++++++
 .../pulsar/broker/web/PulsarWebResource.java       |   5 +-
 .../pulsar/broker/auth/AuthorizationTest.java      |  10 +-
 5 files changed, 289 insertions(+), 18 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 701c9a81552..e1ab772d2fe 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -308,14 +308,10 @@ public class AuthorizationService {
 
     /**
      * Validates that the authenticatedPrincipal and the originalPrincipal are 
a valid combination.
-     * Valid combinations fulfill one of the following two rules:
+     * Valid combinations fulfills the following rule:
      * <p>
-     * 1. The authenticatedPrincipal is in {@link 
ServiceConfiguration#getProxyRoles()}, if, and only if,
+     * The authenticatedPrincipal is in {@link 
ServiceConfiguration#getProxyRoles()}, if, and only if,
      * the originalPrincipal is set to a role that is not also in {@link 
ServiceConfiguration#getProxyRoles()}.
-     * <p>
-     * 2. The authenticatedPrincipal and the originalPrincipal are the same, 
but are not a proxyRole, when
-     * allowNonProxyPrincipalsToBeEqual is true.
-     *
      * @return true when roles are a valid combination and false when roles 
are an invalid combination
      */
     public boolean isValidOriginalPrincipal(String authenticatedPrincipal,
@@ -331,7 +327,9 @@ public class AuthorizationService {
             }
         } else if (StringUtils.isNotBlank(originalPrincipal)
                 && !(allowNonProxyPrincipalsToBeEqual && 
originalPrincipal.equals(authenticatedPrincipal))) {
-            errorMsg = "cannot specify originalPrincipal when connecting 
without valid proxy role.";
+                log.warn("[{}] Non-proxy role [{}] passed originalPrincipal 
[{}]. This behavior will not "
+                        + "be allowed in a future release. A proxy's role must 
be in the broker's proxyRoles "
+                        + "configuration.", remoteAddress, 
authenticatedPrincipal, originalPrincipal);
         }
         if (errorMsg != null) {
             log.warn("[{}] Illegal combination of role [{}] and 
originalPrincipal [{}]: {}", remoteAddress,
@@ -376,7 +374,7 @@ public class AuthorizationService {
         if (!isValidOriginalPrincipal(role, originalRole, authData)) {
             return CompletableFuture.completedFuture(false);
         }
-        if (isProxyRole(role)) {
+        if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowTenantOperationAsync(
                     tenantName, operation, role, authData);
             CompletableFuture<Boolean> isOriginalAuthorizedFuture = 
allowTenantOperationAsync(
@@ -434,7 +432,7 @@ public class AuthorizationService {
         if (!isValidOriginalPrincipal(role, originalRole, authData)) {
             return CompletableFuture.completedFuture(false);
         }
-        if (isProxyRole(role)) {
+        if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowNamespaceOperationAsync(
                     namespaceName, operation, role, authData);
             CompletableFuture<Boolean> isOriginalAuthorizedFuture = 
allowNamespaceOperationAsync(
@@ -478,7 +476,7 @@ public class AuthorizationService {
         if (!isValidOriginalPrincipal(role, originalRole, authData)) {
             return CompletableFuture.completedFuture(false);
         }
-        if (isProxyRole(role)) {
+        if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowNamespacePolicyOperationAsync(
                     namespaceName, policy, operation, role, authData);
             CompletableFuture<Boolean> isOriginalAuthorizedFuture = 
allowNamespacePolicyOperationAsync(
@@ -537,7 +535,7 @@ public class AuthorizationService {
         if (!isValidOriginalPrincipal(role, originalRole, authData)) {
             return CompletableFuture.completedFuture(false);
         }
-        if (isProxyRole(role)) {
+        if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowTopicPolicyOperationAsync(
                     topicName, policy, operation, role, authData);
             CompletableFuture<Boolean> isOriginalAuthorizedFuture = 
allowTopicPolicyOperationAsync(
@@ -622,7 +620,7 @@ public class AuthorizationService {
         if (!isValidOriginalPrincipal(role, originalRole, authData)) {
             return CompletableFuture.completedFuture(false);
         }
-        if (isProxyRole(role)) {
+        if (isProxyRole(role) || StringUtils.isNotBlank(originalRole)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowTopicOperationAsync(
                     topicName, operation, role, authData);
             CompletableFuture<Boolean> isOriginalAuthorizedFuture = 
allowTopicOperationAsync(
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java
new file mode 100644
index 00000000000..54747f9d304
--- /dev/null
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authorization;
+
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+import java.util.HashSet;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class AuthorizationServiceTest {
+
+    AuthorizationService authorizationService;
+
+    @BeforeClass
+    void beforeClass() throws PulsarServerException {
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setAuthorizationEnabled(true);
+        // Consider both of these proxy roles to make testing more 
comprehensive
+        HashSet<String> proxyRoles = new HashSet<>();
+        proxyRoles.add("pass.proxy");
+        proxyRoles.add("fail.proxy");
+        conf.setProxyRoles(proxyRoles);
+        
conf.setAuthorizationProvider(MockAuthorizationProvider.class.getName());
+        authorizationService = new AuthorizationService(conf, null);
+    }
+
+    /**
+     * See {@link MockAuthorizationProvider} for the implementation of the 
mock authorization provider.
+     */
+    @DataProvider(name = "roles")
+    public Object[][] encryptionProvider() {
+        return new Object[][]{
+                // Schema: role, originalRole, whether authorization should 
pass
+
+                // Client conditions where original role isn't passed or is 
blank
+                {"pass.client", null, Boolean.TRUE},
+                {"pass.client", " ", Boolean.TRUE},
+                {"fail.client", null, Boolean.FALSE},
+                {"fail.client", " ", Boolean.FALSE},
+
+                // Proxy conditions where original role isn't passed or is 
blank
+                {"pass.proxy", null, Boolean.FALSE},
+                {"pass.proxy", " ", Boolean.FALSE},
+                {"fail.proxy", null, Boolean.FALSE},
+                {"fail.proxy", " ", Boolean.FALSE},
+
+                // Normal proxy and client conditions
+                {"pass.proxy", "pass.client", Boolean.TRUE},
+                {"pass.proxy", "fail.client", Boolean.FALSE},
+                {"fail.proxy", "pass.client", Boolean.FALSE},
+                {"fail.proxy", "fail.client", Boolean.FALSE},
+
+                // Not proxy with original principal
+                {"pass.not-proxy", "pass.client", Boolean.TRUE},
+                {"pass.not-proxy", "fail.client", Boolean.FALSE},
+                {"fail.not-proxy", "pass.client", Boolean.FALSE},
+                {"fail.not-proxy", "fail.client", Boolean.FALSE},
+
+                // Covers an unlikely scenario, but valid in the context of 
this test
+                {null, "pass.proxy", Boolean.FALSE},
+        };
+    }
+
+    private void checkResult(boolean expected, boolean actual) {
+        if (expected) {
+            assertTrue(actual);
+        } else {
+            assertFalse(actual);
+        }
+    }
+
+    @Test(dataProvider = "roles")
+    public void testAllowTenantOperationAsync(String role, String 
originalRole, boolean shouldPass) throws Exception {
+        boolean isAuthorized = 
authorizationService.allowTenantOperationAsync("tenant",
+                TenantOperation.DELETE_NAMESPACE, originalRole, role, 
null).get();
+        checkResult(shouldPass, isAuthorized);
+    }
+
+    @Test(dataProvider = "roles")
+    public void testNamespaceOperationAsync(String role, String originalRole, 
boolean shouldPass) throws Exception {
+        boolean isAuthorized = 
authorizationService.allowNamespaceOperationAsync(NamespaceName.get("public/default"),
+                NamespaceOperation.PACKAGES, originalRole, role, null).get();
+        checkResult(shouldPass, isAuthorized);
+    }
+
+    @Test(dataProvider = "roles")
+    public void testTopicOperationAsync(String role, String originalRole, 
boolean shouldPass) throws Exception {
+        boolean isAuthorized = 
authorizationService.allowTopicOperationAsync(TopicName.get("topic"),
+                TopicOperation.PRODUCE, originalRole, role, null).get();
+        checkResult(shouldPass, isAuthorized);
+    }
+
+    @Test(dataProvider = "roles")
+    public void testNamespacePolicyOperationAsync(String role, String 
originalRole, boolean shouldPass)
+            throws Exception {
+        boolean isAuthorized = 
authorizationService.allowNamespacePolicyOperationAsync(
+                NamespaceName.get("public/default"), PolicyName.ALL, 
PolicyOperation.READ, originalRole, role, null)
+                .get();
+        checkResult(shouldPass, isAuthorized);
+    }
+
+    @Test(dataProvider = "roles")
+    public void testTopicPolicyOperationAsync(String role, String 
originalRole, boolean shouldPass) throws Exception {
+        boolean isAuthorized = 
authorizationService.allowTopicPolicyOperationAsync(TopicName.get("topic"),
+                PolicyName.ALL, PolicyOperation.READ, originalRole, role, 
null).get();
+        checkResult(shouldPass, isAuthorized);
+    }
+}
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MockAuthorizationProvider.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MockAuthorizationProvider.java
new file mode 100644
index 00000000000..beb0b87d22d
--- /dev/null
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MockAuthorizationProvider.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authorization;
+
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Mock implementation of the authorization provider interface used for 
testing.
+ * A role is authorized if it starts with "pass".
+ */
+public class MockAuthorizationProvider implements AuthorizationProvider {
+
+    private CompletableFuture<Boolean> shouldPass(String role) {
+        return CompletableFuture.completedFuture(role != null && 
role.startsWith("pass"));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, 
String role,
+                                                      AuthenticationDataSource 
authenticationData) {
+        return shouldPass(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, 
String role,
+                                                      AuthenticationDataSource 
authenticationData, String subscription) {
+        return shouldPass(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, 
String role,
+                                                     AuthenticationDataSource 
authenticationData) {
+        return shouldPass(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName 
namespaceName, String role,
+                                                            
AuthenticationDataSource authenticationData) {
+        return shouldPass(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName 
namespaceName, String role,
+                                                          
AuthenticationDataSource authenticationData) {
+        return shouldPass(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowTenantOperationAsync(String 
tenantName, String role, TenantOperation operation, AuthenticationDataSource 
authData) {
+        return shouldPass(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> 
allowNamespaceOperationAsync(NamespaceName namespaceName, String role, 
NamespaceOperation operation, AuthenticationDataSource authData) {
+        return shouldPass(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> 
allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName 
policy, PolicyOperation operation, String role, AuthenticationDataSource 
authData) {
+        return shouldPass(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName 
topic, String role, TopicOperation operation, AuthenticationDataSource 
authData) {
+        return shouldPass(role);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowTopicPolicyOperationAsync(TopicName 
topic, String role, PolicyName policy, PolicyOperation operation, 
AuthenticationDataSource authData) {
+        return shouldPass(role);
+    }
+
+
+    @Override
+    public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName 
namespaceName, String role, AuthenticationDataSource authenticationData) {
+        return null;
+    }
+
+
+    @Override
+    public CompletableFuture<Void> grantPermissionAsync(NamespaceName 
namespace, Set<AuthAction> actions, String role,
+                                                        String authDataJson) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> 
grantSubscriptionPermissionAsync(NamespaceName namespace, String 
subscriptionName,
+                                                                    
Set<String> roles, String authDataJson) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> 
revokeSubscriptionPermissionAsync(NamespaceName namespace, String 
subscriptionName,
+                                                                     String 
role, String authDataJson) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, 
Set<AuthAction> actions, String role,
+                                                        String authDataJson) {
+        return null;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index bf676251d91..e87f337e5f7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -197,7 +197,7 @@ public abstract class PulsarWebResource {
         String originalPrincipal = originalPrincipal();
         validateOriginalPrincipal(appId, originalPrincipal);
 
-        if (pulsar.getConfiguration().getProxyRoles().contains(appId)) {
+        if (pulsar.getConfiguration().getProxyRoles().contains(appId) || 
StringUtils.isNotBlank(originalPrincipal())) {
             BrokerService brokerService = pulsar.getBrokerService();
             return brokerService.getAuthorizationService().isSuperUser(appId, 
clientAuthData())
                     .thenCompose(proxyAuthorizationSuccess -> {
@@ -317,7 +317,8 @@ public abstract class PulsarWebResource {
                             throw new RestException(Status.FORBIDDEN, "Need to 
authenticate to perform the request");
                         }
                         validateOriginalPrincipal(clientAppId, 
originalPrincipal);
-                        if 
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
+                        if 
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)
+                                || StringUtils.isNotBlank(originalPrincipal)) {
                             AuthorizationService authorizationService =
                                     
pulsar.getBrokerService().getAuthorizationService();
                             return authorizationService.isTenantAdmin(tenant, 
clientAppId, tenantInfo,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
index bd6dfd872c5..4984cc3ce37 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
@@ -250,7 +250,8 @@ public class AuthorizationTest extends 
MockedPulsarServiceBaseTest {
 
         // Edge cases that differ because binary protocol and http protocol 
have different expectations
         assertTrue(auth.isValidOriginalPrincipal("client", "client", 
(SocketAddress) null, true));
-        assertFalse(auth.isValidOriginalPrincipal("client", "client", 
(SocketAddress) null, false));
+        // This assert flips to assertFalse in the 3.0 release.
+        assertTrue(auth.isValidOriginalPrincipal("client", "client", 
(SocketAddress) null, false));
 
         // Only likely in cases when authentication is disabled, but we still 
define these to be valid.
         assertTrue(auth.isValidOriginalPrincipal(null, null, (SocketAddress) 
null, false));
@@ -264,9 +265,10 @@ public class AuthorizationTest extends 
MockedPulsarServiceBaseTest {
 
         // OriginalPrincipal cannot be proxy role
         assertFalse(auth.isValidOriginalPrincipal("proxy", "proxy", 
(SocketAddress) null, false));
-        assertFalse(auth.isValidOriginalPrincipal("client", "proxy", 
(SocketAddress) null, false));
-        assertFalse(auth.isValidOriginalPrincipal("", "proxy", (SocketAddress) 
null, false));
-        assertFalse(auth.isValidOriginalPrincipal(null, "proxy", 
(SocketAddress) null, false));
+        // The next 3 asserts flip to assertFalse in the 3.0 release.
+        assertTrue(auth.isValidOriginalPrincipal("client", "proxy", 
(SocketAddress) null, false));
+        assertTrue(auth.isValidOriginalPrincipal("", "proxy", (SocketAddress) 
null, false));
+        assertTrue(auth.isValidOriginalPrincipal(null, "proxy", 
(SocketAddress) null, false));
 
         // Must gracefully handle a missing AuthenticationDataSource
         assertTrue(auth.isValidOriginalPrincipal("proxy", "client", 
(AuthenticationDataSource) null));

Reply via email to