rdhabalia closed pull request #1208: Add hostname-verification at client tls 
connection
URL: https://github.com/apache/incubator-pulsar/pull/1208
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/all/src/assemble/LICENSE.bin.txt b/all/src/assemble/LICENSE.bin.txt
index 41f9000d6..a7e70defb 100644
--- a/all/src/assemble/LICENSE.bin.txt
+++ b/all/src/assemble/LICENSE.bin.txt
@@ -332,6 +332,8 @@ The Apache Software License, Version 2.0
  * Jetty - org.eclipse.jetty-*.jar
  * SnakeYaml -- org.yaml-snakeyaml-*.jar
  * RocksDB - org.rocksdb.*.jar
+ * HttpClient - org.apache.httpcomponents.httpclient.jar
+ * CommonsLogging - commons-logging-*.jar
 
 BSD 3-clause "New" or "Revised" License
  * EA Agent Loader -- com.ea.agentloader-*.jar -- 
licenses/LICENSE-EA-Agent-Loader.txt
diff --git a/pom.xml b/pom.xml
index 27ff691fb..7320661d5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -138,6 +138,18 @@ flexible messaging model and an intuitive client 
API.</description>
         </exclusions>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.httpcomponents</groupId>
+        <artifactId>httpclient</artifactId>
+        <version>4.5.5</version>
+        <exclusions>
+          <exclusion>
+            <groupId>*</groupId>
+            <artifactId>*</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      
       <dependency>
         <groupId>org.testng</groupId>
         <artifactId>testng</artifactId>
@@ -760,6 +772,7 @@ flexible messaging model and an intuitive client 
API.</description>
             <exclude>**/*.crt</exclude>
             <exclude>**/*.key</exclude>
             <exclude>**/*.csr</exclude>
+            <exclude>**/*.pem</exclude>
             <exclude>**/*.json</exclude>
             <exclude>**/*.htpasswd</exclude>
             <exclude>src/test/resources/athenz.conf.test</exclude>
diff --git a/pulsar-broker-shaded/pom.xml b/pulsar-broker-shaded/pom.xml
index fd3ff68de..bda3037eb 100644
--- a/pulsar-broker-shaded/pom.xml
+++ b/pulsar-broker-shaded/pom.xml
@@ -104,6 +104,8 @@
                   <include>org.aspectj:*</include>
                   <include>com.ea.agentloader:*</include>
                   <include>com.wordnik:swagger-annotations</include>
+                  <include>org.apache.httpcomponents:httpclient</include>
+                  <include>commons-logging:commons-logging</include>
                 </includes>
               </artifactSet>
               <filters>
@@ -298,6 +300,10 @@
                   <pattern>com.wordnik</pattern>
                   
<shadedPattern>org.apache.pulsar.shade.com.worknik</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>org.apache.http</pattern>
+                  
<shadedPattern>org.apache.pulsar.shade.org.apache.http</shadedPattern>
+                </relocation>
               </relocations>
             </configuration>
           </execution>
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index cd0415aab..31387696a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -19,8 +19,11 @@
 package org.apache.pulsar.broker.service;
 
 import java.io.File;
+import java.security.cert.X509Certificate;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.impl.auth.AuthenticationDataTls;
 import org.apache.pulsar.common.api.ByteBufPair;
 import org.apache.pulsar.common.api.PulsarDecoder;
 
@@ -68,6 +71,17 @@ protected void initChannel(SocketChannel ch) throws 
Exception {
                     builder.trustManager(trustCertCollection);
                 }
             }
+            
+            ServiceConfiguration config = 
brokerService.pulsar().getConfiguration();
+            String certFilePath = config.getTlsCertificateFilePath();
+            String keyFilePath = config.getTlsKeyFilePath();
+            if (StringUtils.isNotBlank(certFilePath) && 
StringUtils.isNotBlank(keyFilePath)) {
+                AuthenticationDataTls authTlsData = new 
AuthenticationDataTls(certFilePath, keyFilePath);
+                builder.keyManager(authTlsData.getTlsPrivateKey(),
+                        (X509Certificate[]) authTlsData.getTlsCertificates());
+            }
+            
+            
             SslContext sslCtx = 
builder.clientAuth(ClientAuth.OPTIONAL).build();
             ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
new file mode 100644
index 000000000..5ccfc142b
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.mockito.Mockito.spy;
+
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.conn.ssl.DefaultHostnameVerifier;
+import org.apache.http.conn.util.PublicSuffixMatcher;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderBasic;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class AuthenticationTlsHostnameVerificationTest extends 
ProducerConsumerBase {
+    private static final Logger log = 
LoggerFactory.getLogger(AuthenticationTlsHostnameVerificationTest.class);
+
+    // Man in middle certificate which tries to act as a broker by sending its 
own valid certificate
+    private final String TLS_MIM_TRUST_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/hn-verification/cacert.pem";
+    private final String TLS_MIM_SERVER_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/hn-verification/broker-cert.pem";
+    private final String TLS_MIM_SERVER_KEY_FILE_PATH = 
"./src/test/resources/authentication/tls/hn-verification/broker-key.pem";
+
+    private final String TLS_TRUST_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/cacert.pem";
+    private final String TLS_SERVER_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/broker-cert.pem";
+    private final String TLS_SERVER_KEY_FILE_PATH = 
"./src/test/resources/authentication/tls/broker-key.pem";
+
+    private final String TLS_CLIENT_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/client-cert.pem";
+    private final String TLS_CLIENT_KEY_FILE_PATH = 
"./src/test/resources/authentication/tls/client-key.pem";
+
+    private final String BASIC_CONF_FILE_PATH = 
"./src/test/resources/authentication/basic/.htpasswd";
+
+    private final static String brokerHostName = "localhost";
+    private boolean hostnameVerificationEnabled = true;
+
+    protected void setup() throws Exception {
+        if (methodName.equals("testAnonymousSyncProducerAndConsumer")) {
+            conf.setAnonymousUserRole("anonymousUser");
+        }
+
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(true);
+
+        conf.setTlsEnabled(true);
+        conf.setTlsAllowInsecureConnection(true);
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("localhost");
+        superUserRoles.add("superUser");
+        superUserRoles.add("superUser2");
+        conf.setSuperUserRoles(superUserRoles);
+
+        
conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
+        conf.setBrokerClientAuthenticationParameters(
+                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + 
"tlsKeyFile:" + TLS_SERVER_KEY_FILE_PATH);
+
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderTls.class.getName());
+        providers.add(AuthenticationProviderBasic.class.getName());
+        System.setProperty("pulsar.auth.basic.conf", BASIC_CONF_FILE_PATH);
+        conf.setAuthenticationProviders(providers);
+
+        conf.setClusterName("use");
+
+        super.init();
+
+        setupClient();
+    }
+
+    protected void setupClient() throws Exception {
+
+        Map<String, String> authParams = new HashMap<>();
+        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+        Authentication authTls = new AuthenticationTls();
+        authTls.configure(authParams);
+
+        org.apache.pulsar.client.api.ClientConfiguration clientConf = new 
org.apache.pulsar.client.api.ClientConfiguration();
+        clientConf.setStatsInterval(0, TimeUnit.SECONDS);
+        clientConf.setTlsTrustCertsFilePath(TLS_MIM_TRUST_CERT_FILE_PATH);
+        clientConf.setTlsAllowInsecureConnection(true);
+        clientConf.setAuthentication(authTls);
+        clientConf.setUseTls(true);
+        
clientConf.setTlsHostnameVerificationEnable(hostnameVerificationEnabled);
+
+        admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
+        String lookupUrl;
+        lookupUrl = new URI("pulsar+ssl://" + brokerHostName + ":" + 
BROKER_PORT_TLS).toString();
+        pulsarClient = PulsarClient.create(lookupUrl, clientConf);
+
+        admin.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), 
Sets.newHashSet("use")));
+        admin.namespaces().createNamespace("my-property/use/my-ns");
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        if (!methodName.equals("testDefaultHostVerifier")) {
+            super.internalCleanup();
+        }
+    }
+
+    @DataProvider(name = "hostnameVerification")
+    public Object[][] codecProvider() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+
+    /**
+     * It verifies that client performs host-verification in order to create 
producer/consumer.
+     * 
+     * <pre>
+     * 1. Client tries to connect to broker with hostname="localhost"
+     * 2. Broker sends x509 certificates with CN = "pulsar"
+     * 3. Client verifies the host-name and closes the connection and fails 
consumer creation
+     * </pre>
+     * 
+     * @throws Exception
+     */
+    @Test(dataProvider = "hostnameVerification")
+    public void testTlsSyncProducerAndConsumerWithInvalidBrokerHost(boolean 
hostnameVerificationEnabled)
+            throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        this.hostnameVerificationEnabled = hostnameVerificationEnabled;
+        // setup broker cert which has CN = "pulsar" different than broker's 
hostname="localhost"
+        conf.setTlsTrustCertsFilePath(TLS_MIM_TRUST_CERT_FILE_PATH);
+        conf.setTlsCertificateFilePath(TLS_MIM_SERVER_CERT_FILE_PATH);
+        conf.setTlsKeyFilePath(TLS_MIM_SERVER_KEY_FILE_PATH);
+        conf.setBrokerClientAuthenticationParameters(
+                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + 
"tlsKeyFile:" + TLS_MIM_SERVER_KEY_FILE_PATH);
+
+        setup();
+
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        try {
+            Consumer consumer = 
pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic",
+                    "my-subscriber-name", conf);
+            if (hostnameVerificationEnabled) {
+                Assert.fail("Connection should be failed due to 
hostnameVerification enabled");
+            }
+        } catch (PulsarClientException e) {
+            if (!hostnameVerificationEnabled) {
+                Assert.fail("Consumer should be created because 
hostnameverification is disabled");
+            }
+        }
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    /**
+     * It verifies that client performs host-verification in order to create 
producer/consumer.
+     * 
+     * <pre>
+     * 1. Client tries to connect to broker with hostname="localhost"
+     * 2. Broker sends x509 certificates with CN = "localhost"
+     * 3. Client verifies the host-name and continues
+     * </pre>
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testTlsSyncProducerAndConsumerCorrectBrokerHost() throws 
Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        // setup broker cert which has CN = "localhost"
+        conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+        conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+
+        setup();
+
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        Consumer consumer = 
pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic", 
"my-subscriber-name",
+                conf);
+
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+
+        Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic", 
producerConf);
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        Message msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    /**
+     * This test verifies {@link DefaultHostnameVerifier} behavior and gives 
fair idea about host matching result
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testDefaultHostVerifier() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        Method matchIdentityStrict = 
DefaultHostnameVerifier.class.getDeclaredMethod("matchIdentityStrict",
+                String.class, String.class, PublicSuffixMatcher.class);
+        matchIdentityStrict.setAccessible(true);
+        Assert.assertTrue((boolean) matchIdentityStrict.invoke(null, "pulsar", 
"pulsar", null));
+        Assert.assertFalse((boolean) matchIdentityStrict.invoke(null, 
"pulsar.com", "pulsar", null));
+        Assert.assertTrue((boolean) matchIdentityStrict.invoke(null, 
"pulsar-broker1.com", "pulsar*.com", null));
+        // unmatched remainder: "1-broker." should not contain "."
+        Assert.assertFalse((boolean) matchIdentityStrict.invoke(null, 
"pulsar-broker1.com", "pulsar*com", null));
+        Assert.assertFalse((boolean) matchIdentityStrict.invoke(null, 
"pulsar.com", "*", null));
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/resources/authentication/tls/hn-verification/broker-cert.pem
 
b/pulsar-broker/src/test/resources/authentication/tls/hn-verification/broker-cert.pem
new file mode 100644
index 000000000..ac9d51be7
--- /dev/null
+++ 
b/pulsar-broker/src/test/resources/authentication/tls/hn-verification/broker-cert.pem
@@ -0,0 +1,82 @@
+Certificate:
+    Data:
+        Version: 3 (0x2)
+        Serial Number:
+            d8:99:d5:ce:27:f5:be:50
+    Signature Algorithm: sha256WithRSAEncryption
+        Issuer: C=AU, ST=Some-State, O=Internet Widgits Pty Ltd, CN=testCA
+        Validity
+            Not Before: Feb  9 01:11:41 2018 GMT
+            Not After : Feb  9 01:11:41 2019 GMT
+        Subject: C=AU, ST=Some-State, O=Internet Widgits Pty Ltd, 
CN=pulsar*.apache.com
+        Subject Public Key Info:
+            Public Key Algorithm: rsaEncryption
+                Public-Key: (2048 bit)
+                Modulus:
+                    00:e8:bb:b6:87:37:6b:68:44:c9:d6:01:ba:a5:93:
+                    e4:5f:b1:0e:64:23:a9:7b:bd:c1:a6:a8:b8:b9:2c:
+                    c9:73:57:5a:41:89:db:01:64:30:06:dc:5b:4e:01:
+                    d3:02:73:86:d1:f9:c2:a2:5f:8c:c1:4c:00:bc:b1:
+                    bd:67:18:f6:88:ee:b6:72:be:37:18:2f:5d:c2:a1:
+                    30:20:02:38:2b:5e:a9:50:f2:c4:f7:23:74:ef:ad:
+                    4e:b1:25:f7:49:5e:8d:98:cd:2d:71:88:2c:73:df:
+                    eb:5c:2e:f0:5e:e6:15:1e:82:1e:94:33:15:f5:7b:
+                    65:9e:b2:78:89:7a:7f:b7:c1:6a:a3:a9:34:3c:96:
+                    32:2a:26:1d:67:d1:0a:80:1f:7c:95:34:c6:fb:ea:
+                    11:1c:53:86:81:04:bb:90:45:2b:4f:99:9c:72:f5:
+                    ec:86:4b:2f:7e:c3:65:6c:ac:e0:74:5f:35:4e:ee:
+                    3f:d0:82:2b:20:bb:80:65:3f:fe:78:96:42:19:35:
+                    e1:46:bd:d9:4e:b7:b8:95:5f:25:6b:a6:f2:e3:87:
+                    13:d3:29:11:c5:a2:84:bb:12:81:ea:15:60:2f:16:
+                    7e:f9:86:bc:e3:93:ed:d7:ec:5a:34:ae:4c:cd:00:
+                    40:dc:c6:e7:f6:19:ed:63:7f:8f:d0:dd:c5:11:9d:
+                    95:2d
+                Exponent: 65537 (0x10001)
+        X509v3 extensions:
+            X509v3 Basic Constraints: 
+                CA:FALSE
+            Netscape Comment: 
+                OpenSSL Generated Certificate
+            X509v3 Subject Key Identifier: 
+                06:DC:92:77:64:D3:21:AB:08:F6:E4:0C:9A:47:3F:3A:8B:CB:E8:D8
+            X509v3 Authority Key Identifier: 
+                
keyid:62:6F:F8:A2:85:3C:5C:7E:94:CC:3E:89:D6:AC:4F:65:F2:2E:02:39
+
+    Signature Algorithm: sha256WithRSAEncryption
+         70:0b:e4:07:45:98:d3:17:02:2f:44:ec:aa:41:2e:39:57:5e:
+         8a:e0:21:77:59:39:1d:66:c2:10:ea:ae:73:8a:50:94:5e:ad:
+         05:56:aa:8a:2f:87:44:09:cb:50:2c:5a:44:d1:99:fe:ee:5c:
+         82:fb:db:d4:5c:bd:56:dd:e6:37:87:0a:64:2c:85:19:dc:2d:
+         d1:22:00:91:53:5d:4c:f2:1c:4f:61:84:8e:77:e1:cc:9e:f8:
+         16:bb:15:b0:5a:f4:12:c7:b6:3b:28:cf:e3:95:9a:a8:68:ad:
+         02:7e:88:34:88:cd:31:d9:cd:17:8a:ef:5d:d5:40:c7:37:ca:
+         d0:38:35:46:d0:7d:f9:b6:85:f5:ef:9d:f3:05:9c:38:3f:67:
+         df:97:94:a8:81:5d:e3:70:ff:96:28:58:13:37:8a:3f:2a:b9:
+         6a:2a:c6:aa:89:16:91:9a:e7:9c:f3:72:36:74:de:46:7f:4f:
+         26:56:6e:05:47:99:ee:38:26:13:77:16:f5:07:cd:f1:69:6e:
+         08:c8:3b:ef:35:96:b3:b1:8e:87:eb:bd:da:02:b8:40:aa:e8:
+         16:11:80:98:81:77:5a:97:41:58:bd:01:50:4c:6c:c4:14:43:
+         d4:ac:c7:25:8b:df:a4:94:f5:29:12:72:56:8c:25:94:d8:8f:
+         c1:fa:4b:59
+-----BEGIN CERTIFICATE-----
+MIIDtjCCAp6gAwIBAgIJANiZ1c4n9b5QMA0GCSqGSIb3DQEBCwUAMFYxCzAJBgNV
+BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX
+aWRnaXRzIFB0eSBMdGQxDzANBgNVBAMMBnRlc3RDQTAeFw0xODAyMDkwMTExNDFa
+Fw0xOTAyMDkwMTExNDFaMGIxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0
+YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxGzAZBgNVBAMM
+EnB1bHNhciouYXBhY2hlLmNvbTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC
+ggEBAOi7toc3a2hEydYBuqWT5F+xDmQjqXu9waaouLksyXNXWkGJ2wFkMAbcW04B
+0wJzhtH5wqJfjMFMALyxvWcY9ojutnK+NxgvXcKhMCACOCteqVDyxPcjdO+tTrEl
+90lejZjNLXGILHPf61wu8F7mFR6CHpQzFfV7ZZ6yeIl6f7fBaqOpNDyWMiomHWfR
+CoAffJU0xvvqERxThoEEu5BFK0+ZnHL17IZLL37DZWys4HRfNU7uP9CCKyC7gGU/
+/niWQhk14Ua92U63uJVfJWum8uOHE9MpEcWihLsSgeoVYC8WfvmGvOOT7dfsWjSu
+TM0AQNzG5/YZ7WN/j9DdxRGdlS0CAwEAAaN7MHkwCQYDVR0TBAIwADAsBglghkgB
+hvhCAQ0EHxYdT3BlblNTTCBHZW5lcmF0ZWQgQ2VydGlmaWNhdGUwHQYDVR0OBBYE
+FAbckndk0yGrCPbkDJpHPzqLy+jYMB8GA1UdIwQYMBaAFGJv+KKFPFx+lMw+idas
+T2XyLgI5MA0GCSqGSIb3DQEBCwUAA4IBAQBwC+QHRZjTFwIvROyqQS45V16K4CF3
+WTkdZsIQ6q5zilCUXq0FVqqKL4dECctQLFpE0Zn+7lyC+9vUXL1W3eY3hwpkLIUZ
+3C3RIgCRU11M8hxPYYSOd+HMnvgWuxWwWvQSx7Y7KM/jlZqoaK0Cfog0iM0x2c0X
+iu9d1UDHN8rQODVG0H35toX1753zBZw4P2ffl5SogV3jcP+WKFgTN4o/KrlqKsaq
+iRaRmuec83I2dN5Gf08mVm4FR5nuOCYTdxb1B83xaW4IyDvvNZazsY6H673aArhA
+qugWEYCYgXdal0FYvQFQTGzEFEPUrMcli9+klPUpEnJWjCWU2I/B+ktZ
+-----END CERTIFICATE-----
diff --git 
a/pulsar-broker/src/test/resources/authentication/tls/hn-verification/broker-key.pem
 
b/pulsar-broker/src/test/resources/authentication/tls/hn-verification/broker-key.pem
new file mode 100644
index 000000000..b6bde087f
--- /dev/null
+++ 
b/pulsar-broker/src/test/resources/authentication/tls/hn-verification/broker-key.pem
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDou7aHN2toRMnW
+Abqlk+RfsQ5kI6l7vcGmqLi5LMlzV1pBidsBZDAG3FtOAdMCc4bR+cKiX4zBTAC8
+sb1nGPaI7rZyvjcYL13CoTAgAjgrXqlQ8sT3I3TvrU6xJfdJXo2YzS1xiCxz3+tc
+LvBe5hUegh6UMxX1e2WesniJen+3wWqjqTQ8ljIqJh1n0QqAH3yVNMb76hEcU4aB
+BLuQRStPmZxy9eyGSy9+w2VsrOB0XzVO7j/Qgisgu4BlP/54lkIZNeFGvdlOt7iV
+XyVrpvLjhxPTKRHFooS7EoHqFWAvFn75hrzjk+3X7Fo0rkzNAEDcxuf2Ge1jf4/Q
+3cURnZUtAgMBAAECggEBAKUj5V3HBlDDVtCjA3TQHyGDeim2YGGsgQen+wNyczOD
+zUhp8FvpYmbL34HXq4m2vfiql+AtmqviKTe7iyDnxq/datq6fE+N9KLRS1u7F242
+yj/lM7wFjckwGYF75h9Kl4DQPimsLZa/Ubtkly1PZ7bxL4+LPE6nE7FrBDrREGUq
+39bUGmMPXzLRxVSUdmLQIUsgLtuAOVfQB5qZ75zIUMmBhPhNhDgUv35cLxmgj5J8
+GPJxG21BBm88UYA+dhPLTAk+k3rLVKeZfXV75U0Zt04JHthhnFZ+/mJk8AD6c+jZ
+d2M1TdRSMkyTgd0DpN/bQiBvs+MK6dSkDJvYQOVGfQUCgYEA+7C1fNRQgeyJh5HJ
+waRr+9oKBLk1bTq5KaiMFF0SQo0rp5AShjG3ucTiKOBleUkiig/CpLH3CvToapq6
+uh8xLZm8Fz1AIwQ/qjRlVeNzNPCrstRk/BYgmQREr7kDH7RzvynJZYKdwpaJA3+4
+ICK/ES2FGcgNZahnm5brrCc/gxMCgYEA7LfnzWj0x5vCOlGSwo/LjFb9UgreJLQ9
+U1W/ACg9H5cp81AVTMRr9UsZOyaWJrdCTyfiQJOEZQ3YdwjBSr6f5vOxwqF68Mmi
+WG1PhP/kZsGI/cwlEA2odkoy/BGfxSMrfiCaxQNovG35agbRiJ5Awci2lOViPnvF
+HPKUULHpTr8CgYBbykVWAiReTcKWc5/OBEXxcsJmmJkYfesbe0GjB5JqPQvnr05i
+LG2hzWDhoXzAb+Ct0zOcVt8O2uSMRGPHDysjQ0bqfscOPjVtwHAYk7vnWcJ0lKtD
+mFpJE9ps759pB6mS1Q2C/NDGL5pGcWTYK3PdMumwzlm8cl9eyfqnLSUniwKBgQCO
+drfpJat7nkAsfP+IXKYyFgBrKeM7z8XAq7BB1fXDV2SF7MKE6wnWHJZYxQZE0rHz
+lZtTJfTeJJEMQpah90ug4TUwX6Lv20n7Uf4zmxXIyd06cWw01yN13X4Fuk2fhWUd
+iV3cCAs2rDEZIHVmdWefuL45qjuQQ0kD/PJKBmjVXQKBgQC2kaXVskAqZJwyfn5r
+g2hoRxjgv58UGyTsVwiwkSfoYQGdw1otO2zuyYbZZxGttMo1HkKTXFUNDELpiFXb
+5GcfT6xxssEH8zvh30M8rS0nF0AkMGZhxJxPdBnh5enwNg6glStcMY6ZaEDVz34k
+MAr7/FNPcrJt/EgvQ7PYj0/HVg==
+-----END PRIVATE KEY-----
diff --git 
a/pulsar-broker/src/test/resources/authentication/tls/hn-verification/cacert.pem
 
b/pulsar-broker/src/test/resources/authentication/tls/hn-verification/cacert.pem
new file mode 100644
index 000000000..4c98286c9
--- /dev/null
+++ 
b/pulsar-broker/src/test/resources/authentication/tls/hn-verification/cacert.pem
@@ -0,0 +1,79 @@
+Certificate:
+    Data:
+        Version: 3 (0x2)
+        Serial Number:
+            d8:99:d5:ce:27:f5:be:4f
+    Signature Algorithm: sha256WithRSAEncryption
+        Issuer: C=AU, ST=Some-State, O=Internet Widgits Pty Ltd, CN=testCA
+        Validity
+            Not Before: Feb  9 01:11:04 2018 GMT
+            Not After : Feb  8 01:11:04 2021 GMT
+        Subject: C=AU, ST=Some-State, O=Internet Widgits Pty Ltd, CN=testCA
+        Subject Public Key Info:
+            Public Key Algorithm: rsaEncryption
+                Public-Key: (2048 bit)
+                Modulus:
+                    00:cc:50:cd:b6:68:b2:e0:5f:bd:a5:4a:5c:17:bc:
+                    d8:b9:43:e6:22:9a:8a:2e:1b:87:13:b6:ca:59:7e:
+                    d7:ee:50:fe:ef:bf:ae:4d:cc:26:70:b4:27:03:64:
+                    36:73:d5:fd:2e:08:37:b2:2d:36:26:c8:e3:d3:9e:
+                    d3:37:0d:56:fa:a9:78:55:db:09:b3:21:b7:ac:c8:
+                    12:35:16:21:ed:a8:5e:4a:a4:e3:11:a0:67:ae:4c:
+                    5b:a7:15:ff:72:b1:7a:77:2b:ea:bd:3c:89:5c:40:
+                    ae:58:4d:69:56:d6:d9:50:42:e7:d7:b1:58:cc:c8:
+                    2a:84:b0:16:7c:3a:82:38:46:78:cc:4b:8a:db:ac:
+                    cc:4c:e1:a8:c2:d4:8f:b0:d9:dc:79:f8:70:28:8a:
+                    76:4f:dc:b1:09:a2:15:65:33:de:2a:2f:8e:27:7a:
+                    0b:93:6b:66:4b:e2:53:33:97:a2:26:bf:f3:b2:8a:
+                    f2:6c:5c:41:5b:1a:bb:12:6c:2f:f3:14:35:c4:40:
+                    4b:16:65:64:72:16:bf:a3:d6:1b:4d:9b:e6:12:cb:
+                    0a:c7:a9:01:f8:63:2b:b7:22:7a:fd:ef:6b:03:9e:
+                    e5:06:87:1d:a5:d5:11:4c:11:ae:55:62:11:f5:57:
+                    7b:21:51:77:8e:b8:cf:2f:7d:86:d6:38:d3:af:28:
+                    bc:8d
+                Exponent: 65537 (0x10001)
+        X509v3 extensions:
+            X509v3 Subject Key Identifier: 
+                62:6F:F8:A2:85:3C:5C:7E:94:CC:3E:89:D6:AC:4F:65:F2:2E:02:39
+            X509v3 Authority Key Identifier: 
+                
keyid:62:6F:F8:A2:85:3C:5C:7E:94:CC:3E:89:D6:AC:4F:65:F2:2E:02:39
+
+            X509v3 Basic Constraints: 
+                CA:TRUE
+    Signature Algorithm: sha256WithRSAEncryption
+         01:5a:ff:b8:36:ff:0c:9c:12:cc:ad:b2:60:ac:3c:91:c1:04:
+         c0:6b:10:f6:e0:0b:1c:17:44:76:1b:5a:98:c5:33:a2:2c:c8:
+         bf:e7:f7:2b:b7:97:37:43:8c:e7:a4:77:5f:5d:48:f6:77:2d:
+         bb:e0:f9:02:9e:df:0b:71:63:fd:ff:63:f1:23:ec:ed:bc:ac:
+         ea:a8:52:60:a7:c8:b0:f9:f7:66:62:35:ab:72:32:9a:cf:7f:
+         cc:96:fe:3b:01:31:04:21:e9:da:76:d1:09:be:66:03:c8:14:
+         48:d0:ad:73:3a:16:98:72:d9:1e:98:57:9b:49:59:8b:9a:23:
+         a9:e6:66:e6:d0:bc:65:45:fa:eb:ce:5a:21:24:9c:15:99:b9:
+         f3:63:ef:0a:bb:68:4d:ee:2e:52:6a:a2:bc:77:79:be:36:b1:
+         b5:d8:01:c5:9b:37:b0:db:38:f0:0c:59:35:7f:0c:8b:bf:ec:
+         22:bc:dc:14:c8:01:31:4f:a1:0b:82:34:ba:0f:5b:93:2e:4c:
+         ee:20:72:31:30:b1:d9:2c:42:84:2a:4e:c5:ea:d8:af:f4:da:
+         dd:b5:c4:f2:b0:43:f1:c4:09:9f:3d:5e:44:9f:b3:52:9f:92:
+         fe:9d:e3:f4:5b:6f:38:7e:3a:11:5b:99:b8:22:fd:a7:72:5d:
+         40:7c:50:f8
+-----BEGIN CERTIFICATE-----
+MIIDfzCCAmegAwIBAgIJANiZ1c4n9b5PMA0GCSqGSIb3DQEBCwUAMFYxCzAJBgNV
+BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX
+aWRnaXRzIFB0eSBMdGQxDzANBgNVBAMMBnRlc3RDQTAeFw0xODAyMDkwMTExMDRa
+Fw0yMTAyMDgwMTExMDRaMFYxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0
+YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMM
+BnRlc3RDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMxQzbZosuBf
+vaVKXBe82LlD5iKaii4bhxO2yll+1+5Q/u+/rk3MJnC0JwNkNnPV/S4IN7ItNibI
+49Oe0zcNVvqpeFXbCbMht6zIEjUWIe2oXkqk4xGgZ65MW6cV/3Kxencr6r08iVxA
+rlhNaVbW2VBC59exWMzIKoSwFnw6gjhGeMxLituszEzhqMLUj7DZ3Hn4cCiKdk/c
+sQmiFWUz3iovjid6C5NrZkviUzOXoia/87KK8mxcQVsauxJsL/MUNcRASxZlZHIW
+v6PWG02b5hLLCsepAfhjK7ciev3vawOe5QaHHaXVEUwRrlViEfVXeyFRd464zy99
+htY4068ovI0CAwEAAaNQME4wHQYDVR0OBBYEFGJv+KKFPFx+lMw+idasT2XyLgI5
+MB8GA1UdIwQYMBaAFGJv+KKFPFx+lMw+idasT2XyLgI5MAwGA1UdEwQFMAMBAf8w
+DQYJKoZIhvcNAQELBQADggEBAAFa/7g2/wycEsytsmCsPJHBBMBrEPbgCxwXRHYb
+WpjFM6IsyL/n9yu3lzdDjOekd19dSPZ3Lbvg+QKe3wtxY/3/Y/Ej7O28rOqoUmCn
+yLD592ZiNatyMprPf8yW/jsBMQQh6dp20Qm+ZgPIFEjQrXM6Fphy2R6YV5tJWYua
+I6nmZubQvGVF+uvOWiEknBWZufNj7wq7aE3uLlJqorx3eb42sbXYAcWbN7DbOPAM
+WTV/DIu/7CK83BTIATFPoQuCNLoPW5MuTO4gcjEwsdksQoQqTsXq2K/02t21xPKw
+Q/HECZ89XkSfs1Kfkv6d4/Rbbzh+OhFbmbgi/adyXUB8UPg=
+-----END CERTIFICATE-----
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml 
b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
index 986264db0..7212c3936 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
@@ -125,6 +125,8 @@
                   <include>org.apache.pulsar:pulsar-checksum</include>
                   <include>net.jpountz.lz4:lz4</include>
                   <include>com.yahoo.datasketches:sketches-core</include>
+                  <include>org.apache.httpcomponents:httpclient</include>
+                  <include>commons-logging:commons-logging</include>
                 </includes>
               </artifactSet>
               <relocations>
@@ -195,6 +197,10 @@
                   <pattern>com.yahoo.sketches</pattern>
                   
<shadedPattern>org.apache.pulsar.shade.com.yahoo.sketches</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>org.apache.http</pattern>
+                  
<shadedPattern>org.apache.pulsar.shade.org.apache.http</shadedPattern>
+                </relocation>
               </relocations>
               <filters>
                 <filter>
diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml
index c3239cca8..3fbd95a1b 100644
--- a/pulsar-client-shaded/pom.xml
+++ b/pulsar-client-shaded/pom.xml
@@ -81,6 +81,8 @@
                   <include>org.apache.pulsar:pulsar-checksum</include>
                   <include>net.jpountz.lz4:lz4</include>
                   <include>com.yahoo.datasketches:sketches-core</include>
+                  <include>org.apache.httpcomponents:httpclient</include>
+                  <include>commons-logging:commons-logging</include>
                 </includes>
               </artifactSet>
               <filters>
@@ -146,6 +148,10 @@
                   <pattern>com.yahoo.sketches</pattern>
                   
<shadedPattern>org.apache.pulsar.shade.com.yahoo.sketches</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>org.apache.http</pattern>
+                  
<shadedPattern>org.apache.pulsar.shade.org.apache.http</shadedPattern>
+                </relocation>
               </relocations>
             </configuration>
           </execution>
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index 3c26a9e87..fcce6c0f5 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -74,6 +74,24 @@
       <groupId>com.google.code.gson</groupId>
       <artifactId>gson</artifactId>
     </dependency>
+    
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+         </exclusion>
+      </exclusions>
+    </dependency>
+    
+    <!-- httpclient uses it for logging --> 
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+      <version>1.1.1</version>
+    </dependency>
 
   </dependencies>
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
index dd62728f7..9e4aecec7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
@@ -51,6 +51,7 @@
     private boolean useTls = false;
     private String tlsTrustCertsFilePath = "";
     private boolean tlsAllowInsecureConnection = false;
+    private boolean tlsHostnameVerificationEnable = false;
     private int concurrentLookupRequest = 50000;
     private int maxNumberOfRejectedRequestPerConnection = 50;
 
@@ -356,4 +357,21 @@ public void setMaxNumberOfRejectedRequestPerConnection(int 
maxNumberOfRejectedRe
         this.maxNumberOfRejectedRequestPerConnection = 
maxNumberOfRejectedRequestPerConnection;
     }
 
+    public boolean isTlsHostnameVerificationEnable() {
+        return tlsHostnameVerificationEnable;
+    }
+
+    /**
+     * It allows to validate hostname verification when client connects to 
broker over tls. It validates incoming x509
+     * certificate and matches provided hostname(CN/SAN) with expected 
broker's host name. It follows RFC 2818, 3.1. Server
+     * Identity hostname verification.
+     * 
+     * @see <a href="https://tools.ietf.org/html/rfc2818";>rfc2818</a>
+     * 
+     * @param tlsHostnameVerificationEnable
+     */
+    public void setTlsHostnameVerificationEnable(boolean 
tlsHostnameVerificationEnable) {
+        this.tlsHostnameVerificationEnable = tlsHostnameVerificationEnable;
+    }
+    
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 3f2d176a7..38e96edd4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -29,11 +29,14 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
+import javax.net.ssl.SSLSession;
+
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
 import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarHandler;
@@ -51,16 +54,18 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
+import org.apache.http.conn.ssl.DefaultHostnameVerifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.unix.Errors.NativeIoException;
+import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Promise;
-import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
 
 public class ClientCnx extends PulsarHandler {
 
@@ -87,6 +92,10 @@
     private final long operationTimeoutMs;
 
     private String proxyToTargetBrokerAddress = null;
+    // Remote hostName with which client is connected
+    private String remoteHostName = null;
+    private boolean isTlsHostnameVerificationEnable;
+    private DefaultHostnameVerifier hostnameVerifier;
 
     enum State {
         None, SentConnectFrame, Ready, Failed
@@ -100,6 +109,8 @@ public ClientCnx(ClientConfiguration conf, EventLoopGroup 
eventLoopGroup) {
         this.maxNumberOfRejectedRequestPerConnection = 
conf.getMaxNumberOfRejectedRequestPerConnection();
         this.operationTimeoutMs = conf.getOperationTimeoutMs();
         this.state = State.None;
+        this.isTlsHostnameVerificationEnable = 
conf.isTlsHostnameVerificationEnable();
+        this.hostnameVerifier = new DefaultHostnameVerifier();
     }
 
     @Override
@@ -179,6 +190,14 @@ public static boolean isKnownException(Throwable t) {
 
     @Override
     protected void handleConnected(CommandConnected connected) {
+        
+        if (isTlsHostnameVerificationEnable && remoteHostName != null && 
!verifyTlsHostName(remoteHostName, ctx)) {
+            // close the connection if host-verification failed with the broker
+            log.warn("[{}] Failed to verify hostname of {}", ctx.channel(), 
remoteHostName);
+            ctx.close();
+            return;
+        }
+        
         checkArgument(state == State.SentConnectFrame);
 
         if (log.isDebugEnabled()) {
@@ -521,6 +540,35 @@ private void checkServerError(ServerError error, String 
errMsg) {
         }
     }
 
+    /**
+     * verifies host name provided in x509 Certificate in tls session
+     * 
+     * it matches hostname with below scenarios
+     * 
+     * <pre>
+     *  1. Supports IPV4 and IPV6 host matching
+     *  2. Supports wild card matching for DNS-name
+     *  eg:
+     *     HostName                     CN           Result
+     * 1.  localhost                    localhost    PASS
+     * 2.  localhost                    local*       PASS
+     * 3.  pulsar1-broker.com           pulsar*.com  PASS
+     * </pre>
+     * 
+     * @param ctx
+     * @return true if hostname is verified else return false
+     */
+    private boolean verifyTlsHostName(String hostname, ChannelHandlerContext 
ctx) {
+        ChannelHandler sslHandler = ctx.channel().pipeline().get("tls");
+
+        SSLSession sslSession = null;
+        if (sslHandler != null) {
+            sslSession = ((SslHandler) sslHandler).engine().getSession();
+            return hostnameVerifier.verify(hostname, sslSession);
+        }
+        return false;
+    }
+
     void registerConsumer(final long consumerId, final ConsumerImpl consumer) {
         consumers.put(consumerId, consumer);
     }
@@ -542,6 +590,10 @@ void setTargetBroker(InetSocketAddress 
targetBrokerAddress) {
                 targetBrokerAddress.getPort());
     }
 
+     void setRemoteHostName(String remoteHostName) {
+        this.remoteHostName = remoteHostName;
+    }
+    
     private PulsarClientException getPulsarClientException(ServerError error, 
String errorMsg) {
         switch (error) {
         case AuthenticationError:
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index ed3c1845a..f598abe70 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -194,6 +194,8 @@ public void initChannel(SocketChannel ch) throws Exception {
                 cnx.setTargetBroker(logicalAddress);
             }
 
+            cnx.setRemoteHostName(physicalAddress.getHostName());
+            
             cnx.connectionFuture().thenRun(() -> {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Connection handshake completed", 
cnx.channel());
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index 86ebd3741..e7b0be74b 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -19,7 +19,11 @@
 package org.apache.pulsar.proxy.server;
 
 import java.io.File;
+import java.security.cert.X509Certificate;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.impl.auth.AuthenticationDataTls;
 import org.apache.pulsar.common.api.PulsarDecoder;
 
 import io.netty.channel.ChannelInitializer;
@@ -58,7 +62,17 @@ protected void initChannel(SocketChannel ch) throws 
Exception {
             builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
             SslContext sslCtx = 
builder.clientAuth(ClientAuth.OPTIONAL).build();
             ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
+            
+            String certFilePath = serviceConfig.getTlsCertificateFilePath();
+            String keyFilePath = serviceConfig.getTlsKeyFilePath();
+            if (StringUtils.isNotBlank(certFilePath) && 
StringUtils.isNotBlank(keyFilePath)) {
+                AuthenticationDataTls authTlsData = new 
AuthenticationDataTls(certFilePath, keyFilePath);
+                builder.keyManager(authTlsData.getTlsPrivateKey(),
+                        (X509Certificate[]) authTlsData.getTlsCertificates());
+            }
+            
         }
+        
         ch.pipeline().addLast("frameDecoder", new 
LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
         ch.pipeline().addLast("handler", new ProxyConnection(proxyService));
     }
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationTest.java
index 558f5e0e8..1619e2564 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationTest.java
@@ -18,10 +18,8 @@
  */
 package org.apache.pulsar.proxy.server;
 
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -42,7 +40,6 @@
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.policies.data.AuthAction;
-import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
@@ -50,6 +47,7 @@
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 import org.testng.collections.Maps;
 
@@ -75,6 +73,11 @@
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
 
+    @DataProvider(name = "hostnameVerification")
+    public Object[][] codecProvider() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+    
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
@@ -161,7 +164,7 @@ public void textProxyAuthorization() throws Exception {
         createAdminClient();
         final String proxyServiceUrl = "pulsar://localhost:" + 
proxyConfig.getServicePortTls();
         // create a client which connects to proxy over tls and pass authData
-        PulsarClient proxyClient = createPulsarClient(proxyServiceUrl);
+        PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, false);
 
         String namespaceName = "my-property/proxy-authorization/my-ns";
         
@@ -205,6 +208,43 @@ public void textProxyAuthorization() throws Exception {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "hostnameVerification")
+    public void textProxyAuthorizationTlsHostVerification(boolean 
hostnameVerificationEnabled) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        createAdminClient();
+        final String proxyServiceUrl = "pulsar://localhost:" + 
proxyConfig.getServicePortTls();
+        // create a client which connects to proxy over tls and pass authData
+        PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, 
hostnameVerificationEnabled);
+
+        String namespaceName = "my-property/proxy-authorization/my-ns";
+
+        admin.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), 
Sets.newHashSet("proxy-authorization")));
+        admin.namespaces().createNamespace(namespaceName);
+
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy",
+                Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "Client",
+                Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        try {
+            Consumer consumer = 
proxyClient.subscribe("persistent://my-property/proxy-authorization/my-ns/my-topic1",
+                    "my-subscriber-name", conf);
+            if (hostnameVerificationEnabled) {
+                Assert.fail("Connection should be failed due to 
hostnameVerification enabled");
+            }
+        } catch (PulsarClientException e) {
+            if (!hostnameVerificationEnabled) {
+                Assert.fail("Consumer should be created because 
hostnameverification is disabled");
+            }
+        }
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+    
     protected final void createAdminClient() throws Exception {
         Map<String, String> authParams = Maps.newHashMap();
         authParams.put("tlsCertFile", TLS_SUPERUSER_CLIENT_CERT_FILE_PATH);
@@ -221,7 +261,7 @@ protected final void createAdminClient() throws Exception {
         admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
     }
     
-    private PulsarClient createPulsarClient(String proxyServiceUrl) throws 
PulsarClientException {
+    private PulsarClient createPulsarClient(String proxyServiceUrl, boolean 
hosnameVerificationEnabled) throws PulsarClientException {
         Map<String, String> authParams = Maps.newHashMap();
         authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
         authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
@@ -233,6 +273,7 @@ private PulsarClient createPulsarClient(String 
proxyServiceUrl) throws PulsarCli
         clientConf.setTlsAllowInsecureConnection(true);
         clientConf.setAuthentication(authTls);
         clientConf.setUseTls(true);
+        
clientConf.setTlsHostnameVerificationEnable(hosnameVerificationEnabled);
         return PulsarClient.create(proxyServiceUrl, clientConf);
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to