michaeljmarshall commented on code in PR #18130:
URL: https://github.com/apache/pulsar/pull/18130#discussion_r1073155826


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -707,73 +709,96 @@ private void completeConnect(int clientProtoVersion, 
String clientVersion, boole
     }
 
     // According to auth result, send newConnected or newAuthChallenge command.
-    private State doAuthentication(AuthData clientData,
-                                   int clientProtocolVersion,
-                                   String clientVersion) throws Exception {
-
+    private CompletableFuture<Void> doAuthenticationAsync(AuthData clientData, 
int clientProtocolVersion,
+                                                          String 
clientVersion) {
         // The original auth state can only be set on subsequent auth attempts 
(and only
         // in presence of a proxy and if the proxy is forwarding the 
credentials).
         // In this case, the re-validation needs to be done against the 
original client
-        // credentials.
-        boolean useOriginalAuthState = (originalAuthState != null);
-        AuthenticationState authState =  useOriginalAuthState ? 
originalAuthState : this.authState;
-        String authRole = useOriginalAuthState ? originalPrincipal : 
this.authRole;
-        AuthData brokerData = authState.authenticate(clientData);
-
-        if (log.isDebugEnabled()) {
-            log.debug("Authenticate using original auth state : {}, role = 
{}", useOriginalAuthState, authRole);
-        }
+        // credentials, but we only can new an authentication state, because 
some authentication data(TLS, SASL)
+        // based on outside service.

Review Comment:
   Can you explain this a bit more? In the case of TLS authentication, we are 
not able to refresh the `originalAuthenticationState`, that case seems 
unrelated.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -707,73 +709,96 @@ private void completeConnect(int clientProtoVersion, 
String clientVersion, boole
     }
 
     // According to auth result, send newConnected or newAuthChallenge command.
-    private State doAuthentication(AuthData clientData,
-                                   int clientProtocolVersion,
-                                   String clientVersion) throws Exception {
-
+    private CompletableFuture<Void> doAuthenticationAsync(AuthData clientData, 
int clientProtocolVersion,
+                                                          String 
clientVersion) {
         // The original auth state can only be set on subsequent auth attempts 
(and only
         // in presence of a proxy and if the proxy is forwarding the 
credentials).
         // In this case, the re-validation needs to be done against the 
original client
-        // credentials.
-        boolean useOriginalAuthState = (originalAuthState != null);
-        AuthenticationState authState =  useOriginalAuthState ? 
originalAuthState : this.authState;
-        String authRole = useOriginalAuthState ? originalPrincipal : 
this.authRole;
-        AuthData brokerData = authState.authenticate(clientData);
-
-        if (log.isDebugEnabled()) {
-            log.debug("Authenticate using original auth state : {}, role = 
{}", useOriginalAuthState, authRole);
-        }
+        // credentials, but we only can new an authentication state, because 
some authentication data(TLS, SASL)
+        // based on outside service.
+        // If we can get the role from the authentication sate, the global 
variable need to be updated.
 
-        if (authState.isComplete()) {
-            // Authentication has completed. It was either:
-            // 1. the 1st time the authentication process was done, in which 
case we'll send
-            //    a `CommandConnected` response
-            // 2. an authentication refresh, in which case we need to refresh 
authenticationData
-
-            String newAuthRole = authState.getAuthRole();
-
-            // Refresh the auth data.
-            this.authenticationData = authState.getAuthDataSource();
+        boolean useOriginalAuthState = (originalAuthState != null);
+        if (state == State.Connected) {
+            // For auth challenge, the authentication state requires to be 
updated.
             if (log.isDebugEnabled()) {
-                log.debug("[{}] Auth data refreshed for role={}", 
remoteAddress, this.authRole);
+                log.debug("Refreshing authenticate state, original auth state: 
{}, original auth role: {}, "
+                                + "auth role: {}",
+                        useOriginalAuthState, originalPrincipal, authRole);
             }
-
-            if (!useOriginalAuthState) {
-                this.authRole = newAuthRole;
+            try {
+                if (useOriginalAuthState) {
+                    originalAuthState =
+                            
originalAuthenticationProvider.newAuthState(clientData, remoteAddress, 
sslSession);
+                } else {
+                    authState = 
authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);

Review Comment:
   This does not feel right based on the `AuthenticationState` interface, which 
provides hooks for calls to `authenticate`. Can you provide additional 
motivation for this change?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -707,73 +709,96 @@ private void completeConnect(int clientProtoVersion, 
String clientVersion, boole
     }
 
     // According to auth result, send newConnected or newAuthChallenge command.
-    private State doAuthentication(AuthData clientData,
-                                   int clientProtocolVersion,
-                                   String clientVersion) throws Exception {
-
+    private CompletableFuture<Void> doAuthenticationAsync(AuthData clientData, 
int clientProtocolVersion,
+                                                          String 
clientVersion) {
         // The original auth state can only be set on subsequent auth attempts 
(and only
         // in presence of a proxy and if the proxy is forwarding the 
credentials).
         // In this case, the re-validation needs to be done against the 
original client
-        // credentials.
-        boolean useOriginalAuthState = (originalAuthState != null);
-        AuthenticationState authState =  useOriginalAuthState ? 
originalAuthState : this.authState;
-        String authRole = useOriginalAuthState ? originalPrincipal : 
this.authRole;
-        AuthData brokerData = authState.authenticate(clientData);
-
-        if (log.isDebugEnabled()) {
-            log.debug("Authenticate using original auth state : {}, role = 
{}", useOriginalAuthState, authRole);
-        }
+        // credentials, but we only can new an authentication state, because 
some authentication data(TLS, SASL)
+        // based on outside service.
+        // If we can get the role from the authentication sate, the global 
variable need to be updated.
 
-        if (authState.isComplete()) {
-            // Authentication has completed. It was either:
-            // 1. the 1st time the authentication process was done, in which 
case we'll send
-            //    a `CommandConnected` response
-            // 2. an authentication refresh, in which case we need to refresh 
authenticationData
-
-            String newAuthRole = authState.getAuthRole();
-
-            // Refresh the auth data.
-            this.authenticationData = authState.getAuthDataSource();

Review Comment:
   I spent many hours working on the authentication framework code today. Below 
are some of my thoughts. They aren't necessarily my final opinion.
   
   In the PR description, this line was identified as the root cause of the 
issue, but based on the description and on this PR, I think I see it as a 
symptom of a larger issue. We do not have a clear enough definition for the 
state transitions in the `ServerCnx` class, which is essentially a finite state 
machine.
   
   I want to describe part of the problem with this line of code. On the first 
pass through, the `this.authenticationData` is set by getting it from 
`this.authState.getAuthDataSource()` because `useOriginalAuthState` is `false`. 
Then, on subsequent `AuthResponse` commands from the original client, the 
`authenticationData` is set by getting it from 
`this.originalAuthState.getAuthDataSource()` because `useOriginalAuthState` is 
`true`. That means the `this.authenticationData` is incorrectly updated.
   
   As a note, the broker gets `AuthResponse` commands from the original client 
when the connection through the client is in state `ProxyConnectionToBroker`. 
As of https://github.com/apache/pulsar/pull/17831, the broker also gets an 
`AuthResponse` from the original client when the proxy is in state 
`ProxyLookupRequests` with `forwardAuthorizationCredentials=true`.
   
   Otherwise, the proxy sends its own authentication information.
   
   Here are some problems with the current solution:
   
   1. The `AuthResponse` protocol message only has one field for `AuthData`, 
and there is no indication whether the `AuthData` is for the proxy or the 
original client. As a consequence, the broker does not know whether to update 
the `authenticationData` or the `originalAuthData`.
   2. The implementation for `getAuthDataSource` does not always get the most 
recent `authenticationDataSource` See `TokenAuthenticationState`. (I am already 
working on fixing this.)
   
   Open questions:
   1. Can we just remove the `originalAuthState` and only keep track of one 
`authState`?
   2. Is the `AuthenticationState` object meant to last the whole lifecycle of 
a given `ServerCnx`? In my mind, the answer is yes, but this PR says otherwise. 
If not, it feels odd that we have a "state" object.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -676,73 +678,93 @@ private void completeConnect(int clientProtoVersion, 
String clientVersion, boole
     }
 
     // According to auth result, send newConnected or newAuthChallenge command.
-    private State doAuthentication(AuthData clientData,
-                                   int clientProtocolVersion,
-                                   String clientVersion) throws Exception {
+    private CompletableFuture<Void> doAuthenticationAsync(AuthData clientData, 
int clientProtocolVersion,
+                                                          String 
clientVersion) {
+        boolean useOriginalAuthState = (originalAuthState != null);
+        if (state == State.Connected) {
+            // For auth challenge, the authentication state requires to be 
updated.
+            if (log.isDebugEnabled()) {
+                log.debug("Refreshing authenticate state, original auth state: 
{}, original auth role: {}, "
+                                + "auth role: {}",
+                        useOriginalAuthState, originalPrincipal, authRole);
+            }
+            try {
+                if (useOriginalAuthState) {
+                    originalAuthState =
+                            
originalAuthenticationProvider.newAuthState(clientData, remoteAddress, 
sslSession);
+                } else {
+                    authState = 
authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
+                }
+            } catch (AuthenticationException e) {
+                return CompletableFuture.failedFuture(e);
+            }
+        }
 
         // The original auth state can only be set on subsequent auth attempts 
(and only
         // in presence of a proxy and if the proxy is forwarding the 
credentials).
         // In this case, the re-validation needs to be done against the 
original client
         // credentials.
-        boolean useOriginalAuthState = (originalAuthState != null);
-        AuthenticationState authState =  useOriginalAuthState ? 
originalAuthState : this.authState;
+        AuthenticationState authState = useOriginalAuthState ? 
originalAuthState : this.authState;
         String authRole = useOriginalAuthState ? originalPrincipal : 
this.authRole;
-        AuthData brokerData = authState.authenticate(clientData);
 
-        if (log.isDebugEnabled()) {
-            log.debug("Authenticate using original auth state : {}, role = 
{}", useOriginalAuthState, authRole);
-        }
-
-        if (authState.isComplete()) {
-            // Authentication has completed. It was either:
-            // 1. the 1st time the authentication process was done, in which 
case we'll send
-            //    a `CommandConnected` response
-            // 2. an authentication refresh, in which case we need to refresh 
authenticationData
-
-            String newAuthRole = authState.getAuthRole();
-
-            // Refresh the auth data.
-            this.authenticationData = authState.getAuthDataSource();
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Auth data refreshed for role={}", 
remoteAddress, this.authRole);
-            }
-
-            if (!useOriginalAuthState) {
-                this.authRole = newAuthRole;
-            }
-
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Client successfully authenticated with {} role 
{} and originalPrincipal {}",
-                        remoteAddress, authMethod, this.authRole, 
originalPrincipal);
-            }
+        CompletableFuture<AuthData> authFuture = 
CompletableFuture.completedFuture(null);
+        if (!authState.isComplete()) {
+            authFuture = authState.authenticateAsync(clientData);
+        }
+        return authFuture.thenCompose(nextAuthData -> {
+            if (nextAuthData == null) {
+                // Authentication has completed. It was either:
+                // 1. the 1st time the authentication process was done, in 
which case we'll send
+                //    a `CommandConnected` response
+                // 2. an authentication refresh, in which case we need to 
refresh authenticationData and role
+                String newAuthRole;
+                try {
+                    newAuthRole = authState.getAuthRole();
+                } catch (AuthenticationException e) {
+                    return CompletableFuture.failedFuture(e);
+                }
+                AuthenticationDataSource newAuthDataSource = 
authState.getAuthDataSource();
+                if (useOriginalAuthState) {
+                    this.originalAuthData = newAuthDataSource;
+                } else {
+                    this.authRole = newAuthRole;
+                    this.authenticationData = newAuthDataSource;
+                }
 
-            if (state != State.Connected) {
-                // First time authentication is done
-                completeConnect(clientProtocolVersion, clientVersion, 
enableSubscriptionPatternEvaluation);
-            } else {
-                // If the connection was already ready, it means we're doing a 
refresh
-                if (!StringUtils.isEmpty(authRole)) {
-                    if (!authRole.equals(newAuthRole)) {
-                        log.warn("[{}] Principal cannot change during an 
authentication refresh expected={} got={}",
-                                remoteAddress, authRole, newAuthRole);
-                        ctx.close();
-                    } else {
-                        log.info("[{}] Refreshed authentication credentials 
for role {}", remoteAddress, authRole);
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Client successfully authenticated with {} 
role {} and originalPrincipal {}, "
+                                    + "using original auth state: {}",
+                            remoteAddress, authMethod, this.authRole, 
originalPrincipal, originalAuthState);
+                }
+                if (state != State.Connected) {
+                    // First time authentication is done
+                    completeConnect(clientProtocolVersion, clientVersion, 
enableSubscriptionPatternEvaluation);
+                } else {
+                    // If the connection was already ready, it means we're 
doing a refresh
+                    if (!StringUtils.isEmpty(authRole)) {
+                        if (!authRole.equals(newAuthRole)) {
+                            return CompletableFuture.failedFuture(new 
AuthenticationException(String.format(
+                                    "Principal cannot change during an 
authentication refresh expected=%s got=%s",
+                                    authRole, newAuthRole)));
+                        } else {
+                            log.info("[{}] Refreshed authentication 
credentials for role {}", remoteAddress, authRole);
+                        }
                     }
+                    state = State.Connected;

Review Comment:
   I believe @codelipenghui is right here. We are updating the state from a 
callback which could be another thread, and that is not a safe operation here. 
Instead, I think we should remove all of the async references in this PR and 
just focus on fixing the underlying problem with authentication data. I already 
started work on implementing PIP 97, and I will follow up on this PR to replace 
the synchronous methods with asynchronous calls.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to