This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e916457f905 [fix][ws] Fix WebSocket authorization issue due to 
originalPrincipal must be provided (#24533)
e916457f905 is described below

commit e916457f9055cf3e292d98ffcb677518a27699f6
Author: Penghui Li <peng...@apache.org>
AuthorDate: Mon Jul 21 23:15:14 2025 -0700

    [fix][ws] Fix WebSocket authorization issue due to originalPrincipal must 
be provided (#24533)
---
 .../AuthenticationDataSubscription.java            |  22 +-
 .../broker/authorization/AuthorizationService.java |  29 +-
 .../broker/admin/impl/PersistentTopicsBase.java    |   2 +-
 .../pulsar/broker/lookup/TopicLookupBase.java      |  31 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  27 +-
 .../pulsar/broker/web/PulsarWebResource.java       |   4 +-
 .../pulsar/broker/service/ServerCnxTest.java       |  26 +-
 .../proxy/WebSocketProxyAuthIntegrationTest.java   | 425 +++++++++++++++++++++
 .../pulsar/client/impl/ClientBuilderImpl.java      |   5 +
 .../org/apache/pulsar/client/impl/ClientCnx.java   |   4 +-
 .../client/impl/conf/ClientConfigurationData.java  |   6 +
 .../org/apache/pulsar/common/naming/Constants.java |   2 +
 .../apache/pulsar/websocket/WebSocketService.java  |   5 +
 13 files changed, 497 insertions(+), 91 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSubscription.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSubscription.java
index 96b85989b6f..eaaf8c20d8e 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSubscription.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSubscription.java
@@ -34,32 +34,32 @@ public class AuthenticationDataSubscription implements 
AuthenticationDataSource
 
     @Override
     public boolean hasDataFromCommand() {
-        return authData.hasDataFromCommand();
+        return hasAuthData() && authData.hasDataFromCommand();
     }
 
     @Override
     public String getCommandData() {
-        return authData.getCommandData();
+        return hasAuthData() ? authData.getCommandData() : null;
     }
 
     @Override
     public boolean hasDataFromPeer() {
-        return authData.hasDataFromPeer();
+        return hasAuthData() && authData.hasDataFromPeer();
     }
 
     @Override
     public SocketAddress getPeerAddress() {
-        return authData.getPeerAddress();
+        return hasAuthData() ? authData.getPeerAddress() : null;
     }
 
     @Override
     public boolean hasDataFromTls() {
-        return authData.hasDataFromTls();
+        return hasAuthData() && authData.hasDataFromTls();
     }
 
     @Override
     public Certificate[] getTlsCertificates() {
-        return authData.getTlsCertificates();
+        return hasAuthData() ? authData.getTlsCertificates() : null;
     }
 
     @Override
@@ -74,20 +74,24 @@ public class AuthenticationDataSubscription implements 
AuthenticationDataSource
 
     @Override
     public boolean hasDataFromHttp() {
-        return authData.hasDataFromHttp();
+        return hasAuthData() && authData.hasDataFromHttp();
     }
 
     @Override
     public String getHttpAuthType() {
-        return authData.getHttpAuthType();
+        return hasAuthData() ? authData.getHttpAuthType() : null;
     }
 
     @Override
     public String getHttpHeader(String name) {
-        return authData.getHttpHeader(name);
+        return hasAuthData() ? authData.getHttpHeader(name) : null;
     }
 
     public AuthenticationDataSource getAuthData() {
         return authData;
     }
+
+    private boolean hasAuthData() {
+        return authData != null;
+    }
 }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 40573d99d60..7b7c35e820e 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -35,6 +35,7 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationParameters;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
 import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
+import org.apache.pulsar.common.naming.Constants;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -87,7 +88,7 @@ public class AuthorizationService {
         if (!isValidOriginalPrincipal(authParams)) {
             return CompletableFuture.completedFuture(false);
         }
-        if (isProxyRole(authParams.getClientRole())) {
+        if (isProxyRole(authParams.getClientRole()) && 
!isWebsocketPrinciple(authParams.getOriginalPrincipal())) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = 
isSuperUser(authParams.getClientRole(),
                     authParams.getClientAuthenticationDataSource());
             // The current paradigm is to pass the client auth data when we 
don't have access to the original auth data.
@@ -348,7 +349,7 @@ public class AuthorizationService {
         if (!isValidOriginalPrincipal(authParams)) {
             return CompletableFuture.completedFuture(false);
         }
-        if (isProxyRole(authParams.getClientRole())) {
+        if (isProxyRole(authParams.getClientRole()) && 
!isWebsocketPrinciple(authParams.getOriginalPrincipal())) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowFunctionOpsAsync(namespaceName,
                     authParams.getClientRole(), 
authParams.getClientAuthenticationDataSource());
             // The current paradigm is to pass the client auth data when we 
don't have access to the original auth data.
@@ -376,7 +377,7 @@ public class AuthorizationService {
         if (!isValidOriginalPrincipal(authParams)) {
             return CompletableFuture.completedFuture(false);
         }
-        if (isProxyRole(authParams.getClientRole())) {
+        if (isProxyRole(authParams.getClientRole()) && 
!isWebsocketPrinciple(authParams.getOriginalPrincipal())) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowSourceOpsAsync(namespaceName,
                     authParams.getClientRole(), 
authParams.getClientAuthenticationDataSource());
             // The current paradigm is to pass the client auth data when we 
don't have access to the original auth data.
@@ -404,7 +405,7 @@ public class AuthorizationService {
         if (!isValidOriginalPrincipal(authParams)) {
             return CompletableFuture.completedFuture(false);
         }
-        if (isProxyRole(authParams.getClientRole())) {
+        if (isProxyRole(authParams.getClientRole()) && 
!isWebsocketPrinciple(authParams.getOriginalPrincipal())) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowSinkOpsAsync(namespaceName,
                     authParams.getClientRole(), 
authParams.getClientAuthenticationDataSource());
             // The current paradigm is to pass the client auth data when we 
don't have access to the original auth data.
@@ -504,6 +505,10 @@ public class AuthorizationService {
         return role != null && conf.getProxyRoles().contains(role);
     }
 
+    public boolean isWebsocketPrinciple(String role) {
+        return Constants.WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE.equals(role);
+    }
+
     /**
      * Grant authorization-action permission on a tenant to the given client.
      *
@@ -534,7 +539,7 @@ public class AuthorizationService {
         if (!isValidOriginalPrincipal(role, originalRole, authData)) {
             return CompletableFuture.completedFuture(false);
         }
-        if (isProxyRole(role)) {
+        if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowTenantOperationAsync(
                     tenantName, operation, role, authData);
             CompletableFuture<Boolean> isOriginalAuthorizedFuture = 
allowTenantOperationAsync(
@@ -556,7 +561,7 @@ public class AuthorizationService {
             return CompletableFuture.completedFuture(false);
         }
 
-        if (isProxyRole(role)) {
+        if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) {
             final var isRoleAuthorizedFuture = 
provider.allowBrokerOperationAsync(clusterName, brokerId,
                     brokerOperation, role, authData);
             final var isOriginalAuthorizedFuture =  
provider.allowBrokerOperationAsync(clusterName, brokerId,
@@ -577,7 +582,7 @@ public class AuthorizationService {
             return CompletableFuture.completedFuture(false);
         }
 
-        if (isProxyRole(role)) {
+        if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) {
             final var isRoleAuthorizedFuture = 
provider.allowClusterOperationAsync(clusterName,
                     clusterOperation, role, authData);
             final var isOriginalAuthorizedFuture =  
provider.allowClusterOperationAsync(clusterName,
@@ -599,7 +604,7 @@ public class AuthorizationService {
             return CompletableFuture.completedFuture(false);
         }
 
-        if (isProxyRole(role)) {
+        if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) {
             final var isRoleAuthorizedFuture = 
provider.allowClusterPolicyOperationAsync(clusterName, role,
                     policy, operation, authData);
             final var isOriginalAuthorizedFuture =  
provider.allowClusterPolicyOperationAsync(clusterName, originalRole,
@@ -662,7 +667,7 @@ public class AuthorizationService {
         if (!isValidOriginalPrincipal(role, originalRole, authData)) {
             return CompletableFuture.completedFuture(false);
         }
-        if (isProxyRole(role)) {
+        if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowNamespaceOperationAsync(
                     namespaceName, operation, role, authData);
             CompletableFuture<Boolean> isOriginalAuthorizedFuture = 
allowNamespaceOperationAsync(
@@ -706,7 +711,7 @@ public class AuthorizationService {
         if (!isValidOriginalPrincipal(role, originalRole, authData)) {
             return CompletableFuture.completedFuture(false);
         }
-        if (isProxyRole(role)) {
+        if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowNamespacePolicyOperationAsync(
                     namespaceName, policy, operation, role, authData);
             CompletableFuture<Boolean> isOriginalAuthorizedFuture = 
allowNamespacePolicyOperationAsync(
@@ -769,7 +774,7 @@ public class AuthorizationService {
         if (!isValidOriginalPrincipal(role, originalRole, authData)) {
             return CompletableFuture.completedFuture(false);
         }
-        if (isProxyRole(role)) {
+        if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowTopicPolicyOperationAsync(
                     topicName, policy, operation, role, authData);
             CompletableFuture<Boolean> isOriginalAuthorizedFuture = 
allowTopicPolicyOperationAsync(
@@ -865,7 +870,7 @@ public class AuthorizationService {
         if (!isValidOriginalPrincipal(role, originalRole, authData)) {
             return CompletableFuture.completedFuture(false);
         }
-        if (isProxyRole(role)) {
+        if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowTopicOperationAsync(
                     topicName, operation, role, authData);
             CompletableFuture<Boolean> isOriginalAuthorizedFuture = 
allowTopicOperationAsync(
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 604af932cca..644658caab4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4349,7 +4349,7 @@ public class PersistentTopicsBase extends AdminResource {
             AuthenticationDataSource authenticationData, TopicName topicName) {
         CompletableFuture<PartitionedTopicMetadata> metadataFuture = new 
CompletableFuture<>();
         CompletableFuture<Void> authorizationFuture = new 
CompletableFuture<>();
-        checkAuthorizationAsync(pulsar, topicName, clientAppId, 
authenticationData)
+        checkAuthorizationAsync(pulsar, topicName, clientAppId, 
originalPrincipal, authenticationData)
                 .thenRun(() -> authorizationFuture.complete(null))
                 .exceptionally(e -> {
                     Throwable throwable = 
FutureUtil.unwrapCompletionException(e);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index 92f045ad90d..a80c8a4eccc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -24,7 +24,6 @@ import io.netty.buffer.ByteBuf;
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -159,32 +158,6 @@ public class TopicLookupBase extends PulsarWebResource {
         }
     }
 
-    /**
-     * Lookup broker-service address for a given namespace-bundle which 
contains given topic.
-     *
-     * a. Returns broker-address if namespace-bundle is already owned by any 
broker
-     * b. If current-broker receives lookup-request and if it's not a leader 
then current broker redirects request
-     * to leader by returning leader-service address.
-     * c. If current-broker is leader then it finds out least-loaded broker to
-     * own namespace bundle and redirects request
-     * by returning least-loaded broker.
-     * d. If current-broker receives request to own the namespace-bundle then
-     * it owns a bundle and returns success(connect)
-     * response to client.
-     *
-     * @param pulsarService
-     * @param topicName
-     * @param authoritative
-     * @param clientAppId
-     * @param requestId
-     * @return
-     */
-    public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService 
pulsarService, TopicName topicName,
-            boolean authoritative, String clientAppId, 
AuthenticationDataSource authenticationData, long requestId) {
-        return lookupTopicAsync(pulsarService, topicName, authoritative, 
clientAppId,
-                authenticationData, requestId, null, Collections.emptyMap());
-    }
-
     /**
      *
      * Lookup broker-service address for a given namespace-bundle which 
contains given topic.
@@ -209,6 +182,7 @@ public class TopicLookupBase extends PulsarWebResource {
      */
     public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService 
pulsarService, TopicName topicName,
                                                               boolean 
authoritative, String clientAppId,
+                                                              String 
originalPrinciple,
                                                               
AuthenticationDataSource authenticationData,
                                                               long requestId, 
final String advertisedListenerName,
                                                               Map<String, 
String> properties) {
@@ -231,7 +205,8 @@ public class TopicLookupBase extends PulsarWebResource {
                         requestId, false));
             } else {
                 // (2) authorize client
-                checkAuthorizationAsync(pulsarService, topicName, clientAppId, 
authenticationData).thenRun(() -> {
+                checkAuthorizationAsync(pulsarService, topicName, clientAppId, 
originalPrinciple,
+                        authenticationData).thenRun(() -> {
                         // (3) validate global namespace
                         // It is necessary for system topic operations because 
system topics are used to store metadata
                         // and other vital information. Even after namespace 
starting deletion,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 3aa365323ec..b0774513057 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -477,27 +477,16 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         if (!service.isAuthorizationEnabled()) {
             return CompletableFuture.completedFuture(true);
         }
-        CompletableFuture<Boolean> isProxyAuthorizedFuture;
-        if (originalPrincipal != null) {
-            isProxyAuthorizedFuture = 
service.getAuthorizationService().allowTopicOperationAsync(
-                    topicName, operation, originalPrincipal,
-                    originalAuthDataSource != null ? originalAuthDataSource : 
authDataSource);
-        } else {
-            isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
-        }
-        CompletableFuture<Boolean> isAuthorizedFuture = 
service.getAuthorizationService().allowTopicOperationAsync(
-            topicName, operation, authRole, authDataSource);
-        return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, 
(isProxyAuthorized, isAuthorized) -> {
-            if (!isProxyAuthorized) {
-                log.warn("OriginalRole {} is not authorized to perform 
operation {} on topic {}",
-                        originalPrincipal, operation, topicName);
-            }
+        CompletableFuture<Boolean> result = 
service.getAuthorizationService().allowTopicOperationAsync(
+                topicName, operation, originalPrincipal, authRole,
+                originalAuthDataSource != null ? originalAuthDataSource : 
authDataSource);
+        result.thenAccept(isAuthorized -> {
             if (!isAuthorized) {
-                log.warn("Role {} is not authorized to perform operation {} on 
topic {}",
-                        authRole, operation, topicName);
+                log.warn("Role {} and OriginalRole {} is not authorized to 
perform operation {} on topic {}",
+                        authRole, originalPrincipal, operation, topicName);
             }
-            return isProxyAuthorized && isAuthorized;
         });
+        return result;
     }
 
     private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName 
topicName, String subscriptionName,
@@ -561,7 +550,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                         properties = Collections.emptyMap();
                     }
                     lookupTopicAsync(getBrokerService().pulsar(), topicName, 
authoritative,
-                            getPrincipal(), getAuthenticationData(),
+                            authRole, originalPrincipal, 
getAuthenticationData(),
                             requestId, advertisedListenerName, 
properties).handle((lookupResponse, ex) -> {
                                 if (ex == null) {
                                     writeAndFlush(lookupResponse);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 5820d96eccc..7e770704c1a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -987,14 +987,14 @@ public abstract class PulsarWebResource {
     }
 
     protected static CompletableFuture<Void> 
checkAuthorizationAsync(PulsarService pulsarService, TopicName topicName,
-                        String role, AuthenticationDataSource 
authenticationData) {
+                        String role, String originalPrinciple, 
AuthenticationDataSource authenticationData) {
         if (!pulsarService.getConfiguration().isAuthorizationEnabled()) {
             // No enforcing of authorization policies
             return CompletableFuture.completedFuture(null);
         }
         // get zk policy manager
         return 
pulsarService.getBrokerService().getAuthorizationService().allowTopicOperationAsync(topicName,
-                TopicOperation.LOOKUP, null, role, 
authenticationData).thenAccept(allow -> {
+                TopicOperation.LOOKUP, originalPrinciple, role, 
authenticationData).thenAccept(allow -> {
                     if (!allow) {
                         log.warn("[{}] Role {} is not allowed to lookup 
topic", topicName, role);
                         throw new RestException(Status.UNAUTHORIZED,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index dadc6a001a6..bba8f286755 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -947,10 +947,8 @@ public class ServerCnxTest {
         assertEquals(((CommandLookupTopicResponse) lookupResponse).getError(), 
ServerError.AuthorizationError);
         assertEquals(((CommandLookupTopicResponse) 
lookupResponse).getRequestId(), 1);
         verify(authorizationService, times(1))
-                .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, 
proxyRole, serverCnx.getAuthData());
-        verify(authorizationService, times(1))
-                .allowTopicOperationAsync(topicName,
-                        TopicOperation.LOOKUP, clientRole, 
serverCnx.getOriginalAuthData());
+                .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, 
clientRole, proxyRole,
+                        serverCnx.getOriginalAuthData());
 
         // producer
         ByteBuf producer = Commands.newProducer(topicName.toString(), 1, 2, 
"test-producer", new HashMap<>(), false);
@@ -960,10 +958,8 @@ public class ServerCnxTest {
         assertEquals(((CommandError) producerResponse).getError(), 
ServerError.AuthorizationError);
         assertEquals(((CommandError) producerResponse).getRequestId(), 2);
         verify(authorizationService, times(1))
-                .allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, 
clientRole,
+                .allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, 
clientRole, proxyRole,
                         serverCnx.getOriginalAuthData());
-        verify(authorizationService, times(1))
-                .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, 
proxyRole, serverCnx.getAuthData());
 
         // consumer
         String subscriptionName = "test-subscribe";
@@ -976,20 +972,12 @@ public class ServerCnxTest {
         assertEquals(((CommandError) subscribeResponse).getRequestId(), 3);
         verify(authorizationService, times(1)).allowTopicOperationAsync(
                 eq(topicName), eq(TopicOperation.CONSUME),
-                eq(clientRole), argThat(arg -> {
+                eq(clientRole), eq(proxyRole), argThat(arg -> {
                     assertTrue(arg instanceof AuthenticationDataSubscription);
                     assertEquals(arg.getCommandData(), clientRole);
                     assertEquals(arg.getSubscription(), subscriptionName);
                     return true;
                 }));
-        verify(authorizationService, times(1)).allowTopicOperationAsync(
-                eq(topicName), eq(TopicOperation.CONSUME),
-                eq(proxyRole), argThat(arg -> {
-                    assertTrue(arg instanceof AuthenticationDataSubscription);
-                    assertEquals(arg.getCommandData(), proxyRole);
-                    assertEquals(arg.getSubscription(), subscriptionName);
-                    return true;
-                }));
     }
 
     @Test
@@ -1761,7 +1749,7 @@ public class ServerCnxTest {
         AuthorizationService authorizationService = 
mock(AuthorizationService.class);
         
doReturn(CompletableFuture.completedFuture(false)).when(authorizationService)
                 .allowTopicOperationAsync(Mockito.any(),
-                        Mockito.any(), Mockito.any(), Mockito.any());
+                        Mockito.any(), Mockito.any(), Mockito.any(), 
Mockito.any());
         
doReturn(authorizationService).when(brokerService).getAuthorizationService();
         svcConfig.setAuthenticationEnabled(true);
         svcConfig.setAuthorizationEnabled(true);
@@ -2548,7 +2536,7 @@ public class ServerCnxTest {
         AuthorizationService authorizationService = 
mock(AuthorizationService.class);
         
doReturn(CompletableFuture.completedFuture(true)).when(authorizationService)
                 .allowTopicOperationAsync(Mockito.any(),
-                        Mockito.any(), Mockito.any(), Mockito.any());
+                        Mockito.any(), Mockito.any(), Mockito.any(), 
Mockito.any());
         
doReturn(authorizationService).when(brokerService).getAuthorizationService();
         svcConfig.setAuthenticationEnabled(true);
         svcConfig.setAuthorizationEnabled(true);
@@ -2571,7 +2559,7 @@ public class ServerCnxTest {
         AuthorizationService authorizationService = 
mock(AuthorizationService.class);
         
doReturn(CompletableFuture.completedFuture(false)).when(authorizationService)
                 .allowTopicOperationAsync(Mockito.any(),
-                        Mockito.any(), Mockito.any(), Mockito.any());
+                        Mockito.any(), Mockito.any(), Mockito.any(), 
Mockito.any());
         
doReturn(authorizationService).when(brokerService).getAuthorizationService();
         svcConfig.setAuthenticationEnabled(true);
         svcConfig.setAuthorizationEnabled(true);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/WebSocketProxyAuthIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/WebSocketProxyAuthIntegrationTest.java
new file mode 100644
index 00000000000..624c2522822
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/WebSocketProxyAuthIntegrationTest.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy;
+
+import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.net.URI;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.websocket.WebSocketService;
+import org.apache.pulsar.websocket.service.ProxyServer;
+import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
+import org.apache.pulsar.websocket.service.WebSocketServiceStarter;
+import org.awaitility.Awaitility;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Integration test for WebSocket proxy with JWT token authentication and 
authorization enabled.
+ * This test specifically verifies that the WebSocket dummy principal fix 
allows
+ * WebSocket connections to work properly through a proxy when both 
authentication
+ * and authorization are enabled, using real JWT tokens for different roles.
+ *
+ * The test uses four different JWT tokens:
+ * - ADMIN_TOKEN: Used by admin client and broker's internal client for setup 
operations
+ * - PROXY_TOKEN: Used by WebSocket proxy's internal client to connect to 
broker
+ * - CLIENT_TOKEN: Used by WebSocket clients to authenticate to the proxy
+ * - UNAUTHORIZED_TOKEN: Used to test that unauthorized tokens are properly 
rejected
+ *
+ * Test coverage:
+ * 1. testWebSocketProxyProduceConsumeWithAuthorization: Positive test with 
authorized tokens
+ * 2. testWebSocketProxyWithUnauthorizedToken: Negative test with unauthorized 
tokens
+ */
+@Test(groups = "websocket")
+public class WebSocketProxyAuthIntegrationTest extends ProducerConsumerBase {
+
+    private static final Logger log = 
LoggerFactory.getLogger(WebSocketProxyAuthIntegrationTest.class);
+
+    // JWT token authentication setup with different roles
+    private static final SecretKey SECRET_KEY = 
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private static final String PROXY_TOKEN = 
AuthTokenUtils.createToken(SECRET_KEY, "websocket_proxy",
+            Optional.empty());
+    private static final String CLIENT_TOKEN = 
AuthTokenUtils.createToken(SECRET_KEY, "client", Optional.empty());
+    private static final String ADMIN_TOKEN = 
AuthTokenUtils.createToken(SECRET_KEY, "admin", Optional.empty());
+    private static final String UNAUTHORIZED_TOKEN = 
AuthTokenUtils.createToken(SECRET_KEY, "unauthorized_user",
+            Optional.empty());
+
+    private ProxyServer proxyServer;
+    private WebSocketService service;
+    private WebSocketClient consumeClient;
+    private WebSocketClient produceClient;
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        // Enable authentication and authorization in broker
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(true);
+
+        // Configure proxy roles for the broker - this is critical for the 
WebSocket proxy fix
+        conf.setProxyRoles(Sets.newHashSet("websocket_proxy"));
+
+        // Set super user roles for admin operations and proxy role
+        conf.setSuperUserRoles(Sets.newHashSet("admin", "websocket_proxy"));
+
+        // Configure broker's internal client to use admin token
+        
conf.setBrokerClientAuthenticationPlugin("org.apache.pulsar.client.impl.auth.AuthenticationToken");
+        conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN);
+
+        // Enable JWT token authentication provider
+        
conf.setAuthenticationProviders(Sets.newHashSet(AuthenticationProviderToken.class.getName()));
+
+        // Configure JWT token secret key for token validation
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", 
AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        conf.setProperties(properties);
+    }
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        super.internalSetup();
+
+        // Configure admin client with ADMIN_TOKEN for namespace and 
permission setup
+        if (admin != null) {
+            admin.close();
+        }
+        admin = PulsarAdmin.builder()
+            .serviceHttpUrl(brokerUrl.toString())
+            
.authentication("org.apache.pulsar.client.impl.auth.AuthenticationToken",
+                    "token:" + ADMIN_TOKEN)
+            .build();
+
+        // Setup namespace and grant permissions for client role
+        setupNamespacePermissions();
+
+        // Create WebSocket proxy configuration with authentication and 
authorization enabled
+        WebSocketProxyConfiguration proxyConfig = new 
WebSocketProxyConfiguration();
+        proxyConfig.setWebServicePort(Optional.of(0));
+        proxyConfig.setClusterName("test");
+        proxyConfig.setAuthenticationEnabled(true);
+        proxyConfig.setAuthorizationEnabled(true);  // Enable authorization at 
proxy level
+        proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+
+        // Configure WebSocket proxy to use JWT token authentication
+        
proxyConfig.setAuthenticationProviders(Sets.newHashSet(AuthenticationProviderToken.class.getName()));
+
+        // Set up JWT token authentication properties for proxy
+        Properties proxyProperties = new Properties();
+        proxyProperties.setProperty("tokenSecretKey", 
AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+        proxyConfig.setProperties(proxyProperties);
+
+        // Configure proxy's internal client to use PROXY_TOKEN when 
connecting to broker
+        
proxyConfig.setBrokerClientAuthenticationPlugin("org.apache.pulsar.client.impl.auth.AuthenticationToken");
+        proxyConfig.setBrokerClientAuthenticationParameters("token:" + 
PROXY_TOKEN);
+
+        // Set broker service URL to connect to our test broker
+        proxyConfig.setBrokerServiceUrl(pulsar.getBrokerServiceUrl());
+        proxyConfig.setBrokerServiceUrlTls(pulsar.getBrokerServiceUrlTls());
+
+        service = spyWithClassAndConstructorArgs(WebSocketService.class, 
proxyConfig);
+        doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeperGlobal))).when(service)
+                .createConfigMetadataStore(anyString(), anyInt(), 
anyBoolean());
+
+        proxyServer = new ProxyServer(proxyConfig);
+        WebSocketServiceStarter.start(proxyServer, service);
+
+        log.info("WebSocket Proxy Server started on port: {}", 
proxyServer.getListenPortHTTP().get());
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void cleanup() throws Exception {
+        try {
+            if (consumeClient != null) {
+                consumeClient.stop();
+            }
+            if (produceClient != null) {
+                produceClient.stop();
+            }
+            log.info("WebSocket clients stopped successfully");
+        } catch (Exception e) {
+            log.error("Error stopping WebSocket clients: {}", e.getMessage());
+        }
+
+        try {
+            if (service != null) {
+                service.close();
+            }
+            if (proxyServer != null) {
+                proxyServer.stop();
+            }
+        } catch (Exception e) {
+            log.error("Error stopping proxy server: {}", e.getMessage());
+        }
+
+        super.internalCleanup();
+        log.info("Finished cleaning up test setup");
+    }
+
+    /**
+     * Test WebSocket message produce and consume through proxy with JWT token 
authentication and authorization enabled.
+     * This verifies the fix for WebSocket dummy principal authorization issue 
using real JWT tokens.
+     *
+     * The test validates that:
+     * 1. Admin client uses ADMIN_TOKEN for setup operations (namespace 
creation, permission grants)
+     * 2. WebSocket proxy uses PROXY_TOKEN for its internal client connection 
to broker
+     * 3. WebSocket clients use CLIENT_TOKEN for authentication to the proxy 
via Authorization header
+     * 4. Topic-level permissions are correctly granted to the "client" role
+     * 5. The WebSocket dummy principal fix prevents authorization failures 
when proxy role is configured
+     * 6. The authorization service correctly handles different JWT tokens for 
different roles
+     *    without blocking operations
+     */
+    @Test(timeOut = 30000)
+    public void testWebSocketProxyProduceConsumeWithAuthorization() throws 
Exception {
+        final String namespaceName = "my-property/my-ns";
+        final String topic = namespaceName + "/my-websocket-topic";
+
+        final String consumerUri = "ws://localhost:" + 
proxyServer.getListenPortHTTP().get()
+                + "/ws/v2/consumer/persistent/" + topic + "/my-sub";
+        final String producerUri = "ws://localhost:" + 
proxyServer.getListenPortHTTP().get()
+                + "/ws/v2/producer/persistent/" + topic;
+
+        URI consumeUri = URI.create(consumerUri);
+        URI produceUri = URI.create(producerUri);
+
+        consumeClient = new WebSocketClient();
+        SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
+        produceClient = new WebSocketClient();
+        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+        try {
+            // Connect consumer with CLIENT_TOKEN in Authorization header
+            consumeClient.start();
+            ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
+            // Add JWT token authentication for WebSocket client
+            consumeRequest.setHeader("Authorization", "Bearer " + 
CLIENT_TOKEN);
+            Future<Session> consumerFuture = 
consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
+            log.info("Connecting consumer to: {} with CLIENT_TOKEN", 
consumeUri);
+
+            // Connect producer with CLIENT_TOKEN in Authorization header
+            produceClient.start();
+            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+            // Add JWT token authentication for WebSocket client
+            produceRequest.setHeader("Authorization", "Bearer " + 
CLIENT_TOKEN);
+            Future<Session> producerFuture = 
produceClient.connect(produceSocket, produceUri, produceRequest);
+            log.info("Connecting producer to: {} with CLIENT_TOKEN", 
produceUri);
+
+            // Verify connections are established
+            Session consumerSession = consumerFuture.get(10, TimeUnit.SECONDS);
+            Session producerSession = producerFuture.get(10, TimeUnit.SECONDS);
+
+            Assert.assertTrue(consumerSession.isOpen(), "Consumer WebSocket 
session should be open");
+            Assert.assertTrue(producerSession.isOpen(), "Producer WebSocket 
session should be open");
+
+            // Wait for messages to be produced and consumed
+            Awaitility.await()
+                    .atMost(20, TimeUnit.SECONDS)
+                    .untilAsserted(() -> {
+                        Assert.assertFalse(produceSocket.getBuffer().isEmpty(),
+                                "Producer should have sent messages");
+                        Assert.assertEquals(produceSocket.getBuffer(), 
consumeSocket.getBuffer(),
+                                "Consumer should receive all produced 
messages");
+                    });
+
+            log.info("Successfully produced and consumed {} messages through 
WebSocket proxy "
+                    + "with JWT token authorization",
+                    produceSocket.getBuffer().size());
+
+            // Verify that we successfully exchanged messages with JWT token 
authentication
+            // This proves that the WebSocket dummy principal fix is working 
correctly:
+            // - Proxy authenticates to broker with PROXY_TOKEN 
(websocket_proxy role)
+            // - Client authenticates to proxy with CLIENT_TOKEN (client role)
+            // - Authorization service correctly handles the WebSocket dummy 
principal scenario
+            Assert.assertTrue(produceSocket.getBuffer().size() >= 3,
+                    "Should have exchanged at least 3 messages with JWT token 
authentication");
+
+            log.info("Test passed: WebSocket proxy produce/consume works 
correctly with JWT token authorization");
+
+        } finally {
+            try {
+                if (consumeClient != null && consumeClient.isStarted()) {
+                    consumeClient.stop();
+                }
+                if (produceClient != null && produceClient.isStarted()) {
+                    produceClient.stop();
+                }
+            } catch (Exception e) {
+                log.warn("Error stopping WebSocket clients in finally block: 
{}", e.getMessage());
+            }
+        }
+    }
+
+    /**
+     * Test WebSocket connections with unauthorized token should fail.
+     * This verifies that the authorization system correctly rejects tokens 
without proper permissions.
+     */
+    @Test(timeOut = 30000)
+    public void testWebSocketProxyWithUnauthorizedToken() throws Exception {
+        final String namespaceName = "my-property/my-ns";
+        final String topic = namespaceName + "/my-websocket-topic";
+
+        final String consumerUri = "ws://localhost:" + 
proxyServer.getListenPortHTTP().get()
+                + "/ws/v2/consumer/persistent/" + topic + 
"/my-sub-unauthorized";
+        final String producerUri = "ws://localhost:" + 
proxyServer.getListenPortHTTP().get()
+                + "/ws/v2/producer/persistent/" + topic;
+
+        URI consumeUri = URI.create(consumerUri);
+        URI produceUri = URI.create(producerUri);
+
+        WebSocketClient unauthorizedConsumeClient = null;
+        WebSocketClient unauthorizedProduceClient = null;
+
+        try {
+            // Test unauthorized consumer connection
+            unauthorizedConsumeClient = new WebSocketClient();
+            SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
+            unauthorizedConsumeClient.start();
+
+            ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
+            // Use UNAUTHORIZED_TOKEN which doesn't have permissions
+            consumeRequest.setHeader("Authorization", "Bearer " + 
UNAUTHORIZED_TOKEN);
+            Future<Session> consumerFuture = 
unauthorizedConsumeClient.connect(consumeSocket, consumeUri,
+                    consumeRequest);
+
+            log.info("Attempting to connect consumer with unauthorized token 
to: {}", consumeUri);
+
+            try {
+                Session consumerSession = consumerFuture.get(10, 
TimeUnit.SECONDS);
+                // If we reach here, the connection succeeded when it should 
have failed
+                if (consumerSession.isOpen()) {
+                    Assert.fail("Consumer connection should have been rejected 
due to lack of permissions");
+                }
+            } catch (Exception e) {
+                // Expected: Connection should fail due to authorization
+                log.info("Consumer connection correctly failed with 
unauthorized token: {}", e.getMessage());
+            }
+
+            // Test unauthorized producer connection
+            unauthorizedProduceClient = new WebSocketClient();
+            SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+            unauthorizedProduceClient.start();
+
+            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+            // Use UNAUTHORIZED_TOKEN which doesn't have permissions
+            produceRequest.setHeader("Authorization", "Bearer " + 
UNAUTHORIZED_TOKEN);
+            Future<Session> producerFuture = 
unauthorizedProduceClient.connect(produceSocket, produceUri,
+                    produceRequest);
+
+            log.info("Attempting to connect producer with unauthorized token 
to: {}", produceUri);
+
+            try {
+                Session producerSession = producerFuture.get(10, 
TimeUnit.SECONDS);
+                // If we reach here, the connection succeeded when it should 
have failed
+                if (producerSession.isOpen()) {
+                    Assert.fail("Producer connection should have been rejected 
due to lack of permissions");
+                }
+            } catch (Exception e) {
+                // Expected: Connection should fail due to authorization
+                log.info("Producer connection correctly failed with 
unauthorized token: {}", e.getMessage());
+            }
+
+            log.info("Test passed: Unauthorized tokens are correctly rejected 
by WebSocket proxy");
+
+        } finally {
+            // Clean up connections
+            try {
+                if (unauthorizedConsumeClient != null && 
unauthorizedConsumeClient.isStarted()) {
+                    unauthorizedConsumeClient.stop();
+                }
+                if (unauthorizedProduceClient != null && 
unauthorizedProduceClient.isStarted()) {
+                    unauthorizedProduceClient.stop();
+                }
+            } catch (Exception e) {
+                log.warn("Error stopping unauthorized WebSocket clients in 
finally block: {}", e.getMessage());
+            }
+        }
+    }
+
+    /**
+     * Setup namespace and permissions for testing authorization scenarios.
+     * This method creates the necessary tenant, namespace, and grants 
permissions.
+     */
+    private void setupNamespacePermissions() throws Exception {
+        String namespaceName = "my-property/my-ns";
+        try {
+            // Create cluster if not exists
+            admin.clusters().createCluster("test",
+                org.apache.pulsar.common.policies.data.ClusterData.builder()
+                    .serviceUrl(pulsar.getWebServiceAddress())
+                    .serviceUrlTls(pulsar.getWebServiceAddressTls())
+                    .brokerServiceUrl(pulsar.getBrokerServiceUrl())
+                    .brokerServiceUrlTls(pulsar.getBrokerServiceUrlTls())
+                    .build());
+        } catch (Exception e) {
+            // Cluster might already exist, ignore
+            log.debug("Cluster creation failed (may already exist): {}", 
e.getMessage());
+        }
+
+        try {
+            // Create tenant
+            admin.tenants().createTenant("my-property",
+                org.apache.pulsar.common.policies.data.TenantInfoImpl.builder()
+                    .allowedClusters(Sets.newHashSet("test"))
+                    .build());
+        } catch (Exception e) {
+            // Tenant might already exist, ignore
+            log.debug("Tenant creation failed (may already exist): {}", 
e.getMessage());
+        }
+
+        try {
+            // Create namespace
+            admin.namespaces().createNamespace(namespaceName);
+        } catch (Exception e) {
+            // Namespace might already exist, ignore
+            log.debug("Namespace creation failed (may already exist): {}", 
e.getMessage());
+        }
+
+        // Grant permissions for WebSocket proxy and client
+        try {
+            // Grant permissions for the proxy role
+            admin.namespaces().grantPermissionOnNamespace(namespaceName, 
"client",
+                    Sets.newHashSet(
+                        
org.apache.pulsar.common.policies.data.AuthAction.consume,
+                        
org.apache.pulsar.common.policies.data.AuthAction.produce));
+            log.info("Granted permissions for namespace: {}", namespaceName);
+        } catch (Exception e) {
+            log.warn("Failed to grant permissions (may already exist): {}", 
e.getMessage());
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 09c668f753e..7d851361505 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -167,6 +167,11 @@ public class ClientBuilderImpl implements ClientBuilder {
         return this;
     }
 
+    public ClientBuilderImpl originalPrincipal(String originalPrincipal) {
+        conf.setOriginalPrincipal(originalPrincipal);
+        return this;
+    }
+
     private void setAuthenticationFromPropsIfAvailable(ClientConfigurationData 
clientConfig) {
         String authPluginClass = clientConfig.getAuthPluginClassName();
         String authParams = clientConfig.getAuthParams();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index c737a2e9a78..2b1109e179d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -206,6 +206,7 @@ public class ClientCnx extends PulsarHandler {
     private long lastDisconnectedTimestamp;
 
     protected final String clientVersion;
+    protected final String originalPrincipal;
 
     protected enum State {
         None, SentConnectFrame, Ready, Failed, Connecting
@@ -271,6 +272,7 @@ public class ClientCnx extends PulsarHandler {
         this.idleState = new ClientCnxIdleState(this);
         this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion()
                 + (conf.getDescription() == null ? "" : ("-" + 
conf.getDescription()));
+        this.originalPrincipal = conf.getOriginalPrincipal();
         this.connectionsOpenedCounter =
                 
instrumentProvider.newCounter("pulsar.client.connection.opened", 
Unit.Connections,
                         "The number of connections opened", null, 
Attributes.empty());
@@ -320,7 +322,7 @@ public class ClientCnx extends PulsarHandler {
         authenticationDataProvider = 
authentication.getAuthData(remoteHostName);
         AuthData authData = 
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
         return Commands.newConnect(authentication.getAuthMethodName(), 
authData, this.protocolVersion,
-                clientVersion, proxyToTargetBrokerAddress, null, null, null);
+                clientVersion, proxyToTargetBrokerAddress, originalPrincipal, 
null, null);
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 72b9cf0c191..e406581e707 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -111,6 +111,12 @@ public class ClientConfigurationData implements 
Serializable, Cloneable {
     @Secret
     private Map<String, String> authParamMap;
 
+    @ApiModelProperty(
+            name = "originalPrincipal",
+            value = "Original principal for proxy authentication scenarios."
+    )
+    private String originalPrincipal;
+
     @ApiModelProperty(
             name = "operationTimeoutMs",
             value = "Client operation timeout (in milliseconds)."
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Constants.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Constants.java
index ab71f2a43e5..7970b395fb3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Constants.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Constants.java
@@ -25,5 +25,7 @@ public class Constants {
 
     public static final String GLOBAL_CLUSTER = "global";
 
+    public static final String WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE = 
"__websocket_dummy_original_principle";
+
     private Constants() {}
 }
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 7bb4df7baa5..d379d01d1be 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.websocket;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static 
org.apache.pulsar.common.naming.Constants.WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.Closeable;
 import java.io.IOException;
@@ -44,6 +45,7 @@ import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SizeUnit;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -194,6 +196,9 @@ public class WebSocketService implements Closeable {
                 
.tlsTrustCertsFilePath(config.getBrokerClientTrustCertsFilePath()) //
                 .ioThreads(config.getWebSocketNumIoThreads()) //
                 
.connectionsPerBroker(config.getWebSocketConnectionsPerBroker());
+        if (clientBuilder instanceof  ClientBuilderImpl) {
+            ((ClientBuilderImpl) 
clientBuilder).originalPrincipal(WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE);
+        }
 
         // Apply all arbitrary configuration. This must be called before 
setting any fields annotated as
         // @Secret on the ClientConfigurationData object because of the way 
they are serialized.

Reply via email to