This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new aa247ad7276 [feat][broker] PIP 97: Implement for ServerCnx (#19409)
aa247ad7276 is described below

commit aa247ad72760ae95f3e7f1a969a6d83121319472
Author: Michael Marshall <[email protected]>
AuthorDate: Sat Feb 4 02:40:42 2023 -0600

    [feat][broker] PIP 97: Implement for ServerCnx (#19409)
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 222 +++++-----
 .../broker/service/ServerCnxAuthorizationTest.java | 400 ------------------
 .../pulsar/broker/service/ServerCnxTest.java       | 445 +++++++++++++++++++--
 .../apache/pulsar/sql/presto/TestPulsarAuth.java   |   2 +-
 .../integration/presto/TestPulsarSQLAuth.java      |   2 +-
 5 files changed, 553 insertions(+), 518 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 56f3f07fd8c..e355f87581b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -194,6 +194,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     // it will hold the credentials of the original client
     private AuthenticationState originalAuthState;
     private AuthenticationDataSource originalAuthData;
+    // Keep temporarily in order to verify after verifying proxy's authData
+    private AuthData originalAuthDataCopy;
     private boolean pendingAuthChallengeResponse = false;
 
     // Max number of pending requests per connections. If multiple producers 
are sharing the same connection the flow
@@ -690,8 +692,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     }
 
     // complete the connect and sent newConnected command
-    private void completeConnect(int clientProtoVersion, String clientVersion, 
boolean supportsTopicWatchers) {
-        writeAndFlush(Commands.newConnected(clientProtoVersion, 
maxMessageSize, supportsTopicWatchers));
+    private void completeConnect(int clientProtoVersion, String clientVersion) 
{
+        writeAndFlush(Commands.newConnected(clientProtoVersion, 
maxMessageSize, enableSubscriptionPatternEvaluation));
         state = State.Connected;
         service.getPulsarStats().recordConnectionCreateSuccess();
         if (log.isDebugEnabled()) {
@@ -706,74 +708,135 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         }
     }
 
-    // According to auth result, send newConnected or newAuthChallenge command.
-    private State doAuthentication(AuthData clientData,
-                                   int clientProtocolVersion,
-                                   String clientVersion) throws Exception {
-
+    // According to auth result, send Connected, AuthChallenge, or Error 
command.
+    private void doAuthentication(AuthData clientData,
+                                  boolean useOriginalAuthState,
+                                  int clientProtocolVersion,
+                                  final String clientVersion) {
         // The original auth state can only be set on subsequent auth attempts 
(and only
         // in presence of a proxy and if the proxy is forwarding the 
credentials).
         // In this case, the re-validation needs to be done against the 
original client
         // credentials.
-        boolean useOriginalAuthState = (originalAuthState != null);
-        AuthenticationState authState =  useOriginalAuthState ? 
originalAuthState : this.authState;
+        AuthenticationState authState = useOriginalAuthState ? 
originalAuthState : this.authState;
         String authRole = useOriginalAuthState ? originalPrincipal : 
this.authRole;
-        AuthData brokerData = authState.authenticate(clientData);
-
         if (log.isDebugEnabled()) {
             log.debug("Authenticate using original auth state : {}, role = 
{}", useOriginalAuthState, authRole);
         }
+        authState
+                .authenticateAsync(clientData)
+                .whenCompleteAsync((authChallenge, throwable) -> {
+                    if (throwable == null) {
+                        authChallengeSuccessCallback(authChallenge, 
useOriginalAuthState, authRole,
+                                clientProtocolVersion, clientVersion);
+                    } else {
+                        authenticationFailed(throwable);
+                    }
+                }, ctx.executor());
+    }
 
-        if (authState.isComplete()) {
-            // Authentication has completed. It was either:
-            // 1. the 1st time the authentication process was done, in which 
case we'll send
-            //    a `CommandConnected` response
-            // 2. an authentication refresh, in which case we need to refresh 
authenticationData
-
-            String newAuthRole = authState.getAuthRole();
-
-            // Refresh the auth data.
-            this.authenticationData = authState.getAuthDataSource();
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Auth data refreshed for role={}", 
remoteAddress, this.authRole);
-            }
+    public void authChallengeSuccessCallback(AuthData authChallenge,
+                                             boolean useOriginalAuthState,
+                                             String authRole,
+                                             int clientProtocolVersion,
+                                             String clientVersion) {
+        try {
+            if (authChallenge == null) {
+                // Authentication has completed. It was either:
+                // 1. the 1st time the authentication process was done, in 
which case we'll send
+                //    a `CommandConnected` response
+                // 2. an authentication refresh, in which case we need to 
refresh authenticationData
+                AuthenticationState authState = useOriginalAuthState ? 
originalAuthState : this.authState;
+                String newAuthRole = authState.getAuthRole();
+
+                // Refresh the auth data.
+                this.authenticationData = authState.getAuthDataSource();
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Auth data refreshed for role={}", 
remoteAddress, this.authRole);
+                }
 
-            if (!useOriginalAuthState) {
-                this.authRole = newAuthRole;
-            }
+                if (!useOriginalAuthState) {
+                    this.authRole = newAuthRole;
+                }
 
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Client successfully authenticated with {} role 
{} and originalPrincipal {}",
-                        remoteAddress, authMethod, this.authRole, 
originalPrincipal);
-            }
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Client successfully authenticated with {} 
role {} and originalPrincipal {}",
+                            remoteAddress, authMethod, this.authRole, 
originalPrincipal);
+                }
 
-            if (state != State.Connected) {
-                // First time authentication is done
-                completeConnect(clientProtocolVersion, clientVersion, 
enableSubscriptionPatternEvaluation);
-            } else {
-                // If the connection was already ready, it means we're doing a 
refresh
-                if (!StringUtils.isEmpty(authRole)) {
-                    if (!authRole.equals(newAuthRole)) {
-                        log.warn("[{}] Principal cannot change during an 
authentication refresh expected={} got={}",
-                                remoteAddress, authRole, newAuthRole);
-                        ctx.close();
+                if (state != State.Connected) {
+                    // First time authentication is done
+                    if (originalAuthState != null) {
+                        // We only set originalAuthState when we are going to 
use it.
+                        authenticateOriginalData(clientProtocolVersion, 
clientVersion);
                     } else {
-                        log.info("[{}] Refreshed authentication credentials 
for role {}", remoteAddress, authRole);
+                        completeConnect(clientProtocolVersion, clientVersion);
                     }
+                } else {
+                    // If the connection was already ready, it means we're 
doing a refresh
+                    if (!StringUtils.isEmpty(authRole)) {
+                        if (!authRole.equals(newAuthRole)) {
+                            log.warn("[{}] Principal cannot change during an 
authentication refresh expected={} got={}",
+                                    remoteAddress, authRole, newAuthRole);
+                            ctx.close();
+                        } else {
+                            log.info("[{}] Refreshed authentication 
credentials for role {}", remoteAddress, authRole);
+                        }
+                    }
+                }
+            } else {
+                // auth not complete, continue auth with client side.
+                ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, 
authChallenge, clientProtocolVersion));
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Authentication in progress client by 
method {}.", remoteAddress, authMethod);
                 }
             }
-
-            return State.Connected;
+        } catch (Exception e) {
+            authenticationFailed(e);
         }
+    }
 
-        // auth not complete, continue auth with client side.
-        writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, 
clientProtocolVersion));
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Authentication in progress client by method {}.",
-                remoteAddress, authMethod);
-            log.debug("[{}] connect state change to : [{}]", remoteAddress, 
State.Connecting.name());
+    private void authenticateOriginalData(int clientProtoVersion, String 
clientVersion) {
+        originalAuthState
+                .authenticateAsync(originalAuthDataCopy)
+                .whenCompleteAsync((authChallenge, throwable) -> {
+                    if (throwable != null) {
+                        authenticationFailed(throwable);
+                    } else if (authChallenge != null) {
+                        // The protocol does not yet handle an auth challenge 
here.
+                        // See https://github.com/apache/pulsar/issues/19291.
+                        authenticationFailed(new 
AuthenticationException("Failed to authenticate original auth data "
+                                + "due to unsupported authChallenge."));
+                    } else {
+                        try {
+                            // No need to retain these bytes anymore
+                            originalAuthDataCopy = null;
+                            originalAuthData = 
originalAuthState.getAuthDataSource();
+                            originalPrincipal = 
originalAuthState.getAuthRole();
+                            if (log.isDebugEnabled()) {
+                                log.debug("[{}] Authenticated original role 
(forwarded from proxy): {}",
+                                        remoteAddress, originalPrincipal);
+                            }
+                            completeConnect(clientProtoVersion, clientVersion);
+                        } catch (Exception e) {
+                            authenticationFailed(e);
+                        }
+                    }
+                }, ctx.executor());
+    }
+
+    // Handle authentication and authentication refresh failures. Must be 
called from event loop.
+    private void authenticationFailed(Throwable t) {
+        String operation;
+        if (state == State.Connecting) {
+            service.getPulsarStats().recordConnectionCreateFail();
+            operation = "connect";
+        } else {
+            operation = "authentication-refresh";
         }
-        return State.Connecting;
+        state = State.Failed;
+        logAuthException(remoteAddress, operation, getPrincipal(), 
Optional.empty(), t);
+        final ByteBuf msg = Commands.newError(-1, 
ServerError.AuthenticationError, "Failed to authenticate");
+        NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
     }
 
     public void refreshAuthenticationCredentials() {
@@ -871,10 +934,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         }
 
         if (!service.isAuthenticationEnabled()) {
-            completeConnect(clientProtocolVersion, clientVersion, 
enableSubscriptionPatternEvaluation);
+            completeConnect(clientProtocolVersion, clientVersion);
             return;
         }
 
+        // Go to Connecting state now because auth can be async.
+        state = State.Connecting;
+
         try {
             byte[] authData = connect.hasAuthData() ? connect.getAuthData() : 
emptyArray;
             AuthData clientData = AuthData.of(authData);
@@ -899,10 +965,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 authRole = 
getBrokerService().getAuthenticationService().getAnonymousUserRole()
                     .orElseThrow(() ->
                         new AuthenticationException("No anonymous role, and no 
authentication provider configured"));
-                completeConnect(clientProtocolVersion, clientVersion, 
enableSubscriptionPatternEvaluation);
+                completeConnect(clientProtocolVersion, clientVersion);
                 return;
             }
-
             // init authState and other var
             ChannelHandler sslHandler = 
ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
             SSLSession sslSession = null;
@@ -922,14 +987,11 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 log.debug("[{}] Authenticate role : {}", remoteAddress, role);
             }
 
-            state = doAuthentication(clientData, clientProtocolVersion, 
clientVersion);
-
-            // This will fail the check if:
-            //  1. client is coming through a proxy
-            //  2. we require to validate the original credentials
-            //  3. no credentials were passed
             if (connect.hasOriginalPrincipal() && 
service.getPulsar().getConfig().isAuthenticateOriginalAuthData()) {
-                // init authentication
+                // Flow:
+                // 1. Initialize original authentication.
+                // 2. Authenticate the proxy's authentication data.
+                // 3. Authenticate the original authentication data.
                 String originalAuthMethod;
                 if (connect.hasOriginalAuthMethod()) {
                     originalAuthMethod = connect.getOriginalAuthMethod();
@@ -947,32 +1009,23 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                     + " using auth method [%s] is not 
available", originalAuthMethod));
                 }
 
-                AuthData originalAuthDataCopy =  
AuthData.of(connect.getOriginalAuthData().getBytes());
+                originalAuthDataCopy = 
AuthData.of(connect.getOriginalAuthData().getBytes());
                 originalAuthState = 
originalAuthenticationProvider.newAuthState(
                         originalAuthDataCopy,
                         remoteAddress,
                         sslSession);
-                originalAuthState.authenticate(originalAuthDataCopy);
-                originalAuthData = originalAuthState.getAuthDataSource();
-                originalPrincipal = originalAuthState.getAuthRole();
+            } else if (connect.hasOriginalPrincipal()) {
+                originalPrincipal = connect.getOriginalPrincipal();
 
                 if (log.isDebugEnabled()) {
-                    log.debug("[{}] Authenticate original role : {}", 
remoteAddress, originalPrincipal);
-                }
-            } else {
-                originalPrincipal = connect.hasOriginalPrincipal() ? 
connect.getOriginalPrincipal() : null;
-
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Authenticate original role (forwarded from 
proxy): {}",
+                    log.debug("[{}] Setting original role (forwarded from 
proxy): {}",
                         remoteAddress, originalPrincipal);
                 }
             }
+
+            doAuthentication(clientData, false, clientProtocolVersion, 
clientVersion);
         } catch (Exception e) {
-            service.getPulsarStats().recordConnectionCreateFail();
-            state = State.Failed;
-            logAuthException(remoteAddress, "connect", getPrincipal(), 
Optional.empty(), e);
-            ByteBuf msg = Commands.newError(-1, 
ServerError.AuthenticationError, "Unable to authenticate");
-            NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
+            authenticationFailed(e);
         }
     }
 
@@ -990,21 +1043,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
         try {
             AuthData clientData = 
AuthData.of(authResponse.getResponse().getAuthData());
-            doAuthentication(clientData, authResponse.getProtocolVersion(),
+            doAuthentication(clientData, originalAuthState != null, 
authResponse.getProtocolVersion(),
                     authResponse.hasClientVersion() ? 
authResponse.getClientVersion() : EMPTY);
-        } catch (AuthenticationException e) {
-            service.getPulsarStats().recordConnectionCreateFail();
-            state = State.Failed;
-            log.warn("[{}] Authentication failed: {} ", remoteAddress, 
e.getMessage());
-            ByteBuf msg = Commands.newError(-1, 
ServerError.AuthenticationError, "Unable to authenticate");
-            NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
         } catch (Exception e) {
-            service.getPulsarStats().recordConnectionCreateFail();
-            state = State.Failed;
-            String msg = "Unable to handleAuthResponse";
-            log.warn("[{}] {} ", remoteAddress, msg, e);
-            ByteBuf command = Commands.newError(-1, ServerError.UnknownError, 
msg);
-            NettyChannelUtil.writeAndFlushWithClosePromise(ctx, command);
+            authenticationFailed(e);
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
deleted file mode 100644
index 03ef2460f06..00000000000
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
+++ /dev/null
@@ -1,400 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pulsar.broker.service;
-
-import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
-import static org.mockito.ArgumentMatchers.argThat;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-import com.google.common.collect.Sets;
-import io.jsonwebtoken.Jwts;
-import io.jsonwebtoken.SignatureAlgorithm;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPipeline;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.charset.StandardCharsets;
-import java.util.Base64;
-import java.util.Collections;
-import java.util.Optional;
-import java.util.Properties;
-import javax.crypto.SecretKey;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.testcontext.PulsarTestContext;
-import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
-import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
-import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
-import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
-import org.apache.pulsar.broker.authorization.AuthorizationService;
-import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.auth.AuthenticationToken;
-import org.apache.pulsar.common.api.proto.CommandConnect;
-import org.apache.pulsar.common.api.proto.CommandLookupTopic;
-import org.apache.pulsar.common.api.proto.CommandProducer;
-import org.apache.pulsar.common.api.proto.CommandSubscribe;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.policies.data.TopicOperation;
-import org.mockito.ArgumentMatcher;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-@Test(groups = "broker")
-public class ServerCnxAuthorizationTest {
-    private final SecretKey SECRET_KEY = 
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
-    private final String CLIENT_PRINCIPAL = "client";
-    private final String PROXY_PRINCIPAL = "proxy";
-    private final String CLIENT_TOKEN = 
Jwts.builder().setSubject(CLIENT_PRINCIPAL).signWith(SECRET_KEY).compact();
-    private final String PROXY_TOKEN = 
Jwts.builder().setSubject(PROXY_PRINCIPAL).signWith(SECRET_KEY).compact();
-
-    private ServiceConfiguration svcConfig;
-
-    protected PulsarTestContext pulsarTestContext;
-    private BrokerService brokerService;
-
-    @BeforeMethod(alwaysRun = true)
-    public void beforeMethod() throws Exception {
-        svcConfig = new ServiceConfiguration();
-        svcConfig.setKeepAliveIntervalSeconds(0);
-        svcConfig.setBrokerShutdownTimeoutMs(0L);
-        svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
-        svcConfig.setClusterName("pulsar-cluster");
-        svcConfig.setSuperUserRoles(Collections.singleton(PROXY_PRINCIPAL));
-        svcConfig.setAuthenticationEnabled(true);
-        
svcConfig.setAuthenticationProviders(Sets.newHashSet(AuthenticationProviderToken.class.getName()));
-        svcConfig.setAuthorizationEnabled(true);
-        
svcConfig.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
-        Properties properties = new Properties();
-        properties.setProperty("tokenSecretKey", "data:;base64,"
-                + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
-        svcConfig.setProperties(properties);
-        pulsarTestContext = PulsarTestContext.builderForNonStartableContext()
-                .config(svcConfig)
-                .spyByDefault()
-                .build();
-        brokerService = pulsarTestContext.getBrokerService();
-
-        
pulsarTestContext.getPulsarResources().getTenantResources().createTenant("public",
-                TenantInfo.builder().build());
-    }
-
-    @AfterMethod(alwaysRun = true)
-    public void cleanup() throws Exception {
-        if (pulsarTestContext != null) {
-            pulsarTestContext.close();
-            pulsarTestContext = null;
-        }
-    }
-
-    @Test
-    public void testVerifyOriginalPrincipalWithAuthDataForwardedFromProxy() 
throws Exception {
-        svcConfig.setAuthenticateOriginalAuthData(true);
-
-
-        ServerCnx serverCnx = pulsarTestContext.createServerCnxSpy();
-        ChannelHandlerContext channelHandlerContext = 
mock(ChannelHandlerContext.class);
-        Channel channel = mock(Channel.class);
-        ChannelPipeline channelPipeline = mock(ChannelPipeline.class);
-        doReturn(channelPipeline).when(channel).pipeline();
-        
doReturn(null).when(channelPipeline).get(PulsarChannelInitializer.TLS_HANDLER);
-
-        SocketAddress socketAddress = new InetSocketAddress(0);
-        doReturn(socketAddress).when(channel).remoteAddress();
-        doReturn(channel).when(channelHandlerContext).channel();
-        channelHandlerContext.channel().remoteAddress();
-        serverCnx.channelActive(channelHandlerContext);
-
-        // connect
-        AuthenticationToken clientAuthenticationToken = new 
AuthenticationToken(CLIENT_TOKEN);
-        AuthenticationToken proxyAuthenticationToken = new 
AuthenticationToken(PROXY_TOKEN);
-        CommandConnect connect = new CommandConnect();
-        
connect.setAuthMethodName(proxyAuthenticationToken.getAuthMethodName());
-        
connect.setAuthData(proxyAuthenticationToken.getAuthData().getCommandData().getBytes(StandardCharsets.UTF_8));
-        connect.setClientVersion("test");
-        connect.setProtocolVersion(1);
-        connect.setOriginalPrincipal(CLIENT_PRINCIPAL);
-        
connect.setOriginalAuthData(clientAuthenticationToken.getAuthData().getCommandData());
-        
connect.setOriginalAuthMethod(clientAuthenticationToken.getAuthMethodName());
-
-        serverCnx.handleConnect(connect);
-        assertEquals(serverCnx.getOriginalAuthData().getCommandData(),
-                clientAuthenticationToken.getAuthData().getCommandData());
-        assertEquals(serverCnx.getOriginalAuthState().getAuthRole(), 
CLIENT_PRINCIPAL);
-        assertEquals(serverCnx.getOriginalPrincipal(), CLIENT_PRINCIPAL);
-        assertEquals(serverCnx.getAuthData().getCommandData(),
-                proxyAuthenticationToken.getAuthData().getCommandData());
-        assertEquals(serverCnx.getAuthRole(), PROXY_PRINCIPAL);
-        assertEquals(serverCnx.getAuthState().getAuthRole(), PROXY_PRINCIPAL);
-
-        AuthorizationService authorizationService =
-                
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class, 
svcConfig,
-                        pulsarTestContext.getPulsarResources());
-        
doReturn(authorizationService).when(brokerService).getAuthorizationService();
-
-        // lookup
-        CommandLookupTopic commandLookupTopic = new CommandLookupTopic();
-        TopicName topicName = 
TopicName.get("persistent://public/default/test-topic");
-        commandLookupTopic.setTopic(topicName.toString());
-        commandLookupTopic.setRequestId(1);
-        serverCnx.handleLookup(commandLookupTopic);
-        verify(authorizationService, 
times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
-                CLIENT_PRINCIPAL,
-                serverCnx.getOriginalAuthData());
-        verify(authorizationService, 
times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
-                PROXY_PRINCIPAL,
-                serverCnx.getAuthData());
-
-        // producer
-        CommandProducer commandProducer = new CommandProducer();
-        commandProducer.setRequestId(1);
-        commandProducer.setProducerId(1);
-        commandProducer.setProducerName("test-producer");
-        commandProducer.setTopic(topicName.toString());
-        serverCnx.handleProducer(commandProducer);
-        verify(authorizationService, 
times(1)).allowTopicOperationAsync(topicName, TopicOperation.PRODUCE,
-                CLIENT_PRINCIPAL,
-                serverCnx.getOriginalAuthData());
-        verify(authorizationService, 
times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
-                PROXY_PRINCIPAL,
-                serverCnx.getAuthData());
-
-        // consumer
-        CommandSubscribe commandSubscribe = new CommandSubscribe();
-        commandSubscribe.setTopic(topicName.toString());
-        commandSubscribe.setRequestId(1);
-        commandSubscribe.setConsumerId(1);
-        final String subscriptionName = "test-subscribe";
-        commandSubscribe.setSubscription("test-subscribe");
-        commandSubscribe.setSubType(CommandSubscribe.SubType.Shared);
-        serverCnx.handleSubscribe(commandSubscribe);
-
-        verify(authorizationService, times(1)).allowTopicOperationAsync(
-                eq(topicName), eq(TopicOperation.CONSUME),
-                eq(CLIENT_PRINCIPAL), argThat(arg -> {
-                    assertTrue(arg instanceof AuthenticationDataSubscription);
-                    try {
-                        assertEquals(arg.getCommandData(), 
clientAuthenticationToken.getAuthData().getCommandData());
-                    } catch (PulsarClientException e) {
-                        fail(e.getMessage());
-                    }
-                    assertEquals(arg.getSubscription(), subscriptionName);
-                    return true;
-                }));
-        verify(authorizationService, times(1)).allowTopicOperationAsync(
-                eq(topicName), eq(TopicOperation.CONSUME),
-                eq(PROXY_PRINCIPAL), argThat(arg -> {
-                    assertTrue(arg instanceof AuthenticationDataSubscription);
-                    try {
-                        assertEquals(arg.getCommandData(), 
proxyAuthenticationToken.getAuthData().getCommandData());
-                    } catch (PulsarClientException e) {
-                        fail(e.getMessage());
-                    }
-                    assertEquals(arg.getSubscription(), subscriptionName);
-                    return true;
-                }));
-    }
-
-    @Test
-    public void testVerifyOriginalPrincipalWithoutAuthDataForwardedFromProxy() 
throws Exception {
-        svcConfig.setAuthenticateOriginalAuthData(false);
-
-        ServerCnx serverCnx = pulsarTestContext.createServerCnxSpy();
-        ChannelHandlerContext channelHandlerContext = 
mock(ChannelHandlerContext.class);
-        Channel channel = mock(Channel.class);
-        ChannelPipeline channelPipeline = mock(ChannelPipeline.class);
-        doReturn(channelPipeline).when(channel).pipeline();
-        
doReturn(null).when(channelPipeline).get(PulsarChannelInitializer.TLS_HANDLER);
-
-        SocketAddress socketAddress = new InetSocketAddress(0);
-        doReturn(socketAddress).when(channel).remoteAddress();
-        doReturn(channel).when(channelHandlerContext).channel();
-        channelHandlerContext.channel().remoteAddress();
-        serverCnx.channelActive(channelHandlerContext);
-
-        // connect
-        AuthenticationToken proxyAuthenticationToken = new 
AuthenticationToken(PROXY_TOKEN);
-        CommandConnect connect = new CommandConnect();
-        
connect.setAuthMethodName(proxyAuthenticationToken.getAuthMethodName());
-        
connect.setAuthData(proxyAuthenticationToken.getAuthData().getCommandData().getBytes(StandardCharsets.UTF_8));
-        connect.setClientVersion("test");
-        connect.setProtocolVersion(1);
-        connect.setOriginalPrincipal(CLIENT_PRINCIPAL);
-        serverCnx.handleConnect(connect);
-        assertNull(serverCnx.getOriginalAuthData());
-        assertNull(serverCnx.getOriginalAuthState());
-        assertEquals(serverCnx.getOriginalPrincipal(), CLIENT_PRINCIPAL);
-        assertEquals(serverCnx.getAuthData().getCommandData(),
-                proxyAuthenticationToken.getAuthData().getCommandData());
-        assertEquals(serverCnx.getAuthRole(), PROXY_PRINCIPAL);
-        assertEquals(serverCnx.getAuthState().getAuthRole(), PROXY_PRINCIPAL);
-
-        AuthorizationService authorizationService =
-                
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class, 
svcConfig,
-                        pulsarTestContext.getPulsarResources());
-        
doReturn(authorizationService).when(brokerService).getAuthorizationService();
-
-        // lookup
-        CommandLookupTopic commandLookupTopic = new CommandLookupTopic();
-        TopicName topicName = 
TopicName.get("persistent://public/default/test-topic");
-        commandLookupTopic.setTopic(topicName.toString());
-        commandLookupTopic.setRequestId(1);
-        serverCnx.handleLookup(commandLookupTopic);
-        verify(authorizationService, 
times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
-                CLIENT_PRINCIPAL,
-                serverCnx.getAuthData());
-        verify(authorizationService, 
times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
-                PROXY_PRINCIPAL,
-                serverCnx.getAuthData());
-
-        // producer
-        CommandProducer commandProducer = new CommandProducer();
-        commandProducer.setRequestId(1);
-        commandProducer.setProducerId(1);
-        commandProducer.setProducerName("test-producer");
-        commandProducer.setTopic(topicName.toString());
-        serverCnx.handleProducer(commandProducer);
-        verify(authorizationService, 
times(1)).allowTopicOperationAsync(topicName, TopicOperation.PRODUCE,
-                CLIENT_PRINCIPAL,
-                serverCnx.getAuthData());
-        verify(authorizationService, 
times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
-                PROXY_PRINCIPAL,
-                serverCnx.getAuthData());
-
-        // consumer
-        CommandSubscribe commandSubscribe = new CommandSubscribe();
-        commandSubscribe.setTopic(topicName.toString());
-        commandSubscribe.setRequestId(1);
-        commandSubscribe.setConsumerId(1);
-        final String subscriptionName = "test-subscribe";
-        commandSubscribe.setSubscription("test-subscribe");
-        commandSubscribe.setSubType(CommandSubscribe.SubType.Shared);
-        serverCnx.handleSubscribe(commandSubscribe);
-
-        ArgumentMatcher<AuthenticationDataSource> 
authenticationDataSourceArgumentMatcher = arg -> {
-            assertTrue(arg instanceof AuthenticationDataSubscription);
-            try {
-                assertEquals(arg.getCommandData(), 
proxyAuthenticationToken.getAuthData().getCommandData());
-            } catch (PulsarClientException e) {
-                fail(e.getMessage());
-            }
-            assertEquals(arg.getSubscription(), subscriptionName);
-            return true;
-        };
-
-        verify(authorizationService, times(1)).allowTopicOperationAsync(
-                eq(topicName), eq(TopicOperation.CONSUME),
-                eq(CLIENT_PRINCIPAL), 
argThat(authenticationDataSourceArgumentMatcher));
-        verify(authorizationService, times(1)).allowTopicOperationAsync(
-                eq(topicName), eq(TopicOperation.CONSUME),
-                eq(PROXY_PRINCIPAL), 
argThat(authenticationDataSourceArgumentMatcher));
-    }
-
-    @Test
-    public void testVerifyAuthRoleAndAuthDataFromDirectConnectionBroker() 
throws Exception {
-        ServerCnx serverCnx = pulsarTestContext.createServerCnxSpy();
-
-        ChannelHandlerContext channelHandlerContext = 
mock(ChannelHandlerContext.class);
-        Channel channel = mock(Channel.class);
-        ChannelPipeline channelPipeline = mock(ChannelPipeline.class);
-        doReturn(channelPipeline).when(channel).pipeline();
-        
doReturn(null).when(channelPipeline).get(PulsarChannelInitializer.TLS_HANDLER);
-
-        SocketAddress socketAddress = new InetSocketAddress(0);
-        doReturn(socketAddress).when(channel).remoteAddress();
-        doReturn(channel).when(channelHandlerContext).channel();
-        channelHandlerContext.channel().remoteAddress();
-        serverCnx.channelActive(channelHandlerContext);
-
-        // connect
-        AuthenticationToken clientAuthenticationToken = new 
AuthenticationToken(CLIENT_TOKEN);
-        CommandConnect connect = new CommandConnect();
-        
connect.setAuthMethodName(clientAuthenticationToken.getAuthMethodName());
-        
connect.setAuthData(clientAuthenticationToken.getAuthData().getCommandData().getBytes(StandardCharsets.UTF_8));
-        connect.setClientVersion("test");
-        connect.setProtocolVersion(1);
-        serverCnx.handleConnect(connect);
-        assertNull(serverCnx.getOriginalAuthData());
-        assertNull(serverCnx.getOriginalAuthState());
-        assertNull(serverCnx.getOriginalPrincipal());
-        assertEquals(serverCnx.getAuthData().getCommandData(),
-                clientAuthenticationToken.getAuthData().getCommandData());
-        assertEquals(serverCnx.getAuthRole(), CLIENT_PRINCIPAL);
-        assertEquals(serverCnx.getAuthState().getAuthRole(), CLIENT_PRINCIPAL);
-
-        AuthorizationService authorizationService =
-                
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class, 
svcConfig,
-                        pulsarTestContext.getPulsarResources());
-        
doReturn(authorizationService).when(brokerService).getAuthorizationService();
-
-        // lookup
-        CommandLookupTopic commandLookupTopic = new CommandLookupTopic();
-        TopicName topicName = 
TopicName.get("persistent://public/default/test-topic");
-        commandLookupTopic.setTopic(topicName.toString());
-        commandLookupTopic.setRequestId(1);
-        serverCnx.handleLookup(commandLookupTopic);
-        verify(authorizationService, 
times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
-                CLIENT_PRINCIPAL,
-                serverCnx.getAuthData());
-
-        // producer
-        CommandProducer commandProducer = new CommandProducer();
-        commandProducer.setRequestId(1);
-        commandProducer.setProducerId(1);
-        commandProducer.setProducerName("test-producer");
-        commandProducer.setTopic(topicName.toString());
-        serverCnx.handleProducer(commandProducer);
-        verify(authorizationService, 
times(1)).allowTopicOperationAsync(topicName, TopicOperation.PRODUCE,
-                CLIENT_PRINCIPAL,
-                serverCnx.getAuthData());
-
-        // consumer
-        CommandSubscribe commandSubscribe = new CommandSubscribe();
-        commandSubscribe.setTopic(topicName.toString());
-        commandSubscribe.setRequestId(1);
-        commandSubscribe.setConsumerId(1);
-        final String subscriptionName = "test-subscribe";
-        commandSubscribe.setSubscription("test-subscribe");
-        commandSubscribe.setSubType(CommandSubscribe.SubType.Shared);
-        serverCnx.handleSubscribe(commandSubscribe);
-
-        verify(authorizationService, times(1)).allowTopicOperationAsync(
-                eq(topicName), eq(TopicOperation.CONSUME),
-                eq(CLIENT_PRINCIPAL), argThat(arg -> {
-                    assertTrue(arg instanceof AuthenticationDataSubscription);
-                    try {
-                        assertEquals(arg.getCommandData(), 
clientAuthenticationToken.getAuthData().getCommandData());
-                    } catch (PulsarClientException e) {
-                        fail(e.getMessage());
-                    }
-                    assertEquals(arg.getSubscription(), subscriptionName);
-                    return true;
-                }));
-    }
-}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 3dcea7e4bd7..5b45a16d3dc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -20,11 +20,14 @@
 package org.apache.pulsar.broker.service;
 
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.CALLS_REAL_METHODS;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -71,13 +74,13 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.broker.TransactionMetadataStoreService;
 import org.apache.pulsar.broker.auth.MockAuthenticationProvider;
 import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
-import org.apache.pulsar.broker.authentication.AuthenticationState;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.broker.namespace.NamespaceService;
@@ -107,6 +110,7 @@ import 
org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataRespons
 import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
 import org.apache.pulsar.common.api.proto.CommandSendError;
 import org.apache.pulsar.common.api.proto.CommandSendReceipt;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.CommandSuccess;
@@ -120,6 +124,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Commands.ChecksumType;
@@ -357,34 +362,117 @@ public class ServerCnxTest {
     @Test(timeOut = 30000)
     public void testConnectCommandWithAuthenticationPositive() throws 
Exception {
         AuthenticationService authenticationService = 
mock(AuthenticationService.class);
-        AuthenticationProvider authenticationProvider = 
mock(AuthenticationProvider.class);
-        AuthenticationState authenticationState = 
mock(AuthenticationState.class);
-        AuthData authData = AuthData.of(null);
+        AuthenticationProvider authenticationProvider = new 
MockAuthenticationProvider();
+        String authMethodName = authenticationProvider.getAuthMethodName();
 
-        
doReturn(authenticationService).when(brokerService).getAuthenticationService();
-        
doReturn(authenticationProvider).when(authenticationService).getAuthenticationProvider(Mockito.anyString());
-        doReturn(authenticationState).when(authenticationProvider)
-                .newAuthState(Mockito.any(), Mockito.any(), Mockito.any());
-        doReturn(authData).when(authenticationState)
-                .authenticate(authData);
-        doReturn(true).when(authenticationState)
-                .isComplete();
+        
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+        
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+        svcConfig.setAuthenticationEnabled(true);
+
+        resetChannel();
+        assertTrue(channel.isActive());
+        assertEquals(serverCnx.getState(), State.Start);
 
-        doReturn("appid1").when(authenticationState)
-                .getAuthRole();
+        // test server response to CONNECT
+        ByteBuf clientCommand = Commands.newConnect(authMethodName, 
"pass.client", null);
+        channel.writeInbound(clientCommand);
+
+        assertTrue(getResponse() instanceof CommandConnected);
+        assertEquals(serverCnx.getState(), State.Connected);
+        assertEquals(serverCnx.getPrincipal(), "pass.client");
+        assertTrue(serverCnx.isActive());
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void 
testConnectCommandWithoutOriginalAuthInfoWhenAuthenticateOriginalAuthData() 
throws Exception {
+        AuthenticationService authenticationService = 
mock(AuthenticationService.class);
+        AuthenticationProvider authenticationProvider = new 
MockAuthenticationProvider();
+        String authMethodName = authenticationProvider.getAuthMethodName();
 
+        
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+        
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
         svcConfig.setAuthenticationEnabled(true);
+        svcConfig.setAuthenticateOriginalAuthData(true);
 
         resetChannel();
         assertTrue(channel.isActive());
         assertEquals(serverCnx.getState(), State.Start);
 
-        // test server response to CONNECT
-        ByteBuf clientCommand = Commands.newConnect("none", "", null);
+        ByteBuf clientCommand = Commands.newConnect(authMethodName, 
"pass.client", "");
         channel.writeInbound(clientCommand);
 
+        Object response1 = getResponse();
+        assertTrue(response1 instanceof CommandConnected);
         assertEquals(serverCnx.getState(), State.Connected);
-        assertTrue(getResponse() instanceof CommandConnected);
+        assertEquals(serverCnx.getAuthRole(), "pass.client");
+        assertEquals(serverCnx.getPrincipal(), "pass.client");
+        assertNull(serverCnx.getOriginalPrincipal());
+        assertTrue(serverCnx.isActive());
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void testConnectCommandWithPassingOriginalAuthData() throws 
Exception {
+        AuthenticationService authenticationService = 
mock(AuthenticationService.class);
+        AuthenticationProvider authenticationProvider = new 
MockAuthenticationProvider();
+        String authMethodName = authenticationProvider.getAuthMethodName();
+
+        
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+        
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+        svcConfig.setAuthenticationEnabled(true);
+        svcConfig.setAuthenticateOriginalAuthData(true);
+        svcConfig.setProxyRoles(Collections.singleton("pass.proxy"));
+
+        resetChannel();
+        assertTrue(channel.isActive());
+        assertEquals(serverCnx.getState(), State.Start);
+
+        ByteBuf clientCommand = Commands.newConnect(authMethodName, 
"pass.proxy", 1, null,
+                null, "client", "pass.client", authMethodName);
+        channel.writeInbound(clientCommand);
+
+        Object response1 = getResponse();
+        assertTrue(response1 instanceof CommandConnected);
+        assertEquals(serverCnx.getState(), State.Connected);
+        // Note that this value will change to the client's data if the broker 
sends an AuthChallenge to the
+        // proxy/client. Details described here 
https://github.com/apache/pulsar/issues/19332.
+        assertEquals(serverCnx.getAuthRole(), "pass.proxy");
+        // These are all taken without verifying the auth data
+        assertEquals(serverCnx.getPrincipal(), "pass.client");
+        assertEquals(serverCnx.getOriginalPrincipal(), "pass.client");
+        assertTrue(serverCnx.isActive());
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void testConnectCommandWithPassingOriginalPrincipal() throws 
Exception {
+        AuthenticationService authenticationService = 
mock(AuthenticationService.class);
+        AuthenticationProvider authenticationProvider = new 
MockAuthenticationProvider();
+        String authMethodName = authenticationProvider.getAuthMethodName();
+
+        
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+        
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+        svcConfig.setAuthenticationEnabled(true);
+        svcConfig.setAuthenticateOriginalAuthData(false);
+        svcConfig.setProxyRoles(Collections.singleton("pass.proxy"));
+
+        resetChannel();
+        assertTrue(channel.isActive());
+        assertEquals(serverCnx.getState(), State.Start);
+
+        ByteBuf clientCommand = Commands.newConnect(authMethodName, 
"pass.proxy", 1, null,
+                null, "client", "pass.client", authMethodName);
+        channel.writeInbound(clientCommand);
+
+        Object response1 = getResponse();
+        assertTrue(response1 instanceof CommandConnected);
+        assertEquals(serverCnx.getState(), State.Connected);
+        assertEquals(serverCnx.getAuthRole(), "pass.proxy");
+        // These are all taken without verifying the auth data
+        assertEquals(serverCnx.getPrincipal(), "client");
+        assertEquals(serverCnx.getOriginalPrincipal(), "client");
+        assertTrue(serverCnx.isActive());
         channel.finish();
     }
 
@@ -418,7 +506,7 @@ public class ServerCnxTest {
         
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
         svcConfig.setAuthenticationEnabled(true);
         svcConfig.setAuthenticateOriginalAuthData(true);
-        svcConfig.setProxyRoles(Collections.singleton("proxy"));
+        svcConfig.setProxyRoles(Collections.singleton("pass.proxy"));
 
         resetChannel();
         assertTrue(channel.isActive());
@@ -428,14 +516,9 @@ public class ServerCnxTest {
                 null, "client", "fail", authMethodName);
         channel.writeInbound(clientCommand);
 
-        // We currently expect two responses because the originalAuthData is 
verified after sending
-        // a successful response to the proxy. Because this is a synchronous 
operation, there is currently
-        // no risk. It would be better to fix this. See 
https://github.com/apache/pulsar/issues/19311.
         Object response1 = getResponse();
-        assertTrue(response1 instanceof CommandConnected);
-        Object response2 = getResponse();
-        assertTrue(response2 instanceof CommandError);
-        assertEquals(((CommandError) response2).getMessage(), "Unable to 
authenticate");
+        assertTrue(response1 instanceof CommandError);
+        assertEquals(((CommandError) response1).getMessage(), "Failed to 
authenticate");
         assertEquals(serverCnx.getState(), State.Failed);
         assertFalse(serverCnx.isActive());
         channel.finish();
@@ -461,6 +544,7 @@ public class ServerCnxTest {
 
         Object challenge1 = getResponse();
         assertTrue(challenge1 instanceof CommandAuthChallenge);
+        assertEquals(serverCnx.getState(), State.Connecting);
 
         // Trigger another AuthChallenge to verify that code path continues to 
challenge
         ByteBuf authResponse1 =
@@ -469,6 +553,7 @@ public class ServerCnxTest {
 
         Object challenge2 = getResponse();
         assertTrue(challenge2 instanceof CommandAuthChallenge);
+        assertEquals(serverCnx.getState(), State.Connecting);
 
         // Trigger failure
         ByteBuf authResponse2 = Commands.newAuthResponse(authMethodName, 
AuthData.of("fail.client".getBytes()), 1, "1");
@@ -476,12 +561,320 @@ public class ServerCnxTest {
 
         Object response3 = getResponse();
         assertTrue(response3 instanceof CommandError);
-        assertEquals(((CommandError) response3).getMessage(), "Unable to 
authenticate");
+        assertEquals(((CommandError) response3).getMessage(), "Failed to 
authenticate");
         assertEquals(serverCnx.getState(), State.Failed);
         assertFalse(serverCnx.isActive());
         channel.finish();
     }
 
+    @Test(timeOut = 30000)
+    public void testOriginalAuthDataTriggersAuthChallengeFailure() throws 
Exception {
+        // Test verifies the current behavior in the absence of a solution for
+        // https://github.com/apache/pulsar/issues/19291. When that issue is 
completed, we can update this test
+        // to correctly verify that behavior.
+        AuthenticationService authenticationService = 
mock(AuthenticationService.class);
+        AuthenticationProvider authenticationProvider = new 
MockMultiStageAuthenticationProvider();
+        String authMethodName = authenticationProvider.getAuthMethodName();
+
+        
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+        
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+        svcConfig.setAuthenticationEnabled(true);
+        svcConfig.setAuthenticateOriginalAuthData(true);
+        svcConfig.setProxyRoles(Collections.singleton("pass.proxy"));
+
+        resetChannel();
+        assertTrue(channel.isActive());
+        assertEquals(serverCnx.getState(), State.Start);
+
+        // Trigger connect command to result in AuthChallenge
+        ByteBuf clientCommand = Commands.newConnect(authMethodName, 
"pass.proxy", 1, "1",
+                "localhost", "client", "challenge.client", authMethodName);
+        channel.writeInbound(clientCommand);
+
+        Object response = getResponse();
+        assertTrue(response instanceof CommandError);
+
+        assertEquals(((CommandError) response).getMessage(), "Failed to 
authenticate");
+        assertEquals(serverCnx.getState(), State.Failed);
+        assertFalse(serverCnx.isActive());
+        channel.finish();
+    }
+
+    // This test used to be in the ServerCnxAuthorizationTest class, but it 
was migrated here because the mocking
+    // in that class was too extensive. There is some overlap with this test 
and other tests in this class. The primary
+    // role of this test is verifying that the correct role and 
AuthenticationDataSource are passed to the
+    // AuthorizationService.
+    @Test
+    public void testVerifyOriginalPrincipalWithAuthDataForwardedFromProxy() 
throws Exception {
+        AuthenticationService authenticationService = 
mock(AuthenticationService.class);
+        AuthenticationProvider authenticationProvider = new 
MockAuthenticationProvider();
+        String authMethodName = authenticationProvider.getAuthMethodName();
+        
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+        
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+        svcConfig.setAuthenticationEnabled(true);
+        svcConfig.setAuthenticateOriginalAuthData(true);
+        svcConfig.setProxyRoles(Collections.singleton("pass.pass"));
+
+        
svcConfig.setAuthorizationProvider("org.apache.pulsar.broker.auth.MockAuthorizationProvider");
+        AuthorizationService authorizationService =
+                
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class, 
svcConfig,
+                        pulsarTestContext.getPulsarResources());
+        
when(brokerService.getAuthorizationService()).thenReturn(authorizationService);
+        svcConfig.setAuthorizationEnabled(true);
+
+        resetChannel();
+        assertTrue(channel.isActive());
+        assertEquals(serverCnx.getState(), State.Start);
+
+        // Connect
+        // This client role integrates with the MockAuthenticationProvider and 
MockAuthorizationProvider
+        // to pass authentication and fail authorization
+        String proxyRole = "pass.pass";
+        String clientRole = "pass.fail";
+        // Submit a failing originalPrincipal to show that it is not used at 
all.
+        ByteBuf connect = Commands.newConnect(authMethodName, proxyRole, 
"test", "localhost",
+                "fail.fail", clientRole, authMethodName);
+        channel.writeInbound(connect);
+        Object connectResponse = getResponse();
+        assertTrue(connectResponse instanceof CommandConnected);
+        assertEquals(serverCnx.getOriginalAuthData().getCommandData(), 
clientRole);
+        assertEquals(serverCnx.getOriginalAuthState().getAuthRole(), 
clientRole);
+        assertEquals(serverCnx.getOriginalPrincipal(), clientRole);
+        assertEquals(serverCnx.getAuthData().getCommandData(), proxyRole);
+        assertEquals(serverCnx.getAuthRole(), proxyRole);
+        assertEquals(serverCnx.getAuthState().getAuthRole(), proxyRole);
+
+        // Lookup
+        TopicName topicName = 
TopicName.get("persistent://public/default/test-topic");
+        ByteBuf lookup = Commands.newLookup(topicName.toString(), false, 1);
+        channel.writeInbound(lookup);
+        Object lookupResponse = getResponse();
+        assertTrue(lookupResponse instanceof CommandLookupTopicResponse);
+        assertEquals(((CommandLookupTopicResponse) lookupResponse).getError(), 
ServerError.AuthorizationError);
+        assertEquals(((CommandLookupTopicResponse) 
lookupResponse).getRequestId(), 1);
+        verify(authorizationService, times(1))
+                .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, 
proxyRole, serverCnx.getAuthData());
+        verify(authorizationService, times(1))
+                .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, 
clientRole, serverCnx.getOriginalAuthData());
+
+        // producer
+        ByteBuf producer = Commands.newProducer(topicName.toString(), 1, 2, 
"test-producer", new HashMap<>(), false);
+        channel.writeInbound(producer);
+        Object producerResponse = getResponse();
+        assertTrue(producerResponse instanceof CommandError);
+        assertEquals(((CommandError) producerResponse).getError(), 
ServerError.AuthorizationError);
+        assertEquals(((CommandError) producerResponse).getRequestId(), 2);
+        verify(authorizationService, times(1))
+                .allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, 
clientRole, serverCnx.getOriginalAuthData());
+        verify(authorizationService, times(1))
+                .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, 
proxyRole, serverCnx.getAuthData());
+
+        // consumer
+        String subscriptionName = "test-subscribe";
+        ByteBuf subscribe = Commands.newSubscribe(topicName.toString(), 
subscriptionName, 1, 3,
+                CommandSubscribe.SubType.Shared, 0, "consumer", 0);
+        channel.writeInbound(subscribe);
+        Object subscribeResponse = getResponse();
+        assertTrue(subscribeResponse instanceof CommandError);
+        assertEquals(((CommandError) subscribeResponse).getError(), 
ServerError.AuthorizationError);
+        assertEquals(((CommandError) subscribeResponse).getRequestId(), 3);
+        verify(authorizationService, times(1)).allowTopicOperationAsync(
+                eq(topicName), eq(TopicOperation.CONSUME),
+                eq(clientRole), argThat(arg -> {
+                    assertTrue(arg instanceof AuthenticationDataSubscription);
+                    assertEquals(arg.getCommandData(), clientRole);
+                    assertEquals(arg.getSubscription(), subscriptionName);
+                    return true;
+                }));
+        verify(authorizationService, times(1)).allowTopicOperationAsync(
+                eq(topicName), eq(TopicOperation.CONSUME),
+                eq(proxyRole), argThat(arg -> {
+                    assertTrue(arg instanceof AuthenticationDataSubscription);
+                    assertEquals(arg.getCommandData(), proxyRole);
+                    assertEquals(arg.getSubscription(), subscriptionName);
+                    return true;
+                }));
+    }
+
+    // This test used to be in the ServerCnxAuthorizationTest class, but it 
was migrated here because the mocking
+    // in that class was too extensive. There is some overlap with this test 
and other tests in this class. The primary
+    // role of this test is verifying that the correct role and 
AuthenticationDataSource are passed to the
+    // AuthorizationService.
+    public void testVerifyOriginalPrincipalWithoutAuthDataForwardedFromProxy() 
throws Exception {
+        AuthenticationService authenticationService = 
mock(AuthenticationService.class);
+        AuthenticationProvider authenticationProvider = new 
MockAuthenticationProvider();
+        String authMethodName = authenticationProvider.getAuthMethodName();
+        
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+        
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+        svcConfig.setAuthenticationEnabled(true);
+        svcConfig.setAuthenticateOriginalAuthData(false);
+        svcConfig.setProxyRoles(Collections.singleton("pass.pass"));
+
+        
svcConfig.setAuthorizationProvider("org.apache.pulsar.broker.auth.MockAuthorizationProvider");
+        AuthorizationService authorizationService =
+                
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class, 
svcConfig,
+                        pulsarTestContext.getPulsarResources());
+        
when(brokerService.getAuthorizationService()).thenReturn(authorizationService);
+        svcConfig.setAuthorizationEnabled(true);
+
+        resetChannel();
+        assertTrue(channel.isActive());
+        assertEquals(serverCnx.getState(), State.Start);
+
+        // Connect
+        // This client role integrates with the MockAuthenticationProvider and 
MockAuthorizationProvider
+        // to pass authentication and fail authorization
+        String proxyRole = "pass.pass";
+        String clientRole = "pass.fail";
+        ByteBuf connect = Commands.newConnect(authMethodName, proxyRole, 
"test", "localhost",
+                clientRole, null, null);
+        channel.writeInbound(connect);
+        Object connectResponse = getResponse();
+        assertTrue(connectResponse instanceof CommandConnected);
+        assertNull(serverCnx.getOriginalAuthData());
+        assertNull(serverCnx.getOriginalAuthState());
+        assertEquals(serverCnx.getOriginalPrincipal(), clientRole);
+        assertEquals(serverCnx.getAuthData().getCommandData(), proxyRole);
+        assertEquals(serverCnx.getAuthRole(), proxyRole);
+        assertEquals(serverCnx.getAuthState().getAuthRole(), proxyRole);
+
+        // Lookup
+        TopicName topicName = 
TopicName.get("persistent://public/default/test-topic");
+        ByteBuf lookup = Commands.newLookup(topicName.toString(), false, 1);
+        channel.writeInbound(lookup);
+        Object lookupResponse = getResponse();
+        assertTrue(lookupResponse instanceof CommandLookupTopicResponse);
+        assertEquals(((CommandLookupTopicResponse) lookupResponse).getError(), 
ServerError.AuthorizationError);
+        assertEquals(((CommandLookupTopicResponse) 
lookupResponse).getRequestId(), 1);
+        verify(authorizationService, times(1))
+                .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, 
proxyRole, serverCnx.getAuthData());
+        // This test is an example of 
https://github.com/apache/pulsar/issues/19332. Essentially, we're passing
+        // the proxy's auth data because it is all we have. This test should 
be updated when we resolve that issue.
+        verify(authorizationService, times(1))
+                .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, 
clientRole, serverCnx.getAuthData());
+
+        // producer
+        ByteBuf producer = Commands.newProducer(topicName.toString(), 1, 2, 
"test-producer", new HashMap<>(), false);
+        channel.writeInbound(producer);
+        Object producerResponse = getResponse();
+        assertTrue(producerResponse instanceof CommandError);
+        assertEquals(((CommandError) producerResponse).getError(), 
ServerError.AuthorizationError);
+        assertEquals(((CommandError) producerResponse).getRequestId(), 2);
+        // See https://github.com/apache/pulsar/issues/19332 for justification 
of this assertion.
+        verify(authorizationService, times(1))
+                .allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, 
clientRole, serverCnx.getAuthData());
+        verify(authorizationService, times(1))
+                .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, 
proxyRole, serverCnx.getAuthData());
+
+        // consumer
+        String subscriptionName = "test-subscribe";
+        ByteBuf subscribe = Commands.newSubscribe(topicName.toString(), 
subscriptionName, 1, 3,
+                CommandSubscribe.SubType.Shared, 0, "consumer", 0);
+        channel.writeInbound(subscribe);
+        Object subscribeResponse = getResponse();
+        assertTrue(subscribeResponse instanceof CommandError);
+        assertEquals(((CommandError) subscribeResponse).getError(), 
ServerError.AuthorizationError);
+        assertEquals(((CommandError) subscribeResponse).getRequestId(), 3);
+        verify(authorizationService, times(1)).allowTopicOperationAsync(
+                eq(topicName), eq(TopicOperation.CONSUME),
+                eq(clientRole), argThat(arg -> {
+                    assertTrue(arg instanceof AuthenticationDataSubscription);
+                    // We assert that the role is clientRole and commandData 
is proxyRole due to
+                    // https://github.com/apache/pulsar/issues/19332.
+                    assertEquals(arg.getCommandData(), proxyRole);
+                    assertEquals(arg.getSubscription(), subscriptionName);
+                    return true;
+                }));
+        verify(authorizationService, times(1)).allowTopicOperationAsync(
+                eq(topicName), eq(TopicOperation.CONSUME),
+                eq(proxyRole), argThat(arg -> {
+                    assertTrue(arg instanceof AuthenticationDataSubscription);
+                    assertEquals(arg.getCommandData(), proxyRole);
+                    assertEquals(arg.getSubscription(), subscriptionName);
+                    return true;
+                }));
+    }
+
+    // This test used to be in the ServerCnxAuthorizationTest class, but it 
was migrated here because the mocking
+    // in that class was too extensive. There is some overlap with this test 
and other tests in this class. The primary
+    // role of this test is verifying that the correct role and 
AuthenticationDataSource are passed to the
+    // AuthorizationService.
+    @Test
+    public void testVerifyAuthRoleAndAuthDataFromDirectConnectionBroker() 
throws Exception {
+        AuthenticationService authenticationService = 
mock(AuthenticationService.class);
+        AuthenticationProvider authenticationProvider = new 
MockAuthenticationProvider();
+        String authMethodName = authenticationProvider.getAuthMethodName();
+        
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+        
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+        svcConfig.setAuthenticationEnabled(true);
+
+        
svcConfig.setAuthorizationProvider("org.apache.pulsar.broker.auth.MockAuthorizationProvider");
+        AuthorizationService authorizationService =
+                
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class, 
svcConfig,
+                        pulsarTestContext.getPulsarResources());
+        
when(brokerService.getAuthorizationService()).thenReturn(authorizationService);
+        svcConfig.setAuthorizationEnabled(true);
+
+        resetChannel();
+        assertTrue(channel.isActive());
+        assertEquals(serverCnx.getState(), State.Start);
+
+        // connect
+        // This client role integrates with the MockAuthenticationProvider and 
MockAuthorizationProvider
+        // to pass authentication and fail authorization
+        String clientRole = "pass.fail";
+        ByteBuf connect = Commands.newConnect(authMethodName, clientRole, 
"test");
+        channel.writeInbound(connect);
+
+        Object connectResponse = getResponse();
+        assertTrue(connectResponse instanceof CommandConnected);
+        assertNull(serverCnx.getOriginalAuthData());
+        assertNull(serverCnx.getOriginalAuthState());
+        assertNull(serverCnx.getOriginalPrincipal());
+        assertEquals(serverCnx.getAuthData().getCommandData(), clientRole);
+        assertEquals(serverCnx.getAuthRole(), clientRole);
+        assertEquals(serverCnx.getAuthState().getAuthRole(), clientRole);
+
+        // lookup
+        TopicName topicName = 
TopicName.get("persistent://public/default/test-topic");
+        ByteBuf lookup = Commands.newLookup(topicName.toString(), false, 1);
+        channel.writeInbound(lookup);
+        Object lookupResponse = getResponse();
+        assertTrue(lookupResponse instanceof CommandLookupTopicResponse);
+        assertEquals(((CommandLookupTopicResponse) lookupResponse).getError(), 
ServerError.AuthorizationError);
+        assertEquals(((CommandLookupTopicResponse) 
lookupResponse).getRequestId(), 1);
+        verify(authorizationService, times(1))
+                .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, 
clientRole, serverCnx.getAuthData());
+
+        // producer
+        ByteBuf producer = Commands.newProducer(topicName.toString(), 1, 2, 
"test-producer", new HashMap<>(), false);
+        channel.writeInbound(producer);
+        Object producerResponse = getResponse();
+        assertTrue(producerResponse instanceof CommandError);
+        assertEquals(((CommandError) producerResponse).getError(), 
ServerError.AuthorizationError);
+        assertEquals(((CommandError) producerResponse).getRequestId(), 2);
+        verify(authorizationService, times(1))
+                .allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, 
clientRole, serverCnx.getAuthData());
+
+        // consumer
+        String subscriptionName = "test-subscribe";
+        ByteBuf subscribe = Commands.newSubscribe(topicName.toString(), 
subscriptionName, 1, 3,
+                CommandSubscribe.SubType.Shared, 0, "consumer", 0);
+        channel.writeInbound(subscribe);
+        Object subscribeResponse = getResponse();
+        assertTrue(subscribeResponse instanceof CommandError);
+        assertEquals(((CommandError) subscribeResponse).getError(), 
ServerError.AuthorizationError);
+        assertEquals(((CommandError) subscribeResponse).getRequestId(), 3);
+        verify(authorizationService, times(1)).allowTopicOperationAsync(
+                eq(topicName), eq(TopicOperation.CONSUME),
+                eq(clientRole), argThat(arg -> {
+                    assertTrue(arg instanceof AuthenticationDataSubscription);
+                    assertEquals(arg.getCommandData(), clientRole);
+                    assertEquals(arg.getSubscription(), subscriptionName);
+                    return true;
+                }));
+    }
+
     @Test(timeOut = 30000)
     public void testProducerCommand() throws Exception {
         resetChannel();
diff --git 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
index 4233c5cfdfa..9119ffed4e2 100644
--- 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
+++ 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
@@ -209,7 +209,7 @@ public class TestPulsarAuth extends 
MockedPulsarServiceBaseTest {
             Assert.fail(); // should fail
         } catch (TrinoException e){
             Assert.assertEquals(PERMISSION_DENIED.toErrorCode(), 
e.getErrorCode());
-            Assert.assertTrue(e.getMessage().contains("Unable to 
authenticate"));
+            Assert.assertTrue(e.getMessage().contains("Failed to 
authenticate"));
         }
 
         pulsarAuth.cleanSession(session);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
index 3f0a6e0338f..0a9bb5e1959 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
@@ -141,7 +141,7 @@ public class TestPulsarSQLAuth extends TestPulsarSQLBase {
                         // Authorization error
                         assertEquals(e.getResult().getExitCode(), 1);
                         log.info(e.getResult().getStderr());
-                        assertTrue(e.getResult().getStderr().contains("Unable 
to authenticate"));
+                        assertTrue(e.getResult().getStderr().contains("Failed 
to authenticate"));
                     }
                 }
         );

Reply via email to