This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 55d41bef71408d634954e596767b8760395f9294
Author: Michael Marshall <[email protected]>
AuthorDate: Wed Apr 20 22:25:17 2022 -0500

    Add KeyStore support in WebSocket, Function Worker HTTPS Servers  (#15084)
    
    * Add KeyStore support in WebSocket, Function Worker HTTPS Servers
    
    * Avoid leaking worker config password
    
    * Fix checkstyle
    
    * Replace broker with appropriate text in annotations
    
    * Update python script for new configs
    
    Co-authored-by: Lari Hotari <[email protected]>
    
    We support configuring KeyStores for the broker and the proxy, but not the 
WebSocket or the Function Worker. By adding this support, users are able to 
provide KeyStores of type PCKS12 or JKS, which adds flexibility. Further, these 
KeyStores simplify support for additional algorithms because we rely on the TLS 
provider to load the KeyStore instead of loading keys ourselves.
    
    * Add `KeyStoreSSLContext`s to the function worker server
    * Add `KeyStoreSSLContext`s to the web socket server
    * Add configurations to the function worker, the web socket, and the proxy 
configuration files to simply configuration
    * Rely on `toString`, not `ObjectMapper`, when converting the 
`WorkerConfig` to a string so that we don't log the KeyStore password. (Add a 
test to verify this logic. Note that we don't want the `ObjectMapper` to ignore 
the field because we use mappers when converting configuration classes.)
    
    I manually verified that this change works in a minikube cluster. The 
underlying method named `KeyStoreSSLContext#createSslContextFactory` is already 
used and tested, so I don't believe we need additional testing on that 
component.
    
    This change adds a new way to configure TLS in the WebSocket and Function 
Worker HTTPS Servers. As such, it adds new configuration. This configuration is 
named in the same way that the broker and proxy configuration is named, so it 
is consistent.
    
    I've documented the new configuration in the appropriate configuration 
files.
    
    (cherry picked from commit a4103960a4eb7803af6fabd9fe54a3137f82aff0)
---
 conf/broker.conf                                   |  2 +
 conf/functions_worker.yml                          | 40 +++++++++++
 conf/proxy.conf                                    | 27 ++++++++
 conf/websocket.conf                                | 37 ++++++++++
 docker/pulsar/scripts/gen-yml-from-env.py          |  4 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |  2 +-
 .../pulsar/functions/worker/WorkerConfig.java      | 78 ++++++++++++++++++++--
 .../worker/WorkerApiV2ResourceConfigTest.java      | 18 ++++-
 .../functions/worker/PulsarWorkerService.java      | 11 +--
 .../pulsar/functions/worker/rest/WorkerServer.java | 33 +++++++--
 .../pulsar/websocket/service/ProxyServer.java      | 35 +++++++---
 .../service/WebSocketProxyConfiguration.java       | 66 ++++++++++++++++--
 12 files changed, 316 insertions(+), 37 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index a315ef6dde1..affdaaf8dcc 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -590,6 +590,8 @@ tlsRequireTrustedClientCertOnConnect=false
 tlsProvider=
 
 ### --- KeyStore TLS config variables --- ###
+## Note that some of the above TLS configs also apply to the KeyStore TLS 
configuration.
+
 # Enable TLS with KeyStore type configuration in broker.
 tlsEnabledWithKeyStore=false
 
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 574a0eb3db6..efdb6b61653 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -301,6 +301,46 @@ tlsAllowInsecureConnection: false
 tlsEnableHostnameVerification: false
 # Tls cert refresh duration in seconds (set 0 to check on every new connection)
 tlsCertRefreshCheckDurationSec: 300
+# Whether client certificates are required for TLS. Connections are rejected 
if the client
+# certificate isn't trusted.
+tlsRequireTrustedClientCertOnConnect: false
+
+### --- KeyStore TLS config variables --- ###
+## Note that some of the above TLS configs also apply to the KeyStore TLS 
configuration.
+
+# TLS Provider for KeyStore type
+tlsProvider:
+
+# Enable TLS with KeyStore type configuration in function worker.
+tlsEnabledWithKeyStore: false
+
+# TLS KeyStore type configuration in function worker: JKS, PKCS12
+tlsKeyStoreType: JKS
+
+# TLS KeyStore path in function worker
+tlsKeyStore:
+
+# TLS KeyStore password for function worker
+tlsKeyStorePassword:
+
+# TLS TrustStore type configuration in function worker: JKS, PKCS12
+tlsTrustStoreType: JKS
+
+# TLS TrustStore path in function worker
+tlsTrustStore:
+
+# TLS TrustStore password in function worker, default value is empty password
+tlsTrustStorePassword:
+
+# Specify the tls protocols the function worker's web service will use to 
negotiate during TLS handshake
+# (a comma-separated list of protocol names).
+# Examples:- [TLSv1.3, TLSv1.2]
+webServiceTlsProtocols:
+
+# Specify the tls cipher the function worker will use to negotiate during TLS 
Handshake
+# (a comma-separated list of ciphers).
+# Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
+webServiceTlsCiphers:
 
 ########################
 # State Management
diff --git a/conf/proxy.conf b/conf/proxy.conf
index a119095218a..6033a07b851 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -68,6 +68,33 @@ webServicePort=8080
 # Port to use to server HTTPS request
 webServicePortTls=
 
+### --- KeyStore TLS config variables --- ###
+## Note that some of the above TLS configs also apply to the KeyStore TLS 
configuration.
+
+# TLS Provider for KeyStore type
+tlsProvider=
+
+# Enable TLS with KeyStore type configuration in proxy.
+tlsEnabledWithKeyStore=false
+
+# TLS KeyStore type configuration in proxy: JKS, PKCS12
+tlsKeyStoreType=JKS
+
+# TLS KeyStore path in proxy
+tlsKeyStore=
+
+# TLS KeyStore password for proxy
+tlsKeyStorePassword=
+
+# TLS TrustStore type configuration in proxy: JKS, PKCS12
+tlsTrustStoreType=JKS
+
+# TLS TrustStore path in proxy
+tlsTrustStore=
+
+# TLS TrustStore password in proxy, default value is empty password
+tlsTrustStorePassword=
+
 # Path for the file used to determine the rotation status for the proxy 
instance when responding
 # to service discovery health checks
 statusFilePath=
diff --git a/conf/websocket.conf b/conf/websocket.conf
index e6d374de13d..85d8bc4564d 100644
--- a/conf/websocket.conf
+++ b/conf/websocket.conf
@@ -119,6 +119,43 @@ tlsRequireTrustedClientCertOnConnect=false
 # Tls cert refresh duration in seconds (set 0 to check on every new 
connection) 
 tlsCertRefreshCheckDurationSec=300
 
+### --- KeyStore TLS config variables --- ###
+## Note that some of the above TLS configs also apply to the KeyStore TLS 
configuration.
+
+# TLS Provider for KeyStore type
+tlsProvider=
+
+# Enable TLS with KeyStore type configuration in WebSocket.
+tlsEnabledWithKeyStore=false
+
+# TLS KeyStore type configuration in WebSocket: JKS, PKCS12
+tlsKeyStoreType=JKS
+
+# TLS KeyStore path in WebSocket
+tlsKeyStore=
+
+# TLS KeyStore password for WebSocket
+tlsKeyStorePassword=
+
+# TLS TrustStore type configuration in WebSocket: JKS, PKCS12
+tlsTrustStoreType=JKS
+
+# TLS TrustStore path in WebSocket
+tlsTrustStore=
+
+# TLS TrustStore password in WebSocket, default value is empty password
+tlsTrustStorePassword=
+
+# Specify the tls protocols the proxy's web service will use to negotiate 
during TLS handshake
+# (a comma-separated list of protocol names).
+# Examples:- [TLSv1.3, TLSv1.2]
+webServiceTlsProtocols=
+
+# Specify the tls cipher the proxy will use to negotiate during TLS Handshake
+# (a comma-separated list of ciphers).
+# Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
+webServiceTlsCiphers=
+
 ### --- Deprecated config variables --- ###
 
 # Deprecated. Use configurationStoreServers
diff --git a/docker/pulsar/scripts/gen-yml-from-env.py 
b/docker/pulsar/scripts/gen-yml-from-env.py
index 8aee68b5282..779341d1269 100755
--- a/docker/pulsar/scripts/gen-yml-from-env.py
+++ b/docker/pulsar/scripts/gen-yml-from-env.py
@@ -47,7 +47,9 @@ SET_KEYS = [
     'proxyRoles',
     'schemaRegistryCompatibilityCheckers',
     'brokerClientTlsCiphers',
-    'brokerClientTlsProtocols'
+    'brokerClientTlsProtocols',
+    'webServiceTlsCiphers',
+    'webServiceTlsProtocols',
 ]
 
 PF_ENV_PREFIX = 'PF_'
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index f9ccad6a3a3..abd0bffe7eb 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2192,7 +2192,7 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
 
     @FieldContext(
             category = CATEGORY_KEYSTORE_TLS,
-            doc = "TLS Provider for Specify the SSL provider for the broker 
service: \n"
+            doc = "Specify the TLS provider for the broker service: \n"
                     + "When using TLS authentication with CACert, the valid 
value is either OPENSSL or JDK.\n"
                     + "When using TLS authentication with KeyStore, available 
values can be SunJSSE, Conscrypt and etc."
     )
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 64cbc095f40..37304afdb8a 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -36,18 +36,18 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-
+import java.util.TreeSet;
 import lombok.AccessLevel;
+import lombok.Data;
 import lombok.Getter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.common.configuration.Category;
 import org.apache.pulsar.common.configuration.FieldContext;
 import org.apache.pulsar.common.configuration.PulsarConfiguration;
 import org.apache.pulsar.common.functions.Resources;
-
-import lombok.Data;
-import lombok.experimental.Accessors;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider;
 import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory;
@@ -75,6 +75,8 @@ public class WorkerConfig implements Serializable, 
PulsarConfiguration {
     @Category
     private static final String CATEGORY_SECURITY = "Common Security Settings 
(applied for both worker and client)";
     @Category
+    private static final String CATEGORY_KEYSTORE_TLS = "KeyStoreTLS";
+    @Category
     private static final String CATEGORY_WORKER_SECURITY = "Worker Security 
Settings";
     @Category
     private static final String CATEGORY_CLIENT_SECURITY = "Security settings 
for clients talking to brokers";
@@ -368,6 +370,74 @@ public class WorkerConfig implements Serializable, 
PulsarConfiguration {
             doc = "Tls cert refresh duration in seconds (set 0 to check on 
every new connection)"
         )
         private long tlsCertRefreshCheckDurationSec = 300;
+
+    /**** --- KeyStore TLS config variables. --- ****/
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "Enable TLS with KeyStore type configuration in function 
worker"
+    )
+    private boolean tlsEnabledWithKeyStore = false;
+
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "Specify the TLS provider for the function worker service: 
\n"
+                    + "When using TLS authentication with CACert, the valid 
value is either OPENSSL or JDK.\n"
+                    + "When using TLS authentication with KeyStore, available 
values can be SunJSSE, Conscrypt and etc."
+    )
+    private String tlsProvider = null;
+
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS KeyStore type configuration in function worker: JKS, 
PKCS12"
+    )
+    private String tlsKeyStoreType = "JKS";
+
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS KeyStore path in function worker"
+    )
+    private String tlsKeyStore = null;
+
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS KeyStore password for function worker"
+    )
+    @ToString.Exclude
+    private String tlsKeyStorePassword = null;
+
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS TrustStore type configuration in function worker: JKS, 
PKCS12"
+    )
+    private String tlsTrustStoreType = "JKS";
+
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS TrustStore path in function worker"
+    )
+    private String tlsTrustStore = null;
+
+    @FieldContext(
+            category = CATEGORY_KEYSTORE_TLS,
+            doc = "TLS TrustStore password for function worker, null means 
empty password."
+    )
+    @ToString.Exclude
+    private String tlsTrustStorePassword = null;
+
+    @FieldContext(
+            category = CATEGORY_WORKER_SECURITY,
+            doc = "Specify the tls protocols the proxy's web service will use 
to negotiate during TLS Handshake.\n\n"
+                    + "Example:- [TLSv1.3, TLSv1.2]"
+    )
+    private Set<String> webServiceTlsProtocols = new TreeSet<>();
+
+    @FieldContext(
+            category = CATEGORY_WORKER_SECURITY,
+            doc = "Specify the tls cipher the proxy's web service will use to 
negotiate during TLS Handshake.\n\n"
+                    + "Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]"
+    )
+    private Set<String> webServiceTlsCiphers = new TreeSet<>();
+
     @FieldContext(
         category = CATEGORY_WORKER_SECURITY,
         doc = "Enforce authentication"
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
index 6cfaea9e242..bf45dd958c8 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
@@ -25,10 +25,11 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
 import java.net.URL;
+import java.util.Locale;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider;
 import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory;
-import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.testng.annotations.Test;
 
 /**
@@ -121,4 +122,19 @@ public class WorkerApiV2ResourceConfigTest {
 
         assertTrue(newK8SWc.isFunctionInstanceResourceChangeInLockStep());
     }
+
+    @Test
+    public void testPasswordsNotLeakedOnToString() throws Exception {
+        URL yamlUrl = 
getClass().getClassLoader().getResource("test_worker_config.yml");
+        WorkerConfig wc = WorkerConfig.load(yamlUrl.toURI().getPath());
+        
assertFalse(wc.toString().toLowerCase(Locale.ROOT).contains("password"), 
"Stringified config must not contain password");
+    }
+
+    @Test
+    public void testPasswordsPresentOnObjectMapping() throws Exception {
+        URL yamlUrl = 
getClass().getClassLoader().getResource("test_worker_config.yml");
+        WorkerConfig wc = WorkerConfig.load(yamlUrl.toURI().getPath());
+        assertTrue((new 
ObjectMapper().writeValueAsString(wc)).toLowerCase(Locale.ROOT).contains("password"),
+                "ObjectMapper output must include passwords for proper 
serialization");
+    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
index 8eccb9d2751..e21949a4ebd 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
@@ -20,9 +20,6 @@ package org.apache.pulsar.functions.worker;
 
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 import io.netty.util.concurrent.DefaultThreadFactory;
@@ -407,13 +404,7 @@ public class PulsarWorkerService implements WorkerService {
 
         workerStatsManager.startupTimeStart();
         log.info("/** Starting worker id={} **/", workerConfig.getWorkerId());
-
-        try {
-            log.info("Worker Configs: {}", new 
ObjectMapper().writerWithDefaultPrettyPrinter()
-                    .writeValueAsString(workerConfig));
-        } catch (JsonProcessingException e) {
-            log.warn("Failed to print worker configs with error {}", 
e.getMessage(), e);
-        }
+        log.info("Worker Configs: {}", workerConfig);
 
         try {
             DistributedLogConfiguration dlogConf = 
WorkerUtils.getDlogConf(workerConfig);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index ee130639eb2..7c9a1798b92 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.broker.web.AuthenticationFilter;
 import org.apache.pulsar.broker.web.RateLimitingFilter;
 import org.apache.pulsar.broker.web.WebExecutorThreadPool;
 import org.apache.pulsar.common.util.SecurityUtility;
+import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource;
@@ -139,13 +140,31 @@ public class WorkerServer {
 
         if (this.workerConfig.getTlsEnabled()) {
             try {
-                SslContextFactory sslCtxFactory = 
SecurityUtility.createSslContextFactory(
-                        this.workerConfig.isTlsAllowInsecureConnection(), 
this.workerConfig.getTlsTrustCertsFilePath(),
-                        this.workerConfig.getTlsCertificateFilePath(), 
this.workerConfig.getTlsKeyFilePath(),
-                        
this.workerConfig.isTlsRequireTrustedClientCertOnConnect(),
-                        true,
-                        this.workerConfig.getTlsCertRefreshCheckDurationSec());
-                httpsConnector = new ServerConnector(server, 1, 1, 
sslCtxFactory);
+                SslContextFactory sslCtxFactory;
+                if (workerConfig.isTlsEnabledWithKeyStore()) {
+                    sslCtxFactory = KeyStoreSSLContext.createSslContextFactory(
+                            workerConfig.getTlsProvider(),
+                            workerConfig.getTlsKeyStoreType(),
+                            workerConfig.getTlsKeyStore(),
+                            workerConfig.getTlsKeyStorePassword(),
+                            workerConfig.isTlsAllowInsecureConnection(),
+                            workerConfig.getTlsTrustStoreType(),
+                            workerConfig.getTlsTrustStore(),
+                            workerConfig.getTlsTrustStorePassword(),
+                            
workerConfig.isTlsRequireTrustedClientCertOnConnect(),
+                            workerConfig.getTlsCertRefreshCheckDurationSec()
+                    );
+                } else {
+                    sslCtxFactory = SecurityUtility.createSslContextFactory(
+                            workerConfig.isTlsAllowInsecureConnection(),
+                            workerConfig.getTlsTrustCertsFilePath(),
+                            workerConfig.getTlsCertificateFilePath(),
+                            workerConfig.getTlsKeyFilePath(),
+                            
workerConfig.isTlsRequireTrustedClientCertOnConnect(),
+                            true,
+                            workerConfig.getTlsCertRefreshCheckDurationSec());
+                }
+                httpsConnector = new ServerConnector(server, sslCtxFactory);
                 httpsConnector.setPort(this.workerConfig.getWorkerPortTls());
                 connectors.add(httpsConnector);
             } catch (Exception e) {
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
index c616db0d4db..23414cb4532 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.broker.web.JsonMapperProvider;
 import org.apache.pulsar.broker.web.WebExecutorThreadPool;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.util.SecurityUtility;
+import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
@@ -77,15 +78,31 @@ public class ProxyServer {
         // TLS enabled connector
         if (config.getWebServicePortTls().isPresent()) {
             try {
-                SslContextFactory sslCtxFactory = 
SecurityUtility.createSslContextFactory(
-                        config.isTlsAllowInsecureConnection(),
-                        config.getTlsTrustCertsFilePath(),
-                        config.getTlsCertificateFilePath(),
-                        config.getTlsKeyFilePath(),
-                        config.isTlsRequireTrustedClientCertOnConnect(),
-                        true,
-                        config.getTlsCertRefreshCheckDurationSec());
-                connectorTls = new ServerConnector(server, -1, -1, 
sslCtxFactory);
+                SslContextFactory sslCtxFactory;
+                if (config.isTlsEnabledWithKeyStore()) {
+                    sslCtxFactory = KeyStoreSSLContext.createSslContextFactory(
+                            config.getTlsProvider(),
+                            config.getTlsKeyStoreType(),
+                            config.getTlsKeyStore(),
+                            config.getTlsKeyStorePassword(),
+                            config.isTlsAllowInsecureConnection(),
+                            config.getTlsTrustStoreType(),
+                            config.getTlsTrustStore(),
+                            config.getTlsTrustStorePassword(),
+                            config.isTlsRequireTrustedClientCertOnConnect(),
+                            config.getTlsCertRefreshCheckDurationSec()
+                    );
+                } else {
+                    sslCtxFactory = SecurityUtility.createSslContextFactory(
+                            config.isTlsAllowInsecureConnection(),
+                            config.getTlsTrustCertsFilePath(),
+                            config.getTlsCertificateFilePath(),
+                            config.getTlsKeyFilePath(),
+                            config.isTlsRequireTrustedClientCertOnConnect(),
+                            true,
+                            config.getTlsCertRefreshCheckDurationSec());
+                }
+                connectorTls = new ServerConnector(server, sslCtxFactory);
                 connectorTls.setPort(config.getWebServicePortTls().get());
                 connectors.add(connectorTls);
             } catch (Exception e) {
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
index 6ce27e6e500..e62fecf9fab 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
@@ -21,16 +21,16 @@ package org.apache.pulsar.websocket.service;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
-
+import java.util.TreeSet;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.common.configuration.FieldContext;
 import org.apache.pulsar.common.configuration.PulsarConfiguration;
 
 import com.google.common.collect.Sets;
 
-import lombok.Getter;
-import lombok.Setter;
-
 @Getter
 @Setter
 public class WebSocketProxyConfiguration implements PulsarConfiguration {
@@ -131,5 +131,63 @@ public class WebSocketProxyConfiguration implements 
PulsarConfiguration {
     // Tls cert refresh duration in seconds (set 0 to check on every new 
connection) 
     private long tlsCertRefreshCheckDurationSec = 300;
 
+    /**** --- KeyStore TLS config variables. --- ****/
+    @FieldContext(
+            doc = "Enable TLS with KeyStore type configuration for WebSocket"
+    )
+    private boolean tlsEnabledWithKeyStore = false;
+
+    @FieldContext(
+            doc = "Specify the TLS provider for the WebSocket service: \n"
+                    + "When using TLS authentication with CACert, the valid 
value is either OPENSSL or JDK.\n"
+                    + "When using TLS authentication with KeyStore, available 
values can be SunJSSE, Conscrypt and etc."
+    )
+    private String tlsProvider = null;
+
+    @FieldContext(
+            doc = "TLS KeyStore type configuration in WebSocket: JKS, PKCS12"
+    )
+    private String tlsKeyStoreType = "JKS";
+
+    @FieldContext(
+            doc = "TLS KeyStore path in WebSocket"
+    )
+    private String tlsKeyStore = null;
+
+    @FieldContext(
+            doc = "TLS KeyStore password for WebSocket"
+    )
+    @ToString.Exclude
+    private String tlsKeyStorePassword = null;
+
+    @FieldContext(
+            doc = "TLS TrustStore type configuration in WebSocket: JKS, PKCS12"
+    )
+    private String tlsTrustStoreType = "JKS";
+
+    @FieldContext(
+            doc = "TLS TrustStore path in WebSocket"
+    )
+    private String tlsTrustStore = null;
+
+    @FieldContext(
+            doc = "TLS TrustStore password for WebSocket, null means empty 
password."
+    )
+    @ToString.Exclude
+    private String tlsTrustStorePassword = null;
+
+    @FieldContext(
+            doc = "Specify the tls protocols the proxy's web service will use 
to negotiate during TLS Handshake.\n\n"
+                    + "Example:- [TLSv1.3, TLSv1.2]"
+    )
+    private Set<String> webServiceTlsProtocols = new TreeSet<>();
+
+    @FieldContext(
+            doc = "Specify the tls cipher the proxy's web service will use to 
negotiate during TLS Handshake.\n\n"
+                    + "Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]"
+    )
+    private Set<String> webServiceTlsCiphers = new TreeSet<>();
+
+    @FieldContext(doc = "Key-value properties. Types are all String")
     private Properties properties = new Properties();
 }

Reply via email to