Copilot commented on code in PR #24878:
URL: https://github.com/apache/pulsar/pull/24878#discussion_r2448366422
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java:
##########
@@ -522,6 +530,24 @@ public void
testConnectCommandWithPassingOriginalAuthData() throws Exception {
channel.finish();
}
+ private BinaryAuthSession spyBinaryAuthSession(AuthenticationService
authenticationService, ByteBuf connectCommand, ServiceConfiguration
serviceConfiguration) {
+ BinaryAuthContext binaryAuthContext = mock(BinaryAuthContext.class);
+
when(binaryAuthContext.getAuthenticationService()).thenReturn(authenticationService);
+ when(binaryAuthContext.isAuthenticateOriginalAuthData()).thenReturn(
+ serviceConfiguration.isAuthenticateOriginalAuthData());
+
when(binaryAuthContext.getExecutor()).thenReturn(serverCnx.ctx().executor());
+ when(binaryAuthContext.getIsConnectingSupplier()).thenReturn(() ->
serverCnx.getState() != State.Connected);
+ BinaryAuthSession binaryAuthSession = spy(new
BinaryAuthSession(binaryAuthContext));
+
when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession);
Review Comment:
The `when().thenReturn()` call on line 541 is redundant since line 458
already sets up this mock behavior for the same
`authenticationService.createBinaryAuthSession(any())` call. Remove the
duplicate on line 541.
```suggestion
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -1280,14 +1099,53 @@ protected void handleAuthResponse(CommandAuthResponse
authResponse) {
}
try {
- AuthData clientData =
AuthData.of(authResponse.getResponse().getAuthData());
- doAuthentication(clientData, originalAuthState != null,
authResponse.getProtocolVersion(),
- authResponse.hasClientVersion() ?
authResponse.getClientVersion() : EMPTY);
+ if (binaryAuthSession != null) {
+ AuthData clientData =
AuthData.of(authResponse.getResponse().getAuthData());
+ binaryAuthSession.authChallenge(clientData,
binaryAuthSession.getOriginalAuthState() != null,
+ authResponse.getProtocolVersion(),
+ authResponse.hasClientVersion() ?
authResponse.getClientVersion() : "")
+ .whenCompleteAsync((authResult, ex) -> {
+ if (ex != null) {
+ authenticationFailed(ex);
+ } else {
+ handleAuthResult(authResult);
+ }
+ }, ctx.executor());
+ } else {
+ authenticationFailed(new
AuthenticationException("authentication session is null"));
Review Comment:
The error message 'authentication session is null' should be more
descriptive and follow the capitalization style of other error messages in the
codebase. Consider 'Authentication session is null or not initialized'.
```suggestion
authenticationFailed(new
AuthenticationException("Authentication session is null or not initialized"));
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java:
##########
@@ -1531,7 +1585,7 @@ public void
testRefreshOriginalPrincipalWithAuthDataForwardedFromProxy() throws
AuthData.of(newClientRole.getBytes(StandardCharsets.UTF_8)),
0, "test");
channel.writeInbound(refreshAuth);
- assertEquals(serverCnx.getOriginalAuthData().getCommandData(),
newClientRole);
+ assertEquals(serverCnx.getOriginalAuthData().getCommandData(),
clientRole);
Review Comment:
This assertion has changed from expecting `newClientRole` to expecting
`clientRole`, which appears to be testing that the original auth data has NOT
been updated. This contradicts the test name
'testRefreshOriginalPrincipalWithAuthDataForwardedFromProxy' which suggests
that refresh should update the data. Verify this is the intended behavior
change.
```suggestion
assertEquals(serverCnx.getOriginalAuthData().getCommandData(),
newClientRole);
```
##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication;
+
+import static
org.apache.pulsar.common.naming.Constants.WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE;
+import java.util.concurrent.CompletableFuture;
+import javax.naming.AuthenticationException;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.api.AuthData;
+import org.jspecify.annotations.NonNull;
+
+@Slf4j
+@Getter
+public class BinaryAuthSession {
+ private static final byte[] emptyArray = new byte[0];
+
+ private AuthenticationState authState;
+ private String authMethod;
+ private String authRole = null;
+ private volatile AuthenticationDataSource authenticationData;
+ private AuthenticationProvider authenticationProvider;
+
+ // In case of proxy, if the authentication credentials are forwardable,
+ // it will hold the credentials of the original client
+ private String originalAuthMethod;
+ private String originalPrincipal = null;
+ private AuthenticationState originalAuthState;
+ private volatile AuthenticationDataSource originalAuthData;
+ // Keep temporarily in order to verify after verifying proxy's authData
+ private AuthData originalAuthDataCopy;
+
+ private final BinaryAuthContext context;
+
+ private AuthResult defaultAuthResult;
+
+ public BinaryAuthSession(@NonNull BinaryAuthContext context) {
+ this.context = context;
+ }
+
+ public CompletableFuture<AuthResult> doAuthentication() {
+ var connect = context.getCommandConnect();
+ try {
+ var authData = connect.hasAuthData() ? connect.getAuthData() :
emptyArray;
+ var clientData = AuthData.of(authData);
+ // init authentication
+ if (connect.hasAuthMethodName()) {
+ authMethod = connect.getAuthMethodName();
+ } else if (connect.hasAuthMethod()) {
+ // Legacy client is passing enum
+ authMethod =
connect.getAuthMethod().name().substring(10).toLowerCase();
+ } else {
+ authMethod = "none";
+ }
+
+ defaultAuthResult =
AuthResult.builder().clientProtocolVersion(connect.getProtocolVersion())
+ .clientVersion(connect.hasClientVersion() ?
connect.getClientVersion() : "")
+ .build();
+
+ authenticationProvider =
context.getAuthenticationService().getAuthenticationProvider(authMethod);
+ // Not find provider named authMethod. Most used for tests.
+ // In AuthenticationDisabled, it will set authMethod "none".
+ if (authenticationProvider == null) {
+ authRole =
context.getAuthenticationService().getAnonymousUserRole()
+ .orElseThrow(() ->
+ new AuthenticationException(
+ "No anonymous role, and no
authentication provider configured"));
+ return CompletableFuture.completedFuture(defaultAuthResult);
+ }
+
+ authState =
+ authenticationProvider.newAuthState(clientData,
context.getRemoteAddress(),
+ context.getSslSession());
+
+ if (log.isDebugEnabled()) {
+ String role = "";
+ if (authState != null && authState.isComplete()) {
+ role = authState.getAuthRole();
+ } else {
+ role = "authentication incomplete or null";
+ }
+ log.debug("[{}] Authenticate role : {}",
context.getRemoteAddress(), role);
+ }
+
+ if (connect.hasOriginalPrincipal() &&
context.isAuthenticateOriginalAuthData()
+ &&
!WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE.equals(connect.getOriginalPrincipal())) {
+ // Flow:
+ // 1. Initialize original authentication.
+ // 2. Authenticate the proxy's authentication data.
+ // 3. Authenticate the original authentication data.
+ if (connect.hasOriginalAuthMethod()) {
+ originalAuthMethod = connect.getOriginalAuthMethod();
+ } else {
+ originalAuthMethod = "none";
+ }
+
+ var originalAuthenticationProvider =
+
context.getAuthenticationService().getAuthenticationProvider(originalAuthMethod);
+
+ /**
+ * When both the broker and the proxy are configured with
anonymousUserRole
+ * if the client does not configure an authentication method
+ * the proxy side will set the value of anonymousUserRole to
clientAuthRole when it creates a connection
+ * and the value of clientAuthMethod will be none.
+ * Similarly, should also set the value of authRole to
anonymousUserRole on the broker side.
+ */
+ if (originalAuthenticationProvider == null) {
+ authRole =
context.getAuthenticationService().getAnonymousUserRole()
+ .orElseThrow(() ->
+ new AuthenticationException("No anonymous
role, and can't find "
+ + "AuthenticationProvider for
original role using auth method "
+ + "[" + originalAuthMethod + "] is
not available"));
+ originalPrincipal = authRole;
+ return
CompletableFuture.completedFuture(defaultAuthResult);
+ }
+
+ originalAuthDataCopy =
AuthData.of(connect.getOriginalAuthData().getBytes());
+ originalAuthState =
originalAuthenticationProvider.newAuthState(
+ originalAuthDataCopy,
+ context.getRemoteAddress(),
+ context.getSslSession());
+ } else if (connect.hasOriginalPrincipal()) {
+ originalPrincipal = connect.getOriginalPrincipal();
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Setting original role (forwarded from
proxy): {}",
+ context.getRemoteAddress(), originalPrincipal);
+ }
+ }
+
+ return authChallenge(clientData, false,
connect.getProtocolVersion(),
+ connect.hasClientVersion() ? connect.getClientVersion() :
"");
+ } catch (Exception e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ }
+
+
+ // According to auth result, send Connected, AuthChallenge, or Error
command.
+ public CompletableFuture<AuthResult> authChallenge(AuthData clientData,
+ boolean
useOriginalAuthState,
+ int
clientProtocolVersion,
+ String clientVersion) {
+ // The original auth state can only be set on subsequent auth attempts
(and only
+ // in presence of a proxy and if the proxy is forwarding the
credentials).
+ // In this case, the re-validation needs to be done against the
original client
+ // credentials.
+ AuthenticationState authState = useOriginalAuthState ?
originalAuthState : this.authState;
+ String authRole = useOriginalAuthState ? originalPrincipal :
this.authRole;
+ if (log.isDebugEnabled()) {
+ log.debug("Authenticate using original auth state : {}, role =
{}", useOriginalAuthState, authRole);
+ }
+ return authState
+ .authenticateAsync(clientData)
+ .thenComposeAsync((authChallenge) ->
authChallengeSuccessCallback(authChallenge,
+ useOriginalAuthState, authRole,
clientProtocolVersion, clientVersion),
+ context.getExecutor());
+ }
+
+ public CompletableFuture<AuthResult> authChallengeSuccessCallback(AuthData
authChallenge,
+ boolean
useOriginalAuthState,
+ String
authRole,
+ int
clientProtocolVersion,
+ String
clientVersion) {
+ try {
+ if (authChallenge == null) {
+ // Authentication has completed. It was either:
+ // 1. the 1st time the authentication process was done, in
which case we'll send
+ // a `CommandConnected` response
+ // 2. an authentication refresh, in which case we need to
refresh authenticationData
+ AuthenticationState authState = useOriginalAuthState ?
originalAuthState : this.authState;
+ String newAuthRole = authState.getAuthRole();
+ AuthenticationDataSource newAuthDataSource =
authState.getAuthDataSource();
+
+ if (context.getIsConnectingSupplier().get()) {
+ // Set the auth data and auth role
+ if (!useOriginalAuthState) {
+ this.authRole = newAuthRole;
+ this.authenticationData = newAuthDataSource;
+ }
+ // First time authentication is done
+ if (originalAuthState != null) {
+ // We only set originalAuthState when we are going to
use it.
+ return authenticateOriginalData().thenApply(
+ __ -> defaultAuthResult);
+ } else {
+ return
CompletableFuture.completedFuture(defaultAuthResult);
+ }
+ } else {
+ // If the connection was already ready, it means we're
doing a refresh
+ if (!StringUtils.isEmpty(authRole)) {
+ if (!authRole.equals(newAuthRole)) {
+ log.warn("[{}] Principal cannot change during an
authentication refresh expected={} got={}",
+ context.getRemoteAddress(), authRole,
newAuthRole);
+ return CompletableFuture.failedFuture(
+ new AuthenticationException("Auth role not
match previous"));
Review Comment:
The error message 'Auth role not match previous' has a grammatical error. It
should be 'Auth role does not match previous' or 'Auth role does not match
previous role'.
```suggestion
new AuthenticationException("Auth role
does not match previous role"));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]