Copilot commented on code in PR #24878: URL: https://github.com/apache/pulsar/pull/24878#discussion_r2444724921
########## pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.authentication; + +import static org.apache.pulsar.common.naming.Constants.WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE; +import java.util.concurrent.CompletableFuture; +import javax.naming.AuthenticationException; +import lombok.Builder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.api.AuthData; +import org.jspecify.annotations.NonNull; + +@Slf4j +@Getter +public class BinaryAuthSession { + private static final byte[] emptyArray = new byte[0]; + + private AuthenticationState authState; + private String authMethod; + private String authRole = null; + private volatile AuthenticationDataSource authenticationData; + private AuthenticationProvider authenticationProvider; + + // In case of proxy, if the authentication credentials are forwardable, + // it will hold the credentials of the original client + private String originalAuthMethod; + private String originalPrincipal = null; + private AuthenticationState originalAuthState; + private volatile AuthenticationDataSource originalAuthData; + // Keep temporarily in order to verify after verifying proxy's authData + private AuthData originalAuthDataCopy; + + private final BinaryAuthContext context; + + private AuthResult defaultAuthResult; + + public BinaryAuthSession(@NonNull BinaryAuthContext context) { + this.context = context; + } + + public CompletableFuture<AuthResult> doAuthentication() { + var connect = context.getCommandConnect(); + try { + var authData = connect.hasAuthData() ? connect.getAuthData() : emptyArray; + var clientData = AuthData.of(authData); + // init authentication + if (connect.hasAuthMethodName()) { + authMethod = connect.getAuthMethodName(); + } else if (connect.hasAuthMethod()) { + // Legacy client is passing enum + authMethod = connect.getAuthMethod().name().substring(10).toLowerCase(); + } else { + authMethod = "none"; + } + + defaultAuthResult = AuthResult.builder().clientProtocolVersion(connect.getProtocolVersion()) + .clientVersion(connect.hasClientVersion()? connect.getClientVersion() : "") + .build(); + + authenticationProvider = context.getAuthenticationService().getAuthenticationProvider(authMethod); + // Not find provider named authMethod. Most used for tests. + // In AuthenticationDisabled, it will set authMethod "none". + if (authenticationProvider == null) { + authRole = context.getAuthenticationService().getAnonymousUserRole() + .orElseThrow(() -> + new AuthenticationException( + "No anonymous role, and no authentication provider configured")); + return CompletableFuture.completedFuture(defaultAuthResult); + } + + authState = + authenticationProvider.newAuthState(AuthData.of(connect.getAuthData()), context.getRemoteAddress(), Review Comment: Possible null dereference: connect.getAuthData() may be absent when hasAuthData() is false. Use the already computed clientData instead to avoid NPE and ensure consistent behavior. ```suggestion authenticationProvider.newAuthState(clientData, context.getRemoteAddress(), ``` ########## pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.authentication; + +import static org.apache.pulsar.common.naming.Constants.WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE; +import java.util.concurrent.CompletableFuture; +import javax.naming.AuthenticationException; +import lombok.Builder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.api.AuthData; +import org.jspecify.annotations.NonNull; + +@Slf4j +@Getter +public class BinaryAuthSession { + private static final byte[] emptyArray = new byte[0]; + + private AuthenticationState authState; + private String authMethod; + private String authRole = null; + private volatile AuthenticationDataSource authenticationData; + private AuthenticationProvider authenticationProvider; + + // In case of proxy, if the authentication credentials are forwardable, + // it will hold the credentials of the original client + private String originalAuthMethod; + private String originalPrincipal = null; + private AuthenticationState originalAuthState; + private volatile AuthenticationDataSource originalAuthData; + // Keep temporarily in order to verify after verifying proxy's authData + private AuthData originalAuthDataCopy; + + private final BinaryAuthContext context; + + private AuthResult defaultAuthResult; + + public BinaryAuthSession(@NonNull BinaryAuthContext context) { + this.context = context; + } + + public CompletableFuture<AuthResult> doAuthentication() { + var connect = context.getCommandConnect(); + try { + var authData = connect.hasAuthData() ? connect.getAuthData() : emptyArray; Review Comment: Possible null dereference: connect.getAuthData() may be absent when hasAuthData() is false. Use the already computed clientData instead to avoid NPE and ensure consistent behavior. ```suggestion var authData = connect.hasAuthData() && connect.getAuthData() != null ? connect.getAuthData() : emptyArray; ``` ########## pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.authentication; + +import static org.apache.pulsar.common.naming.Constants.WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE; +import java.util.concurrent.CompletableFuture; +import javax.naming.AuthenticationException; +import lombok.Builder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.api.AuthData; +import org.jspecify.annotations.NonNull; + +@Slf4j +@Getter +public class BinaryAuthSession { + private static final byte[] emptyArray = new byte[0]; + + private AuthenticationState authState; + private String authMethod; + private String authRole = null; + private volatile AuthenticationDataSource authenticationData; + private AuthenticationProvider authenticationProvider; + + // In case of proxy, if the authentication credentials are forwardable, + // it will hold the credentials of the original client + private String originalAuthMethod; + private String originalPrincipal = null; + private AuthenticationState originalAuthState; + private volatile AuthenticationDataSource originalAuthData; + // Keep temporarily in order to verify after verifying proxy's authData + private AuthData originalAuthDataCopy; + + private final BinaryAuthContext context; + + private AuthResult defaultAuthResult; + + public BinaryAuthSession(@NonNull BinaryAuthContext context) { + this.context = context; + } + + public CompletableFuture<AuthResult> doAuthentication() { + var connect = context.getCommandConnect(); + try { + var authData = connect.hasAuthData() ? connect.getAuthData() : emptyArray; + var clientData = AuthData.of(authData); + // init authentication + if (connect.hasAuthMethodName()) { + authMethod = connect.getAuthMethodName(); + } else if (connect.hasAuthMethod()) { + // Legacy client is passing enum + authMethod = connect.getAuthMethod().name().substring(10).toLowerCase(); + } else { + authMethod = "none"; + } + + defaultAuthResult = AuthResult.builder().clientProtocolVersion(connect.getProtocolVersion()) + .clientVersion(connect.hasClientVersion()? connect.getClientVersion() : "") + .build(); + + authenticationProvider = context.getAuthenticationService().getAuthenticationProvider(authMethod); + // Not find provider named authMethod. Most used for tests. + // In AuthenticationDisabled, it will set authMethod "none". Review Comment: Improve grammar in the comment for clarity. ```suggestion // No provider named 'authMethod' was found. This is mostly used for tests. // When authentication is disabled, 'authMethod' will be set to 'none'. ``` ########## pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.authentication; + +import static org.apache.pulsar.common.naming.Constants.WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE; +import java.util.concurrent.CompletableFuture; +import javax.naming.AuthenticationException; +import lombok.Builder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.api.AuthData; +import org.jspecify.annotations.NonNull; + +@Slf4j +@Getter +public class BinaryAuthSession { + private static final byte[] emptyArray = new byte[0]; + + private AuthenticationState authState; + private String authMethod; + private String authRole = null; + private volatile AuthenticationDataSource authenticationData; + private AuthenticationProvider authenticationProvider; + + // In case of proxy, if the authentication credentials are forwardable, + // it will hold the credentials of the original client + private String originalAuthMethod; + private String originalPrincipal = null; + private AuthenticationState originalAuthState; + private volatile AuthenticationDataSource originalAuthData; + // Keep temporarily in order to verify after verifying proxy's authData + private AuthData originalAuthDataCopy; + + private final BinaryAuthContext context; + + private AuthResult defaultAuthResult; + + public BinaryAuthSession(@NonNull BinaryAuthContext context) { + this.context = context; + } + + public CompletableFuture<AuthResult> doAuthentication() { + var connect = context.getCommandConnect(); + try { + var authData = connect.hasAuthData() ? connect.getAuthData() : emptyArray; + var clientData = AuthData.of(authData); + // init authentication + if (connect.hasAuthMethodName()) { + authMethod = connect.getAuthMethodName(); + } else if (connect.hasAuthMethod()) { + // Legacy client is passing enum + authMethod = connect.getAuthMethod().name().substring(10).toLowerCase(); + } else { + authMethod = "none"; + } + + defaultAuthResult = AuthResult.builder().clientProtocolVersion(connect.getProtocolVersion()) + .clientVersion(connect.hasClientVersion()? connect.getClientVersion() : "") + .build(); + + authenticationProvider = context.getAuthenticationService().getAuthenticationProvider(authMethod); + // Not find provider named authMethod. Most used for tests. + // In AuthenticationDisabled, it will set authMethod "none". + if (authenticationProvider == null) { + authRole = context.getAuthenticationService().getAnonymousUserRole() + .orElseThrow(() -> + new AuthenticationException( + "No anonymous role, and no authentication provider configured")); + return CompletableFuture.completedFuture(defaultAuthResult); + } + + authState = + authenticationProvider.newAuthState(AuthData.of(connect.getAuthData()), context.getRemoteAddress(), + context.getSslSession()); + + if (log.isDebugEnabled()) { + String role = ""; + if (authState != null && authState.isComplete()) { + role = authState.getAuthRole(); + } else { + role = "authentication incomplete or null"; + } + log.debug("[{}] Authenticate role : {}", context.getRemoteAddress(), role); + } + + if (connect.hasOriginalPrincipal() && context.isAuthenticateOriginalAuthData() + && !WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE.equals(connect.getOriginalPrincipal())) { + // Flow: + // 1. Initialize original authentication. + // 2. Authenticate the proxy's authentication data. + // 3. Authenticate the original authentication data. + if (connect.hasOriginalAuthMethod()) { + originalAuthMethod = connect.getOriginalAuthMethod(); + } else { + originalAuthMethod = "none"; + } + + var originalAuthenticationProvider = + context.getAuthenticationService().getAuthenticationProvider(originalAuthMethod); + + /** + * When both the broker and the proxy are configured with anonymousUserRole + * if the client does not configure an authentication method + * the proxy side will set the value of anonymousUserRole to clientAuthRole when it creates a connection + * and the value of clientAuthMethod will be none. + * Similarly, should also set the value of authRole to anonymousUserRole on the broker side. + */ + if (originalAuthenticationProvider == null) { + originalPrincipal = context.getAuthenticationService().getAnonymousUserRole() + .orElseThrow(() -> + new AuthenticationException("No anonymous role, and can't find " + + "AuthenticationProvider for original role using auth method " + + "[" + originalAuthMethod + "] is not available")); + return CompletableFuture.completedFuture(defaultAuthResult); + } + + originalAuthDataCopy = AuthData.of(connect.getOriginalAuthData().getBytes()); + originalAuthState = originalAuthenticationProvider.newAuthState( + originalAuthDataCopy, + context.getRemoteAddress(), + context.getSslSession()); + } else if (connect.hasOriginalPrincipal()) { + originalPrincipal = connect.getOriginalPrincipal(); + + if (log.isDebugEnabled()) { + log.debug("[{}] Setting original role (forwarded from proxy): {}", + context.getSslSession(), originalPrincipal); Review Comment: [nitpick] The log context uses the SSL session instead of the remote address, which makes logs less actionable. Replace context.getSslSession() with context.getRemoteAddress(). ```suggestion context.getRemoteAddress(), originalPrincipal); ``` ########## pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthContext.java: ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.authentication; + +import java.net.SocketAddress; +import java.util.concurrent.Executor; +import java.util.function.Supplier; +import javax.net.ssl.SSLSession; +import lombok.Builder; +import lombok.Getter; +import org.apache.pulsar.common.api.proto.CommandConnect; + +@Getter +@Builder +public class BinaryAuthContext { + private CommandConnect commandConnect; + private SSLSession sslSession; + private AuthenticationService authenticationService; + private Executor executor; + private SocketAddress remoteAddress; + private boolean authenticateOriginalAuthData; Review Comment: Add a brief Javadoc explaining each field’s intent (e.g., isConnectingSupplier semantics, executor requirements, and when authenticateOriginalAuthData should be true) to clarify how callers should construct the context. ```suggestion public class BinaryAuthContext { /** * The CommandConnect object representing the client's connection request. */ private CommandConnect commandConnect; /** * The SSLSession associated with the connection, if SSL/TLS is used. */ private SSLSession sslSession; /** * The AuthenticationService used to perform authentication for this context. */ private AuthenticationService authenticationService; /** * The Executor to use for asynchronous authentication operations. * Must be provided if authentication involves async tasks. */ private Executor executor; /** * The remote address of the client initiating the connection. */ private SocketAddress remoteAddress; /** * If true, authentication should be performed using the original authentication data * provided by the client, rather than any intermediate or proxy data. * Set to true when authenticating the initial client request. */ private boolean authenticateOriginalAuthData; /** * Supplier indicating whether the connection is currently in the process of connecting. * Used to determine connection state during authentication. */ ``` ########## pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.authentication; + +import static org.apache.pulsar.common.naming.Constants.WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE; +import java.util.concurrent.CompletableFuture; +import javax.naming.AuthenticationException; +import lombok.Builder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.api.AuthData; +import org.jspecify.annotations.NonNull; + +@Slf4j +@Getter Review Comment: Consider adding a class-level Javadoc describing the session lifecycle, thread-affinity (operations expected on the event loop), and how proxy/original authentication sequencing is handled. This will help future maintainers understand how to use and extend this class. ```suggestion @Getter /** * Represents an authentication session for binary protocols in Pulsar. * * <p><b>Session Lifecycle:</b> * <ul> * <li>Created per connection or authentication attempt via {@link BinaryAuthContext}.</li> * <li>Maintains authentication state, method, and role for the session.</li> * <li>Session may be updated as authentication progresses, including proxy and original client authentication.</li> * <li>Disposed when the connection is closed or authentication is no longer needed.</li> * </ul> * * <p><b>Thread-Affinity:</b> * <ul> * <li>All operations are expected to be performed on the event loop thread associated with the connection.</li> * <li>Some fields (e.g., {@code authenticationData}, {@code originalAuthData}) are {@code volatile} to support safe access from multiple threads.</li> * <li>External synchronization is not required if used as intended within the event loop.</li> * </ul> * * <p><b>Proxy/Original Authentication Sequencing:</b> * <ul> * <li>Supports authentication via a proxy, storing both proxy and original client credentials.</li> * <li>Proxy authentication is verified first; if credentials are forwardable, original client authentication is performed.</li> * <li>Fields such as {@code originalAuthMethod}, {@code originalPrincipal}, and {@code originalAuthState} track the original client's authentication state.</li> * <li>Sequencing ensures that both proxy and original authentication are validated before granting access.</li> * </ul> * * <p>See field-level comments and method documentation for further details on usage and extension.</p> */ ``` ########## pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.authentication; + +import static org.apache.pulsar.common.naming.Constants.WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE; +import java.util.concurrent.CompletableFuture; +import javax.naming.AuthenticationException; +import lombok.Builder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.api.AuthData; +import org.jspecify.annotations.NonNull; + +@Slf4j +@Getter +public class BinaryAuthSession { + private static final byte[] emptyArray = new byte[0]; + + private AuthenticationState authState; + private String authMethod; + private String authRole = null; + private volatile AuthenticationDataSource authenticationData; + private AuthenticationProvider authenticationProvider; + + // In case of proxy, if the authentication credentials are forwardable, + // it will hold the credentials of the original client + private String originalAuthMethod; + private String originalPrincipal = null; + private AuthenticationState originalAuthState; + private volatile AuthenticationDataSource originalAuthData; + // Keep temporarily in order to verify after verifying proxy's authData + private AuthData originalAuthDataCopy; + + private final BinaryAuthContext context; + + private AuthResult defaultAuthResult; + + public BinaryAuthSession(@NonNull BinaryAuthContext context) { + this.context = context; + } + + public CompletableFuture<AuthResult> doAuthentication() { + var connect = context.getCommandConnect(); + try { + var authData = connect.hasAuthData() ? connect.getAuthData() : emptyArray; + var clientData = AuthData.of(authData); + // init authentication + if (connect.hasAuthMethodName()) { + authMethod = connect.getAuthMethodName(); + } else if (connect.hasAuthMethod()) { + // Legacy client is passing enum + authMethod = connect.getAuthMethod().name().substring(10).toLowerCase(); + } else { + authMethod = "none"; + } + + defaultAuthResult = AuthResult.builder().clientProtocolVersion(connect.getProtocolVersion()) + .clientVersion(connect.hasClientVersion()? connect.getClientVersion() : "") + .build(); + + authenticationProvider = context.getAuthenticationService().getAuthenticationProvider(authMethod); + // Not find provider named authMethod. Most used for tests. + // In AuthenticationDisabled, it will set authMethod "none". + if (authenticationProvider == null) { + authRole = context.getAuthenticationService().getAnonymousUserRole() + .orElseThrow(() -> + new AuthenticationException( + "No anonymous role, and no authentication provider configured")); + return CompletableFuture.completedFuture(defaultAuthResult); + } + + authState = + authenticationProvider.newAuthState(AuthData.of(connect.getAuthData()), context.getRemoteAddress(), + context.getSslSession()); + + if (log.isDebugEnabled()) { + String role = ""; + if (authState != null && authState.isComplete()) { + role = authState.getAuthRole(); + } else { + role = "authentication incomplete or null"; + } + log.debug("[{}] Authenticate role : {}", context.getRemoteAddress(), role); + } + + if (connect.hasOriginalPrincipal() && context.isAuthenticateOriginalAuthData() + && !WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE.equals(connect.getOriginalPrincipal())) { + // Flow: + // 1. Initialize original authentication. + // 2. Authenticate the proxy's authentication data. + // 3. Authenticate the original authentication data. + if (connect.hasOriginalAuthMethod()) { + originalAuthMethod = connect.getOriginalAuthMethod(); + } else { + originalAuthMethod = "none"; + } + + var originalAuthenticationProvider = + context.getAuthenticationService().getAuthenticationProvider(originalAuthMethod); + + /** + * When both the broker and the proxy are configured with anonymousUserRole + * if the client does not configure an authentication method + * the proxy side will set the value of anonymousUserRole to clientAuthRole when it creates a connection + * and the value of clientAuthMethod will be none. + * Similarly, should also set the value of authRole to anonymousUserRole on the broker side. + */ + if (originalAuthenticationProvider == null) { + originalPrincipal = context.getAuthenticationService().getAnonymousUserRole() + .orElseThrow(() -> + new AuthenticationException("No anonymous role, and can't find " + + "AuthenticationProvider for original role using auth method " + + "[" + originalAuthMethod + "] is not available")); Review Comment: When the originalAuthenticationProvider is missing, the previous behavior set both originalPrincipal and authRole to the anonymous role before completing connect. Here only originalPrincipal is set; authRole remains unset (null). This can result in a connected session with a null authRole. Set this.authRole to the anonymous role as well before returning. ```suggestion + "[" + originalAuthMethod + "] is not available")); this.authRole = originalPrincipal; ``` ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java: ########## @@ -1280,14 +1100,52 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) { } try { - AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData()); - doAuthentication(clientData, originalAuthState != null, authResponse.getProtocolVersion(), - authResponse.hasClientVersion() ? authResponse.getClientVersion() : EMPTY); + if (binaryAuthSession != null) { + AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData()); + binaryAuthSession.authChallenge(clientData, binaryAuthSession.getOriginalAuthState() != null, + authResponse.getProtocolVersion(), + authResponse.hasClientVersion() ? authResponse.getClientVersion() : "") + .whenCompleteAsync((authResult, ex) -> { + if (ex != null) { + authenticationFailed(ex); + } else { + handleAuthResult(authResult); + } + }, ctx.executor()); + } else { + authenticationFailed(new AuthenticationException("authentication session is null")); + } } catch (Exception e) { authenticationFailed(e); } } + private void handleAuthResult(@NonNull AuthResult authResult) { + AuthData authData = authResult.getAuthData(); + if (authData != null) { + writeAndFlush(Commands.newAuthChallenge( + authResult.getAuthMethod(), + authData, + authResult.getClientProtocolVersion())); + if (log.isDebugEnabled()) { + log.debug("[{}] Authentication in progress client by method {}.", remoteAddress, authMethod); Review Comment: [nitpick] The debug log uses the field authMethod, which may not reflect the current challenge method (especially before it’s set). Log authResult.getAuthMethod() to ensure accuracy in challenge-phase logging. ```suggestion log.debug("[{}] Authentication in progress client by method {}.", remoteAddress, authResult.getAuthMethod()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
