Copilot commented on code in PR #24878:
URL: https://github.com/apache/pulsar/pull/24878#discussion_r2444724921


##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication;
+
+import static 
org.apache.pulsar.common.naming.Constants.WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE;
+import java.util.concurrent.CompletableFuture;
+import javax.naming.AuthenticationException;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.api.AuthData;
+import org.jspecify.annotations.NonNull;
+
+@Slf4j
+@Getter
+public class BinaryAuthSession {
+    private static final byte[] emptyArray = new byte[0];
+
+    private AuthenticationState authState;
+    private String authMethod;
+    private String authRole = null;
+    private volatile AuthenticationDataSource authenticationData;
+    private AuthenticationProvider authenticationProvider;
+
+    // In case of proxy, if the authentication credentials are forwardable,
+    // it will hold the credentials of the original client
+    private String originalAuthMethod;
+    private String originalPrincipal = null;
+    private AuthenticationState originalAuthState;
+    private volatile AuthenticationDataSource originalAuthData;
+    // Keep temporarily in order to verify after verifying proxy's authData
+    private AuthData originalAuthDataCopy;
+
+    private final BinaryAuthContext context;
+
+    private AuthResult defaultAuthResult;
+
+    public BinaryAuthSession(@NonNull BinaryAuthContext context) {
+        this.context = context;
+    }
+
+    public CompletableFuture<AuthResult> doAuthentication() {
+        var connect = context.getCommandConnect();
+        try {
+            var authData = connect.hasAuthData() ? connect.getAuthData() : 
emptyArray;
+            var clientData = AuthData.of(authData);
+            // init authentication
+            if (connect.hasAuthMethodName()) {
+                authMethod = connect.getAuthMethodName();
+            } else if (connect.hasAuthMethod()) {
+                // Legacy client is passing enum
+                authMethod = 
connect.getAuthMethod().name().substring(10).toLowerCase();
+            } else {
+                authMethod = "none";
+            }
+
+            defaultAuthResult = 
AuthResult.builder().clientProtocolVersion(connect.getProtocolVersion())
+                    .clientVersion(connect.hasClientVersion()? 
connect.getClientVersion() : "")
+                    .build();
+
+            authenticationProvider = 
context.getAuthenticationService().getAuthenticationProvider(authMethod);
+            // Not find provider named authMethod. Most used for tests.
+            // In AuthenticationDisabled, it will set authMethod "none".
+            if (authenticationProvider == null) {
+                authRole = 
context.getAuthenticationService().getAnonymousUserRole()
+                        .orElseThrow(() ->
+                                new AuthenticationException(
+                                        "No anonymous role, and no 
authentication provider configured"));
+                return CompletableFuture.completedFuture(defaultAuthResult);
+            }
+
+            authState =
+                    
authenticationProvider.newAuthState(AuthData.of(connect.getAuthData()), 
context.getRemoteAddress(),

Review Comment:
   Possible null dereference: connect.getAuthData() may be absent when 
hasAuthData() is false. Use the already computed clientData instead to avoid 
NPE and ensure consistent behavior.
   ```suggestion
                       authenticationProvider.newAuthState(clientData, 
context.getRemoteAddress(),
   ```



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication;
+
+import static 
org.apache.pulsar.common.naming.Constants.WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE;
+import java.util.concurrent.CompletableFuture;
+import javax.naming.AuthenticationException;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.api.AuthData;
+import org.jspecify.annotations.NonNull;
+
+@Slf4j
+@Getter
+public class BinaryAuthSession {
+    private static final byte[] emptyArray = new byte[0];
+
+    private AuthenticationState authState;
+    private String authMethod;
+    private String authRole = null;
+    private volatile AuthenticationDataSource authenticationData;
+    private AuthenticationProvider authenticationProvider;
+
+    // In case of proxy, if the authentication credentials are forwardable,
+    // it will hold the credentials of the original client
+    private String originalAuthMethod;
+    private String originalPrincipal = null;
+    private AuthenticationState originalAuthState;
+    private volatile AuthenticationDataSource originalAuthData;
+    // Keep temporarily in order to verify after verifying proxy's authData
+    private AuthData originalAuthDataCopy;
+
+    private final BinaryAuthContext context;
+
+    private AuthResult defaultAuthResult;
+
+    public BinaryAuthSession(@NonNull BinaryAuthContext context) {
+        this.context = context;
+    }
+
+    public CompletableFuture<AuthResult> doAuthentication() {
+        var connect = context.getCommandConnect();
+        try {
+            var authData = connect.hasAuthData() ? connect.getAuthData() : 
emptyArray;

Review Comment:
   Possible null dereference: connect.getAuthData() may be absent when 
hasAuthData() is false. Use the already computed clientData instead to avoid 
NPE and ensure consistent behavior.
   ```suggestion
               var authData = connect.hasAuthData() && connect.getAuthData() != 
null ? connect.getAuthData() : emptyArray;
   ```



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication;
+
+import static 
org.apache.pulsar.common.naming.Constants.WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE;
+import java.util.concurrent.CompletableFuture;
+import javax.naming.AuthenticationException;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.api.AuthData;
+import org.jspecify.annotations.NonNull;
+
+@Slf4j
+@Getter
+public class BinaryAuthSession {
+    private static final byte[] emptyArray = new byte[0];
+
+    private AuthenticationState authState;
+    private String authMethod;
+    private String authRole = null;
+    private volatile AuthenticationDataSource authenticationData;
+    private AuthenticationProvider authenticationProvider;
+
+    // In case of proxy, if the authentication credentials are forwardable,
+    // it will hold the credentials of the original client
+    private String originalAuthMethod;
+    private String originalPrincipal = null;
+    private AuthenticationState originalAuthState;
+    private volatile AuthenticationDataSource originalAuthData;
+    // Keep temporarily in order to verify after verifying proxy's authData
+    private AuthData originalAuthDataCopy;
+
+    private final BinaryAuthContext context;
+
+    private AuthResult defaultAuthResult;
+
+    public BinaryAuthSession(@NonNull BinaryAuthContext context) {
+        this.context = context;
+    }
+
+    public CompletableFuture<AuthResult> doAuthentication() {
+        var connect = context.getCommandConnect();
+        try {
+            var authData = connect.hasAuthData() ? connect.getAuthData() : 
emptyArray;
+            var clientData = AuthData.of(authData);
+            // init authentication
+            if (connect.hasAuthMethodName()) {
+                authMethod = connect.getAuthMethodName();
+            } else if (connect.hasAuthMethod()) {
+                // Legacy client is passing enum
+                authMethod = 
connect.getAuthMethod().name().substring(10).toLowerCase();
+            } else {
+                authMethod = "none";
+            }
+
+            defaultAuthResult = 
AuthResult.builder().clientProtocolVersion(connect.getProtocolVersion())
+                    .clientVersion(connect.hasClientVersion()? 
connect.getClientVersion() : "")
+                    .build();
+
+            authenticationProvider = 
context.getAuthenticationService().getAuthenticationProvider(authMethod);
+            // Not find provider named authMethod. Most used for tests.
+            // In AuthenticationDisabled, it will set authMethod "none".

Review Comment:
   Improve grammar in the comment for clarity.
   ```suggestion
               // No provider named 'authMethod' was found. This is mostly used 
for tests.
               // When authentication is disabled, 'authMethod' will be set to 
'none'.
   ```



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication;
+
+import static 
org.apache.pulsar.common.naming.Constants.WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE;
+import java.util.concurrent.CompletableFuture;
+import javax.naming.AuthenticationException;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.api.AuthData;
+import org.jspecify.annotations.NonNull;
+
+@Slf4j
+@Getter
+public class BinaryAuthSession {
+    private static final byte[] emptyArray = new byte[0];
+
+    private AuthenticationState authState;
+    private String authMethod;
+    private String authRole = null;
+    private volatile AuthenticationDataSource authenticationData;
+    private AuthenticationProvider authenticationProvider;
+
+    // In case of proxy, if the authentication credentials are forwardable,
+    // it will hold the credentials of the original client
+    private String originalAuthMethod;
+    private String originalPrincipal = null;
+    private AuthenticationState originalAuthState;
+    private volatile AuthenticationDataSource originalAuthData;
+    // Keep temporarily in order to verify after verifying proxy's authData
+    private AuthData originalAuthDataCopy;
+
+    private final BinaryAuthContext context;
+
+    private AuthResult defaultAuthResult;
+
+    public BinaryAuthSession(@NonNull BinaryAuthContext context) {
+        this.context = context;
+    }
+
+    public CompletableFuture<AuthResult> doAuthentication() {
+        var connect = context.getCommandConnect();
+        try {
+            var authData = connect.hasAuthData() ? connect.getAuthData() : 
emptyArray;
+            var clientData = AuthData.of(authData);
+            // init authentication
+            if (connect.hasAuthMethodName()) {
+                authMethod = connect.getAuthMethodName();
+            } else if (connect.hasAuthMethod()) {
+                // Legacy client is passing enum
+                authMethod = 
connect.getAuthMethod().name().substring(10).toLowerCase();
+            } else {
+                authMethod = "none";
+            }
+
+            defaultAuthResult = 
AuthResult.builder().clientProtocolVersion(connect.getProtocolVersion())
+                    .clientVersion(connect.hasClientVersion()? 
connect.getClientVersion() : "")
+                    .build();
+
+            authenticationProvider = 
context.getAuthenticationService().getAuthenticationProvider(authMethod);
+            // Not find provider named authMethod. Most used for tests.
+            // In AuthenticationDisabled, it will set authMethod "none".
+            if (authenticationProvider == null) {
+                authRole = 
context.getAuthenticationService().getAnonymousUserRole()
+                        .orElseThrow(() ->
+                                new AuthenticationException(
+                                        "No anonymous role, and no 
authentication provider configured"));
+                return CompletableFuture.completedFuture(defaultAuthResult);
+            }
+
+            authState =
+                    
authenticationProvider.newAuthState(AuthData.of(connect.getAuthData()), 
context.getRemoteAddress(),
+                            context.getSslSession());
+
+            if (log.isDebugEnabled()) {
+                String role = "";
+                if (authState != null && authState.isComplete()) {
+                    role = authState.getAuthRole();
+                } else {
+                    role = "authentication incomplete or null";
+                }
+                log.debug("[{}] Authenticate role : {}", 
context.getRemoteAddress(), role);
+            }
+
+            if (connect.hasOriginalPrincipal() && 
context.isAuthenticateOriginalAuthData()
+                    && 
!WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE.equals(connect.getOriginalPrincipal())) {
+                // Flow:
+                // 1. Initialize original authentication.
+                // 2. Authenticate the proxy's authentication data.
+                // 3. Authenticate the original authentication data.
+                if (connect.hasOriginalAuthMethod()) {
+                    originalAuthMethod = connect.getOriginalAuthMethod();
+                } else {
+                    originalAuthMethod = "none";
+                }
+
+                var originalAuthenticationProvider =
+                        
context.getAuthenticationService().getAuthenticationProvider(originalAuthMethod);
+
+                /**
+                 * When both the broker and the proxy are configured with 
anonymousUserRole
+                 * if the client does not configure an authentication method
+                 * the proxy side will set the value of anonymousUserRole to 
clientAuthRole when it creates a connection
+                 * and the value of clientAuthMethod will be none.
+                 * Similarly, should also set the value of authRole to 
anonymousUserRole on the broker side.
+                 */
+                if (originalAuthenticationProvider == null) {
+                    originalPrincipal = 
context.getAuthenticationService().getAnonymousUserRole()
+                            .orElseThrow(() ->
+                                    new AuthenticationException("No anonymous 
role, and can't find "
+                                            + "AuthenticationProvider for 
original role using auth method "
+                                            + "[" + originalAuthMethod + "] is 
not available"));
+                    return 
CompletableFuture.completedFuture(defaultAuthResult);
+                }
+
+                originalAuthDataCopy = 
AuthData.of(connect.getOriginalAuthData().getBytes());
+                originalAuthState = 
originalAuthenticationProvider.newAuthState(
+                        originalAuthDataCopy,
+                        context.getRemoteAddress(),
+                        context.getSslSession());
+            } else if (connect.hasOriginalPrincipal()) {
+                originalPrincipal = connect.getOriginalPrincipal();
+
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Setting original role (forwarded from 
proxy): {}",
+                            context.getSslSession(), originalPrincipal);

Review Comment:
   [nitpick] The log context uses the SSL session instead of the remote 
address, which makes logs less actionable. Replace context.getSslSession() with 
context.getRemoteAddress().
   ```suggestion
                               context.getRemoteAddress(), originalPrincipal);
   ```



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthContext.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication;
+
+import java.net.SocketAddress;
+import java.util.concurrent.Executor;
+import java.util.function.Supplier;
+import javax.net.ssl.SSLSession;
+import lombok.Builder;
+import lombok.Getter;
+import org.apache.pulsar.common.api.proto.CommandConnect;
+
+@Getter
+@Builder
+public class BinaryAuthContext {
+    private CommandConnect commandConnect;
+    private SSLSession sslSession;
+    private AuthenticationService authenticationService;
+    private Executor executor;
+    private SocketAddress remoteAddress;
+    private boolean authenticateOriginalAuthData;

Review Comment:
   Add a brief Javadoc explaining each field’s intent (e.g., 
isConnectingSupplier semantics, executor requirements, and when 
authenticateOriginalAuthData should be true) to clarify how callers should 
construct the context.
   ```suggestion
   public class BinaryAuthContext {
       /**
        * The CommandConnect object representing the client's connection 
request.
        */
       private CommandConnect commandConnect;
   
       /**
        * The SSLSession associated with the connection, if SSL/TLS is used.
        */
       private SSLSession sslSession;
   
       /**
        * The AuthenticationService used to perform authentication for this 
context.
        */
       private AuthenticationService authenticationService;
   
       /**
        * The Executor to use for asynchronous authentication operations.
        * Must be provided if authentication involves async tasks.
        */
       private Executor executor;
   
       /**
        * The remote address of the client initiating the connection.
        */
       private SocketAddress remoteAddress;
   
       /**
        * If true, authentication should be performed using the original 
authentication data
        * provided by the client, rather than any intermediate or proxy data.
        * Set to true when authenticating the initial client request.
        */
       private boolean authenticateOriginalAuthData;
   
       /**
        * Supplier indicating whether the connection is currently in the 
process of connecting.
        * Used to determine connection state during authentication.
        */
   ```



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication;
+
+import static 
org.apache.pulsar.common.naming.Constants.WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE;
+import java.util.concurrent.CompletableFuture;
+import javax.naming.AuthenticationException;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.api.AuthData;
+import org.jspecify.annotations.NonNull;
+
+@Slf4j
+@Getter

Review Comment:
   Consider adding a class-level Javadoc describing the session lifecycle, 
thread-affinity (operations expected on the event loop), and how proxy/original 
authentication sequencing is handled. This will help future maintainers 
understand how to use and extend this class.
   ```suggestion
   @Getter
   /**
    * Represents an authentication session for binary protocols in Pulsar.
    *
    * <p><b>Session Lifecycle:</b>
    * <ul>
    *   <li>Created per connection or authentication attempt via {@link 
BinaryAuthContext}.</li>
    *   <li>Maintains authentication state, method, and role for the 
session.</li>
    *   <li>Session may be updated as authentication progresses, including 
proxy and original client authentication.</li>
    *   <li>Disposed when the connection is closed or authentication is no 
longer needed.</li>
    * </ul>
    *
    * <p><b>Thread-Affinity:</b>
    * <ul>
    *   <li>All operations are expected to be performed on the event loop 
thread associated with the connection.</li>
    *   <li>Some fields (e.g., {@code authenticationData}, {@code 
originalAuthData}) are {@code volatile} to support safe access from multiple 
threads.</li>
    *   <li>External synchronization is not required if used as intended within 
the event loop.</li>
    * </ul>
    *
    * <p><b>Proxy/Original Authentication Sequencing:</b>
    * <ul>
    *   <li>Supports authentication via a proxy, storing both proxy and 
original client credentials.</li>
    *   <li>Proxy authentication is verified first; if credentials are 
forwardable, original client authentication is performed.</li>
    *   <li>Fields such as {@code originalAuthMethod}, {@code 
originalPrincipal}, and {@code originalAuthState} track the original client's 
authentication state.</li>
    *   <li>Sequencing ensures that both proxy and original authentication are 
validated before granting access.</li>
    * </ul>
    *
    * <p>See field-level comments and method documentation for further details 
on usage and extension.</p>
    */
   ```



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication;
+
+import static 
org.apache.pulsar.common.naming.Constants.WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE;
+import java.util.concurrent.CompletableFuture;
+import javax.naming.AuthenticationException;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.api.AuthData;
+import org.jspecify.annotations.NonNull;
+
+@Slf4j
+@Getter
+public class BinaryAuthSession {
+    private static final byte[] emptyArray = new byte[0];
+
+    private AuthenticationState authState;
+    private String authMethod;
+    private String authRole = null;
+    private volatile AuthenticationDataSource authenticationData;
+    private AuthenticationProvider authenticationProvider;
+
+    // In case of proxy, if the authentication credentials are forwardable,
+    // it will hold the credentials of the original client
+    private String originalAuthMethod;
+    private String originalPrincipal = null;
+    private AuthenticationState originalAuthState;
+    private volatile AuthenticationDataSource originalAuthData;
+    // Keep temporarily in order to verify after verifying proxy's authData
+    private AuthData originalAuthDataCopy;
+
+    private final BinaryAuthContext context;
+
+    private AuthResult defaultAuthResult;
+
+    public BinaryAuthSession(@NonNull BinaryAuthContext context) {
+        this.context = context;
+    }
+
+    public CompletableFuture<AuthResult> doAuthentication() {
+        var connect = context.getCommandConnect();
+        try {
+            var authData = connect.hasAuthData() ? connect.getAuthData() : 
emptyArray;
+            var clientData = AuthData.of(authData);
+            // init authentication
+            if (connect.hasAuthMethodName()) {
+                authMethod = connect.getAuthMethodName();
+            } else if (connect.hasAuthMethod()) {
+                // Legacy client is passing enum
+                authMethod = 
connect.getAuthMethod().name().substring(10).toLowerCase();
+            } else {
+                authMethod = "none";
+            }
+
+            defaultAuthResult = 
AuthResult.builder().clientProtocolVersion(connect.getProtocolVersion())
+                    .clientVersion(connect.hasClientVersion()? 
connect.getClientVersion() : "")
+                    .build();
+
+            authenticationProvider = 
context.getAuthenticationService().getAuthenticationProvider(authMethod);
+            // Not find provider named authMethod. Most used for tests.
+            // In AuthenticationDisabled, it will set authMethod "none".
+            if (authenticationProvider == null) {
+                authRole = 
context.getAuthenticationService().getAnonymousUserRole()
+                        .orElseThrow(() ->
+                                new AuthenticationException(
+                                        "No anonymous role, and no 
authentication provider configured"));
+                return CompletableFuture.completedFuture(defaultAuthResult);
+            }
+
+            authState =
+                    
authenticationProvider.newAuthState(AuthData.of(connect.getAuthData()), 
context.getRemoteAddress(),
+                            context.getSslSession());
+
+            if (log.isDebugEnabled()) {
+                String role = "";
+                if (authState != null && authState.isComplete()) {
+                    role = authState.getAuthRole();
+                } else {
+                    role = "authentication incomplete or null";
+                }
+                log.debug("[{}] Authenticate role : {}", 
context.getRemoteAddress(), role);
+            }
+
+            if (connect.hasOriginalPrincipal() && 
context.isAuthenticateOriginalAuthData()
+                    && 
!WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE.equals(connect.getOriginalPrincipal())) {
+                // Flow:
+                // 1. Initialize original authentication.
+                // 2. Authenticate the proxy's authentication data.
+                // 3. Authenticate the original authentication data.
+                if (connect.hasOriginalAuthMethod()) {
+                    originalAuthMethod = connect.getOriginalAuthMethod();
+                } else {
+                    originalAuthMethod = "none";
+                }
+
+                var originalAuthenticationProvider =
+                        
context.getAuthenticationService().getAuthenticationProvider(originalAuthMethod);
+
+                /**
+                 * When both the broker and the proxy are configured with 
anonymousUserRole
+                 * if the client does not configure an authentication method
+                 * the proxy side will set the value of anonymousUserRole to 
clientAuthRole when it creates a connection
+                 * and the value of clientAuthMethod will be none.
+                 * Similarly, should also set the value of authRole to 
anonymousUserRole on the broker side.
+                 */
+                if (originalAuthenticationProvider == null) {
+                    originalPrincipal = 
context.getAuthenticationService().getAnonymousUserRole()
+                            .orElseThrow(() ->
+                                    new AuthenticationException("No anonymous 
role, and can't find "
+                                            + "AuthenticationProvider for 
original role using auth method "
+                                            + "[" + originalAuthMethod + "] is 
not available"));

Review Comment:
   When the originalAuthenticationProvider is missing, the previous behavior 
set both originalPrincipal and authRole to the anonymous role before completing 
connect. Here only originalPrincipal is set; authRole remains unset (null). 
This can result in a connected session with a null authRole. Set this.authRole 
to the anonymous role as well before returning.
   ```suggestion
                                               + "[" + originalAuthMethod + "] 
is not available"));
                       this.authRole = originalPrincipal;
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -1280,14 +1100,52 @@ protected void handleAuthResponse(CommandAuthResponse 
authResponse) {
         }
 
         try {
-            AuthData clientData = 
AuthData.of(authResponse.getResponse().getAuthData());
-            doAuthentication(clientData, originalAuthState != null, 
authResponse.getProtocolVersion(),
-                    authResponse.hasClientVersion() ? 
authResponse.getClientVersion() : EMPTY);
+            if (binaryAuthSession != null) {
+                AuthData clientData = 
AuthData.of(authResponse.getResponse().getAuthData());
+                binaryAuthSession.authChallenge(clientData, 
binaryAuthSession.getOriginalAuthState() != null,
+                                authResponse.getProtocolVersion(),
+                                authResponse.hasClientVersion() ? 
authResponse.getClientVersion() : "")
+                        .whenCompleteAsync((authResult, ex) -> {
+                            if (ex != null) {
+                                authenticationFailed(ex);
+                            } else {
+                                handleAuthResult(authResult);
+                            }
+                        }, ctx.executor());
+            } else {
+                authenticationFailed(new 
AuthenticationException("authentication session is null"));
+            }
         } catch (Exception e) {
             authenticationFailed(e);
         }
     }
 
+    private void handleAuthResult(@NonNull AuthResult authResult) {
+        AuthData authData = authResult.getAuthData();
+        if (authData != null) {
+            writeAndFlush(Commands.newAuthChallenge(
+                    authResult.getAuthMethod(),
+                    authData,
+                    authResult.getClientProtocolVersion()));
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Authentication in progress client by method 
{}.", remoteAddress, authMethod);

Review Comment:
   [nitpick] The debug log uses the field authMethod, which may not reflect the 
current challenge method (especially before it’s set). Log 
authResult.getAuthMethod() to ensure accuracy in challenge-phase logging.
   ```suggestion
                   log.debug("[{}] Authentication in progress client by method 
{}.", remoteAddress, authResult.getAuthMethod());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to