michaeljmarshall commented on code in PR #18130:
URL: https://github.com/apache/pulsar/pull/18130#discussion_r1073155826
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -707,73 +709,96 @@ private void completeConnect(int clientProtoVersion,
String clientVersion, boole
}
// According to auth result, send newConnected or newAuthChallenge command.
- private State doAuthentication(AuthData clientData,
- int clientProtocolVersion,
- String clientVersion) throws Exception {
-
+ private CompletableFuture<Void> doAuthenticationAsync(AuthData clientData,
int clientProtocolVersion,
+ String
clientVersion) {
// The original auth state can only be set on subsequent auth attempts
(and only
// in presence of a proxy and if the proxy is forwarding the
credentials).
// In this case, the re-validation needs to be done against the
original client
- // credentials.
- boolean useOriginalAuthState = (originalAuthState != null);
- AuthenticationState authState = useOriginalAuthState ?
originalAuthState : this.authState;
- String authRole = useOriginalAuthState ? originalPrincipal :
this.authRole;
- AuthData brokerData = authState.authenticate(clientData);
-
- if (log.isDebugEnabled()) {
- log.debug("Authenticate using original auth state : {}, role =
{}", useOriginalAuthState, authRole);
- }
+ // credentials, but we only can new an authentication state, because
some authentication data(TLS, SASL)
+ // based on outside service.
Review Comment:
Can you explain this a bit more? In the case of TLS authentication, we are
not able to refresh the `originalAuthenticationState`, that case seems
unrelated.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -707,73 +709,96 @@ private void completeConnect(int clientProtoVersion,
String clientVersion, boole
}
// According to auth result, send newConnected or newAuthChallenge command.
- private State doAuthentication(AuthData clientData,
- int clientProtocolVersion,
- String clientVersion) throws Exception {
-
+ private CompletableFuture<Void> doAuthenticationAsync(AuthData clientData,
int clientProtocolVersion,
+ String
clientVersion) {
// The original auth state can only be set on subsequent auth attempts
(and only
// in presence of a proxy and if the proxy is forwarding the
credentials).
// In this case, the re-validation needs to be done against the
original client
- // credentials.
- boolean useOriginalAuthState = (originalAuthState != null);
- AuthenticationState authState = useOriginalAuthState ?
originalAuthState : this.authState;
- String authRole = useOriginalAuthState ? originalPrincipal :
this.authRole;
- AuthData brokerData = authState.authenticate(clientData);
-
- if (log.isDebugEnabled()) {
- log.debug("Authenticate using original auth state : {}, role =
{}", useOriginalAuthState, authRole);
- }
+ // credentials, but we only can new an authentication state, because
some authentication data(TLS, SASL)
+ // based on outside service.
+ // If we can get the role from the authentication sate, the global
variable need to be updated.
- if (authState.isComplete()) {
- // Authentication has completed. It was either:
- // 1. the 1st time the authentication process was done, in which
case we'll send
- // a `CommandConnected` response
- // 2. an authentication refresh, in which case we need to refresh
authenticationData
-
- String newAuthRole = authState.getAuthRole();
-
- // Refresh the auth data.
- this.authenticationData = authState.getAuthDataSource();
+ boolean useOriginalAuthState = (originalAuthState != null);
+ if (state == State.Connected) {
+ // For auth challenge, the authentication state requires to be
updated.
if (log.isDebugEnabled()) {
- log.debug("[{}] Auth data refreshed for role={}",
remoteAddress, this.authRole);
+ log.debug("Refreshing authenticate state, original auth state:
{}, original auth role: {}, "
+ + "auth role: {}",
+ useOriginalAuthState, originalPrincipal, authRole);
}
-
- if (!useOriginalAuthState) {
- this.authRole = newAuthRole;
+ try {
+ if (useOriginalAuthState) {
+ originalAuthState =
+
originalAuthenticationProvider.newAuthState(clientData, remoteAddress,
sslSession);
+ } else {
+ authState =
authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
Review Comment:
This does not feel right based on the `AuthenticationState` interface, which
provides hooks for calls to `authenticate`. Can you provide additional
motivation for this change?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -707,73 +709,96 @@ private void completeConnect(int clientProtoVersion,
String clientVersion, boole
}
// According to auth result, send newConnected or newAuthChallenge command.
- private State doAuthentication(AuthData clientData,
- int clientProtocolVersion,
- String clientVersion) throws Exception {
-
+ private CompletableFuture<Void> doAuthenticationAsync(AuthData clientData,
int clientProtocolVersion,
+ String
clientVersion) {
// The original auth state can only be set on subsequent auth attempts
(and only
// in presence of a proxy and if the proxy is forwarding the
credentials).
// In this case, the re-validation needs to be done against the
original client
- // credentials.
- boolean useOriginalAuthState = (originalAuthState != null);
- AuthenticationState authState = useOriginalAuthState ?
originalAuthState : this.authState;
- String authRole = useOriginalAuthState ? originalPrincipal :
this.authRole;
- AuthData brokerData = authState.authenticate(clientData);
-
- if (log.isDebugEnabled()) {
- log.debug("Authenticate using original auth state : {}, role =
{}", useOriginalAuthState, authRole);
- }
+ // credentials, but we only can new an authentication state, because
some authentication data(TLS, SASL)
+ // based on outside service.
+ // If we can get the role from the authentication sate, the global
variable need to be updated.
- if (authState.isComplete()) {
- // Authentication has completed. It was either:
- // 1. the 1st time the authentication process was done, in which
case we'll send
- // a `CommandConnected` response
- // 2. an authentication refresh, in which case we need to refresh
authenticationData
-
- String newAuthRole = authState.getAuthRole();
-
- // Refresh the auth data.
- this.authenticationData = authState.getAuthDataSource();
Review Comment:
I spent many hours working on the authentication framework code today. Below
are some of my thoughts. They aren't necessarily my final opinion.
In the PR description, this line was identified as the root cause of the
issue, but based on the description and on this PR, I think I see it as a
symptom of a larger issue. We do not have a clear enough definition for the
state transitions in the `ServerCnx` class, which is essentially a finite state
machine.
I want to describe part of the problem with this line of code. On the first
pass through, the `this.authenticationData` is set by getting it from
`this.authState.getAuthDataSource()` because `useOriginalAuthState` is `false`.
Then, on subsequent `AuthResponse` commands from the original client, the
`authenticationData` is set by getting it from
`this.originalAuthState.getAuthDataSource()` because `useOriginalAuthState` is
`true`. That means the `this.authenticationData` is incorrectly updated.
As a note, the broker gets `AuthResponse` commands from the original client
when the connection through the client is in state `ProxyConnectionToBroker`.
As of https://github.com/apache/pulsar/pull/17831, the broker also gets an
`AuthResponse` from the original client when the proxy is in state
`ProxyLookupRequests` with `forwardAuthorizationCredentials=true`.
Otherwise, the proxy sends its own authentication information.
Here are some problems with the current solution:
1. The `AuthResponse` protocol message only has one field for `AuthData`,
and there is no indication whether the `AuthData` is for the proxy or the
original client. As a consequence, the broker does not know whether to update
the `authenticationData` or the `originalAuthData`.
2. The implementation for `getAuthDataSource` does not always get the most
recent `authenticationDataSource` See `TokenAuthenticationState`. (I am already
working on fixing this.)
Open questions:
1. Can we just remove the `originalAuthState` and only keep track of one
`authState`?
2. Is the `AuthenticationState` object meant to last the whole lifecycle of
a given `ServerCnx`? In my mind, the answer is yes, but this PR says otherwise.
If not, it feels odd that we have a "state" object.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -676,73 +678,93 @@ private void completeConnect(int clientProtoVersion,
String clientVersion, boole
}
// According to auth result, send newConnected or newAuthChallenge command.
- private State doAuthentication(AuthData clientData,
- int clientProtocolVersion,
- String clientVersion) throws Exception {
+ private CompletableFuture<Void> doAuthenticationAsync(AuthData clientData,
int clientProtocolVersion,
+ String
clientVersion) {
+ boolean useOriginalAuthState = (originalAuthState != null);
+ if (state == State.Connected) {
+ // For auth challenge, the authentication state requires to be
updated.
+ if (log.isDebugEnabled()) {
+ log.debug("Refreshing authenticate state, original auth state:
{}, original auth role: {}, "
+ + "auth role: {}",
+ useOriginalAuthState, originalPrincipal, authRole);
+ }
+ try {
+ if (useOriginalAuthState) {
+ originalAuthState =
+
originalAuthenticationProvider.newAuthState(clientData, remoteAddress,
sslSession);
+ } else {
+ authState =
authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
+ }
+ } catch (AuthenticationException e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ }
// The original auth state can only be set on subsequent auth attempts
(and only
// in presence of a proxy and if the proxy is forwarding the
credentials).
// In this case, the re-validation needs to be done against the
original client
// credentials.
- boolean useOriginalAuthState = (originalAuthState != null);
- AuthenticationState authState = useOriginalAuthState ?
originalAuthState : this.authState;
+ AuthenticationState authState = useOriginalAuthState ?
originalAuthState : this.authState;
String authRole = useOriginalAuthState ? originalPrincipal :
this.authRole;
- AuthData brokerData = authState.authenticate(clientData);
- if (log.isDebugEnabled()) {
- log.debug("Authenticate using original auth state : {}, role =
{}", useOriginalAuthState, authRole);
- }
-
- if (authState.isComplete()) {
- // Authentication has completed. It was either:
- // 1. the 1st time the authentication process was done, in which
case we'll send
- // a `CommandConnected` response
- // 2. an authentication refresh, in which case we need to refresh
authenticationData
-
- String newAuthRole = authState.getAuthRole();
-
- // Refresh the auth data.
- this.authenticationData = authState.getAuthDataSource();
- if (log.isDebugEnabled()) {
- log.debug("[{}] Auth data refreshed for role={}",
remoteAddress, this.authRole);
- }
-
- if (!useOriginalAuthState) {
- this.authRole = newAuthRole;
- }
-
- if (log.isDebugEnabled()) {
- log.debug("[{}] Client successfully authenticated with {} role
{} and originalPrincipal {}",
- remoteAddress, authMethod, this.authRole,
originalPrincipal);
- }
+ CompletableFuture<AuthData> authFuture =
CompletableFuture.completedFuture(null);
+ if (!authState.isComplete()) {
+ authFuture = authState.authenticateAsync(clientData);
+ }
+ return authFuture.thenCompose(nextAuthData -> {
+ if (nextAuthData == null) {
+ // Authentication has completed. It was either:
+ // 1. the 1st time the authentication process was done, in
which case we'll send
+ // a `CommandConnected` response
+ // 2. an authentication refresh, in which case we need to
refresh authenticationData and role
+ String newAuthRole;
+ try {
+ newAuthRole = authState.getAuthRole();
+ } catch (AuthenticationException e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ AuthenticationDataSource newAuthDataSource =
authState.getAuthDataSource();
+ if (useOriginalAuthState) {
+ this.originalAuthData = newAuthDataSource;
+ } else {
+ this.authRole = newAuthRole;
+ this.authenticationData = newAuthDataSource;
+ }
- if (state != State.Connected) {
- // First time authentication is done
- completeConnect(clientProtocolVersion, clientVersion,
enableSubscriptionPatternEvaluation);
- } else {
- // If the connection was already ready, it means we're doing a
refresh
- if (!StringUtils.isEmpty(authRole)) {
- if (!authRole.equals(newAuthRole)) {
- log.warn("[{}] Principal cannot change during an
authentication refresh expected={} got={}",
- remoteAddress, authRole, newAuthRole);
- ctx.close();
- } else {
- log.info("[{}] Refreshed authentication credentials
for role {}", remoteAddress, authRole);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Client successfully authenticated with {}
role {} and originalPrincipal {}, "
+ + "using original auth state: {}",
+ remoteAddress, authMethod, this.authRole,
originalPrincipal, originalAuthState);
+ }
+ if (state != State.Connected) {
+ // First time authentication is done
+ completeConnect(clientProtocolVersion, clientVersion,
enableSubscriptionPatternEvaluation);
+ } else {
+ // If the connection was already ready, it means we're
doing a refresh
+ if (!StringUtils.isEmpty(authRole)) {
+ if (!authRole.equals(newAuthRole)) {
+ return CompletableFuture.failedFuture(new
AuthenticationException(String.format(
+ "Principal cannot change during an
authentication refresh expected=%s got=%s",
+ authRole, newAuthRole)));
+ } else {
+ log.info("[{}] Refreshed authentication
credentials for role {}", remoteAddress, authRole);
+ }
}
+ state = State.Connected;
Review Comment:
I believe @codelipenghui is right here. We are updating the state from a
callback which could be another thread, and that is not a safe operation here.
Instead, I think we should remove all of the async references in this PR and
just focus on fixing the underlying problem with authentication data. I already
started work on implementing PIP 97, and I will follow up on this PR to replace
the synchronous methods with asynchronous calls.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]