This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 01bd9860dbb [cleanup][broker] Validate originalPrincipal earlier in 
ServerCnx (#19270)
01bd9860dbb is described below

commit 01bd9860dbb272509bbc13bd71a3983715f84a2a
Author: Michael Marshall <[email protected]>
AuthorDate: Tue Feb 14 12:44:18 2023 -0600

    [cleanup][broker] Validate originalPrincipal earlier in ServerCnx (#19270)
    
    (cherry picked from commit fd3ce8b5786baf0b76f301bd9597cd0b99a412f1)
    (cherry picked from commit 2847dd19f6e8a546f4d45bf51eb2b72aae0869ce)
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 78 +++++++++-------------
 .../pulsar/broker/service/ServerCnxTest.java       | 37 ++++++++++
 2 files changed, 67 insertions(+), 48 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index fd73156ae2d..be2386bd369 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -367,17 +367,30 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         ctx.close();
     }
 
-    /*
-     * If authentication and authorization is enabled(and not sasl) and
-     *  if the authRole is one of proxyRoles we want to enforce
+    /**
+     * When transitioning from Connecting to Connected, this method validates 
the roles.
+     * If the authRole is one of proxyRoles, the following must be true:
      * - the originalPrincipal is given while connecting
      * - originalPrincipal is not blank
-     * - originalPrincipal is not a proxy principal
+     * - originalPrincipal is not a proxy principal.
+     * @return true when roles are valid and false when roles are invalid
      */
-    private boolean invalidOriginalPrincipal(String originalPrincipal) {
-        return (service.isAuthenticationEnabled() && 
service.isAuthorizationEnabled()
-                && proxyRoles.contains(authRole) && 
(StringUtils.isBlank(originalPrincipal)
-                || proxyRoles.contains(originalPrincipal)));
+    private boolean isValidRoleAndOriginalPrincipal() {
+        String errorMsg = null;
+        if (proxyRoles.contains(authRole)) {
+            if (StringUtils.isBlank(originalPrincipal)) {
+                errorMsg = "originalPrincipal must be provided when connecting 
with a proxy role.";
+            } else if (proxyRoles.contains(originalPrincipal)) {
+                errorMsg = "originalPrincipal cannot be a proxy role.";
+            }
+        }
+        if (errorMsg != null) {
+            log.warn("[{}] Illegal combination of role [{}] and 
originalPrincipal [{}]: {}", remoteAddress, authRole,
+                    originalPrincipal, errorMsg);
+            return false;
+        } else {
+            return true;
+        }
     }
 
     // ////
@@ -448,14 +461,6 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
         if (lookupSemaphore.tryAcquire()) {
-            if (invalidOriginalPrincipal(originalPrincipal)) {
-                final String msg = "Valid Proxy Client role should be provided 
for lookup ";
-                log.warn("[{}] {} with role {} and proxyClientAuthRole {} on 
topic {}", remoteAddress, msg, authRole,
-                        originalPrincipal, topicName);
-                
ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, 
requestId));
-                lookupSemaphore.release();
-                return;
-            }
             isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, 
authenticationData, originalAuthData).thenApply(
                     isAuthorized -> {
                 if (isAuthorized) {
@@ -512,14 +517,6 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
         if (lookupSemaphore.tryAcquire()) {
-            if (invalidOriginalPrincipal(originalPrincipal)) {
-                final String msg = "Valid Proxy Client role should be provided 
for getPartitionMetadataRequest ";
-                log.warn("[{}] {} with role {} and proxyClientAuthRole {} on 
topic {}", remoteAddress, msg, authRole,
-                        originalPrincipal, topicName);
-                
commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError, 
msg, requestId);
-                lookupSemaphore.release();
-                return;
-            }
             isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, 
authenticationData, originalAuthData).thenApply(
                     isAuthorized -> {
                 if (isAuthorized) {
@@ -629,6 +626,15 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
     // complete the connect and sent newConnected command
     private void completeConnect(int clientProtoVersion, String clientVersion) 
{
+        if (service.isAuthenticationEnabled() && 
service.isAuthorizationEnabled()) {
+            if (!isValidRoleAndOriginalPrincipal()) {
+                state = State.Failed;
+                service.getPulsarStats().recordConnectionCreateFail();
+                final ByteBuf msg = Commands.newError(-1, 
ServerError.AuthorizationError, "Invalid roles.");
+                
ctx.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
+                return;
+            }
+        }
         ctx.writeAndFlush(Commands.newConnected(clientProtoVersion, 
maxMessageSize));
         state = State.Connected;
         service.getPulsarStats().recordConnectionCreateSuccess();
@@ -942,14 +948,6 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 remoteAddress, authRole, originalPrincipal);
         }
 
-        if (invalidOriginalPrincipal(originalPrincipal)) {
-            final String msg = "Valid Proxy Client role should be provided 
while subscribing ";
-            log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic 
{}", remoteAddress, msg, authRole,
-                    originalPrincipal, topicName);
-            commandSender.sendErrorResponse(requestId, 
ServerError.AuthorizationError, msg);
-            return;
-        }
-
         final String subscriptionName = subscribe.getSubscription();
         final SubType subType = subscribe.getSubType();
         final String consumerName = subscribe.hasConsumerName() ? 
subscribe.getConsumerName() : "";
@@ -1190,14 +1188,6 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             return;
         }
 
-        if (invalidOriginalPrincipal(originalPrincipal)) {
-            final String msg = "Valid Proxy Client role should be provided 
while creating producer ";
-            log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic 
{}", remoteAddress, msg, authRole,
-                    originalPrincipal, topicName);
-            commandSender.sendErrorResponse(requestId, 
ServerError.AuthorizationError, msg);
-            return;
-        }
-
         CompletableFuture<Boolean> isAuthorizedFuture = 
isTopicOperationAllowed(
                 topicName, TopicOperation.PRODUCE, authenticationData, 
originalAuthData
         );
@@ -2012,14 +2002,6 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
         if (lookupSemaphore.tryAcquire()) {
-            if (invalidOriginalPrincipal(originalPrincipal)) {
-                final String msg = "Valid Proxy Client role should be provided 
for getTopicsOfNamespaceRequest ";
-                log.warn("[{}] {} with role {} and proxyClientAuthRole {} on 
namespace {}", remoteAddress, msg,
-                        authRole, originalPrincipal, namespaceName);
-                commandSender.sendErrorResponse(requestId, 
ServerError.AuthorizationError, msg);
-                lookupSemaphore.release();
-                return;
-            }
             isNamespaceOperationAllowed(namespaceName, 
NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
                 if (isAuthorized) {
                     
getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName,
 mode)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index ee1789b322f..34cc3ba2232 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -72,6 +72,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockAuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -398,6 +399,42 @@ public class ServerCnxTest {
         channel.finish();
     }
 
+    @Test(timeOut = 30000)
+    public void testConnectCommandWithInvalidRoleCombinations() throws 
Exception {
+        AuthenticationService authenticationService = 
mock(AuthenticationService.class);
+        AuthenticationProvider authenticationProvider = new 
MockAuthenticationProvider();
+        String authMethodName = authenticationProvider.getAuthMethodName();
+
+        
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+        
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+        svcConfig.setAuthenticationEnabled(true);
+        svcConfig.setAuthenticateOriginalAuthData(false);
+        svcConfig.setAuthorizationEnabled(true);
+        svcConfig.setProxyRoles(Collections.singleton("pass.proxy"));
+
+        // Invalid combinations where authData is proxy role
+        verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, 
"pass.proxy", "pass.proxy");
+        verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, 
"pass.proxy", "");
+        verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, 
"pass.proxy", null);
+    }
+
+    private void verifyAuthRoleAndOriginalPrincipalBehavior(String 
authMethodName, String authData,
+                                                            String 
originalPrincipal) throws Exception {
+        resetChannel();
+        assertTrue(channel.isActive());
+        assertEquals(serverCnx.getState(), State.Start);
+
+        ByteBuf clientCommand = Commands.newConnect(authMethodName, authData, 
1,null,
+                null, originalPrincipal, null, null);
+        channel.writeInbound(clientCommand);
+
+        Object response = getResponse();
+        assertTrue(response instanceof CommandError);
+        assertEquals(((CommandError) response).getError(), 
ServerError.AuthorizationError);
+        assertEquals(serverCnx.getState(), State.Failed);
+        channel.finish();
+    }
+
     @Test(timeOut = 30000)
     public void testConnectCommandWithAuthenticationNegative() throws 
Exception {
         AuthenticationService authenticationService = 
mock(AuthenticationService.class);

Reply via email to