This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 4b69b30a705 [fix][client][branch-3.0] Fix compatibility between 
kerberos and tls (#23801)
4b69b30a705 is described below

commit 4b69b30a705a060b1d370c1551da34cee9e61d93
Author: Zixuan Liu <[email protected]>
AuthorDate: Fri Jan 3 10:33:56 2025 +0800

    [fix][client][branch-3.0] Fix compatibility between kerberos and tls 
(#23801)
    
    Signed-off-by: Zixuan Liu <[email protected]>
---
 .../pulsar/client/api/TlsProducerConsumerTest.java |  69 ++++++++++
 .../admin/internal/http/AsyncHttpConnector.java    |   3 +-
 .../apache/pulsar/client/api/Authentication.java   |   1 +
 .../org/apache/pulsar/client/cli/CmdConsume.java   |   2 +-
 .../org/apache/pulsar/client/cli/CmdProduce.java   |   2 +-
 .../java/org/apache/pulsar/client/cli/CmdRead.java |   2 +-
 .../org/apache/pulsar/client/impl/HttpClient.java  |   3 +-
 .../client/impl/PulsarChannelInitializer.java      | 146 +++++++++++----------
 .../pulsar/proxy/server/AdminProxyHandler.java     |  32 +++--
 .../pulsar/proxy/server/DirectProxyHandler.java    | 103 +++++++--------
 .../proxy/server/ProxyServiceTlsStarterTest.java   |   2 +
 .../proxy/socket/client/PerformanceClient.java     |   2 +-
 12 files changed, 227 insertions(+), 140 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
index 879289eb65d..85e6dd6292f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
@@ -18,18 +18,24 @@
  */
 package org.apache.pulsar.client.api;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import lombok.Cleanup;
 import org.apache.commons.compress.utils.IOUtils;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -293,4 +299,67 @@ public class TlsProducerConsumerTest extends 
TlsProducerConsumerBase {
         @Cleanup
         Producer<byte[]> ignored = 
client.newProducer().topic(topicName).create();
     }
+
+    @Test
+    public void testTlsWithFakeAuthentication() throws Exception {
+        Authentication authentication = spy(new Authentication() {
+            @Override
+            public String getAuthMethodName() {
+                return "fake";
+            }
+
+            @Override
+            public void configure(Map<String, String> authParams) {
+
+            }
+
+            @Override
+            public void start() {
+
+            }
+
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public AuthenticationDataProvider getAuthData(String 
brokerHostName) {
+                return mock(AuthenticationDataProvider.class);
+            }
+        });
+
+        @Cleanup
+        PulsarAdmin pulsarAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsar().getWebServiceAddressTls())
+                .tlsTrustCertsFilePath(CA_CERT_FILE_PATH)
+                .allowTlsInsecureConnection(false)
+                .enableTlsHostnameVerification(false)
+                .tlsKeyFilePath(getTlsFileForClient("admin.key-pk8"))
+                .tlsCertificateFilePath(getTlsFileForClient("admin.cert"))
+                .authentication(authentication)
+                .build();
+        pulsarAdmin.tenants().getTenants();
+        verify(authentication, never()).getAuthData();
+
+        @Cleanup
+        PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(getPulsar().getBrokerServiceUrlTls())
+                .tlsTrustCertsFilePath(CA_CERT_FILE_PATH)
+                .allowTlsInsecureConnection(false)
+                .enableTlsHostnameVerification(false)
+                .tlsKeyFilePath(getTlsFileForClient("admin.key-pk8"))
+                .tlsCertificateFilePath(getTlsFileForClient("admin.cert"))
+                .authentication(authentication).build();
+        verify(authentication, never()).getAuthData();
+
+        final String topicName = "persistent://my-property/my-ns/my-topic-1";
+        internalSetUpForNamespace();
+        @Cleanup
+        Consumer<byte[]> ignoredConsumer =
+                
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name").subscribe();
+        verify(authentication, never()).getAuthData();
+        @Cleanup
+        Producer<byte[]> ignoredProducer = 
pulsarClient.newProducer().topic(topicName).create();
+        verify(authentication, never()).getAuthData();
+    }
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index 60e1b434424..edcde2f27ce 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -186,7 +186,8 @@ public class AsyncHttpConnector implements Connector, 
AsyncHttpRequestExecutor {
                                                           
DefaultAsyncHttpClientConfig.Builder confBuilder)
             throws GeneralSecurityException, IOException {
         // Set client key and certificate if available
-        AuthenticationDataProvider authData = 
conf.getAuthentication().getAuthData();
+        AuthenticationDataProvider authData = 
conf.getAuthentication().getAuthData(serviceNameResolver
+                .resolveHostUri().getHost());
 
         SslEngineFactory sslEngineFactory = null;
         if (conf.isUseKeyStoreTls()) {
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java
index 9bf1b24cbdb..48d9e3e2307 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java
@@ -48,6 +48,7 @@ public interface Authentication extends Closeable, 
Serializable {
      * @throws PulsarClientException
      *             any other error
      */
+    @Deprecated
     default AuthenticationDataProvider getAuthData() throws 
PulsarClientException {
         throw new UnsupportedAuthenticationException("Method not 
implemented!");
     }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
index 0c65604cbe6..1375d61f104 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
@@ -254,7 +254,7 @@ public class CmdConsume extends AbstractCmdConsume {
         try {
             if (authentication != null) {
                 authentication.start();
-                AuthenticationDataProvider authData = 
authentication.getAuthData();
+                AuthenticationDataProvider authData = 
authentication.getAuthData(consumerUri.getHost());
                 if (authData.hasDataForHttp()) {
                     for (Map.Entry<String, String> kv : 
authData.getHttpHeaders()) {
                         consumeRequest.setHeader(kv.getKey(), kv.getValue());
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
index 04d557d2a93..d9e7ef7c7d5 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
@@ -401,7 +401,7 @@ public class CmdProduce {
         try {
             if (authentication != null) {
                 authentication.start();
-                AuthenticationDataProvider authData = 
authentication.getAuthData();
+                AuthenticationDataProvider authData = 
authentication.getAuthData(produceUri.getHost());
                 if (authData.hasDataForHttp()) {
                     for (Map.Entry<String, String> kv : 
authData.getHttpHeaders()) {
                         produceRequest.setHeader(kv.getKey(), kv.getValue());
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
index 4ad8a5293f6..53d849aa272 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
@@ -243,7 +243,7 @@ public class CmdRead extends AbstractCmdConsume {
         try {
             if (authentication != null) {
                 authentication.start();
-                AuthenticationDataProvider authData = 
authentication.getAuthData();
+                AuthenticationDataProvider authData = 
authentication.getAuthData(readerUri.getHost());
                 if (authData.hasDataForHttp()) {
                     for (Map.Entry<String, String> kv : 
authData.getHttpHeaders()) {
                         readRequest.setHeader(kv.getKey(), kv.getValue());
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index ea45fe8981e..d732a6a36d9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -93,7 +93,8 @@ public class HttpClient implements Closeable {
         if 
("https".equals(serviceNameResolver.getServiceUri().getServiceName())) {
             try {
                 // Set client key and certificate if available
-                AuthenticationDataProvider authData = 
authentication.getAuthData();
+                AuthenticationDataProvider authData =
+                        
authentication.getAuthData(serviceNameResolver.resolveHostUri().getHost());
 
                 if (conf.isUseKeyStoreTls()) {
                     SSLContext sslCtx = null;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index dff423d19fb..6385ab78ccd 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -29,8 +29,10 @@ import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.ssl.SslProvider;
 import java.net.InetSocketAddress;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import lombok.Getter;
@@ -59,9 +61,9 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
     private final InetSocketAddress socks5ProxyAddress;
     private final String socks5ProxyUsername;
     private final String socks5ProxyPassword;
-
-    private final Supplier<SslContext> sslContextSupplier;
-    private NettySSLContextAutoRefreshBuilder 
nettySSLContextAutoRefreshBuilder;
+    private final ClientConfigurationData conf;
+    private Map<String, Supplier<SslContext>> sslContextSupplierMap;
+    private Map<String, NettySSLContextAutoRefreshBuilder> 
nettySSLContextAutoRefreshBuilderMap;
 
     private static final long TLS_CERTIFICATE_CACHE_MILLIS = 
TimeUnit.MINUTES.toMillis(1);
 
@@ -76,15 +78,34 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
         this.socks5ProxyPassword = conf.getSocks5ProxyPassword();
 
         this.tlsEnabledWithKeyStore = conf.isUseKeyStoreTls();
+        this.conf = conf.clone();
+        this.sslContextSupplierMap = new ConcurrentHashMap<>();
+        this.nettySSLContextAutoRefreshBuilderMap = new ConcurrentHashMap<>();
+    }
 
-        if (tlsEnabled) {
-            if (tlsEnabledWithKeyStore) {
-                AuthenticationDataProvider authData1 = 
conf.getAuthentication().getAuthData();
-                if (StringUtils.isBlank(conf.getTlsTrustStorePath())) {
-                    throw new PulsarClientException("Failed to create TLS 
context, the tlsTrustStorePath"
-                            + " need to be configured if useKeyStoreTls 
enabled");
-                }
-                nettySSLContextAutoRefreshBuilder = new 
NettySSLContextAutoRefreshBuilder(
+    @Override
+    public void initChannel(SocketChannel ch) throws Exception {
+        ch.pipeline().addLast("consolidation", new 
FlushConsolidationHandler(1024, true));
+
+        // Setup channel except for the SsHandler for TLS enabled connections
+        ch.pipeline().addLast("ByteBufPairEncoder", 
ByteBufPair.getEncoder(tlsEnabled));
+
+        ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
+                Commands.DEFAULT_MAX_MESSAGE_SIZE + 
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
+        ChannelHandler clientCnx = clientCnxSupplier.get();
+        ch.pipeline().addLast("handler", clientCnx);
+    }
+
+    private NettySSLContextAutoRefreshBuilder 
getNettySSLContextAutoRefreshBuilder(String host)
+            throws PulsarClientException {
+        if (tlsEnabledWithKeyStore) {
+            AuthenticationDataProvider authData1 = 
conf.getAuthentication().getAuthData(host);
+            if (StringUtils.isBlank(conf.getTlsTrustStorePath())) {
+                throw new PulsarClientException("Failed to create TLS context, 
the tlsTrustStorePath"
+                        + " need to be configured if useKeyStoreTls enabled");
+            }
+            return nettySSLContextAutoRefreshBuilderMap.computeIfAbsent(host,
+                    key -> new NettySSLContextAutoRefreshBuilder(
                             conf.getSslProvider(),
                             conf.isTlsAllowInsecureConnection(),
                             conf.getTlsTrustStoreType(),
@@ -96,64 +117,52 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
                             conf.getTlsCiphers(),
                             conf.getTlsProtocols(),
                             TLS_CERTIFICATE_CACHE_MILLIS,
-                            authData1);
-            }
-
-            sslContextSupplier = new ObjectCache<SslContext>(() -> {
-                try {
-                    SslProvider sslProvider = null;
-                    if (conf.getSslProvider() != null) {
-                        sslProvider = 
SslProvider.valueOf(conf.getSslProvider());
-                    }
-
-                    // Set client certificate if available
-                    AuthenticationDataProvider authData = 
conf.getAuthentication().getAuthData();
-                    if (authData.hasDataForTls()) {
-                        return authData.getTlsTrustStoreStream() == null
-                                ? 
SecurityUtility.createNettySslContextForClient(
-                                sslProvider,
-                                conf.isTlsAllowInsecureConnection(),
-                                conf.getTlsTrustCertsFilePath(),
-                                authData.getTlsCertificates(),
-                                authData.getTlsPrivateKey(),
-                                conf.getTlsCiphers(),
-                                conf.getTlsProtocols())
-                                : 
SecurityUtility.createNettySslContextForClient(sslProvider,
-                                conf.isTlsAllowInsecureConnection(),
-                                authData.getTlsTrustStoreStream(),
-                                authData.getTlsCertificates(), 
authData.getTlsPrivateKey(),
-                                conf.getTlsCiphers(),
-                                conf.getTlsProtocols());
-                    } else {
-                        return SecurityUtility.createNettySslContextForClient(
-                                sslProvider,
-                                conf.isTlsAllowInsecureConnection(),
-                                conf.getTlsTrustCertsFilePath(),
-                                conf.getTlsCertificateFilePath(),
-                                conf.getTlsKeyFilePath(),
-                                conf.getTlsCiphers(),
-                                conf.getTlsProtocols());
-                    }
-                } catch (Exception e) {
-                    throw new RuntimeException("Failed to create TLS context", 
e);
-                }
-            }, TLS_CERTIFICATE_CACHE_MILLIS, TimeUnit.MILLISECONDS);
-        } else {
-            sslContextSupplier = null;
+                            authData1));
         }
+        throw new PulsarClientException(
+                "Failed to create TLS context, the tlsEnabledWithKeyStore need 
to be true");
     }
 
-    @Override
-    public void initChannel(SocketChannel ch) throws Exception {
-        ch.pipeline().addLast("consolidation", new 
FlushConsolidationHandler(1024, true));
-
-        // Setup channel except for the SsHandler for TLS enabled connections
-        ch.pipeline().addLast("ByteBufPairEncoder", 
ByteBufPair.getEncoder(tlsEnabled));
+    private Supplier<SslContext> getSslContextSupplier(String host) {
+        return sslContextSupplierMap.computeIfAbsent(host, key -> new 
ObjectCache<>(() -> {
+            try {
+                SslProvider sslProvider = null;
+                if (conf.getSslProvider() != null) {
+                    sslProvider = SslProvider.valueOf(conf.getSslProvider());
+                }
 
-        ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
-                Commands.DEFAULT_MAX_MESSAGE_SIZE + 
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
-        ChannelHandler clientCnx = clientCnxSupplier.get();
-        ch.pipeline().addLast("handler", clientCnx);
+                // Set client certificate if available
+                AuthenticationDataProvider authData = 
conf.getAuthentication().getAuthData(host);
+                if (authData.hasDataForTls()) {
+                    return authData.getTlsTrustStoreStream() == null
+                            ? SecurityUtility.createNettySslContextForClient(
+                            sslProvider,
+                            conf.isTlsAllowInsecureConnection(),
+                            conf.getTlsTrustCertsFilePath(),
+                            authData.getTlsCertificates(),
+                            authData.getTlsPrivateKey(),
+                            conf.getTlsCiphers(),
+                            conf.getTlsProtocols())
+                            : 
SecurityUtility.createNettySslContextForClient(sslProvider,
+                            conf.isTlsAllowInsecureConnection(),
+                            authData.getTlsTrustStoreStream(),
+                            authData.getTlsCertificates(), 
authData.getTlsPrivateKey(),
+                            conf.getTlsCiphers(),
+                            conf.getTlsProtocols());
+                } else {
+                    return SecurityUtility.createNettySslContextForClient(
+                            sslProvider,
+                            conf.isTlsAllowInsecureConnection(),
+                            conf.getTlsTrustCertsFilePath(),
+                            conf.getTlsCertificateFilePath(),
+                            conf.getTlsKeyFilePath(),
+                            conf.getTlsCiphers(),
+                            conf.getTlsProtocols());
+                }
+            } catch (Exception e) {
+                throw new RuntimeException("Failed to create TLS context", e);
+            }
+        }, TLS_CERTIFICATE_CACHE_MILLIS, TimeUnit.MILLISECONDS));
     }
 
    /**
@@ -175,9 +184,10 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
         ch.eventLoop().execute(() -> {
             try {
                 SslHandler handler = tlsEnabledWithKeyStore
-                        ? new 
SslHandler(nettySSLContextAutoRefreshBuilder.get()
-                                .createSSLEngine(sniHost.getHostString(), 
sniHost.getPort()))
-                        : sslContextSupplier.get().newHandler(ch.alloc(), 
sniHost.getHostString(), sniHost.getPort());
+                        ? new 
SslHandler(getNettySSLContextAutoRefreshBuilder(sniHost.getHostName()).get()
+                        .createSSLEngine(sniHost.getHostString(), 
sniHost.getPort()))
+                        : getSslContextSupplier(sniHost.getHostName()).get()
+                        .newHandler(ch.alloc(), sniHost.getHostString(), 
sniHost.getPort());
 
                 if (tlsHostnameVerificationEnabled) {
                     SecurityUtility.configureSSLHandler(handler);
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
index 0108b770249..eaed42a532a 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
@@ -36,6 +36,7 @@ import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.web.AuthenticationFilter;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
@@ -263,7 +264,8 @@ class AdminProxyHandler extends ProxyServlet {
                         
.loadCertificatesFromPemFile(config.getBrokerClientTrustCertsFilePath());
 
                     SSLContext sslCtx;
-                    AuthenticationDataProvider authData = 
proxyClientAuthentication.getAuthData();
+                    AuthenticationDataProvider authData =
+                            
proxyClientAuthentication.getAuthData(URI.create(getWebServiceUrl()).getHost());
                     if (config.isBrokerClientTlsEnabledWithKeyStore()) {
                         KeyStoreParams params = authData.hasDataForTls() ? 
authData.getTlsKeyStoreParams() : null;
                         sslCtx = KeyStoreSSLContext.createClientSslContext(
@@ -314,6 +316,19 @@ class AdminProxyHandler extends ProxyServlet {
         return new JettyHttpClient();
     }
 
+    private String getWebServiceUrl() throws PulsarServerException {
+        if (isBlank(brokerWebServiceUrl)) {
+            ServiceLookupData availableBroker = discoveryProvider.nextBroker();
+            if (config.isTlsEnabledWithBroker()) {
+                return availableBroker.getWebServiceUrlTls();
+            } else {
+                return availableBroker.getWebServiceUrl();
+            }
+        } else {
+            return brokerWebServiceUrl;
+        }
+    }
+
     @Override
     protected String rewriteTarget(HttpServletRequest request) {
         StringBuilder url = new StringBuilder();
@@ -329,17 +344,10 @@ class AdminProxyHandler extends ProxyServlet {
 
         if (isFunctionsRestRequest && !isBlank(functionWorkerWebServiceUrl)) {
             url.append(functionWorkerWebServiceUrl);
-        } else if (isBlank(brokerWebServiceUrl)) {
+        } else {
             try {
-                ServiceLookupData availableBroker = 
discoveryProvider.nextBroker();
-
-                if (config.isTlsEnabledWithBroker()) {
-                    url.append(availableBroker.getWebServiceUrlTls());
-                } else {
-                    url.append(availableBroker.getWebServiceUrl());
-                }
-
-                if (LOG.isDebugEnabled()) {
+                url.append(getWebServiceUrl());
+                if (LOG.isDebugEnabled() && isBlank(brokerWebServiceUrl)) {
                     LOG.debug("[{}:{}] Selected active broker is {}", 
request.getRemoteAddr(), request.getRemotePort(),
                             url);
                 }
@@ -348,8 +356,6 @@ class AdminProxyHandler extends ProxyServlet {
                         request.getRemotePort(), e.getMessage(), e);
                 return null;
             }
-        } else {
-            url.append(brokerWebServiceUrl);
         }
 
         if (url.lastIndexOf("/") == url.length() - 1) {
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 4678db82c6e..ac3acf2d647 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -47,6 +47,8 @@ import io.netty.handler.timeout.ReadTimeoutHandler;
 import io.netty.util.CharsetUtil;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import lombok.Getter;
 import org.apache.pulsar.PulsarVersion;
@@ -90,8 +92,8 @@ public class DirectProxyHandler {
     private final boolean tlsHostnameVerificationEnabled;
     private final boolean tlsEnabledWithKeyStore;
     final boolean tlsEnabledWithBroker;
-    private final SslContextAutoRefreshBuilder<SslContext> 
clientSslCtxRefresher;
-    private final NettySSLContextAutoRefreshBuilder 
clientSSLContextAutoRefreshBuilder;
+    private final Map<String, SslContextAutoRefreshBuilder<SslContext>> 
clientSslCtxRefresherMap;
+    private final Map<String, NettySSLContextAutoRefreshBuilder> 
clientSSLContextAutoRefreshBuilderMap;
 
     public DirectProxyHandler(ProxyService service, ProxyConnection 
proxyConnection) {
         this.service = service;
@@ -106,54 +108,8 @@ public class DirectProxyHandler {
         this.tlsHostnameVerificationEnabled = 
service.getConfiguration().isTlsHostnameVerificationEnabled();
         this.tlsEnabledWithKeyStore = 
service.getConfiguration().isTlsEnabledWithKeyStore();
         this.onHandshakeCompleteAction = proxyConnection::cancelKeepAliveTask;
-        ProxyConfiguration config = service.getConfiguration();
-
-        if (tlsEnabledWithBroker) {
-            AuthenticationDataProvider authData = null;
-
-            if (!isEmpty(config.getBrokerClientAuthenticationPlugin())) {
-                try {
-                    authData = authentication.getAuthData();
-                } catch (PulsarClientException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-
-            if (tlsEnabledWithKeyStore) {
-                clientSSLContextAutoRefreshBuilder = new 
NettySSLContextAutoRefreshBuilder(
-                        config.getBrokerClientSslProvider(),
-                        config.isTlsAllowInsecureConnection(),
-                        config.getBrokerClientTlsTrustStoreType(),
-                        config.getBrokerClientTlsTrustStore(),
-                        config.getBrokerClientTlsTrustStorePassword(),
-                        config.getBrokerClientTlsKeyStoreType(),
-                        config.getBrokerClientTlsKeyStore(),
-                        config.getBrokerClientTlsKeyStorePassword(),
-                        config.getBrokerClientTlsCiphers(),
-                        config.getBrokerClientTlsProtocols(),
-                        config.getTlsCertRefreshCheckDurationSec(),
-                        authData);
-                clientSslCtxRefresher = null;
-            } else {
-                SslProvider sslProvider = null;
-                if (config.getBrokerClientSslProvider() != null) {
-                    sslProvider = 
SslProvider.valueOf(config.getBrokerClientSslProvider());
-                }
-                clientSslCtxRefresher = new NettyClientSslContextRefresher(
-                        sslProvider,
-                        config.isTlsAllowInsecureConnection(),
-                        config.getBrokerClientTrustCertsFilePath(),
-                        authData,
-                        config.getBrokerClientTlsCiphers(),
-                        config.getBrokerClientTlsProtocols(),
-                        config.getTlsCertRefreshCheckDurationSec()
-                );
-                clientSSLContextAutoRefreshBuilder = null;
-            }
-        } else {
-            clientSSLContextAutoRefreshBuilder = null;
-            clientSslCtxRefresher = null;
-        }
+        this.clientSslCtxRefresherMap = new ConcurrentHashMap<>();
+        this.clientSSLContextAutoRefreshBuilderMap = new ConcurrentHashMap<>();
     }
 
     public void connect(String brokerHostAndPort, InetSocketAddress 
targetBrokerAddress, int protocolVersion) {
@@ -191,11 +147,52 @@ public class DirectProxyHandler {
                 ch.pipeline().addLast("consolidation", new 
FlushConsolidationHandler(1024,
                         true));
                 if (tlsEnabledWithBroker) {
+                    AuthenticationDataProvider authData;
+                    if 
(!isEmpty(config.getBrokerClientAuthenticationPlugin())) {
+                        try {
+                            authData = authentication.getAuthData(remoteHost);
+                        } catch (PulsarClientException e) {
+                            throw new RuntimeException(e);
+                        }
+                    } else {
+                        authData = null;
+                    }
                     String host = targetBrokerAddress.getHostString();
                     int port = targetBrokerAddress.getPort();
-                    SslHandler handler = tlsEnabledWithKeyStore
-                            ? new 
SslHandler(clientSSLContextAutoRefreshBuilder.get().createSSLEngine(host, port))
-                            : 
clientSslCtxRefresher.get().newHandler(ch.alloc(), host, port);
+                    SslHandler handler;
+                    if (tlsEnabledWithKeyStore) {
+                        handler = new 
SslHandler(clientSSLContextAutoRefreshBuilderMap.computeIfAbsent(remoteHost,
+                                key -> new NettySSLContextAutoRefreshBuilder(
+                                        config.getBrokerClientSslProvider(),
+                                        config.isTlsAllowInsecureConnection(),
+                                        
config.getBrokerClientTlsTrustStoreType(),
+                                        config.getBrokerClientTlsTrustStore(),
+                                        
config.getBrokerClientTlsTrustStorePassword(),
+                                        
config.getBrokerClientTlsKeyStoreType(),
+                                        config.getBrokerClientTlsKeyStore(),
+                                        
config.getBrokerClientTlsKeyStorePassword(),
+                                        config.getBrokerClientTlsCiphers(),
+                                        config.getBrokerClientTlsProtocols(),
+                                        
config.getTlsCertRefreshCheckDurationSec(),
+                                        authData)).get().createSSLEngine(host, 
port));
+                    } else {
+                        SslProvider sslProvider;
+                        if (config.getBrokerClientSslProvider() != null) {
+                            sslProvider = 
SslProvider.valueOf(config.getBrokerClientSslProvider());
+                        } else {
+                            sslProvider = null;
+                        }
+                        handler = 
clientSslCtxRefresherMap.computeIfAbsent(remoteHost,
+                                key -> new NettyClientSslContextRefresher(
+                                        sslProvider,
+                                        config.isTlsAllowInsecureConnection(),
+                                        
config.getBrokerClientTrustCertsFilePath(),
+                                        authData,
+                                        config.getBrokerClientTlsCiphers(),
+                                        config.getBrokerClientTlsProtocols(),
+                                        
config.getTlsCertRefreshCheckDurationSec()
+                                )).get().newHandler(ch.alloc(), host, port);
+                    }
                     if (tlsHostnameVerificationEnabled) {
                         SecurityUtility.configureSSLHandler(handler);
                     }
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
index a4ebe25b428..8246e974f6f 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
@@ -59,6 +59,7 @@ public class ProxyServiceTlsStarterTest extends 
MockedPulsarServiceBaseTest {
         
serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl());
         
serviceStarter.getConfig().setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
         
serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
+        
serviceStarter.getConfig().setBrokerWebServiceURLTLS(pulsar.getWebServiceAddressTls());
         
serviceStarter.getConfig().setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH);
         serviceStarter.getConfig().setServicePort(Optional.empty());
         serviceStarter.getConfig().setServicePortTls(Optional.of(0));
@@ -76,6 +77,7 @@ public class ProxyServiceTlsStarterTest extends 
MockedPulsarServiceBaseTest {
     protected void doInitConf() throws Exception {
         super.doInitConf();
         this.conf.setBrokerServicePortTls(Optional.of(0));
+        this.conf.setWebServicePortTls(Optional.of(0));
         this.conf.setTlsCertificateFilePath(PROXY_CERT_FILE_PATH);
         this.conf.setTlsKeyFilePath(PROXY_KEY_FILE_PATH);
     }
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
index 596eb8d2c28..0b5db8d78fb 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
@@ -263,7 +263,7 @@ public class PerformanceClient {
                     Authentication auth = 
AuthenticationFactory.create(arguments.authPluginClassName,
                             arguments.authParams);
                     auth.start();
-                    AuthenticationDataProvider authData = auth.getAuthData();
+                    AuthenticationDataProvider authData = 
auth.getAuthData(produceUri.getHost());
                     if (authData.hasDataForHttp()) {
                         for (Map.Entry<String, String> kv : 
authData.getHttpHeaders()) {
                             produceRequest.setHeader(kv.getKey(), 
kv.getValue());


Reply via email to