This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ffd6f21  Add Configuration to set tlsClientAuth (#1297)
ffd6f21 is described below

commit ffd6f211b206788cfab4fd62e8d259bff96ad366
Author: Jai Asher <j...@ccs.neu.edu>
AuthorDate: Tue Mar 27 16:10:07 2018 -0700

    Add Configuration to set tlsClientAuth (#1297)
    
    * Add Configuration to set tlsClientAuth
    
    * Fixed ProxyPublishConsumeTlsTest
    
    * Handled Matteos PR review comments
    
    * Negative Tests
    
    * Changed the Client Auth to ENUM
    
    * Addressed Matteo's PR review Comments
    
    * Removed unused imports
    
    * Added client Cert to HTTPS
    
    * Split the test case
    
    * Replace tlsReqTrustedClientCertOnConnect with 
tlsRequireTrustedClientCertOnConnect
---
 conf/broker.conf                                   |  3 +
 conf/discovery.conf                                |  4 ++
 conf/proxy.conf                                    |  4 ++
 conf/websocket.conf                                |  4 ++
 .../apache/pulsar/broker/ServiceConfiguration.java | 10 ++++
 .../broker/service/PulsarChannelInitializer.java   |  3 +-
 .../org/apache/pulsar/broker/web/WebService.java   | 24 ++++----
 .../pulsar/client/api/TlsProducerConsumerBase.java | 38 ++++++++++---
 .../pulsar/client/api/TlsProducerConsumerTest.java | 66 +++++++++++++++++++++-
 .../proxy/ProxyPublishConsumeTlsTest.java          |  6 ++
 pulsar-common/pom.xml                              |  6 ++
 .../apache/pulsar/common/util/SecurityUtility.java | 31 ++++++++--
 .../service/ServiceChannelInitializer.java         |  3 +-
 .../discovery/service/server/ServerManager.java    | 20 +++----
 .../discovery/service/server/ServiceConfig.java    | 11 ++++
 .../pulsar/proxy/server/ProxyConfiguration.java    | 13 ++++-
 .../proxy/server/ServiceChannelInitializer.java    |  3 +-
 .../org/apache/pulsar/proxy/server/WebServer.java  | 18 +++---
 .../pulsar/websocket/service/ProxyServer.java      | 18 +++---
 .../service/WebSocketProxyConfiguration.java       | 12 +++-
 20 files changed, 238 insertions(+), 59 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 59a19d7..cec3c0f 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -207,6 +207,9 @@ tlsTrustCertsFilePath=
 # Accept untrusted TLS certificate from client
 tlsAllowInsecureConnection=false
 
+# Specify whether Client certificates are required for TLS
+# Reject the Connection if the Client Certificate is not trusted.
+tlsRequireTrustedClientCertOnConnect=false
 ### --- Authentication --- ###
 
 # Enable authentication
diff --git a/conf/discovery.conf b/conf/discovery.conf
index 49f499a..87f887f 100644
--- a/conf/discovery.conf
+++ b/conf/discovery.conf
@@ -73,3 +73,7 @@ tlsCertificateFilePath=
 
 # Path for the TLS private key file
 tlsKeyFilePath=
+
+# Specify whether Client certificates are required for TLS
+# Reject the Connection if the Client Certificate is not trusted.
+tlsRequireTrustedClientCertOnConnect=false
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 384cca0..5d0647d 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -85,3 +85,7 @@ tlsKeyFilePath=
 
 # Validates hostname when proxy creates tls connection with broker
 tlsHostnameVerificationEnabled=false
+
+# Specify whether Client certificates are required for TLS
+# Reject the Connection if the Client Certificate is not trusted.
+tlsRequireTrustedClientCertOnConnect=false
diff --git a/conf/websocket.conf b/conf/websocket.conf
index 0ceda62..87accac 100644
--- a/conf/websocket.conf
+++ b/conf/websocket.conf
@@ -99,3 +99,7 @@ tlsKeyFilePath=
 
 # Path for the trusted TLS certificate file
 tlsTrustCertsFilePath=
+
+# Specify whether Client certificates are required for TLS
+# Reject the Connection if the Client Certificate is not trusted.
+tlsRequireTrustedClientCertOnConnect=false
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index f851f7d..40ac189 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -202,6 +202,9 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     // Specify the tls cipher the broker will use to negotiate during TLS 
Handshake.
     // Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
     private Set<String> tlsCiphers = Sets.newTreeSet();
+    // Specify whether Client certificates are required for TLS
+    // Reject the Connection if the Client Certificate is not trusted.
+    private boolean tlsRequireTrustedClientCertOnConnect = false;
 
     /***** --- Authentication --- ****/
     // Enable authentication
@@ -1497,7 +1500,14 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     public void setTlsCiphers(Set<String> tlsCiphers) {
         this.tlsCiphers = tlsCiphers;
     }
+    
+    public boolean getTlsRequireTrustedClientCertOnConnect() {
+        return tlsRequireTrustedClientCertOnConnect;
+    }
 
+    public void setTlsRequireTrustedClientCertOnConnect(boolean 
tlsRequireTrustedClientCertOnConnect) {
+        this.tlsRequireTrustedClientCertOnConnect = 
tlsRequireTrustedClientCertOnConnect;
+    }
     /**** --- Function ---- ****/
 
     public void setFunctionsWorkerEnabled(boolean enabled) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index f77c6e6..8c16a55 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -53,7 +53,8 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
             SslContext sslCtx = SecurityUtility.createNettySslContextForServer(
                     serviceConfig.isTlsAllowInsecureConnection(), 
serviceConfig.getTlsTrustCertsFilePath(),
                     serviceConfig.getTlsCertificateFilePath(), 
serviceConfig.getTlsKeyFilePath(),
-                    serviceConfig.getTlsCiphers(), 
serviceConfig.getTlsProtocols());
+                    serviceConfig.getTlsCiphers(), 
serviceConfig.getTlsProtocols(),
+                    serviceConfig.getTlsRequireTrustedClientCertOnConnect());
             ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
         }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 02c8b1a..099f1a0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -90,24 +90,20 @@ public class WebService implements AutoCloseable {
         connectors.add(connector);
 
         if (pulsar.getConfiguration().isTlsEnabled()) {
-            SslContextFactory sslCtxFactory = new SslContextFactory();
-
             try {
-                sslCtxFactory.setSslContext(
-                        SecurityUtility.createSslContext(
-                            
pulsar.getConfiguration().isTlsAllowInsecureConnection(),
-                            
pulsar.getConfiguration().getTlsTrustCertsFilePath(),
-                            
pulsar.getConfiguration().getTlsCertificateFilePath(),
-                            pulsar.getConfiguration().getTlsKeyFilePath()));
+                SslContextFactory sslCtxFactory = 
SecurityUtility.createSslContextFactory(
+                        
pulsar.getConfiguration().isTlsAllowInsecureConnection(),
+                        pulsar.getConfiguration().getTlsTrustCertsFilePath(),
+                        pulsar.getConfiguration().getTlsCertificateFilePath(),
+                        pulsar.getConfiguration().getTlsKeyFilePath(),
+                        
pulsar.getConfiguration().getTlsRequireTrustedClientCertOnConnect());
+                ServerConnector tlsConnector = new 
PulsarServerConnector(server, 1, 1, sslCtxFactory);
+                
tlsConnector.setPort(pulsar.getConfiguration().getWebServicePortTls());
+                tlsConnector.setHost(pulsar.getBindAddress());
+                connectors.add(tlsConnector);
             } catch (GeneralSecurityException e) {
                 throw new PulsarServerException(e);
             }
-
-            sslCtxFactory.setWantClientAuth(true);
-            ServerConnector tlsConnector = new PulsarServerConnector(server, 
1, 1, sslCtxFactory);
-            
tlsConnector.setPort(pulsar.getConfiguration().getWebServicePortTls());
-            tlsConnector.setHost(pulsar.getBindAddress());
-            connectors.add(tlsConnector);
         }
 
         // Limit number of concurrent HTTP connections to avoid getting out of 
file descriptors
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
index 66b2265..f1e1308 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
@@ -21,11 +21,14 @@ package org.apache.pulsar.client.api;
 import static org.mockito.Mockito.spy;
 
 import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
@@ -34,8 +37,12 @@ import org.testng.annotations.BeforeMethod;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import io.netty.handler.ssl.ClientAuth;
+
 public class TlsProducerConsumerBase extends ProducerConsumerBase {
     protected final String TLS_TRUST_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/cacert.pem";
+    protected final String TLS_CLIENT_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/client-cert.pem";
+    protected final String TLS_CLIENT_KEY_FILE_PATH = 
"./src/test/resources/authentication/tls/client-key.pem";
     protected final String TLS_SERVER_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/broker-cert.pem";
     protected final String TLS_SERVER_KEY_FILE_PATH = 
"./src/test/resources/authentication/tls/broker-key.pem";
     private final String clusterName = "use";
@@ -43,7 +50,6 @@ public class TlsProducerConsumerBase extends 
ProducerConsumerBase {
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
-
         // TLS configuration for Broker
         internalSetUpForBroker();
 
@@ -61,19 +67,37 @@ public class TlsProducerConsumerBase extends 
ProducerConsumerBase {
         conf.setTlsEnabled(true);
         conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
         conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+        conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
         conf.setClusterName(clusterName);
+        conf.setTlsRequireTrustedClientCertOnConnect(true);
+        Set<String> tlsProtocols = Sets.newConcurrentHashSet();
+        tlsProtocols.add("TLSv1.2");
+        conf.setTlsProtocols(tlsProtocols);
     }
 
-    protected void internalSetUpForClient() throws Exception {
-        String lookupUrl = new URI("pulsar+ssl://localhost:" + 
BROKER_PORT_TLS).toString();
-        pulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl).tlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH)
-                .enableTls(true).build();
+    protected void internalSetUpForClient(boolean addCertificates, String 
lookupUrl) throws Exception {
+        ClientConfiguration clientConf = new ClientConfiguration();
+        clientConf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+        clientConf.setUseTls(true);
+        clientConf.setTlsAllowInsecureConnection(false);
+        if (addCertificates) {
+            Map<String, String> authParams = new HashMap<>();
+            authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+            authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+            clientConf.setAuthentication(AuthenticationTls.class.getName(), 
authParams);
+        }
+        pulsarClient = PulsarClient.create(lookupUrl, clientConf);
     }
 
     protected void internalSetUpForNamespace() throws Exception {
         ClientConfiguration clientConf = new ClientConfiguration();
         clientConf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
         clientConf.setUseTls(true);
+        clientConf.setTlsAllowInsecureConnection(false);
+        Map<String, String> authParams = new HashMap<>();
+        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+        clientConf.setAuthentication(AuthenticationTls.class.getName(), 
authParams);
         admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
         admin.clusters().updateCluster(clusterName, new 
ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
                 "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" 
+ BROKER_PORT_TLS));
@@ -81,4 +105,4 @@ public class TlsProducerConsumerBase extends 
ProducerConsumerBase {
                 new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), 
Sets.newHashSet("use")));
         admin.namespaces().createNamespace("my-property/use/my-ns");
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
index a0d4bc2..8641ac7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
@@ -42,7 +42,7 @@ public class TlsProducerConsumerTest extends 
TlsProducerConsumerBase {
         final int MESSAGE_SIZE = 16 * 1024 + 1;
         log.info("-- message size --", MESSAGE_SIZE);
 
-        internalSetUpForClient();
+        internalSetUpForClient(true, "pulsar+ssl://localhost:" + 
BROKER_PORT_TLS);
         internalSetUpForNamespace();
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1")
@@ -68,4 +68,68 @@ public class TlsProducerConsumerTest extends 
TlsProducerConsumerBase {
         consumer.close();
         log.info("-- Exiting {} test --", methodName);
     }
+
+    @Test(timeOut = 30000)
+    public void testTlsClientAuthOverBinaryProtocol() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final int MESSAGE_SIZE = 16 * 1024 + 1;
+        log.info("-- message size --", MESSAGE_SIZE);
+        internalSetUpForNamespace();
+
+        // Test 1 - Using TLS on binary protocol without sending certs - 
expect failure
+        internalSetUpForClient(false, "pulsar+ssl://localhost:" + 
BROKER_PORT_TLS);
+        try {
+            ConsumerConfiguration conf = new ConsumerConfiguration();
+            conf.setSubscriptionType(SubscriptionType.Exclusive);
+            Consumer consumer = 
pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1",
+                    "my-subscriber-name", conf);
+            Assert.fail("Server should have failed the TLS handshake since 
client didn't .");
+        } catch (Exception ex) {
+            // OK
+        }
+
+        // Test 2 - Using TLS on binary protocol - sending certs
+        internalSetUpForClient(true, "pulsar+ssl://localhost:" + 
BROKER_PORT_TLS);
+        try {
+            ConsumerConfiguration conf = new ConsumerConfiguration();
+            conf.setSubscriptionType(SubscriptionType.Exclusive);
+            Consumer consumer = 
pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1",
+                    "my-subscriber-name", conf);
+        } catch (Exception ex) {
+            Assert.fail("Should not fail since certs are sent.");
+        }
+    }
+
+    @Test(timeOut = 30000)
+    public void testTlsClientAuthOverHTTPProtocol() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final int MESSAGE_SIZE = 16 * 1024 + 1;
+        log.info("-- message size --", MESSAGE_SIZE);
+        internalSetUpForNamespace();
+
+        // Test 1 - Using TLS on https without sending certs - expect failure
+        internalSetUpForClient(false, "https://localhost:"; + 
BROKER_WEBSERVICE_PORT_TLS);
+        try {
+            ConsumerConfiguration conf = new ConsumerConfiguration();
+            conf.setSubscriptionType(SubscriptionType.Exclusive);
+            Consumer consumer = 
pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1",
+                    "my-subscriber-name", conf);
+            Assert.fail("Server should have failed the TLS handshake since 
client didn't .");
+        } catch (Exception ex) {
+            // OK
+        }
+
+        // Test 2 - Using TLS on https - sending certs
+        internalSetUpForClient(true, "https://localhost:"; + 
BROKER_WEBSERVICE_PORT_TLS);
+        try {
+            ConsumerConfiguration conf = new ConsumerConfiguration();
+            conf.setSubscriptionType(SubscriptionType.Exclusive);
+            Consumer consumer = 
pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1",
+                    "my-subscriber-name", conf);
+        } catch (Exception ex) {
+            Assert.fail("Should not fail since certs are sent.");
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
index ac79c8a..6d486c2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
@@ -24,12 +24,15 @@ import static org.mockito.Mockito.spy;
 
 import java.net.URI;
 import java.security.GeneralSecurityException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.client.api.TlsProducerConsumerBase;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.apache.pulsar.websocket.WebSocketService;
 import org.apache.pulsar.websocket.service.ProxyServer;
@@ -71,6 +74,9 @@ public class ProxyPublishConsumeTlsTest extends 
TlsProducerConsumerBase {
         config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
         config.setClusterName("use");
         config.setGlobalZookeeperServers("dummy-zk-servers");
+        config.setBrokerClientAuthenticationParameters("tlsCertFile:" + 
TLS_CLIENT_CERT_FILE_PATH + ",tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
+        
config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
+        String lookupUrl = new URI("pulsar://localhost:" + 
BROKER_PORT_TLS).toString();
         service = spy(new WebSocketService(config));
         
doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory();
         proxyServer = new ProxyServer(config);
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index 68dd322..2ef8e64 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -86,5 +86,11 @@
       <groupId>io.netty</groupId>
       <artifactId>netty-tcnative-boringssl-static</artifactId>
     </dependency>
+
+       <dependency>
+               <groupId>org.eclipse.jetty</groupId>
+               <artifactId>jetty-server</artifactId>
+       </dependency>
+    
   </dependencies>
 </project>
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
index 26f97bb..0181627 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
@@ -49,6 +49,8 @@ import javax.net.ssl.SSLException;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
 
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+
 import io.netty.handler.ssl.ClientAuth;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
@@ -93,7 +95,8 @@ public class SecurityUtility {
     }
 
     public static SslContext createNettySslContextForServer(boolean 
allowInsecureConnection, String trustCertsFilePath,
-            String certFilePath, String keyFilePath, Set<String> ciphers, 
Set<String> protocols)
+            String certFilePath, String keyFilePath, Set<String> ciphers, 
Set<String> protocols,
+            boolean requireTrustedClientCertOnConnect)
             throws GeneralSecurityException, SSLException, 
FileNotFoundException, IOException {
         X509Certificate[] certificates = 
loadCertificatesFromPemFile(certFilePath);
         PrivateKey privateKey = loadPrivateKeyFromPemFile(keyFilePath);
@@ -103,7 +106,7 @@ public class SecurityUtility {
         setupProtocols(builder, protocols);
         setupTrustCerts(builder, allowInsecureConnection, trustCertsFilePath);
         setupKeyManager(builder, privateKey, certificates);
-        setupClientAuthentication(builder);
+        setupClientAuthentication(builder, requireTrustedClientCertOnConnect);
         return builder.build();
     }
 
@@ -236,7 +239,27 @@ public class SecurityUtility {
         }
     }
 
-    private static void setupClientAuthentication(SslContextBuilder builder) {
-        builder.clientAuth(ClientAuth.OPTIONAL);
+    private static void setupClientAuthentication(SslContextBuilder builder, 
boolean requireTrustedClientCertOnConnect) {
+        if (requireTrustedClientCertOnConnect) {
+            builder.clientAuth(ClientAuth.REQUIRE);
+        } else {
+            builder.clientAuth(ClientAuth.OPTIONAL);
+        }
+    }
+
+    public static SslContextFactory createSslContextFactory(boolean 
tlsAllowInsecureConnection,
+            String tlsTrustCertsFilePath, String tlsCertificateFilePath, 
String tlsKeyFilePath,
+            boolean tlsRequireTrustedClientCertOnConnect) throws 
GeneralSecurityException {
+        SslContextFactory sslCtxFactory = new SslContextFactory();
+        SSLContext sslCtx = createSslContext(tlsAllowInsecureConnection, 
tlsTrustCertsFilePath, tlsCertificateFilePath,
+                tlsKeyFilePath);
+        sslCtxFactory.setSslContext(sslCtx);
+        if (tlsRequireTrustedClientCertOnConnect) {
+            sslCtxFactory.setNeedClientAuth(true);
+        } else {
+            sslCtxFactory.setWantClientAuth(true);
+        }
+        sslCtxFactory.setTrustAll(true);
+        return sslCtxFactory;
     }
 }
diff --git 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java
 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java
index 2cfe128..3f230d4 100644
--- 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java
+++ 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java
@@ -52,7 +52,8 @@ public class ServiceChannelInitializer extends 
ChannelInitializer<SocketChannel>
             SslContext sslCtx = SecurityUtility.createNettySslContextForServer(
                     serviceConfig.isTlsAllowInsecureConnection(), 
serviceConfig.getTlsTrustCertsFilePath(),
                     serviceConfig.getTlsCertificateFilePath(), 
serviceConfig.getTlsKeyFilePath(),
-                    serviceConfig.getTlsCiphers(), 
serviceConfig.getTlsProtocols());
+                    serviceConfig.getTlsCiphers(), 
serviceConfig.getTlsProtocols(),
+                    serviceConfig.getTlsRequireTrustedClientCertOnConnect());
             ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
         }
         ch.pipeline().addLast("frameDecoder", new 
LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
diff --git 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServerManager.java
 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServerManager.java
index d8be507..f0d911f 100644
--- 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServerManager.java
+++ 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServerManager.java
@@ -72,19 +72,19 @@ public class ServerManager {
         connectors.add(connector);
 
         if (config.isTlsEnabled()) {
-            SslContextFactory sslCtxFactory = new SslContextFactory();
             try {
-                SSLContext sslCtx = 
SecurityUtility.createSslContext(config.isTlsAllowInsecureConnection(), 
config.getTlsTrustCertsFilePath(), config.getTlsCertificateFilePath(),
-                        config.getTlsKeyFilePath());
-                sslCtxFactory.setSslContext(sslCtx);
+                SslContextFactory sslCtxFactory = 
SecurityUtility.createSslContextFactory(
+                        config.isTlsAllowInsecureConnection(),
+                        config.getTlsTrustCertsFilePath(),
+                        config.getTlsCertificateFilePath(),
+                        config.getTlsKeyFilePath(), 
+                        config.getTlsRequireTrustedClientCertOnConnect());
+                ServerConnector tlsConnector = new ServerConnector(server, 1, 
1, sslCtxFactory);
+                tlsConnector.setPort(config.getWebServicePortTls());
+                connectors.add(tlsConnector);
             } catch (GeneralSecurityException e) {
                 throw new RestException(e);
-            }
-
-            sslCtxFactory.setWantClientAuth(true);
-            ServerConnector tlsConnector = new ServerConnector(server, 1, 1, 
sslCtxFactory);
-            tlsConnector.setPort(config.getWebServicePortTls());
-            connectors.add(tlsConnector);
+            }            
         }
 
         // Limit number of concurrent HTTP connections to avoid getting out of 
file descriptors
diff --git 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
index 8cf56d1..c1d59ee 100644
--- 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
+++ 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
@@ -88,6 +88,9 @@ public class ServiceConfig implements PulsarConfiguration {
     // Specify the tls cipher the broker will use to negotiate during TLS 
Handshake.
     // Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
     private Set<String> tlsCiphers = Sets.newTreeSet();
+    // Specify whether Client certificates are required for TLS
+    // Reject the Connection if the Client Certificate is not trusted.
+    private boolean tlsRequireTrustedClientCertOnConnect = false;
     
     private Properties properties = new Properties();
 
@@ -266,4 +269,12 @@ public class ServiceConfig implements PulsarConfiguration {
     public void setTlsCiphers(Set<String> tlsCiphers) {
         this.tlsCiphers = tlsCiphers;
     }
+    
+    public boolean getTlsRequireTrustedClientCertOnConnect() {
+        return tlsRequireTrustedClientCertOnConnect;
+    }
+
+    public void setTlsRequireTrustedClientCertOnConnect(boolean 
tlsRequireTrustedClientCertOnConnect) {
+        this.tlsRequireTrustedClientCertOnConnect = 
tlsRequireTrustedClientCertOnConnect;
+    }
 }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 69329ef..a8d3855 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -98,7 +98,10 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
     // Specify the tls cipher the broker will use to negotiate during TLS 
Handshake.
     // Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
     private Set<String> tlsCiphers = Sets.newTreeSet();
-
+    // Specify whether Client certificates are required for TLS
+    // Reject the Connection if the Client Certificate is not trusted.
+    private boolean tlsRequireTrustedClientCertOnConnect = false;
+    
     private Properties properties = new Properties();
 
     public boolean forwardAuthorizationCredentials() {
@@ -332,4 +335,12 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
     public void setTlsCiphers(Set<String> tlsCiphers) {
         this.tlsCiphers = tlsCiphers;
     }
+    
+    public boolean getTlsRequireTrustedClientCertOnConnect() {
+        return tlsRequireTrustedClientCertOnConnect;
+    }
+
+    public void setTlsRequireTrustedClientCertOnConnect(boolean 
tlsRequireTrustedClientCertOnConnect) {
+        this.tlsRequireTrustedClientCertOnConnect = 
tlsRequireTrustedClientCertOnConnect;
+    }
 }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index 19abe83..b0055e1 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -49,7 +49,8 @@ public class ServiceChannelInitializer extends 
ChannelInitializer<SocketChannel>
         if (enableTLS) {
             SslContext sslCtx = 
SecurityUtility.createNettySslContextForServer(true /* to allow 
InsecureConnection */,
                     serviceConfig.getTlsTrustCertsFilePath(), 
serviceConfig.getTlsCertificateFilePath(),
-                    serviceConfig.getTlsKeyFilePath(), 
serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols());
+                    serviceConfig.getTlsKeyFilePath(), 
serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(),
+                    serviceConfig.getTlsRequireTrustedClientCertOnConnect());
             ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
         }
 
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
index edc7188..5a2bdda 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
@@ -73,19 +73,19 @@ public class WebServer {
         connectors.add(connector);
 
         if (config.isTlsEnabledInProxy()) {
-            SslContextFactory sslCtxFactory = new SslContextFactory();
             try {
-                SSLContext sslCtx = SecurityUtility.createSslContext(false, 
null, config.getTlsCertificateFilePath(),
-                        config.getTlsKeyFilePath());
-                sslCtxFactory.setSslContext(sslCtx);
+                SslContextFactory sslCtxFactory = 
SecurityUtility.createSslContextFactory(
+                        config.isTlsAllowInsecureConnection(),
+                        config.getTlsTrustCertsFilePath(),
+                        config.getTlsCertificateFilePath(),
+                        config.getTlsKeyFilePath(), 
+                        config.getTlsRequireTrustedClientCertOnConnect());
+                ServerConnector tlsConnector = new ServerConnector(server, 1, 
1, sslCtxFactory);
+                tlsConnector.setPort(config.getWebServicePortTls());
+                connectors.add(tlsConnector);
             } catch (GeneralSecurityException e) {
                 throw new RuntimeException(e);
             }
-
-            sslCtxFactory.setWantClientAuth(false);
-            ServerConnector tlsConnector = new ServerConnector(server, 1, 1, 
sslCtxFactory);
-            tlsConnector.setPort(config.getWebServicePortTls());
-            connectors.add(tlsConnector);
         }
 
         // Limit number of concurrent HTTP connections to avoid getting out of 
file descriptors
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
index 77b24c1..2829fdc 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
@@ -79,20 +79,20 @@ public class ProxyServer {
 
         // TLS enabled connector
         if (config.isTlsEnabled()) {
-            SslContextFactory sslCtxFactory = new SslContextFactory(true);
             try {
-                SSLContext sslCtx = SecurityUtility.createSslContext(false, 
config.getTlsTrustCertsFilePath(), config.getTlsCertificateFilePath(),
-                        config.getTlsKeyFilePath());
-                sslCtxFactory.setSslContext(sslCtx);
-
+                SslContextFactory sslCtxFactory = 
SecurityUtility.createSslContextFactory(
+                        config.isTlsAllowInsecureConnection(),
+                        config.getTlsTrustCertsFilePath(),
+                        config.getTlsCertificateFilePath(),
+                        config.getTlsKeyFilePath(),
+                        config.getTlsRequireTrustedClientCertOnConnect());
+                ServerConnector tlsConnector = new ServerConnector(server, -1, 
-1, sslCtxFactory);
+                tlsConnector.setPort(config.getWebServicePortTls());
+                connectors.add(tlsConnector);
             } catch (GeneralSecurityException e) {
                 throw new PulsarServerException(e);
             }
 
-            sslCtxFactory.setWantClientAuth(true);
-            ServerConnector tlsConnector = new ServerConnector(server, -1, -1, 
sslCtxFactory);
-            tlsConnector.setPort(config.getWebServicePortTls());
-            connectors.add(tlsConnector);
         }
 
         // Limit number of concurrent HTTP connections to avoid getting out of
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
index c3040df..0126d49 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
@@ -105,7 +105,10 @@ public class WebSocketProxyConfiguration implements 
PulsarConfiguration {
     private String tlsTrustCertsFilePath = "";
     // Accept untrusted TLS certificate from client
     private boolean tlsAllowInsecureConnection = false;
-
+    // Specify whether Client certificates are required for TLS
+    // Reject the Connection if the Client Certificate is not trusted.
+    private boolean tlsRequireTrustedClientCertOnConnect = false;
+    
     private Properties properties = new Properties();
 
     public String getClusterName() {
@@ -340,4 +343,11 @@ public class WebSocketProxyConfiguration implements 
PulsarConfiguration {
         this.properties = properties;
     }
 
+    public boolean getTlsRequireTrustedClientCertOnConnect() {
+        return tlsRequireTrustedClientCertOnConnect;
+    }
+
+    public void setTlsRequireTrustedClientCertOnConnect(boolean 
tlsRequireTrustedClientCertOnConnect) {
+        this.tlsRequireTrustedClientCertOnConnect = 
tlsRequireTrustedClientCertOnConnect;
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to