This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c3c55a7e848 [fix][ws] Fix WebSocket proxy originalPrincipal for HTTP 
admin API calls (#24613)
c3c55a7e848 is described below

commit c3c55a7e84845f2749e781b485f385f7e6be4983
Author: Penghui Li <peng...@apache.org>
AuthorDate: Fri Aug 8 10:31:42 2025 -0700

    [fix][ws] Fix WebSocket proxy originalPrincipal for HTTP admin API calls 
(#24613)
    
    Co-authored-by: Claude <nore...@anthropic.com>
---
 ...IntegrationTest.java => ProxyRoleAuthTest.java} | 27 ++++++++-------
 .../proxy/ProxyRoleAuthWebServiceURLTest.java      | 38 ++++++++++++++++++++++
 .../org/apache/pulsar/client/impl/HttpClient.java  |  8 +++++
 3 files changed, 62 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/WebSocketProxyAuthIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyRoleAuthTest.java
similarity index 98%
rename from 
pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/WebSocketProxyAuthIntegrationTest.java
rename to 
pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyRoleAuthTest.java
index 624c2522822..595991c40aa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/WebSocketProxyAuthIntegrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyRoleAuthTest.java
@@ -68,9 +68,9 @@ import org.testng.annotations.Test;
  * 2. testWebSocketProxyWithUnauthorizedToken: Negative test with unauthorized 
tokens
  */
 @Test(groups = "websocket")
-public class WebSocketProxyAuthIntegrationTest extends ProducerConsumerBase {
+public class ProxyRoleAuthTest extends ProducerConsumerBase {
 
-    private static final Logger log = 
LoggerFactory.getLogger(WebSocketProxyAuthIntegrationTest.class);
+    private static final Logger log = 
LoggerFactory.getLogger(ProxyRoleAuthTest.class);
 
     // JWT token authentication setup with different roles
     private static final SecretKey SECRET_KEY = 
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
@@ -129,6 +129,19 @@ public class WebSocketProxyAuthIntegrationTest extends 
ProducerConsumerBase {
         // Setup namespace and grant permissions for client role
         setupNamespacePermissions();
 
+        WebSocketProxyConfiguration proxyConfig = getProxyConfig();
+
+        service = spyWithClassAndConstructorArgs(WebSocketService.class, 
proxyConfig);
+        doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeperGlobal))).when(service)
+                .createConfigMetadataStore(anyString(), anyInt(), 
anyBoolean());
+
+        proxyServer = new ProxyServer(proxyConfig);
+        WebSocketServiceStarter.start(proxyServer, service);
+
+        log.info("WebSocket Proxy Server started on port: {}", 
proxyServer.getListenPortHTTP().get());
+    }
+
+    protected WebSocketProxyConfiguration getProxyConfig() {
         // Create WebSocket proxy configuration with authentication and 
authorization enabled
         WebSocketProxyConfiguration proxyConfig = new 
WebSocketProxyConfiguration();
         proxyConfig.setWebServicePort(Optional.of(0));
@@ -152,15 +165,7 @@ public class WebSocketProxyAuthIntegrationTest extends 
ProducerConsumerBase {
         // Set broker service URL to connect to our test broker
         proxyConfig.setBrokerServiceUrl(pulsar.getBrokerServiceUrl());
         proxyConfig.setBrokerServiceUrlTls(pulsar.getBrokerServiceUrlTls());
-
-        service = spyWithClassAndConstructorArgs(WebSocketService.class, 
proxyConfig);
-        doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeperGlobal))).when(service)
-                .createConfigMetadataStore(anyString(), anyInt(), 
anyBoolean());
-
-        proxyServer = new ProxyServer(proxyConfig);
-        WebSocketServiceStarter.start(proxyServer, service);
-
-        log.info("WebSocket Proxy Server started on port: {}", 
proxyServer.getListenPortHTTP().get());
+        return  proxyConfig;
     }
 
     @AfterMethod(alwaysRun = true)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyRoleAuthWebServiceURLTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyRoleAuthWebServiceURLTest.java
new file mode 100644
index 00000000000..f726178aabc
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyRoleAuthWebServiceURLTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy;
+
+import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
+
+/**
+ * Same test with ProxyRoleAuthTest but using REST API as the internal client.
+ */
+public class ProxyRoleAuthWebServiceURLTest extends ProxyRoleAuthTest {
+
+    @Override
+    protected WebSocketProxyConfiguration getProxyConfig() {
+        // Create WebSocket proxy configuration with authentication and 
authorization enabled
+        WebSocketProxyConfiguration proxyConfig = super.getProxyConfig();
+        proxyConfig.setServiceUrl(pulsar.getWebServiceAddress());
+        proxyConfig.setServiceUrlTls(pulsar.getWebServiceAddressTls());
+        proxyConfig.setBrokerServiceUrl(null);
+        proxyConfig.setBrokerServiceUrlTls(null);
+        return  proxyConfig;
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index a86e820af6b..f9312be39aa 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -59,17 +59,20 @@ import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
 @Slf4j
 public class HttpClient implements Closeable {
 
+    private static final String ORIGINAL_PRINCIPAL_HEADER = 
"X-Original-Principal";
     protected static final int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10;
     protected static final int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30;
 
     protected final AsyncHttpClient httpClient;
     protected final ServiceNameResolver serviceNameResolver;
     protected final Authentication authentication;
+    protected final ClientConfigurationData clientConf;
     protected ScheduledExecutorService executorService;
     protected PulsarSslFactory sslFactory;
 
     protected HttpClient(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup) throws PulsarClientException {
         this.authentication = conf.getAuthentication();
+        this.clientConf = conf;
         this.serviceNameResolver = new 
PulsarServiceNameResolver(conf.getServiceUrlQuarantineInitDurationMs(),
                 conf.getServiceUrlQuarantineMaxDurationMs());
         this.serviceNameResolver.updateServiceUrl(conf.getServiceUrl());
@@ -194,6 +197,11 @@ public class HttpClient implements Closeable {
                     }
                 }
 
+                // Add X-Original-Principal header if originalPrincipal is 
configured (for proxy scenarios)
+                if (clientConf.getOriginalPrincipal() != null) {
+                    builder.addHeader(ORIGINAL_PRINCIPAL_HEADER, 
clientConf.getOriginalPrincipal());
+                }
+
                 
builder.execute().toCompletableFuture().whenComplete((response2, t) -> {
                     if (t != null) {
                         serviceNameResolver.markHostAvailability(

Reply via email to