This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a26240bd0ce [fix][proxy] Fix JKS TLS transport (#19485)
a26240bd0ce is described below

commit a26240bd0ce2cc1f77a7612e05cc142785766717
Author: Zixuan Liu <[email protected]>
AuthorDate: Fri Feb 24 15:07:53 2023 +0800

    [fix][proxy] Fix JKS TLS transport (#19485)
    
    Signed-off-by: Zixuan Liu <[email protected]>
---
 .../NettySSLContextAutoRefreshBuilder.java         |  18 +--
 .../pulsar/proxy/server/DirectProxyHandler.java    |   6 +-
 .../server/ProxyKeyStoreTlsTransportTest.java      | 131 +++++++++++++++++++++
 3 files changed, 145 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/NettySSLContextAutoRefreshBuilder.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/NettySSLContextAutoRefreshBuilder.java
index 7b47eb9b8bb..6d0cfb108bd 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/NettySSLContextAutoRefreshBuilder.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/NettySSLContextAutoRefreshBuilder.java
@@ -47,7 +47,6 @@ public class NettySSLContextAutoRefreshBuilder extends 
SslContextAutoRefreshBuil
     protected String tlsKeyStorePassword;
     protected FileModifiedTimeUpdater tlsKeyStore;
 
-    protected AuthenticationDataProvider authData;
     protected final boolean isServer;
 
     // for server
@@ -101,8 +100,14 @@ public class NettySSLContextAutoRefreshBuilder extends 
SslContextAutoRefreshBuil
         this.tlsAllowInsecureConnection = allowInsecureConnection;
         this.tlsProvider = sslProviderString;
 
-        this.authData = authData;
-
+        if (authData != null) {
+            KeyStoreParams authParams = authData.getTlsKeyStoreParams();
+            if (authParams != null) {
+                keyStoreTypeString = authParams.getKeyStoreType();
+                keyStore = authParams.getKeyStorePath();
+                keyStorePassword = authParams.getKeyStorePassword();
+            }
+        }
         this.tlsKeyStoreType = keyStoreTypeString;
         this.tlsKeyStore = new FileModifiedTimeUpdater(keyStore);
         this.tlsKeyStorePassword = keyStorePassword;
@@ -126,11 +131,10 @@ public class NettySSLContextAutoRefreshBuilder extends 
SslContextAutoRefreshBuil
                     tlsTrustStoreType, tlsTrustStore.getFileName(), 
tlsTrustStorePassword,
                     tlsRequireTrustedClientCertOnConnect, tlsCiphers, 
tlsProtocols);
         } else {
-            KeyStoreParams authParams = authData.getTlsKeyStoreParams();
             this.keyStoreSSLContext = 
KeyStoreSSLContext.createClientKeyStoreSslContext(tlsProvider,
-                    authParams != null ? authParams.getKeyStoreType() : 
tlsKeyStoreType,
-                    authParams != null ? authParams.getKeyStorePath() : 
tlsKeyStore.getFileName(),
-                    authParams != null ? authParams.getKeyStorePassword() : 
tlsKeyStorePassword,
+                    tlsKeyStoreType,
+                    tlsKeyStore.getFileName(),
+                    tlsKeyStorePassword,
                     tlsAllowInsecureConnection,
                     tlsTrustStoreType, tlsTrustStore.getFileName(), 
tlsTrustStorePassword,
                     tlsCiphers, tlsProtocols);
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 1e9fd676573..23c7faa2d4b 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -127,9 +127,9 @@ public class DirectProxyHandler {
                         config.getBrokerClientTlsTrustStoreType(),
                         config.getBrokerClientTlsTrustStore(),
                         config.getBrokerClientTlsTrustStorePassword(),
-                        null,
-                        null,
-                        null,
+                        config.getBrokerClientTlsKeyStoreType(),
+                        config.getBrokerClientTlsKeyStore(),
+                        config.getBrokerClientTlsKeyStorePassword(),
                         config.getBrokerClientTlsCiphers(),
                         config.getBrokerClientTlsProtocols(),
                         config.getTlsCertRefreshCheckDurationSec(),
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
new file mode 100644
index 00000000000..5c4e40ed65a
--- /dev/null
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.doReturn;
+import java.util.Optional;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "proxy")
+public class ProxyKeyStoreTlsTransportTest extends MockedPulsarServiceBaseTest 
{
+    private ProxyService proxyService;
+    private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    @BeforeMethod
+    protected void setup() throws Exception {
+
+        // broker with JKS
+        conf.setWebServicePortTls(Optional.of(0));
+        conf.setBrokerServicePortTls(Optional.of(0));
+        conf.setTlsEnabledWithKeyStore(true);
+        conf.setTlsKeyStoreType(KEYSTORE_TYPE);
+        conf.setTlsKeyStore(BROKER_KEYSTORE_FILE_PATH);
+        conf.setTlsKeyStorePassword(BROKER_KEYSTORE_PW);
+        conf.setTlsTrustStoreType(KEYSTORE_TYPE);
+        conf.setTlsTrustStore(CLIENT_TRUSTSTORE_FILE_PATH);
+        conf.setTlsTrustStorePassword(CLIENT_TRUSTSTORE_PW);
+        conf.setTlsRequireTrustedClientCertOnConnect(true);
+
+        internalSetup();
+
+        // proxy with JKS
+        proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+        proxyConfig.setServicePortTls(Optional.of(0));
+        proxyConfig.setWebServicePort(Optional.of(0));
+        proxyConfig.setWebServicePortTls(Optional.of(0));
+        proxyConfig.setTlsEnabledWithBroker(true);
+        proxyConfig.setTlsEnabledWithKeyStore(true);
+
+        proxyConfig.setTlsKeyStoreType(KEYSTORE_TYPE);
+        proxyConfig.setTlsKeyStore(BROKER_KEYSTORE_FILE_PATH);
+        proxyConfig.setTlsKeyStorePassword(BROKER_KEYSTORE_PW);
+        proxyConfig.setTlsTrustStoreType(KEYSTORE_TYPE);
+        proxyConfig.setTlsTrustStore(CLIENT_TRUSTSTORE_FILE_PATH);
+        proxyConfig.setTlsTrustStorePassword(CLIENT_TRUSTSTORE_PW);
+
+        proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
+        proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+
+        proxyConfig.setTlsRequireTrustedClientCertOnConnect(false);
+
+        proxyConfig.setBrokerClientTlsEnabledWithKeyStore(true);
+        proxyConfig.setBrokerClientTlsKeyStore(CLIENT_KEYSTORE_FILE_PATH);
+        proxyConfig.setBrokerClientTlsKeyStorePassword(CLIENT_KEYSTORE_PW);
+        proxyConfig.setBrokerClientTlsTrustStore(BROKER_TRUSTSTORE_FILE_PATH);
+        proxyConfig.setBrokerClientTlsTrustStorePassword(BROKER_TRUSTSTORE_PW);
+
+        proxyService = Mockito.spy(new ProxyService(proxyConfig,
+                                                    new AuthenticationService(
+                                                            
PulsarConfigurationLoader.convertFrom(proxyConfig))));
+        doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
+        doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+
+        proxyService.start();
+    }
+
+    @Override
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        internalCleanup();
+
+        proxyService.close();
+    }
+
+    protected PulsarClient newClient() throws Exception {
+        ClientBuilder clientBuilder = PulsarClient.builder()
+                .serviceUrl(proxyService.getServiceUrlTls())
+                .useKeyStoreTls(true)
+                .tlsTrustStorePath(BROKER_TRUSTSTORE_FILE_PATH)
+                .tlsTrustStorePassword(BROKER_TRUSTSTORE_PW)
+                .tlsKeyStorePath(CLIENT_KEYSTORE_FILE_PATH)
+                .tlsKeyStorePassword(CLIENT_KEYSTORE_PW)
+                .allowTlsInsecureConnection(false);
+        return clientBuilder.build();
+    }
+
+    @Test
+    public void testProducer() throws Exception {
+        @Cleanup
+        PulsarClient client = newClient();
+        @Cleanup
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+                .topic("persistent://sample/test/local/topic" + 
System.currentTimeMillis())
+                .create();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send("test".getBytes());
+        }
+    }
+}

Reply via email to