This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 8246da282ca [fix][broker] ServerCnx broken after recent cherry-picks 
(#19521)
8246da282ca is described below

commit 8246da282ca38e891bdf8a4e9abc47f640b22384
Author: Michael Marshall <[email protected]>
AuthorDate: Tue Feb 14 19:35:24 2023 -0600

    [fix][broker] ServerCnx broken after recent cherry-picks (#19521)
    
    ### Motivation
    
    I broke all release branches when I cherry picked 
2847dd19f6e8a546f4d45bf51eb2b72aae0869ce to them. This change takes some of the 
underlying logic from #19409, without taking the async logic.
    
    ### Modifications
    
    * Make changes to `ServerCnx` to make tests pass
    
    ### Verifying this change
    
    Tests are currently failing, so passing tests will show that this solution 
is correct.
    
    ### Documentation
    
    - [x] `doc-not-needed`
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 51 +++++++++++-----------
 1 file changed, 25 insertions(+), 26 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 6d0473ed99c..a297b437b1f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -634,16 +634,6 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
     // complete the connect and sent newConnected command
     private void completeConnect(int clientProtoVersion, String clientVersion, 
boolean supportsTopicWatchers) {
-        if (service.isAuthenticationEnabled() && 
service.isAuthorizationEnabled()) {
-            if (!service.getAuthorizationService()
-                    .isValidOriginalPrincipal(authRole, originalPrincipal, 
remoteAddress)) {
-                state = State.Failed;
-                service.getPulsarStats().recordConnectionCreateFail();
-                final ByteBuf msg = Commands.newError(-1, 
ServerError.AuthorizationError, "Invalid roles.");
-                
ctx.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
-                return;
-            }
-        }
         ctx.writeAndFlush(Commands.newConnected(clientProtoVersion, 
maxMessageSize, supportsTopicWatchers));
         state = State.Connected;
         service.getPulsarStats().recordConnectionCreateSuccess();
@@ -660,7 +650,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     }
 
     // According to auth result, send newConnected or newAuthChallenge command.
-    private State doAuthentication(AuthData clientData,
+    private void doAuthentication(AuthData clientData,
+                                   boolean useOriginalAuthState,
                                    int clientProtocolVersion,
                                    String clientVersion) throws Exception {
 
@@ -668,8 +659,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         // in presence of a proxy and if the proxy is forwarding the 
credentials).
         // In this case, the re-validation needs to be done against the 
original client
         // credentials.
-        boolean useOriginalAuthState = (originalAuthState != null);
-        AuthenticationState authState =  useOriginalAuthState ? 
originalAuthState : this.authState;
+        AuthenticationState authState = useOriginalAuthState ? 
originalAuthState : this.authState;
         String authRole = useOriginalAuthState ? originalPrincipal : 
this.authRole;
         AuthData brokerData = authState.authenticate(clientData);
 
@@ -702,6 +692,16 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
             if (state != State.Connected) {
                 // First time authentication is done
+                if (service.isAuthenticationEnabled() && 
service.isAuthorizationEnabled()) {
+                    if (!service.getAuthorizationService()
+                            .isValidOriginalPrincipal(this.authRole, 
originalPrincipal, remoteAddress)) {
+                        state = State.Failed;
+                        service.getPulsarStats().recordConnectionCreateFail();
+                        final ByteBuf msg = Commands.newError(-1, 
ServerError.AuthorizationError, "Invalid roles.");
+                        
ctx.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
+                        return;
+                    }
+                }
                 completeConnect(clientProtocolVersion, clientVersion, 
enableSubscriptionPatternEvaluation);
             } else {
                 // If the connection was already ready, it means we're doing a 
refresh
@@ -715,18 +715,16 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     }
                 }
             }
+        } else {
 
-            return State.Connected;
-        }
-
-        // auth not complete, continue auth with client side.
-        ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, 
clientProtocolVersion));
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Authentication in progress client by method {}.",
-                remoteAddress, authMethod);
-            log.debug("[{}] connect state change to : [{}]", remoteAddress, 
State.Connecting.name());
+            // auth not complete, continue auth with client side.
+            ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, 
brokerData, clientProtocolVersion));
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Authentication in progress client by method 
{}.",
+                        remoteAddress, authMethod);
+                log.debug("[{}] connect state change to : [{}]", 
remoteAddress, State.Connecting.name());
+            }
         }
-        return State.Connecting;
     }
 
     public void refreshAuthenticationCredentials() {
@@ -824,6 +822,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             return;
         }
 
+        state = State.Connecting;
+
         try {
             byte[] authData = connect.hasAuthData() ? connect.getAuthData() : 
emptyArray;
             AuthData clientData = AuthData.of(authData);
@@ -871,8 +871,6 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 log.debug("[{}] Authenticate role : {}", remoteAddress, role);
             }
 
-            state = doAuthentication(clientData, clientProtocolVersion, 
clientVersion);
-
             // This will fail the check if:
             //  1. client is coming through a proxy
             //  2. we require to validate the original credentials
@@ -914,6 +912,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                         remoteAddress, originalPrincipal);
                 }
             }
+            doAuthentication(clientData, false, clientProtocolVersion, 
clientVersion);
         } catch (Exception e) {
             service.getPulsarStats().recordConnectionCreateFail();
             logAuthException(remoteAddress, "connect", getPrincipal(), 
Optional.empty(), e);
@@ -937,7 +936,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
         try {
             AuthData clientData = 
AuthData.of(authResponse.getResponse().getAuthData());
-            doAuthentication(clientData, authResponse.getProtocolVersion(),
+            doAuthentication(clientData, originalAuthState != null, 
authResponse.getProtocolVersion(),
                     authResponse.hasClientVersion() ? 
authResponse.getClientVersion() : EMPTY);
         } catch (AuthenticationException e) {
             service.getPulsarStats().recordConnectionCreateFail();

Reply via email to