This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new c3c55a7e848 [fix][ws] Fix WebSocket proxy originalPrincipal for HTTP admin API calls (#24613) c3c55a7e848 is described below commit c3c55a7e84845f2749e781b485f385f7e6be4983 Author: Penghui Li <peng...@apache.org> AuthorDate: Fri Aug 8 10:31:42 2025 -0700 [fix][ws] Fix WebSocket proxy originalPrincipal for HTTP admin API calls (#24613) Co-authored-by: Claude <nore...@anthropic.com> --- ...IntegrationTest.java => ProxyRoleAuthTest.java} | 27 ++++++++------- .../proxy/ProxyRoleAuthWebServiceURLTest.java | 38 ++++++++++++++++++++++ .../org/apache/pulsar/client/impl/HttpClient.java | 8 +++++ 3 files changed, 62 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/WebSocketProxyAuthIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyRoleAuthTest.java similarity index 98% rename from pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/WebSocketProxyAuthIntegrationTest.java rename to pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyRoleAuthTest.java index 624c2522822..595991c40aa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/WebSocketProxyAuthIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyRoleAuthTest.java @@ -68,9 +68,9 @@ import org.testng.annotations.Test; * 2. testWebSocketProxyWithUnauthorizedToken: Negative test with unauthorized tokens */ @Test(groups = "websocket") -public class WebSocketProxyAuthIntegrationTest extends ProducerConsumerBase { +public class ProxyRoleAuthTest extends ProducerConsumerBase { - private static final Logger log = LoggerFactory.getLogger(WebSocketProxyAuthIntegrationTest.class); + private static final Logger log = LoggerFactory.getLogger(ProxyRoleAuthTest.class); // JWT token authentication setup with different roles private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); @@ -129,6 +129,19 @@ public class WebSocketProxyAuthIntegrationTest extends ProducerConsumerBase { // Setup namespace and grant permissions for client role setupNamespacePermissions(); + WebSocketProxyConfiguration proxyConfig = getProxyConfig(); + + service = spyWithClassAndConstructorArgs(WebSocketService.class, proxyConfig); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service) + .createConfigMetadataStore(anyString(), anyInt(), anyBoolean()); + + proxyServer = new ProxyServer(proxyConfig); + WebSocketServiceStarter.start(proxyServer, service); + + log.info("WebSocket Proxy Server started on port: {}", proxyServer.getListenPortHTTP().get()); + } + + protected WebSocketProxyConfiguration getProxyConfig() { // Create WebSocket proxy configuration with authentication and authorization enabled WebSocketProxyConfiguration proxyConfig = new WebSocketProxyConfiguration(); proxyConfig.setWebServicePort(Optional.of(0)); @@ -152,15 +165,7 @@ public class WebSocketProxyAuthIntegrationTest extends ProducerConsumerBase { // Set broker service URL to connect to our test broker proxyConfig.setBrokerServiceUrl(pulsar.getBrokerServiceUrl()); proxyConfig.setBrokerServiceUrlTls(pulsar.getBrokerServiceUrlTls()); - - service = spyWithClassAndConstructorArgs(WebSocketService.class, proxyConfig); - doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(service) - .createConfigMetadataStore(anyString(), anyInt(), anyBoolean()); - - proxyServer = new ProxyServer(proxyConfig); - WebSocketServiceStarter.start(proxyServer, service); - - log.info("WebSocket Proxy Server started on port: {}", proxyServer.getListenPortHTTP().get()); + return proxyConfig; } @AfterMethod(alwaysRun = true) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyRoleAuthWebServiceURLTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyRoleAuthWebServiceURLTest.java new file mode 100644 index 00000000000..f726178aabc --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyRoleAuthWebServiceURLTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.websocket.proxy; + +import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration; + +/** + * Same test with ProxyRoleAuthTest but using REST API as the internal client. + */ +public class ProxyRoleAuthWebServiceURLTest extends ProxyRoleAuthTest { + + @Override + protected WebSocketProxyConfiguration getProxyConfig() { + // Create WebSocket proxy configuration with authentication and authorization enabled + WebSocketProxyConfiguration proxyConfig = super.getProxyConfig(); + proxyConfig.setServiceUrl(pulsar.getWebServiceAddress()); + proxyConfig.setServiceUrlTls(pulsar.getWebServiceAddressTls()); + proxyConfig.setBrokerServiceUrl(null); + proxyConfig.setBrokerServiceUrlTls(null); + return proxyConfig; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java index a86e820af6b..f9312be39aa 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java @@ -59,17 +59,20 @@ import org.asynchttpclient.channel.DefaultKeepAliveStrategy; @Slf4j public class HttpClient implements Closeable { + private static final String ORIGINAL_PRINCIPAL_HEADER = "X-Original-Principal"; protected static final int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10; protected static final int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30; protected final AsyncHttpClient httpClient; protected final ServiceNameResolver serviceNameResolver; protected final Authentication authentication; + protected final ClientConfigurationData clientConf; protected ScheduledExecutorService executorService; protected PulsarSslFactory sslFactory; protected HttpClient(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException { this.authentication = conf.getAuthentication(); + this.clientConf = conf; this.serviceNameResolver = new PulsarServiceNameResolver(conf.getServiceUrlQuarantineInitDurationMs(), conf.getServiceUrlQuarantineMaxDurationMs()); this.serviceNameResolver.updateServiceUrl(conf.getServiceUrl()); @@ -194,6 +197,11 @@ public class HttpClient implements Closeable { } } + // Add X-Original-Principal header if originalPrincipal is configured (for proxy scenarios) + if (clientConf.getOriginalPrincipal() != null) { + builder.addHeader(ORIGINAL_PRINCIPAL_HEADER, clientConf.getOriginalPrincipal()); + } + builder.execute().toCompletableFuture().whenComplete((response2, t) -> { if (t != null) { serviceNameResolver.markHostAvailability(