nodece commented on code in PR #18130:
URL: https://github.com/apache/pulsar/pull/18130#discussion_r1073229516


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -676,73 +678,93 @@ private void completeConnect(int clientProtoVersion, 
String clientVersion, boole
     }
 
     // According to auth result, send newConnected or newAuthChallenge command.
-    private State doAuthentication(AuthData clientData,
-                                   int clientProtocolVersion,
-                                   String clientVersion) throws Exception {
+    private CompletableFuture<Void> doAuthenticationAsync(AuthData clientData, 
int clientProtocolVersion,
+                                                          String 
clientVersion) {
+        boolean useOriginalAuthState = (originalAuthState != null);
+        if (state == State.Connected) {
+            // For auth challenge, the authentication state requires to be 
updated.
+            if (log.isDebugEnabled()) {
+                log.debug("Refreshing authenticate state, original auth state: 
{}, original auth role: {}, "
+                                + "auth role: {}",
+                        useOriginalAuthState, originalPrincipal, authRole);
+            }
+            try {
+                if (useOriginalAuthState) {
+                    originalAuthState =
+                            
originalAuthenticationProvider.newAuthState(clientData, remoteAddress, 
sslSession);
+                } else {
+                    authState = 
authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
+                }
+            } catch (AuthenticationException e) {
+                return CompletableFuture.failedFuture(e);
+            }
+        }
 
         // The original auth state can only be set on subsequent auth attempts 
(and only
         // in presence of a proxy and if the proxy is forwarding the 
credentials).
         // In this case, the re-validation needs to be done against the 
original client
         // credentials.
-        boolean useOriginalAuthState = (originalAuthState != null);
-        AuthenticationState authState =  useOriginalAuthState ? 
originalAuthState : this.authState;
+        AuthenticationState authState = useOriginalAuthState ? 
originalAuthState : this.authState;
         String authRole = useOriginalAuthState ? originalPrincipal : 
this.authRole;
-        AuthData brokerData = authState.authenticate(clientData);
 
-        if (log.isDebugEnabled()) {
-            log.debug("Authenticate using original auth state : {}, role = 
{}", useOriginalAuthState, authRole);
-        }
-
-        if (authState.isComplete()) {
-            // Authentication has completed. It was either:
-            // 1. the 1st time the authentication process was done, in which 
case we'll send
-            //    a `CommandConnected` response
-            // 2. an authentication refresh, in which case we need to refresh 
authenticationData
-
-            String newAuthRole = authState.getAuthRole();
-
-            // Refresh the auth data.
-            this.authenticationData = authState.getAuthDataSource();
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Auth data refreshed for role={}", 
remoteAddress, this.authRole);
-            }
-
-            if (!useOriginalAuthState) {
-                this.authRole = newAuthRole;
-            }
-
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Client successfully authenticated with {} role 
{} and originalPrincipal {}",
-                        remoteAddress, authMethod, this.authRole, 
originalPrincipal);
-            }
+        CompletableFuture<AuthData> authFuture = 
CompletableFuture.completedFuture(null);
+        if (!authState.isComplete()) {
+            authFuture = authState.authenticateAsync(clientData);
+        }
+        return authFuture.thenCompose(nextAuthData -> {
+            if (nextAuthData == null) {
+                // Authentication has completed. It was either:
+                // 1. the 1st time the authentication process was done, in 
which case we'll send
+                //    a `CommandConnected` response
+                // 2. an authentication refresh, in which case we need to 
refresh authenticationData and role
+                String newAuthRole;
+                try {
+                    newAuthRole = authState.getAuthRole();
+                } catch (AuthenticationException e) {
+                    return CompletableFuture.failedFuture(e);
+                }
+                AuthenticationDataSource newAuthDataSource = 
authState.getAuthDataSource();
+                if (useOriginalAuthState) {
+                    this.originalAuthData = newAuthDataSource;
+                } else {
+                    this.authRole = newAuthRole;
+                    this.authenticationData = newAuthDataSource;
+                }
 
-            if (state != State.Connected) {
-                // First time authentication is done
-                completeConnect(clientProtocolVersion, clientVersion, 
enableSubscriptionPatternEvaluation);
-            } else {
-                // If the connection was already ready, it means we're doing a 
refresh
-                if (!StringUtils.isEmpty(authRole)) {
-                    if (!authRole.equals(newAuthRole)) {
-                        log.warn("[{}] Principal cannot change during an 
authentication refresh expected={} got={}",
-                                remoteAddress, authRole, newAuthRole);
-                        ctx.close();
-                    } else {
-                        log.info("[{}] Refreshed authentication credentials 
for role {}", remoteAddress, authRole);
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Client successfully authenticated with {} 
role {} and originalPrincipal {}, "
+                                    + "using original auth state: {}",
+                            remoteAddress, authMethod, this.authRole, 
originalPrincipal, originalAuthState);
+                }
+                if (state != State.Connected) {
+                    // First time authentication is done
+                    completeConnect(clientProtocolVersion, clientVersion, 
enableSubscriptionPatternEvaluation);
+                } else {
+                    // If the connection was already ready, it means we're 
doing a refresh
+                    if (!StringUtils.isEmpty(authRole)) {
+                        if (!authRole.equals(newAuthRole)) {
+                            return CompletableFuture.failedFuture(new 
AuthenticationException(String.format(
+                                    "Principal cannot change during an 
authentication refresh expected=%s got=%s",
+                                    authRole, newAuthRole)));
+                        } else {
+                            log.info("[{}] Refreshed authentication 
credentials for role {}", remoteAddress, authRole);
+                        }
                     }
+                    state = State.Connected;

Review Comment:
   I would like to know why this is not a safe operation. Are you worried that 
some other handler will change this state?
   
   > I think we should remove all of the async references in this PR and just 
focus on fixing the underlying problem with authentication data.
   
   Good point out.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to