This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 4b69b30a705 [fix][client][branch-3.0] Fix compatibility between
kerberos and tls (#23801)
4b69b30a705 is described below
commit 4b69b30a705a060b1d370c1551da34cee9e61d93
Author: Zixuan Liu <[email protected]>
AuthorDate: Fri Jan 3 10:33:56 2025 +0800
[fix][client][branch-3.0] Fix compatibility between kerberos and tls
(#23801)
Signed-off-by: Zixuan Liu <[email protected]>
---
.../pulsar/client/api/TlsProducerConsumerTest.java | 69 ++++++++++
.../admin/internal/http/AsyncHttpConnector.java | 3 +-
.../apache/pulsar/client/api/Authentication.java | 1 +
.../org/apache/pulsar/client/cli/CmdConsume.java | 2 +-
.../org/apache/pulsar/client/cli/CmdProduce.java | 2 +-
.../java/org/apache/pulsar/client/cli/CmdRead.java | 2 +-
.../org/apache/pulsar/client/impl/HttpClient.java | 3 +-
.../client/impl/PulsarChannelInitializer.java | 146 +++++++++++----------
.../pulsar/proxy/server/AdminProxyHandler.java | 32 +++--
.../pulsar/proxy/server/DirectProxyHandler.java | 103 +++++++--------
.../proxy/server/ProxyServiceTlsStarterTest.java | 2 +
.../proxy/socket/client/PerformanceClient.java | 2 +-
12 files changed, 227 insertions(+), 140 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
index 879289eb65d..85e6dd6292f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
@@ -18,18 +18,24 @@
*/
package org.apache.pulsar.client.api;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import lombok.Cleanup;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -293,4 +299,67 @@ public class TlsProducerConsumerTest extends
TlsProducerConsumerBase {
@Cleanup
Producer<byte[]> ignored =
client.newProducer().topic(topicName).create();
}
+
+ @Test
+ public void testTlsWithFakeAuthentication() throws Exception {
+ Authentication authentication = spy(new Authentication() {
+ @Override
+ public String getAuthMethodName() {
+ return "fake";
+ }
+
+ @Override
+ public void configure(Map<String, String> authParams) {
+
+ }
+
+ @Override
+ public void start() {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public AuthenticationDataProvider getAuthData(String
brokerHostName) {
+ return mock(AuthenticationDataProvider.class);
+ }
+ });
+
+ @Cleanup
+ PulsarAdmin pulsarAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsar().getWebServiceAddressTls())
+ .tlsTrustCertsFilePath(CA_CERT_FILE_PATH)
+ .allowTlsInsecureConnection(false)
+ .enableTlsHostnameVerification(false)
+ .tlsKeyFilePath(getTlsFileForClient("admin.key-pk8"))
+ .tlsCertificateFilePath(getTlsFileForClient("admin.cert"))
+ .authentication(authentication)
+ .build();
+ pulsarAdmin.tenants().getTenants();
+ verify(authentication, never()).getAuthData();
+
+ @Cleanup
+ PulsarClient pulsarClient =
PulsarClient.builder().serviceUrl(getPulsar().getBrokerServiceUrlTls())
+ .tlsTrustCertsFilePath(CA_CERT_FILE_PATH)
+ .allowTlsInsecureConnection(false)
+ .enableTlsHostnameVerification(false)
+ .tlsKeyFilePath(getTlsFileForClient("admin.key-pk8"))
+ .tlsCertificateFilePath(getTlsFileForClient("admin.cert"))
+ .authentication(authentication).build();
+ verify(authentication, never()).getAuthData();
+
+ final String topicName = "persistent://my-property/my-ns/my-topic-1";
+ internalSetUpForNamespace();
+ @Cleanup
+ Consumer<byte[]> ignoredConsumer =
+
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name").subscribe();
+ verify(authentication, never()).getAuthData();
+ @Cleanup
+ Producer<byte[]> ignoredProducer =
pulsarClient.newProducer().topic(topicName).create();
+ verify(authentication, never()).getAuthData();
+ }
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index 60e1b434424..edcde2f27ce 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -186,7 +186,8 @@ public class AsyncHttpConnector implements Connector,
AsyncHttpRequestExecutor {
DefaultAsyncHttpClientConfig.Builder confBuilder)
throws GeneralSecurityException, IOException {
// Set client key and certificate if available
- AuthenticationDataProvider authData =
conf.getAuthentication().getAuthData();
+ AuthenticationDataProvider authData =
conf.getAuthentication().getAuthData(serviceNameResolver
+ .resolveHostUri().getHost());
SslEngineFactory sslEngineFactory = null;
if (conf.isUseKeyStoreTls()) {
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java
index 9bf1b24cbdb..48d9e3e2307 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java
@@ -48,6 +48,7 @@ public interface Authentication extends Closeable,
Serializable {
* @throws PulsarClientException
* any other error
*/
+ @Deprecated
default AuthenticationDataProvider getAuthData() throws
PulsarClientException {
throw new UnsupportedAuthenticationException("Method not
implemented!");
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
index 0c65604cbe6..1375d61f104 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
@@ -254,7 +254,7 @@ public class CmdConsume extends AbstractCmdConsume {
try {
if (authentication != null) {
authentication.start();
- AuthenticationDataProvider authData =
authentication.getAuthData();
+ AuthenticationDataProvider authData =
authentication.getAuthData(consumerUri.getHost());
if (authData.hasDataForHttp()) {
for (Map.Entry<String, String> kv :
authData.getHttpHeaders()) {
consumeRequest.setHeader(kv.getKey(), kv.getValue());
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
index 04d557d2a93..d9e7ef7c7d5 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
@@ -401,7 +401,7 @@ public class CmdProduce {
try {
if (authentication != null) {
authentication.start();
- AuthenticationDataProvider authData =
authentication.getAuthData();
+ AuthenticationDataProvider authData =
authentication.getAuthData(produceUri.getHost());
if (authData.hasDataForHttp()) {
for (Map.Entry<String, String> kv :
authData.getHttpHeaders()) {
produceRequest.setHeader(kv.getKey(), kv.getValue());
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
index 4ad8a5293f6..53d849aa272 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
@@ -243,7 +243,7 @@ public class CmdRead extends AbstractCmdConsume {
try {
if (authentication != null) {
authentication.start();
- AuthenticationDataProvider authData =
authentication.getAuthData();
+ AuthenticationDataProvider authData =
authentication.getAuthData(readerUri.getHost());
if (authData.hasDataForHttp()) {
for (Map.Entry<String, String> kv :
authData.getHttpHeaders()) {
readRequest.setHeader(kv.getKey(), kv.getValue());
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index ea45fe8981e..d732a6a36d9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -93,7 +93,8 @@ public class HttpClient implements Closeable {
if
("https".equals(serviceNameResolver.getServiceUri().getServiceName())) {
try {
// Set client key and certificate if available
- AuthenticationDataProvider authData =
authentication.getAuthData();
+ AuthenticationDataProvider authData =
+
authentication.getAuthData(serviceNameResolver.resolveHostUri().getHost());
if (conf.isUseKeyStoreTls()) {
SSLContext sslCtx = null;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index dff423d19fb..6385ab78ccd 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -29,8 +29,10 @@ import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslProvider;
import java.net.InetSocketAddress;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.Getter;
@@ -59,9 +61,9 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
private final InetSocketAddress socks5ProxyAddress;
private final String socks5ProxyUsername;
private final String socks5ProxyPassword;
-
- private final Supplier<SslContext> sslContextSupplier;
- private NettySSLContextAutoRefreshBuilder
nettySSLContextAutoRefreshBuilder;
+ private final ClientConfigurationData conf;
+ private Map<String, Supplier<SslContext>> sslContextSupplierMap;
+ private Map<String, NettySSLContextAutoRefreshBuilder>
nettySSLContextAutoRefreshBuilderMap;
private static final long TLS_CERTIFICATE_CACHE_MILLIS =
TimeUnit.MINUTES.toMillis(1);
@@ -76,15 +78,34 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
this.socks5ProxyPassword = conf.getSocks5ProxyPassword();
this.tlsEnabledWithKeyStore = conf.isUseKeyStoreTls();
+ this.conf = conf.clone();
+ this.sslContextSupplierMap = new ConcurrentHashMap<>();
+ this.nettySSLContextAutoRefreshBuilderMap = new ConcurrentHashMap<>();
+ }
- if (tlsEnabled) {
- if (tlsEnabledWithKeyStore) {
- AuthenticationDataProvider authData1 =
conf.getAuthentication().getAuthData();
- if (StringUtils.isBlank(conf.getTlsTrustStorePath())) {
- throw new PulsarClientException("Failed to create TLS
context, the tlsTrustStorePath"
- + " need to be configured if useKeyStoreTls
enabled");
- }
- nettySSLContextAutoRefreshBuilder = new
NettySSLContextAutoRefreshBuilder(
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ch.pipeline().addLast("consolidation", new
FlushConsolidationHandler(1024, true));
+
+ // Setup channel except for the SsHandler for TLS enabled connections
+ ch.pipeline().addLast("ByteBufPairEncoder",
ByteBufPair.getEncoder(tlsEnabled));
+
+ ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
+ Commands.DEFAULT_MAX_MESSAGE_SIZE +
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
+ ChannelHandler clientCnx = clientCnxSupplier.get();
+ ch.pipeline().addLast("handler", clientCnx);
+ }
+
+ private NettySSLContextAutoRefreshBuilder
getNettySSLContextAutoRefreshBuilder(String host)
+ throws PulsarClientException {
+ if (tlsEnabledWithKeyStore) {
+ AuthenticationDataProvider authData1 =
conf.getAuthentication().getAuthData(host);
+ if (StringUtils.isBlank(conf.getTlsTrustStorePath())) {
+ throw new PulsarClientException("Failed to create TLS context,
the tlsTrustStorePath"
+ + " need to be configured if useKeyStoreTls enabled");
+ }
+ return nettySSLContextAutoRefreshBuilderMap.computeIfAbsent(host,
+ key -> new NettySSLContextAutoRefreshBuilder(
conf.getSslProvider(),
conf.isTlsAllowInsecureConnection(),
conf.getTlsTrustStoreType(),
@@ -96,64 +117,52 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
conf.getTlsCiphers(),
conf.getTlsProtocols(),
TLS_CERTIFICATE_CACHE_MILLIS,
- authData1);
- }
-
- sslContextSupplier = new ObjectCache<SslContext>(() -> {
- try {
- SslProvider sslProvider = null;
- if (conf.getSslProvider() != null) {
- sslProvider =
SslProvider.valueOf(conf.getSslProvider());
- }
-
- // Set client certificate if available
- AuthenticationDataProvider authData =
conf.getAuthentication().getAuthData();
- if (authData.hasDataForTls()) {
- return authData.getTlsTrustStoreStream() == null
- ?
SecurityUtility.createNettySslContextForClient(
- sslProvider,
- conf.isTlsAllowInsecureConnection(),
- conf.getTlsTrustCertsFilePath(),
- authData.getTlsCertificates(),
- authData.getTlsPrivateKey(),
- conf.getTlsCiphers(),
- conf.getTlsProtocols())
- :
SecurityUtility.createNettySslContextForClient(sslProvider,
- conf.isTlsAllowInsecureConnection(),
- authData.getTlsTrustStoreStream(),
- authData.getTlsCertificates(),
authData.getTlsPrivateKey(),
- conf.getTlsCiphers(),
- conf.getTlsProtocols());
- } else {
- return SecurityUtility.createNettySslContextForClient(
- sslProvider,
- conf.isTlsAllowInsecureConnection(),
- conf.getTlsTrustCertsFilePath(),
- conf.getTlsCertificateFilePath(),
- conf.getTlsKeyFilePath(),
- conf.getTlsCiphers(),
- conf.getTlsProtocols());
- }
- } catch (Exception e) {
- throw new RuntimeException("Failed to create TLS context",
e);
- }
- }, TLS_CERTIFICATE_CACHE_MILLIS, TimeUnit.MILLISECONDS);
- } else {
- sslContextSupplier = null;
+ authData1));
}
+ throw new PulsarClientException(
+ "Failed to create TLS context, the tlsEnabledWithKeyStore need
to be true");
}
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast("consolidation", new
FlushConsolidationHandler(1024, true));
-
- // Setup channel except for the SsHandler for TLS enabled connections
- ch.pipeline().addLast("ByteBufPairEncoder",
ByteBufPair.getEncoder(tlsEnabled));
+ private Supplier<SslContext> getSslContextSupplier(String host) {
+ return sslContextSupplierMap.computeIfAbsent(host, key -> new
ObjectCache<>(() -> {
+ try {
+ SslProvider sslProvider = null;
+ if (conf.getSslProvider() != null) {
+ sslProvider = SslProvider.valueOf(conf.getSslProvider());
+ }
- ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
- Commands.DEFAULT_MAX_MESSAGE_SIZE +
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
- ChannelHandler clientCnx = clientCnxSupplier.get();
- ch.pipeline().addLast("handler", clientCnx);
+ // Set client certificate if available
+ AuthenticationDataProvider authData =
conf.getAuthentication().getAuthData(host);
+ if (authData.hasDataForTls()) {
+ return authData.getTlsTrustStoreStream() == null
+ ? SecurityUtility.createNettySslContextForClient(
+ sslProvider,
+ conf.isTlsAllowInsecureConnection(),
+ conf.getTlsTrustCertsFilePath(),
+ authData.getTlsCertificates(),
+ authData.getTlsPrivateKey(),
+ conf.getTlsCiphers(),
+ conf.getTlsProtocols())
+ :
SecurityUtility.createNettySslContextForClient(sslProvider,
+ conf.isTlsAllowInsecureConnection(),
+ authData.getTlsTrustStoreStream(),
+ authData.getTlsCertificates(),
authData.getTlsPrivateKey(),
+ conf.getTlsCiphers(),
+ conf.getTlsProtocols());
+ } else {
+ return SecurityUtility.createNettySslContextForClient(
+ sslProvider,
+ conf.isTlsAllowInsecureConnection(),
+ conf.getTlsTrustCertsFilePath(),
+ conf.getTlsCertificateFilePath(),
+ conf.getTlsKeyFilePath(),
+ conf.getTlsCiphers(),
+ conf.getTlsProtocols());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create TLS context", e);
+ }
+ }, TLS_CERTIFICATE_CACHE_MILLIS, TimeUnit.MILLISECONDS));
}
/**
@@ -175,9 +184,10 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
ch.eventLoop().execute(() -> {
try {
SslHandler handler = tlsEnabledWithKeyStore
- ? new
SslHandler(nettySSLContextAutoRefreshBuilder.get()
- .createSSLEngine(sniHost.getHostString(),
sniHost.getPort()))
- : sslContextSupplier.get().newHandler(ch.alloc(),
sniHost.getHostString(), sniHost.getPort());
+ ? new
SslHandler(getNettySSLContextAutoRefreshBuilder(sniHost.getHostName()).get()
+ .createSSLEngine(sniHost.getHostString(),
sniHost.getPort()))
+ : getSslContextSupplier(sniHost.getHostName()).get()
+ .newHandler(ch.alloc(), sniHost.getHostString(),
sniHost.getPort());
if (tlsHostnameVerificationEnabled) {
SecurityUtility.configureSSLHandler(handler);
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
index 0108b770249..eaed42a532a 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
@@ -36,6 +36,7 @@ import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
@@ -263,7 +264,8 @@ class AdminProxyHandler extends ProxyServlet {
.loadCertificatesFromPemFile(config.getBrokerClientTrustCertsFilePath());
SSLContext sslCtx;
- AuthenticationDataProvider authData =
proxyClientAuthentication.getAuthData();
+ AuthenticationDataProvider authData =
+
proxyClientAuthentication.getAuthData(URI.create(getWebServiceUrl()).getHost());
if (config.isBrokerClientTlsEnabledWithKeyStore()) {
KeyStoreParams params = authData.hasDataForTls() ?
authData.getTlsKeyStoreParams() : null;
sslCtx = KeyStoreSSLContext.createClientSslContext(
@@ -314,6 +316,19 @@ class AdminProxyHandler extends ProxyServlet {
return new JettyHttpClient();
}
+ private String getWebServiceUrl() throws PulsarServerException {
+ if (isBlank(brokerWebServiceUrl)) {
+ ServiceLookupData availableBroker = discoveryProvider.nextBroker();
+ if (config.isTlsEnabledWithBroker()) {
+ return availableBroker.getWebServiceUrlTls();
+ } else {
+ return availableBroker.getWebServiceUrl();
+ }
+ } else {
+ return brokerWebServiceUrl;
+ }
+ }
+
@Override
protected String rewriteTarget(HttpServletRequest request) {
StringBuilder url = new StringBuilder();
@@ -329,17 +344,10 @@ class AdminProxyHandler extends ProxyServlet {
if (isFunctionsRestRequest && !isBlank(functionWorkerWebServiceUrl)) {
url.append(functionWorkerWebServiceUrl);
- } else if (isBlank(brokerWebServiceUrl)) {
+ } else {
try {
- ServiceLookupData availableBroker =
discoveryProvider.nextBroker();
-
- if (config.isTlsEnabledWithBroker()) {
- url.append(availableBroker.getWebServiceUrlTls());
- } else {
- url.append(availableBroker.getWebServiceUrl());
- }
-
- if (LOG.isDebugEnabled()) {
+ url.append(getWebServiceUrl());
+ if (LOG.isDebugEnabled() && isBlank(brokerWebServiceUrl)) {
LOG.debug("[{}:{}] Selected active broker is {}",
request.getRemoteAddr(), request.getRemotePort(),
url);
}
@@ -348,8 +356,6 @@ class AdminProxyHandler extends ProxyServlet {
request.getRemotePort(), e.getMessage(), e);
return null;
}
- } else {
- url.append(brokerWebServiceUrl);
}
if (url.lastIndexOf("/") == url.length() - 1) {
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 4678db82c6e..ac3acf2d647 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -47,6 +47,8 @@ import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import org.apache.pulsar.PulsarVersion;
@@ -90,8 +92,8 @@ public class DirectProxyHandler {
private final boolean tlsHostnameVerificationEnabled;
private final boolean tlsEnabledWithKeyStore;
final boolean tlsEnabledWithBroker;
- private final SslContextAutoRefreshBuilder<SslContext>
clientSslCtxRefresher;
- private final NettySSLContextAutoRefreshBuilder
clientSSLContextAutoRefreshBuilder;
+ private final Map<String, SslContextAutoRefreshBuilder<SslContext>>
clientSslCtxRefresherMap;
+ private final Map<String, NettySSLContextAutoRefreshBuilder>
clientSSLContextAutoRefreshBuilderMap;
public DirectProxyHandler(ProxyService service, ProxyConnection
proxyConnection) {
this.service = service;
@@ -106,54 +108,8 @@ public class DirectProxyHandler {
this.tlsHostnameVerificationEnabled =
service.getConfiguration().isTlsHostnameVerificationEnabled();
this.tlsEnabledWithKeyStore =
service.getConfiguration().isTlsEnabledWithKeyStore();
this.onHandshakeCompleteAction = proxyConnection::cancelKeepAliveTask;
- ProxyConfiguration config = service.getConfiguration();
-
- if (tlsEnabledWithBroker) {
- AuthenticationDataProvider authData = null;
-
- if (!isEmpty(config.getBrokerClientAuthenticationPlugin())) {
- try {
- authData = authentication.getAuthData();
- } catch (PulsarClientException e) {
- throw new RuntimeException(e);
- }
- }
-
- if (tlsEnabledWithKeyStore) {
- clientSSLContextAutoRefreshBuilder = new
NettySSLContextAutoRefreshBuilder(
- config.getBrokerClientSslProvider(),
- config.isTlsAllowInsecureConnection(),
- config.getBrokerClientTlsTrustStoreType(),
- config.getBrokerClientTlsTrustStore(),
- config.getBrokerClientTlsTrustStorePassword(),
- config.getBrokerClientTlsKeyStoreType(),
- config.getBrokerClientTlsKeyStore(),
- config.getBrokerClientTlsKeyStorePassword(),
- config.getBrokerClientTlsCiphers(),
- config.getBrokerClientTlsProtocols(),
- config.getTlsCertRefreshCheckDurationSec(),
- authData);
- clientSslCtxRefresher = null;
- } else {
- SslProvider sslProvider = null;
- if (config.getBrokerClientSslProvider() != null) {
- sslProvider =
SslProvider.valueOf(config.getBrokerClientSslProvider());
- }
- clientSslCtxRefresher = new NettyClientSslContextRefresher(
- sslProvider,
- config.isTlsAllowInsecureConnection(),
- config.getBrokerClientTrustCertsFilePath(),
- authData,
- config.getBrokerClientTlsCiphers(),
- config.getBrokerClientTlsProtocols(),
- config.getTlsCertRefreshCheckDurationSec()
- );
- clientSSLContextAutoRefreshBuilder = null;
- }
- } else {
- clientSSLContextAutoRefreshBuilder = null;
- clientSslCtxRefresher = null;
- }
+ this.clientSslCtxRefresherMap = new ConcurrentHashMap<>();
+ this.clientSSLContextAutoRefreshBuilderMap = new ConcurrentHashMap<>();
}
public void connect(String brokerHostAndPort, InetSocketAddress
targetBrokerAddress, int protocolVersion) {
@@ -191,11 +147,52 @@ public class DirectProxyHandler {
ch.pipeline().addLast("consolidation", new
FlushConsolidationHandler(1024,
true));
if (tlsEnabledWithBroker) {
+ AuthenticationDataProvider authData;
+ if
(!isEmpty(config.getBrokerClientAuthenticationPlugin())) {
+ try {
+ authData = authentication.getAuthData(remoteHost);
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ authData = null;
+ }
String host = targetBrokerAddress.getHostString();
int port = targetBrokerAddress.getPort();
- SslHandler handler = tlsEnabledWithKeyStore
- ? new
SslHandler(clientSSLContextAutoRefreshBuilder.get().createSSLEngine(host, port))
- :
clientSslCtxRefresher.get().newHandler(ch.alloc(), host, port);
+ SslHandler handler;
+ if (tlsEnabledWithKeyStore) {
+ handler = new
SslHandler(clientSSLContextAutoRefreshBuilderMap.computeIfAbsent(remoteHost,
+ key -> new NettySSLContextAutoRefreshBuilder(
+ config.getBrokerClientSslProvider(),
+ config.isTlsAllowInsecureConnection(),
+
config.getBrokerClientTlsTrustStoreType(),
+ config.getBrokerClientTlsTrustStore(),
+
config.getBrokerClientTlsTrustStorePassword(),
+
config.getBrokerClientTlsKeyStoreType(),
+ config.getBrokerClientTlsKeyStore(),
+
config.getBrokerClientTlsKeyStorePassword(),
+ config.getBrokerClientTlsCiphers(),
+ config.getBrokerClientTlsProtocols(),
+
config.getTlsCertRefreshCheckDurationSec(),
+ authData)).get().createSSLEngine(host,
port));
+ } else {
+ SslProvider sslProvider;
+ if (config.getBrokerClientSslProvider() != null) {
+ sslProvider =
SslProvider.valueOf(config.getBrokerClientSslProvider());
+ } else {
+ sslProvider = null;
+ }
+ handler =
clientSslCtxRefresherMap.computeIfAbsent(remoteHost,
+ key -> new NettyClientSslContextRefresher(
+ sslProvider,
+ config.isTlsAllowInsecureConnection(),
+
config.getBrokerClientTrustCertsFilePath(),
+ authData,
+ config.getBrokerClientTlsCiphers(),
+ config.getBrokerClientTlsProtocols(),
+
config.getTlsCertRefreshCheckDurationSec()
+ )).get().newHandler(ch.alloc(), host, port);
+ }
if (tlsHostnameVerificationEnabled) {
SecurityUtility.configureSSLHandler(handler);
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
index a4ebe25b428..8246e974f6f 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
@@ -59,6 +59,7 @@ public class ProxyServiceTlsStarterTest extends
MockedPulsarServiceBaseTest {
serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl());
serviceStarter.getConfig().setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
+
serviceStarter.getConfig().setBrokerWebServiceURLTLS(pulsar.getWebServiceAddressTls());
serviceStarter.getConfig().setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH);
serviceStarter.getConfig().setServicePort(Optional.empty());
serviceStarter.getConfig().setServicePortTls(Optional.of(0));
@@ -76,6 +77,7 @@ public class ProxyServiceTlsStarterTest extends
MockedPulsarServiceBaseTest {
protected void doInitConf() throws Exception {
super.doInitConf();
this.conf.setBrokerServicePortTls(Optional.of(0));
+ this.conf.setWebServicePortTls(Optional.of(0));
this.conf.setTlsCertificateFilePath(PROXY_CERT_FILE_PATH);
this.conf.setTlsKeyFilePath(PROXY_KEY_FILE_PATH);
}
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
index 596eb8d2c28..0b5db8d78fb 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
@@ -263,7 +263,7 @@ public class PerformanceClient {
Authentication auth =
AuthenticationFactory.create(arguments.authPluginClassName,
arguments.authParams);
auth.start();
- AuthenticationDataProvider authData = auth.getAuthData();
+ AuthenticationDataProvider authData =
auth.getAuthData(produceUri.getHost());
if (authData.hasDataForHttp()) {
for (Map.Entry<String, String> kv :
authData.getHttpHeaders()) {
produceRequest.setHeader(kv.getKey(),
kv.getValue());