This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new e916457f905 [fix][ws] Fix WebSocket authorization issue due to originalPrincipal must be provided (#24533) e916457f905 is described below commit e916457f9055cf3e292d98ffcb677518a27699f6 Author: Penghui Li <peng...@apache.org> AuthorDate: Mon Jul 21 23:15:14 2025 -0700 [fix][ws] Fix WebSocket authorization issue due to originalPrincipal must be provided (#24533) --- .../AuthenticationDataSubscription.java | 22 +- .../broker/authorization/AuthorizationService.java | 29 +- .../broker/admin/impl/PersistentTopicsBase.java | 2 +- .../pulsar/broker/lookup/TopicLookupBase.java | 31 +- .../apache/pulsar/broker/service/ServerCnx.java | 27 +- .../pulsar/broker/web/PulsarWebResource.java | 4 +- .../pulsar/broker/service/ServerCnxTest.java | 26 +- .../proxy/WebSocketProxyAuthIntegrationTest.java | 425 +++++++++++++++++++++ .../pulsar/client/impl/ClientBuilderImpl.java | 5 + .../org/apache/pulsar/client/impl/ClientCnx.java | 4 +- .../client/impl/conf/ClientConfigurationData.java | 6 + .../org/apache/pulsar/common/naming/Constants.java | 2 + .../apache/pulsar/websocket/WebSocketService.java | 5 + 13 files changed, 497 insertions(+), 91 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSubscription.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSubscription.java index 96b85989b6f..eaaf8c20d8e 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSubscription.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataSubscription.java @@ -34,32 +34,32 @@ public class AuthenticationDataSubscription implements AuthenticationDataSource @Override public boolean hasDataFromCommand() { - return authData.hasDataFromCommand(); + return hasAuthData() && authData.hasDataFromCommand(); } @Override public String getCommandData() { - return authData.getCommandData(); + return hasAuthData() ? authData.getCommandData() : null; } @Override public boolean hasDataFromPeer() { - return authData.hasDataFromPeer(); + return hasAuthData() && authData.hasDataFromPeer(); } @Override public SocketAddress getPeerAddress() { - return authData.getPeerAddress(); + return hasAuthData() ? authData.getPeerAddress() : null; } @Override public boolean hasDataFromTls() { - return authData.hasDataFromTls(); + return hasAuthData() && authData.hasDataFromTls(); } @Override public Certificate[] getTlsCertificates() { - return authData.getTlsCertificates(); + return hasAuthData() ? authData.getTlsCertificates() : null; } @Override @@ -74,20 +74,24 @@ public class AuthenticationDataSubscription implements AuthenticationDataSource @Override public boolean hasDataFromHttp() { - return authData.hasDataFromHttp(); + return hasAuthData() && authData.hasDataFromHttp(); } @Override public String getHttpAuthType() { - return authData.getHttpAuthType(); + return hasAuthData() ? authData.getHttpAuthType() : null; } @Override public String getHttpHeader(String name) { - return authData.getHttpHeader(name); + return hasAuthData() ? authData.getHttpHeader(name) : null; } public AuthenticationDataSource getAuthData() { return authData; } + + private boolean hasAuthData() { + return authData != null; + } } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index 40573d99d60..7b7c35e820e 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -35,6 +35,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationParameters; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.admin.GrantTopicPermissionOptions; import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions; +import org.apache.pulsar.common.naming.Constants; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; @@ -87,7 +88,7 @@ public class AuthorizationService { if (!isValidOriginalPrincipal(authParams)) { return CompletableFuture.completedFuture(false); } - if (isProxyRole(authParams.getClientRole())) { + if (isProxyRole(authParams.getClientRole()) && !isWebsocketPrinciple(authParams.getOriginalPrincipal())) { CompletableFuture<Boolean> isRoleAuthorizedFuture = isSuperUser(authParams.getClientRole(), authParams.getClientAuthenticationDataSource()); // The current paradigm is to pass the client auth data when we don't have access to the original auth data. @@ -348,7 +349,7 @@ public class AuthorizationService { if (!isValidOriginalPrincipal(authParams)) { return CompletableFuture.completedFuture(false); } - if (isProxyRole(authParams.getClientRole())) { + if (isProxyRole(authParams.getClientRole()) && !isWebsocketPrinciple(authParams.getOriginalPrincipal())) { CompletableFuture<Boolean> isRoleAuthorizedFuture = allowFunctionOpsAsync(namespaceName, authParams.getClientRole(), authParams.getClientAuthenticationDataSource()); // The current paradigm is to pass the client auth data when we don't have access to the original auth data. @@ -376,7 +377,7 @@ public class AuthorizationService { if (!isValidOriginalPrincipal(authParams)) { return CompletableFuture.completedFuture(false); } - if (isProxyRole(authParams.getClientRole())) { + if (isProxyRole(authParams.getClientRole()) && !isWebsocketPrinciple(authParams.getOriginalPrincipal())) { CompletableFuture<Boolean> isRoleAuthorizedFuture = allowSourceOpsAsync(namespaceName, authParams.getClientRole(), authParams.getClientAuthenticationDataSource()); // The current paradigm is to pass the client auth data when we don't have access to the original auth data. @@ -404,7 +405,7 @@ public class AuthorizationService { if (!isValidOriginalPrincipal(authParams)) { return CompletableFuture.completedFuture(false); } - if (isProxyRole(authParams.getClientRole())) { + if (isProxyRole(authParams.getClientRole()) && !isWebsocketPrinciple(authParams.getOriginalPrincipal())) { CompletableFuture<Boolean> isRoleAuthorizedFuture = allowSinkOpsAsync(namespaceName, authParams.getClientRole(), authParams.getClientAuthenticationDataSource()); // The current paradigm is to pass the client auth data when we don't have access to the original auth data. @@ -504,6 +505,10 @@ public class AuthorizationService { return role != null && conf.getProxyRoles().contains(role); } + public boolean isWebsocketPrinciple(String role) { + return Constants.WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE.equals(role); + } + /** * Grant authorization-action permission on a tenant to the given client. * @@ -534,7 +539,7 @@ public class AuthorizationService { if (!isValidOriginalPrincipal(role, originalRole, authData)) { return CompletableFuture.completedFuture(false); } - if (isProxyRole(role)) { + if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTenantOperationAsync( tenantName, operation, role, authData); CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowTenantOperationAsync( @@ -556,7 +561,7 @@ public class AuthorizationService { return CompletableFuture.completedFuture(false); } - if (isProxyRole(role)) { + if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { final var isRoleAuthorizedFuture = provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, role, authData); final var isOriginalAuthorizedFuture = provider.allowBrokerOperationAsync(clusterName, brokerId, @@ -577,7 +582,7 @@ public class AuthorizationService { return CompletableFuture.completedFuture(false); } - if (isProxyRole(role)) { + if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { final var isRoleAuthorizedFuture = provider.allowClusterOperationAsync(clusterName, clusterOperation, role, authData); final var isOriginalAuthorizedFuture = provider.allowClusterOperationAsync(clusterName, @@ -599,7 +604,7 @@ public class AuthorizationService { return CompletableFuture.completedFuture(false); } - if (isProxyRole(role)) { + if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { final var isRoleAuthorizedFuture = provider.allowClusterPolicyOperationAsync(clusterName, role, policy, operation, authData); final var isOriginalAuthorizedFuture = provider.allowClusterPolicyOperationAsync(clusterName, originalRole, @@ -662,7 +667,7 @@ public class AuthorizationService { if (!isValidOriginalPrincipal(role, originalRole, authData)) { return CompletableFuture.completedFuture(false); } - if (isProxyRole(role)) { + if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture<Boolean> isRoleAuthorizedFuture = allowNamespaceOperationAsync( namespaceName, operation, role, authData); CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowNamespaceOperationAsync( @@ -706,7 +711,7 @@ public class AuthorizationService { if (!isValidOriginalPrincipal(role, originalRole, authData)) { return CompletableFuture.completedFuture(false); } - if (isProxyRole(role)) { + if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture<Boolean> isRoleAuthorizedFuture = allowNamespacePolicyOperationAsync( namespaceName, policy, operation, role, authData); CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowNamespacePolicyOperationAsync( @@ -769,7 +774,7 @@ public class AuthorizationService { if (!isValidOriginalPrincipal(role, originalRole, authData)) { return CompletableFuture.completedFuture(false); } - if (isProxyRole(role)) { + if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTopicPolicyOperationAsync( topicName, policy, operation, role, authData); CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowTopicPolicyOperationAsync( @@ -865,7 +870,7 @@ public class AuthorizationService { if (!isValidOriginalPrincipal(role, originalRole, authData)) { return CompletableFuture.completedFuture(false); } - if (isProxyRole(role)) { + if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTopicOperationAsync( topicName, operation, role, authData); CompletableFuture<Boolean> isOriginalAuthorizedFuture = allowTopicOperationAsync( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 604af932cca..644658caab4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -4349,7 +4349,7 @@ public class PersistentTopicsBase extends AdminResource { AuthenticationDataSource authenticationData, TopicName topicName) { CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>(); CompletableFuture<Void> authorizationFuture = new CompletableFuture<>(); - checkAuthorizationAsync(pulsar, topicName, clientAppId, authenticationData) + checkAuthorizationAsync(pulsar, topicName, clientAppId, originalPrincipal, authenticationData) .thenRun(() -> authorizationFuture.complete(null)) .exceptionally(e -> { Throwable throwable = FutureUtil.unwrapCompletionException(e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java index 92f045ad90d..a80c8a4eccc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java @@ -24,7 +24,6 @@ import io.netty.buffer.ByteBuf; import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; -import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -159,32 +158,6 @@ public class TopicLookupBase extends PulsarWebResource { } } - /** - * Lookup broker-service address for a given namespace-bundle which contains given topic. - * - * a. Returns broker-address if namespace-bundle is already owned by any broker - * b. If current-broker receives lookup-request and if it's not a leader then current broker redirects request - * to leader by returning leader-service address. - * c. If current-broker is leader then it finds out least-loaded broker to - * own namespace bundle and redirects request - * by returning least-loaded broker. - * d. If current-broker receives request to own the namespace-bundle then - * it owns a bundle and returns success(connect) - * response to client. - * - * @param pulsarService - * @param topicName - * @param authoritative - * @param clientAppId - * @param requestId - * @return - */ - public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarService, TopicName topicName, - boolean authoritative, String clientAppId, AuthenticationDataSource authenticationData, long requestId) { - return lookupTopicAsync(pulsarService, topicName, authoritative, clientAppId, - authenticationData, requestId, null, Collections.emptyMap()); - } - /** * * Lookup broker-service address for a given namespace-bundle which contains given topic. @@ -209,6 +182,7 @@ public class TopicLookupBase extends PulsarWebResource { */ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarService, TopicName topicName, boolean authoritative, String clientAppId, + String originalPrinciple, AuthenticationDataSource authenticationData, long requestId, final String advertisedListenerName, Map<String, String> properties) { @@ -231,7 +205,8 @@ public class TopicLookupBase extends PulsarWebResource { requestId, false)); } else { // (2) authorize client - checkAuthorizationAsync(pulsarService, topicName, clientAppId, authenticationData).thenRun(() -> { + checkAuthorizationAsync(pulsarService, topicName, clientAppId, originalPrinciple, + authenticationData).thenRun(() -> { // (3) validate global namespace // It is necessary for system topic operations because system topics are used to store metadata // and other vital information. Even after namespace starting deletion, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 3aa365323ec..b0774513057 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -477,27 +477,16 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { if (!service.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - CompletableFuture<Boolean> isProxyAuthorizedFuture; - if (originalPrincipal != null) { - isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync( - topicName, operation, originalPrincipal, - originalAuthDataSource != null ? originalAuthDataSource : authDataSource); - } else { - isProxyAuthorizedFuture = CompletableFuture.completedFuture(true); - } - CompletableFuture<Boolean> isAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync( - topicName, operation, authRole, authDataSource); - return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> { - if (!isProxyAuthorized) { - log.warn("OriginalRole {} is not authorized to perform operation {} on topic {}", - originalPrincipal, operation, topicName); - } + CompletableFuture<Boolean> result = service.getAuthorizationService().allowTopicOperationAsync( + topicName, operation, originalPrincipal, authRole, + originalAuthDataSource != null ? originalAuthDataSource : authDataSource); + result.thenAccept(isAuthorized -> { if (!isAuthorized) { - log.warn("Role {} is not authorized to perform operation {} on topic {}", - authRole, operation, topicName); + log.warn("Role {} and OriginalRole {} is not authorized to perform operation {} on topic {}", + authRole, originalPrincipal, operation, topicName); } - return isProxyAuthorized && isAuthorized; }); + return result; } private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, String subscriptionName, @@ -561,7 +550,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { properties = Collections.emptyMap(); } lookupTopicAsync(getBrokerService().pulsar(), topicName, authoritative, - getPrincipal(), getAuthenticationData(), + authRole, originalPrincipal, getAuthenticationData(), requestId, advertisedListenerName, properties).handle((lookupResponse, ex) -> { if (ex == null) { writeAndFlush(lookupResponse); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 5820d96eccc..7e770704c1a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -987,14 +987,14 @@ public abstract class PulsarWebResource { } protected static CompletableFuture<Void> checkAuthorizationAsync(PulsarService pulsarService, TopicName topicName, - String role, AuthenticationDataSource authenticationData) { + String role, String originalPrinciple, AuthenticationDataSource authenticationData) { if (!pulsarService.getConfiguration().isAuthorizationEnabled()) { // No enforcing of authorization policies return CompletableFuture.completedFuture(null); } // get zk policy manager return pulsarService.getBrokerService().getAuthorizationService().allowTopicOperationAsync(topicName, - TopicOperation.LOOKUP, null, role, authenticationData).thenAccept(allow -> { + TopicOperation.LOOKUP, originalPrinciple, role, authenticationData).thenAccept(allow -> { if (!allow) { log.warn("[{}] Role {} is not allowed to lookup topic", topicName, role); throw new RestException(Status.UNAUTHORIZED, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index dadc6a001a6..bba8f286755 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -947,10 +947,8 @@ public class ServerCnxTest { assertEquals(((CommandLookupTopicResponse) lookupResponse).getError(), ServerError.AuthorizationError); assertEquals(((CommandLookupTopicResponse) lookupResponse).getRequestId(), 1); verify(authorizationService, times(1)) - .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, proxyRole, serverCnx.getAuthData()); - verify(authorizationService, times(1)) - .allowTopicOperationAsync(topicName, - TopicOperation.LOOKUP, clientRole, serverCnx.getOriginalAuthData()); + .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, clientRole, proxyRole, + serverCnx.getOriginalAuthData()); // producer ByteBuf producer = Commands.newProducer(topicName.toString(), 1, 2, "test-producer", new HashMap<>(), false); @@ -960,10 +958,8 @@ public class ServerCnxTest { assertEquals(((CommandError) producerResponse).getError(), ServerError.AuthorizationError); assertEquals(((CommandError) producerResponse).getRequestId(), 2); verify(authorizationService, times(1)) - .allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, clientRole, + .allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, clientRole, proxyRole, serverCnx.getOriginalAuthData()); - verify(authorizationService, times(1)) - .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, proxyRole, serverCnx.getAuthData()); // consumer String subscriptionName = "test-subscribe"; @@ -976,20 +972,12 @@ public class ServerCnxTest { assertEquals(((CommandError) subscribeResponse).getRequestId(), 3); verify(authorizationService, times(1)).allowTopicOperationAsync( eq(topicName), eq(TopicOperation.CONSUME), - eq(clientRole), argThat(arg -> { + eq(clientRole), eq(proxyRole), argThat(arg -> { assertTrue(arg instanceof AuthenticationDataSubscription); assertEquals(arg.getCommandData(), clientRole); assertEquals(arg.getSubscription(), subscriptionName); return true; })); - verify(authorizationService, times(1)).allowTopicOperationAsync( - eq(topicName), eq(TopicOperation.CONSUME), - eq(proxyRole), argThat(arg -> { - assertTrue(arg instanceof AuthenticationDataSubscription); - assertEquals(arg.getCommandData(), proxyRole); - assertEquals(arg.getSubscription(), subscriptionName); - return true; - })); } @Test @@ -1761,7 +1749,7 @@ public class ServerCnxTest { AuthorizationService authorizationService = mock(AuthorizationService.class); doReturn(CompletableFuture.completedFuture(false)).when(authorizationService) .allowTopicOperationAsync(Mockito.any(), - Mockito.any(), Mockito.any(), Mockito.any()); + Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); doReturn(authorizationService).when(brokerService).getAuthorizationService(); svcConfig.setAuthenticationEnabled(true); svcConfig.setAuthorizationEnabled(true); @@ -2548,7 +2536,7 @@ public class ServerCnxTest { AuthorizationService authorizationService = mock(AuthorizationService.class); doReturn(CompletableFuture.completedFuture(true)).when(authorizationService) .allowTopicOperationAsync(Mockito.any(), - Mockito.any(), Mockito.any(), Mockito.any()); + Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); doReturn(authorizationService).when(brokerService).getAuthorizationService(); svcConfig.setAuthenticationEnabled(true); svcConfig.setAuthorizationEnabled(true); @@ -2571,7 +2559,7 @@ public class ServerCnxTest { AuthorizationService authorizationService = mock(AuthorizationService.class); doReturn(CompletableFuture.completedFuture(false)).when(authorizationService) .allowTopicOperationAsync(Mockito.any(), - Mockito.any(), Mockito.any(), Mockito.any()); + Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); doReturn(authorizationService).when(brokerService).getAuthorizationService(); svcConfig.setAuthenticationEnabled(true); svcConfig.setAuthorizationEnabled(true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/WebSocketProxyAuthIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/WebSocketProxyAuthIntegrationTest.java new file mode 100644 index 00000000000..624c2522822 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/WebSocketProxyAuthIntegrationTest.java @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.websocket.proxy; + +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; +import com.google.common.collect.Sets; +import io.jsonwebtoken.SignatureAlgorithm; +import java.net.URI; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import javax.crypto.SecretKey; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.metadata.impl.ZKMetadataStore; +import org.apache.pulsar.websocket.WebSocketService; +import org.apache.pulsar.websocket.service.ProxyServer; +import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration; +import org.apache.pulsar.websocket.service.WebSocketServiceStarter; +import org.awaitility.Awaitility; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Integration test for WebSocket proxy with JWT token authentication and authorization enabled. + * This test specifically verifies that the WebSocket dummy principal fix allows + * WebSocket connections to work properly through a proxy when both authentication + * and authorization are enabled, using real JWT tokens for different roles. + * + * The test uses four different JWT tokens: + * - ADMIN_TOKEN: Used by admin client and broker's internal client for setup operations + * - PROXY_TOKEN: Used by WebSocket proxy's internal client to connect to broker + * - CLIENT_TOKEN: Used by WebSocket clients to authenticate to the proxy + * - UNAUTHORIZED_TOKEN: Used to test that unauthorized tokens are properly rejected + * + * Test coverage: + * 1. testWebSocketProxyProduceConsumeWithAuthorization: Positive test with authorized tokens + * 2. testWebSocketProxyWithUnauthorizedToken: Negative test with unauthorized tokens + */ +@Test(groups = "websocket") +public class WebSocketProxyAuthIntegrationTest extends ProducerConsumerBase { + + private static final Logger log = LoggerFactory.getLogger(WebSocketProxyAuthIntegrationTest.class); + + // JWT token authentication setup with different roles + private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); + private static final String PROXY_TOKEN = AuthTokenUtils.createToken(SECRET_KEY, "websocket_proxy", + Optional.empty()); + private static final String CLIENT_TOKEN = AuthTokenUtils.createToken(SECRET_KEY, "client", Optional.empty()); + private static final String ADMIN_TOKEN = AuthTokenUtils.createToken(SECRET_KEY, "admin", Optional.empty()); + private static final String UNAUTHORIZED_TOKEN = AuthTokenUtils.createToken(SECRET_KEY, "unauthorized_user", + Optional.empty()); + + private ProxyServer proxyServer; + private WebSocketService service; + private WebSocketClient consumeClient; + private WebSocketClient produceClient; + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + // Enable authentication and authorization in broker + conf.setAuthenticationEnabled(true); + conf.setAuthorizationEnabled(true); + + // Configure proxy roles for the broker - this is critical for the WebSocket proxy fix + conf.setProxyRoles(Sets.newHashSet("websocket_proxy")); + + // Set super user roles for admin operations and proxy role + conf.setSuperUserRoles(Sets.newHashSet("admin", "websocket_proxy")); + + // Configure broker's internal client to use admin token + conf.setBrokerClientAuthenticationPlugin("org.apache.pulsar.client.impl.auth.AuthenticationToken"); + conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN); + + // Enable JWT token authentication provider + conf.setAuthenticationProviders(Sets.newHashSet(AuthenticationProviderToken.class.getName())); + + // Configure JWT token secret key for token validation + Properties properties = new Properties(); + properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY)); + conf.setProperties(properties); + } + + @BeforeMethod + public void setup() throws Exception { + super.internalSetup(); + + // Configure admin client with ADMIN_TOKEN for namespace and permission setup + if (admin != null) { + admin.close(); + } + admin = PulsarAdmin.builder() + .serviceHttpUrl(brokerUrl.toString()) + .authentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", + "token:" + ADMIN_TOKEN) + .build(); + + // Setup namespace and grant permissions for client role + setupNamespacePermissions(); + + // Create WebSocket proxy configuration with authentication and authorization enabled + WebSocketProxyConfiguration proxyConfig = new WebSocketProxyConfiguration(); + proxyConfig.setWebServicePort(Optional.of(0)); + proxyConfig.setClusterName("test"); + proxyConfig.setAuthenticationEnabled(true); + proxyConfig.setAuthorizationEnabled(true); // Enable authorization at proxy level + proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); + + // Configure WebSocket proxy to use JWT token authentication + proxyConfig.setAuthenticationProviders(Sets.newHashSet(AuthenticationProviderToken.class.getName())); + + // Set up JWT token authentication properties for proxy + Properties proxyProperties = new Properties(); + proxyProperties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY)); + proxyConfig.setProperties(proxyProperties); + + // Configure proxy's internal client to use PROXY_TOKEN when connecting to broker + proxyConfig.setBrokerClientAuthenticationPlugin("org.apache.pulsar.client.impl.auth.AuthenticationToken"); + proxyConfig.setBrokerClientAuthenticationParameters("token:" + PROXY_TOKEN); + + // Set broker service URL to connect to our test broker + proxyConfig.setBrokerServiceUrl(pulsar.getBrokerServiceUrl()); + proxyConfig.setBrokerServiceUrlTls(pulsar.getBrokerServiceUrlTls()); + + service = spyWithClassAndConstructorArgs(WebSocketService.class, proxyConfig); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service) + .createConfigMetadataStore(anyString(), anyInt(), anyBoolean()); + + proxyServer = new ProxyServer(proxyConfig); + WebSocketServiceStarter.start(proxyServer, service); + + log.info("WebSocket Proxy Server started on port: {}", proxyServer.getListenPortHTTP().get()); + } + + @AfterMethod(alwaysRun = true) + public void cleanup() throws Exception { + try { + if (consumeClient != null) { + consumeClient.stop(); + } + if (produceClient != null) { + produceClient.stop(); + } + log.info("WebSocket clients stopped successfully"); + } catch (Exception e) { + log.error("Error stopping WebSocket clients: {}", e.getMessage()); + } + + try { + if (service != null) { + service.close(); + } + if (proxyServer != null) { + proxyServer.stop(); + } + } catch (Exception e) { + log.error("Error stopping proxy server: {}", e.getMessage()); + } + + super.internalCleanup(); + log.info("Finished cleaning up test setup"); + } + + /** + * Test WebSocket message produce and consume through proxy with JWT token authentication and authorization enabled. + * This verifies the fix for WebSocket dummy principal authorization issue using real JWT tokens. + * + * The test validates that: + * 1. Admin client uses ADMIN_TOKEN for setup operations (namespace creation, permission grants) + * 2. WebSocket proxy uses PROXY_TOKEN for its internal client connection to broker + * 3. WebSocket clients use CLIENT_TOKEN for authentication to the proxy via Authorization header + * 4. Topic-level permissions are correctly granted to the "client" role + * 5. The WebSocket dummy principal fix prevents authorization failures when proxy role is configured + * 6. The authorization service correctly handles different JWT tokens for different roles + * without blocking operations + */ + @Test(timeOut = 30000) + public void testWebSocketProxyProduceConsumeWithAuthorization() throws Exception { + final String namespaceName = "my-property/my-ns"; + final String topic = namespaceName + "/my-websocket-topic"; + + final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + + "/ws/v2/consumer/persistent/" + topic + "/my-sub"; + final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + + "/ws/v2/producer/persistent/" + topic; + + URI consumeUri = URI.create(consumerUri); + URI produceUri = URI.create(producerUri); + + consumeClient = new WebSocketClient(); + SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket(); + produceClient = new WebSocketClient(); + SimpleProducerSocket produceSocket = new SimpleProducerSocket(); + + try { + // Connect consumer with CLIENT_TOKEN in Authorization header + consumeClient.start(); + ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest(); + // Add JWT token authentication for WebSocket client + consumeRequest.setHeader("Authorization", "Bearer " + CLIENT_TOKEN); + Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest); + log.info("Connecting consumer to: {} with CLIENT_TOKEN", consumeUri); + + // Connect producer with CLIENT_TOKEN in Authorization header + produceClient.start(); + ClientUpgradeRequest produceRequest = new ClientUpgradeRequest(); + // Add JWT token authentication for WebSocket client + produceRequest.setHeader("Authorization", "Bearer " + CLIENT_TOKEN); + Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest); + log.info("Connecting producer to: {} with CLIENT_TOKEN", produceUri); + + // Verify connections are established + Session consumerSession = consumerFuture.get(10, TimeUnit.SECONDS); + Session producerSession = producerFuture.get(10, TimeUnit.SECONDS); + + Assert.assertTrue(consumerSession.isOpen(), "Consumer WebSocket session should be open"); + Assert.assertTrue(producerSession.isOpen(), "Producer WebSocket session should be open"); + + // Wait for messages to be produced and consumed + Awaitility.await() + .atMost(20, TimeUnit.SECONDS) + .untilAsserted(() -> { + Assert.assertFalse(produceSocket.getBuffer().isEmpty(), + "Producer should have sent messages"); + Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer(), + "Consumer should receive all produced messages"); + }); + + log.info("Successfully produced and consumed {} messages through WebSocket proxy " + + "with JWT token authorization", + produceSocket.getBuffer().size()); + + // Verify that we successfully exchanged messages with JWT token authentication + // This proves that the WebSocket dummy principal fix is working correctly: + // - Proxy authenticates to broker with PROXY_TOKEN (websocket_proxy role) + // - Client authenticates to proxy with CLIENT_TOKEN (client role) + // - Authorization service correctly handles the WebSocket dummy principal scenario + Assert.assertTrue(produceSocket.getBuffer().size() >= 3, + "Should have exchanged at least 3 messages with JWT token authentication"); + + log.info("Test passed: WebSocket proxy produce/consume works correctly with JWT token authorization"); + + } finally { + try { + if (consumeClient != null && consumeClient.isStarted()) { + consumeClient.stop(); + } + if (produceClient != null && produceClient.isStarted()) { + produceClient.stop(); + } + } catch (Exception e) { + log.warn("Error stopping WebSocket clients in finally block: {}", e.getMessage()); + } + } + } + + /** + * Test WebSocket connections with unauthorized token should fail. + * This verifies that the authorization system correctly rejects tokens without proper permissions. + */ + @Test(timeOut = 30000) + public void testWebSocketProxyWithUnauthorizedToken() throws Exception { + final String namespaceName = "my-property/my-ns"; + final String topic = namespaceName + "/my-websocket-topic"; + + final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + + "/ws/v2/consumer/persistent/" + topic + "/my-sub-unauthorized"; + final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + + "/ws/v2/producer/persistent/" + topic; + + URI consumeUri = URI.create(consumerUri); + URI produceUri = URI.create(producerUri); + + WebSocketClient unauthorizedConsumeClient = null; + WebSocketClient unauthorizedProduceClient = null; + + try { + // Test unauthorized consumer connection + unauthorizedConsumeClient = new WebSocketClient(); + SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket(); + unauthorizedConsumeClient.start(); + + ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest(); + // Use UNAUTHORIZED_TOKEN which doesn't have permissions + consumeRequest.setHeader("Authorization", "Bearer " + UNAUTHORIZED_TOKEN); + Future<Session> consumerFuture = unauthorizedConsumeClient.connect(consumeSocket, consumeUri, + consumeRequest); + + log.info("Attempting to connect consumer with unauthorized token to: {}", consumeUri); + + try { + Session consumerSession = consumerFuture.get(10, TimeUnit.SECONDS); + // If we reach here, the connection succeeded when it should have failed + if (consumerSession.isOpen()) { + Assert.fail("Consumer connection should have been rejected due to lack of permissions"); + } + } catch (Exception e) { + // Expected: Connection should fail due to authorization + log.info("Consumer connection correctly failed with unauthorized token: {}", e.getMessage()); + } + + // Test unauthorized producer connection + unauthorizedProduceClient = new WebSocketClient(); + SimpleProducerSocket produceSocket = new SimpleProducerSocket(); + unauthorizedProduceClient.start(); + + ClientUpgradeRequest produceRequest = new ClientUpgradeRequest(); + // Use UNAUTHORIZED_TOKEN which doesn't have permissions + produceRequest.setHeader("Authorization", "Bearer " + UNAUTHORIZED_TOKEN); + Future<Session> producerFuture = unauthorizedProduceClient.connect(produceSocket, produceUri, + produceRequest); + + log.info("Attempting to connect producer with unauthorized token to: {}", produceUri); + + try { + Session producerSession = producerFuture.get(10, TimeUnit.SECONDS); + // If we reach here, the connection succeeded when it should have failed + if (producerSession.isOpen()) { + Assert.fail("Producer connection should have been rejected due to lack of permissions"); + } + } catch (Exception e) { + // Expected: Connection should fail due to authorization + log.info("Producer connection correctly failed with unauthorized token: {}", e.getMessage()); + } + + log.info("Test passed: Unauthorized tokens are correctly rejected by WebSocket proxy"); + + } finally { + // Clean up connections + try { + if (unauthorizedConsumeClient != null && unauthorizedConsumeClient.isStarted()) { + unauthorizedConsumeClient.stop(); + } + if (unauthorizedProduceClient != null && unauthorizedProduceClient.isStarted()) { + unauthorizedProduceClient.stop(); + } + } catch (Exception e) { + log.warn("Error stopping unauthorized WebSocket clients in finally block: {}", e.getMessage()); + } + } + } + + /** + * Setup namespace and permissions for testing authorization scenarios. + * This method creates the necessary tenant, namespace, and grants permissions. + */ + private void setupNamespacePermissions() throws Exception { + String namespaceName = "my-property/my-ns"; + try { + // Create cluster if not exists + admin.clusters().createCluster("test", + org.apache.pulsar.common.policies.data.ClusterData.builder() + .serviceUrl(pulsar.getWebServiceAddress()) + .serviceUrlTls(pulsar.getWebServiceAddressTls()) + .brokerServiceUrl(pulsar.getBrokerServiceUrl()) + .brokerServiceUrlTls(pulsar.getBrokerServiceUrlTls()) + .build()); + } catch (Exception e) { + // Cluster might already exist, ignore + log.debug("Cluster creation failed (may already exist): {}", e.getMessage()); + } + + try { + // Create tenant + admin.tenants().createTenant("my-property", + org.apache.pulsar.common.policies.data.TenantInfoImpl.builder() + .allowedClusters(Sets.newHashSet("test")) + .build()); + } catch (Exception e) { + // Tenant might already exist, ignore + log.debug("Tenant creation failed (may already exist): {}", e.getMessage()); + } + + try { + // Create namespace + admin.namespaces().createNamespace(namespaceName); + } catch (Exception e) { + // Namespace might already exist, ignore + log.debug("Namespace creation failed (may already exist): {}", e.getMessage()); + } + + // Grant permissions for WebSocket proxy and client + try { + // Grant permissions for the proxy role + admin.namespaces().grantPermissionOnNamespace(namespaceName, "client", + Sets.newHashSet( + org.apache.pulsar.common.policies.data.AuthAction.consume, + org.apache.pulsar.common.policies.data.AuthAction.produce)); + log.info("Granted permissions for namespace: {}", namespaceName); + } catch (Exception e) { + log.warn("Failed to grant permissions (may already exist): {}", e.getMessage()); + } + } +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java index 09c668f753e..7d851361505 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java @@ -167,6 +167,11 @@ public class ClientBuilderImpl implements ClientBuilder { return this; } + public ClientBuilderImpl originalPrincipal(String originalPrincipal) { + conf.setOriginalPrincipal(originalPrincipal); + return this; + } + private void setAuthenticationFromPropsIfAvailable(ClientConfigurationData clientConfig) { String authPluginClass = clientConfig.getAuthPluginClassName(); String authParams = clientConfig.getAuthParams(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index c737a2e9a78..2b1109e179d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -206,6 +206,7 @@ public class ClientCnx extends PulsarHandler { private long lastDisconnectedTimestamp; protected final String clientVersion; + protected final String originalPrincipal; protected enum State { None, SentConnectFrame, Ready, Failed, Connecting @@ -271,6 +272,7 @@ public class ClientCnx extends PulsarHandler { this.idleState = new ClientCnxIdleState(this); this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion() + (conf.getDescription() == null ? "" : ("-" + conf.getDescription())); + this.originalPrincipal = conf.getOriginalPrincipal(); this.connectionsOpenedCounter = instrumentProvider.newCounter("pulsar.client.connection.opened", Unit.Connections, "The number of connections opened", null, Attributes.empty()); @@ -320,7 +322,7 @@ public class ClientCnx extends PulsarHandler { authenticationDataProvider = authentication.getAuthData(remoteHostName); AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA); return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion, - clientVersion, proxyToTargetBrokerAddress, null, null, null); + clientVersion, proxyToTargetBrokerAddress, originalPrincipal, null, null); } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java index 72b9cf0c191..e406581e707 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java @@ -111,6 +111,12 @@ public class ClientConfigurationData implements Serializable, Cloneable { @Secret private Map<String, String> authParamMap; + @ApiModelProperty( + name = "originalPrincipal", + value = "Original principal for proxy authentication scenarios." + ) + private String originalPrincipal; + @ApiModelProperty( name = "operationTimeoutMs", value = "Client operation timeout (in milliseconds)." diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Constants.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Constants.java index ab71f2a43e5..7970b395fb3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Constants.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Constants.java @@ -25,5 +25,7 @@ public class Constants { public static final String GLOBAL_CLUSTER = "global"; + public static final String WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE = "__websocket_dummy_original_principle"; + private Constants() {} } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java index 7bb4df7baa5..d379d01d1be 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java @@ -19,6 +19,7 @@ package org.apache.pulsar.websocket; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.common.naming.Constants.WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE; import io.netty.util.concurrent.DefaultThreadFactory; import java.io.Closeable; import java.io.IOException; @@ -44,6 +45,7 @@ import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SizeUnit; +import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.internal.PropertiesUtils; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.ClusterData; @@ -194,6 +196,9 @@ public class WebSocketService implements Closeable { .tlsTrustCertsFilePath(config.getBrokerClientTrustCertsFilePath()) // .ioThreads(config.getWebSocketNumIoThreads()) // .connectionsPerBroker(config.getWebSocketConnectionsPerBroker()); + if (clientBuilder instanceof ClientBuilderImpl) { + ((ClientBuilderImpl) clientBuilder).originalPrincipal(WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE); + } // Apply all arbitrary configuration. This must be called before setting any fields annotated as // @Secret on the ClientConfigurationData object because of the way they are serialized.