This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 2a92819f0bb HBASE-27280 Add mutual authentication support to TLS 
(#4796)
2a92819f0bb is described below

commit 2a92819f0bb8263565fc86e6862485d431c04f88
Author: Bryan Beaudreault <[email protected]>
AuthorDate: Fri Sep 23 08:36:20 2022 -0400

    HBASE-27280 Add mutual authentication support to TLS (#4796)
    
    Signed-off-by: Duo Zhang <[email protected]>
    Reviewed-by: Andor Molnár <[email protected]>
---
 .../hbase/io/crypto/tls/HBaseHostnameVerifier.java | 286 +++++++++++++++++
 .../hbase/io/crypto/tls/HBaseTrustManager.java     | 181 +++++++++++
 .../hadoop/hbase/io/crypto/tls/X509Util.java       | 111 +++++--
 .../io/crypto/tls/TestHBaseHostnameVerifier.java   | 252 +++++++++++++++
 .../hbase/io/crypto/tls/TestHBaseTrustManager.java | 355 +++++++++++++++++++++
 .../hadoop/hbase/io/crypto/tls/TestX509Util.java   |  57 +++-
 .../hbase/io/crypto/tls/X509TestContext.java       |  91 +++++-
 .../hbase/io/crypto/tls/X509TestHelpers.java       |  23 +-
 .../hbase/security/AbstractTestMutualTls.java      | 225 +++++++++++++
 .../hbase/security/TestMutualTlsClientSide.java    |  82 +++++
 .../hbase/security/TestMutualTlsServerSide.java    | 116 +++++++
 11 files changed, 1742 insertions(+), 37 deletions(-)

diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/HBaseHostnameVerifier.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/HBaseHostnameVerifier.java
new file mode 100644
index 00000000000..a703f5ff630
--- /dev/null
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/HBaseHostnameVerifier.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.crypto.tls;
+
+import java.net.InetAddress;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateParsingException;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Optional;
+import javax.naming.InvalidNameException;
+import javax.naming.NamingException;
+import javax.naming.directory.Attribute;
+import javax.naming.directory.Attributes;
+import javax.naming.ldap.LdapName;
+import javax.naming.ldap.Rdn;
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import javax.security.auth.x500.X500Principal;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses;
+
+/**
+ * When enabled in {@link X509Util}, handles verifying that the hostname of a 
peer matches the
+ * certificate it presents.
+ * <p/>
+ * This file has been copied from the Apache ZooKeeper project.
+ * @see <a href=
+ *      
"https://github.com/apache/zookeeper/blob/5820d10d9dc58c8e12d2e25386fdf92acb360359/zookeeper-server/src/main/java/org/apache/zookeeper/common/ZKHostnameVerifier.java";>Base
+ *      revision</a>
+ */
[email protected]
+final class HBaseHostnameVerifier implements HostnameVerifier {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HBaseHostnameVerifier.class);
+
+  /**
+   * Note: copied from Apache httpclient with some minor modifications. We 
want host verification,
+   * but depending on the httpclient jar caused unexplained performance 
regressions (even when the
+   * code was not used).
+   */
+  private static final class SubjectName {
+
+    static final int DNS = 2;
+    static final int IP = 7;
+
+    private final String value;
+    private final int type;
+
+    SubjectName(final String value, final int type) {
+      if (type != DNS && type != IP) {
+        throw new IllegalArgumentException("Invalid type: " + type);
+      }
+      this.value = Objects.requireNonNull(value);
+      this.type = type;
+    }
+
+    public int getType() {
+      return type;
+    }
+
+    public String getValue() {
+      return value;
+    }
+
+    @Override
+    public String toString() {
+      return value;
+    }
+
+  }
+
+  @Override
+  public boolean verify(final String host, final SSLSession session) {
+    try {
+      final Certificate[] certs = session.getPeerCertificates();
+      final X509Certificate x509 = (X509Certificate) certs[0];
+      verify(host, x509);
+      return true;
+    } catch (final SSLException ex) {
+      LOG.debug("Unexpected exception", ex);
+      return false;
+    }
+  }
+
+  void verify(final String host, final X509Certificate cert) throws 
SSLException {
+    final List<SubjectName> subjectAlts = getSubjectAltNames(cert);
+    if (subjectAlts != null && !subjectAlts.isEmpty()) {
+      Optional<InetAddress> inetAddress = parseIpAddress(host);
+      if (inetAddress.isPresent()) {
+        matchIPAddress(host, inetAddress.get(), subjectAlts);
+      } else {
+        matchDNSName(host, subjectAlts);
+      }
+    } else {
+      // CN matching has been deprecated by rfc2818 and can be used
+      // as fallback only when no subjectAlts are available
+      final X500Principal subjectPrincipal = cert.getSubjectX500Principal();
+      final String cn = 
extractCN(subjectPrincipal.getName(X500Principal.RFC2253));
+      if (cn == null) {
+        throw new SSLException("Certificate subject for <" + host + "> doesn't 
contain "
+          + "a common name and does not have alternative names");
+      }
+      matchCN(host, cn);
+    }
+  }
+
+  private static void matchIPAddress(final String host, final InetAddress 
inetAddress,
+    final List<SubjectName> subjectAlts) throws SSLException {
+    for (final SubjectName subjectAlt : subjectAlts) {
+      if (subjectAlt.getType() == SubjectName.IP) {
+        Optional<InetAddress> parsed = parseIpAddress(subjectAlt.getValue());
+        if (parsed.filter(altAddr -> altAddr.equals(inetAddress)).isPresent()) 
{
+          return;
+        }
+      }
+    }
+    throw new SSLPeerUnverifiedException("Certificate for <" + host + "> 
doesn't match any "
+      + "of the subject alternative names: " + subjectAlts);
+  }
+
+  private static void matchDNSName(final String host, final List<SubjectName> 
subjectAlts)
+    throws SSLException {
+    final String normalizedHost = host.toLowerCase(Locale.ROOT);
+    for (final SubjectName subjectAlt : subjectAlts) {
+      if (subjectAlt.getType() == SubjectName.DNS) {
+        final String normalizedSubjectAlt = 
subjectAlt.getValue().toLowerCase(Locale.ROOT);
+        if (matchIdentityStrict(normalizedHost, normalizedSubjectAlt)) {
+          return;
+        }
+      }
+    }
+    throw new SSLPeerUnverifiedException("Certificate for <" + host + "> 
doesn't match any "
+      + "of the subject alternative names: " + subjectAlts);
+  }
+
+  private static void matchCN(final String host, final String cn) throws 
SSLException {
+    final String normalizedHost = host.toLowerCase(Locale.ROOT);
+    final String normalizedCn = cn.toLowerCase(Locale.ROOT);
+    if (!matchIdentityStrict(normalizedHost, normalizedCn)) {
+      throw new SSLPeerUnverifiedException("Certificate for <" + host + "> 
doesn't match "
+        + "common name of the certificate subject: " + cn);
+    }
+  }
+
+  private static boolean matchIdentity(final String host, final String 
identity,
+    final boolean strict) {
+    // RFC 2818, 3.1. Server Identity
+    // "...Names may contain the wildcard
+    // character * which is considered to match any single domain name
+    // component or component fragment..."
+    // Based on this statement presuming only singular wildcard is legal
+    final int asteriskIdx = identity.indexOf('*');
+    if (asteriskIdx != -1) {
+      final String prefix = identity.substring(0, asteriskIdx);
+      final String suffix = identity.substring(asteriskIdx + 1);
+      if (!prefix.isEmpty() && !host.startsWith(prefix)) {
+        return false;
+      }
+      if (!suffix.isEmpty() && !host.endsWith(suffix)) {
+        return false;
+      }
+      // Additional sanity checks on content selected by wildcard can be done 
here
+      if (strict) {
+        final String remainder = host.substring(prefix.length(), host.length() 
- suffix.length());
+        return !remainder.contains(".");
+      }
+      return true;
+    }
+    return host.equalsIgnoreCase(identity);
+  }
+
+  private static boolean matchIdentityStrict(final String host, final String 
identity) {
+    return matchIdentity(host, identity, true);
+  }
+
+  private static String extractCN(final String subjectPrincipal) throws 
SSLException {
+    if (subjectPrincipal == null) {
+      return null;
+    }
+    try {
+      final LdapName subjectDN = new LdapName(subjectPrincipal);
+      final List<Rdn> rdns = subjectDN.getRdns();
+      for (int i = rdns.size() - 1; i >= 0; i--) {
+        final Rdn rds = rdns.get(i);
+        final Attributes attributes = rds.toAttributes();
+        final Attribute cn = attributes.get("cn");
+        if (cn != null) {
+          try {
+            final Object value = cn.get();
+            if (value != null) {
+              return value.toString();
+            }
+          } catch (final NoSuchElementException ignore) {
+            // ignore exception
+          } catch (final NamingException ignore) {
+            // ignore exception
+          }
+        }
+      }
+      return null;
+    } catch (final InvalidNameException e) {
+      throw new SSLException(subjectPrincipal + " is not a valid X500 
distinguished name");
+    }
+  }
+
+  private static Optional<InetAddress> parseIpAddress(String host) {
+    host = host.trim();
+    // Uri strings only work for ipv6 and are wrapped with brackets
+    // Unfortunately InetAddresses can't handle a mixed input, so we
+    // check here and choose which parse method to use.
+    if (host.startsWith("[") && host.endsWith("]")) {
+      return parseIpAddressUriString(host);
+    } else {
+      return parseIpAddressString(host);
+    }
+  }
+
+  private static Optional<InetAddress> parseIpAddressUriString(String host) {
+    if (InetAddresses.isUriInetAddress(host)) {
+      return Optional.of(InetAddresses.forUriString(host));
+    }
+    return Optional.empty();
+  }
+
+  private static Optional<InetAddress> parseIpAddressString(String host) {
+    if (InetAddresses.isInetAddress(host)) {
+      return Optional.of(InetAddresses.forString(host));
+    }
+    return Optional.empty();
+  }
+
+  @SuppressWarnings("MixedMutabilityReturnType")
+  private static List<SubjectName> getSubjectAltNames(final X509Certificate 
cert) {
+    try {
+      final Collection<List<?>> entries = cert.getSubjectAlternativeNames();
+      if (entries == null) {
+        return Collections.emptyList();
+      }
+      final List<SubjectName> result = new ArrayList<SubjectName>();
+      for (List<?> entry : entries) {
+        final Integer type = entry.size() >= 2 ? (Integer) entry.get(0) : null;
+        if (type != null) {
+          if (type == SubjectName.DNS || type == SubjectName.IP) {
+            final Object o = entry.get(1);
+            if (o instanceof String) {
+              result.add(new SubjectName((String) o, type));
+            } else {
+              LOG.debug("non-string Subject Alt Name type detected, not 
currently supported: {}",
+                o);
+            }
+          }
+        }
+      }
+      return result;
+    } catch (final CertificateParsingException ignore) {
+      return Collections.emptyList();
+    }
+  }
+}
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/HBaseTrustManager.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/HBaseTrustManager.java
new file mode 100644
index 00000000000..ca4756a6131
--- /dev/null
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/HBaseTrustManager.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.crypto.tls;
+
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.X509ExtendedTrustManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A custom TrustManager that supports hostname verification We attempt to 
perform verification
+ * using just the IP address first and if that fails will attempt to perform a 
reverse DNS lookup
+ * and verify using the hostname. This file has been copied from the Apache 
ZooKeeper project.
+ * @see <a href=
+ *      
"https://github.com/apache/zookeeper/blob/c74658d398cdc1d207aa296cb6e20de00faec03e/zookeeper-server/src/main/java/org/apache/zookeeper/common/ZKTrustManager.java";>Base
+ *      revision</a>
+ */
[email protected]
+public class HBaseTrustManager extends X509ExtendedTrustManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTrustManager.class);
+
+  private final X509ExtendedTrustManager x509ExtendedTrustManager;
+  private final boolean hostnameVerificationEnabled;
+  private final boolean allowReverseDnsLookup;
+
+  private final HBaseHostnameVerifier hostnameVerifier;
+
+  /**
+   * Instantiate a new HBaseTrustManager.
+   * @param x509ExtendedTrustManager    The trustmanager to use for
+   *                                    checkClientTrusted/checkServerTrusted 
logic
+   * @param hostnameVerificationEnabled If true, this TrustManager should 
verify hostnames of peers
+   *                                    when checking trust.
+   * @param allowReverseDnsLookup       If true, we will fall back on reverse 
dns if resolving of
+   *                                    host fails
+   */
+  HBaseTrustManager(X509ExtendedTrustManager x509ExtendedTrustManager,
+    boolean hostnameVerificationEnabled, boolean allowReverseDnsLookup) {
+    this.x509ExtendedTrustManager = x509ExtendedTrustManager;
+    this.hostnameVerificationEnabled = hostnameVerificationEnabled;
+    this.allowReverseDnsLookup = allowReverseDnsLookup;
+    this.hostnameVerifier = new HBaseHostnameVerifier();
+  }
+
+  @Override
+  public X509Certificate[] getAcceptedIssuers() {
+    return x509ExtendedTrustManager.getAcceptedIssuers();
+  }
+
+  @Override
+  public void checkClientTrusted(X509Certificate[] chain, String authType, 
Socket socket)
+    throws CertificateException {
+    x509ExtendedTrustManager.checkClientTrusted(chain, authType, socket);
+    if (hostnameVerificationEnabled) {
+      performHostVerification(socket.getInetAddress(), chain[0]);
+    }
+  }
+
+  @Override
+  public void checkServerTrusted(X509Certificate[] chain, String authType, 
Socket socket)
+    throws CertificateException {
+    x509ExtendedTrustManager.checkServerTrusted(chain, authType, socket);
+    if (hostnameVerificationEnabled) {
+      performHostVerification(socket.getInetAddress(), chain[0]);
+    }
+  }
+
+  @Override
+  public void checkClientTrusted(X509Certificate[] chain, String authType, 
SSLEngine engine)
+    throws CertificateException {
+    x509ExtendedTrustManager.checkClientTrusted(chain, authType, engine);
+    if (hostnameVerificationEnabled) {
+      try {
+        performHostVerification(InetAddress.getByName(engine.getPeerHost()), 
chain[0]);
+      } catch (UnknownHostException e) {
+        throw new CertificateException("Failed to verify host", e);
+      }
+    }
+  }
+
+  @Override
+  public void checkServerTrusted(X509Certificate[] chain, String authType, 
SSLEngine engine)
+    throws CertificateException {
+    x509ExtendedTrustManager.checkServerTrusted(chain, authType, engine);
+    if (hostnameVerificationEnabled) {
+      try {
+        performHostVerification(InetAddress.getByName(engine.getPeerHost()), 
chain[0]);
+      } catch (UnknownHostException e) {
+        throw new CertificateException("Failed to verify host", e);
+      }
+    }
+  }
+
+  @Override
+  public void checkClientTrusted(X509Certificate[] chain, String authType)
+    throws CertificateException {
+    x509ExtendedTrustManager.checkClientTrusted(chain, authType);
+  }
+
+  @Override
+  public void checkServerTrusted(X509Certificate[] chain, String authType)
+    throws CertificateException {
+    x509ExtendedTrustManager.checkServerTrusted(chain, authType);
+  }
+
+  /**
+   * Compares peer's hostname with the one stored in the provided client 
certificate. Performs
+   * verification with the help of provided HostnameVerifier.
+   * @param inetAddress Peer's inet address.
+   * @param certificate Peer's certificate
+   * @throws CertificateException Thrown if the provided certificate doesn't 
match the peer
+   *                              hostname.
+   */
+  private void performHostVerification(InetAddress inetAddress, 
X509Certificate certificate)
+    throws CertificateException {
+    String hostAddress = "";
+    String hostName = "";
+    try {
+      hostAddress = inetAddress.getHostAddress();
+      hostnameVerifier.verify(hostAddress, certificate);
+    } catch (SSLException addressVerificationException) {
+      // If we fail with hostAddress, we should try the hostname.
+      // The inetAddress may have been created with a hostname, in which case 
getHostName() will
+      // return quickly below. If not, a reverse lookup will happen, which can 
be expensive.
+      // We provide the option to skip the reverse lookup if preferring to 
fail fast.
+
+      // Handle logging here to aid debugging. The easiest way to check for an 
existing
+      // hostname is through toString, see javadoc.
+      String inetAddressString = inetAddress.toString();
+      if (!inetAddressString.startsWith("/")) {
+        LOG.debug(
+          "Failed to verify host address: {}, but inetAddress {} has a 
hostname, trying that",
+          hostAddress, inetAddressString, addressVerificationException);
+      } else if (allowReverseDnsLookup) {
+        LOG.debug(
+          "Failed to verify host address: {}, attempting to verify host name 
with reverse dns",
+          hostAddress, addressVerificationException);
+      } else {
+        LOG.debug("Failed to verify host address: {}, but reverse dns lookup 
is disabled",
+          hostAddress, addressVerificationException);
+        throw new CertificateException(
+          "Failed to verify host address, and reverse lookup is disabled",
+          addressVerificationException);
+      }
+
+      try {
+        hostName = inetAddress.getHostName();
+        hostnameVerifier.verify(hostName, certificate);
+      } catch (SSLException hostnameVerificationException) {
+        LOG.error("Failed to verify host address: {}", hostAddress, 
addressVerificationException);
+        LOG.error("Failed to verify hostname: {}", hostName, 
hostnameVerificationException);
+        throw new CertificateException("Failed to verify both host address and 
host name",
+          hostnameVerificationException);
+      }
+    }
+  }
+
+}
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java
index 471ad41d06f..96aa66364be 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java
@@ -62,7 +62,9 @@ public final class X509Util {
   private static final Logger LOG = LoggerFactory.getLogger(X509Util.class);
   private static final char[] EMPTY_CHAR_ARRAY = new char[0];
 
-  // Config
+  //
+  // Common tls configs across both server and client
+  //
   static final String CONFIG_PREFIX = "hbase.rpc.tls.";
   public static final String TLS_CONFIG_PROTOCOL = CONFIG_PREFIX + "protocol";
   public static final String TLS_CONFIG_KEYSTORE_LOCATION = CONFIG_PREFIX + 
"keystore.location";
@@ -73,21 +75,33 @@ public final class X509Util {
   static final String TLS_CONFIG_TRUSTSTORE_PASSWORD = CONFIG_PREFIX + 
"truststore.password";
   public static final String TLS_CONFIG_CLR = CONFIG_PREFIX + "clr";
   public static final String TLS_CONFIG_OCSP = CONFIG_PREFIX + "ocsp";
+  public static final String TLS_CONFIG_REVERSE_DNS_LOOKUP_ENABLED =
+    CONFIG_PREFIX + "host-verification.reverse-dns.enabled";
   private static final String TLS_ENABLED_PROTOCOLS = CONFIG_PREFIX + 
"enabledProtocols";
   private static final String TLS_CIPHER_SUITES = CONFIG_PREFIX + 
"ciphersuites";
+  public static final String DEFAULT_PROTOCOL = "TLSv1.2";
 
-  public static final String HBASE_CLIENT_NETTY_TLS_ENABLED = 
"hbase.client.netty.tls.enabled";
+  //
+  // Server-side specific configs
+  //
   public static final String HBASE_SERVER_NETTY_TLS_ENABLED = 
"hbase.server.netty.tls.enabled";
-
+  public static final String HBASE_SERVER_NETTY_TLS_CLIENT_AUTH_MODE =
+    "hbase.server.netty.tls.client.auth.mode";
+  public static final String HBASE_SERVER_NETTY_TLS_VERIFY_CLIENT_HOSTNAME =
+    "hbase.server.netty.tls.verify.client.hostname";
   public static final String HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT =
     "hbase.server.netty.tls.supportplaintext";
 
+  //
+  // Client-side specific configs
+  //
+  public static final String HBASE_CLIENT_NETTY_TLS_ENABLED = 
"hbase.client.netty.tls.enabled";
+  public static final String HBASE_CLIENT_NETTY_TLS_VERIFY_SERVER_HOSTNAME =
+    "hbase.client.netty.tls.verify.server.hostname";
   public static final String HBASE_CLIENT_NETTY_TLS_HANDSHAKETIMEOUT =
     "hbase.client.netty.tls.handshaketimeout";
   public static final int DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS = 5000;
 
-  public static final String DEFAULT_PROTOCOL = "TLSv1.2";
-
   private static String[] getGCMCiphers() {
     return new String[] { "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
       "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", 
"TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384",
@@ -110,6 +124,47 @@ public final class X509Util {
   private static final String[] DEFAULT_CIPHERS_JAVA9 =
     ObjectArrays.concat(getGCMCiphers(), getCBCCiphers(), String.class);
 
+  /**
+   * Enum specifying the client auth requirement of server-side TLS sockets 
created by this
+   * X509Util.
+   * <ul>
+   * <li>NONE - do not request a client certificate.</li>
+   * <li>WANT - request a client certificate, but allow anonymous clients to 
connect.</li>
+   * <li>NEED - require a client certificate, disconnect anonymous 
clients.</li>
+   * </ul>
+   * If the config property is not set, the default value is NEED.
+   */
+  public enum ClientAuth {
+    NONE(org.apache.hbase.thirdparty.io.netty.handler.ssl.ClientAuth.NONE),
+    WANT(org.apache.hbase.thirdparty.io.netty.handler.ssl.ClientAuth.OPTIONAL),
+    NEED(org.apache.hbase.thirdparty.io.netty.handler.ssl.ClientAuth.REQUIRE);
+
+    private final org.apache.hbase.thirdparty.io.netty.handler.ssl.ClientAuth 
nettyAuth;
+
+    ClientAuth(org.apache.hbase.thirdparty.io.netty.handler.ssl.ClientAuth 
nettyAuth) {
+      this.nettyAuth = nettyAuth;
+    }
+
+    /**
+     * Converts a property value to a ClientAuth enum. If the input string is 
empty or null, returns
+     * <code>ClientAuth.NEED</code>.
+     * @param prop the property string.
+     * @return the ClientAuth.
+     * @throws IllegalArgumentException if the property value is not "NONE", 
"WANT", "NEED", or
+     *                                  empty/null.
+     */
+    public static ClientAuth fromPropertyValue(String prop) {
+      if (prop == null || prop.length() == 0) {
+        return NEED;
+      }
+      return ClientAuth.valueOf(prop.toUpperCase());
+    }
+
+    public org.apache.hbase.thirdparty.io.netty.handler.ssl.ClientAuth 
toNettyClientAuth() {
+      return nettyAuth;
+    }
+  }
+
   private X509Util() {
     // disabled
   }
@@ -158,11 +213,16 @@ public final class X509Util {
     boolean sslCrlEnabled = config.getBoolean(TLS_CONFIG_CLR, false);
     boolean sslOcspEnabled = config.getBoolean(TLS_CONFIG_OCSP, false);
 
+    boolean verifyServerHostname =
+      config.getBoolean(HBASE_CLIENT_NETTY_TLS_VERIFY_SERVER_HOSTNAME, true);
+    boolean allowReverseDnsLookup = 
config.getBoolean(TLS_CONFIG_REVERSE_DNS_LOOKUP_ENABLED, true);
+
     if (trustStoreLocation.isEmpty()) {
       LOG.warn(TLS_CONFIG_TRUSTSTORE_LOCATION + " not specified");
     } else {
-      sslContextBuilder.trustManager(createTrustManager(trustStoreLocation, 
trustStorePassword,
-        trustStoreType, sslCrlEnabled, sslOcspEnabled));
+      sslContextBuilder
+        .trustManager(createTrustManager(trustStoreLocation, 
trustStorePassword, trustStoreType,
+          sslCrlEnabled, sslOcspEnabled, verifyServerHostname, 
allowReverseDnsLookup));
     }
 
     sslContextBuilder.enableOcsp(sslOcspEnabled);
@@ -195,16 +255,24 @@ public final class X509Util {
     boolean sslCrlEnabled = config.getBoolean(TLS_CONFIG_CLR, false);
     boolean sslOcspEnabled = config.getBoolean(TLS_CONFIG_OCSP, false);
 
+    ClientAuth clientAuth =
+      
ClientAuth.fromPropertyValue(config.get(HBASE_SERVER_NETTY_TLS_CLIENT_AUTH_MODE));
+    boolean verifyClientHostname =
+      config.getBoolean(HBASE_SERVER_NETTY_TLS_VERIFY_CLIENT_HOSTNAME, true);
+    boolean allowReverseDnsLookup = 
config.getBoolean(TLS_CONFIG_REVERSE_DNS_LOOKUP_ENABLED, true);
+
     if (trustStoreLocation.isEmpty()) {
       LOG.warn(TLS_CONFIG_TRUSTSTORE_LOCATION + " not specified");
     } else {
-      sslContextBuilder.trustManager(createTrustManager(trustStoreLocation, 
trustStorePassword,
-        trustStoreType, sslCrlEnabled, sslOcspEnabled));
+      sslContextBuilder
+        .trustManager(createTrustManager(trustStoreLocation, 
trustStorePassword, trustStoreType,
+          sslCrlEnabled, sslOcspEnabled, verifyClientHostname, 
allowReverseDnsLookup));
     }
 
     sslContextBuilder.enableOcsp(sslOcspEnabled);
     sslContextBuilder.protocols(getEnabledProtocols(config));
     sslContextBuilder.ciphers(Arrays.asList(getCipherSuites(config)));
+    sslContextBuilder.clientAuth(clientAuth.toNettyClientAuth());
 
     return sslContextBuilder.build();
   }
@@ -252,19 +320,23 @@ public final class X509Util {
   /**
    * Creates a trust manager by loading the trust store from the given file of 
the given type,
    * optionally decrypting it using the given password.
-   * @param trustStoreLocation the location of the trust store file.
-   * @param trustStorePassword optional password to decrypt the trust store 
(only applies to JKS
-   *                           trust stores). If empty, assumes the trust 
store is not encrypted.
-   * @param trustStoreType     must be JKS, PEM, PKCS12, BCFKS or null. If 
null, attempts to
-   *                           autodetect the trust store type from the file 
extension (e.g. .jks /
-   *                           .pem).
-   * @param crlEnabled         enable CRL (certificate revocation list) checks.
-   * @param ocspEnabled        enable OCSP (online certificate status 
protocol) checks.
+   * @param trustStoreLocation    the location of the trust store file.
+   * @param trustStorePassword    optional password to decrypt the trust store 
(only applies to JKS
+   *                              trust stores). If empty, assumes the trust 
store is not encrypted.
+   * @param trustStoreType        must be JKS, PEM, PKCS12, BCFKS or null. If 
null, attempts to
+   *                              autodetect the trust store type from the 
file extension (e.g. .jks
+   *                              / .pem).
+   * @param crlEnabled            enable CRL (certificate revocation list) 
checks.
+   * @param ocspEnabled           enable OCSP (online certificate status 
protocol) checks.
+   * @param verifyHostName        if true, ssl peer hostname must match name 
in certificate
+   * @param allowReverseDnsLookup if true, allow falling back to reverse dns 
lookup in verifying
+   *                              hostname
    * @return the trust manager.
    * @throws TrustManagerException if something goes wrong.
    */
   static X509TrustManager createTrustManager(String trustStoreLocation, char[] 
trustStorePassword,
-    String trustStoreType, boolean crlEnabled, boolean ocspEnabled) throws 
TrustManagerException {
+    String trustStoreType, boolean crlEnabled, boolean ocspEnabled, boolean 
verifyHostName,
+    boolean allowReverseDnsLookup) throws TrustManagerException {
 
     if (trustStorePassword == null) {
       trustStorePassword = EMPTY_CHAR_ARRAY;
@@ -297,7 +369,8 @@ public final class X509Util {
 
       for (final TrustManager tm : tmf.getTrustManagers()) {
         if (tm instanceof X509ExtendedTrustManager) {
-          return (X509ExtendedTrustManager) tm;
+          return new HBaseTrustManager((X509ExtendedTrustManager) tm, 
verifyHostName,
+            allowReverseDnsLookup);
         }
       }
       throw new TrustManagerException("Couldn't find X509TrustManager");
diff --git 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/TestHBaseHostnameVerifier.java
 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/TestHBaseHostnameVerifier.java
new file mode 100644
index 00000000000..12de6175836
--- /dev/null
+++ 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/TestHBaseHostnameVerifier.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.crypto.tls;
+
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.security.KeyPair;
+import java.security.Security;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import javax.net.ssl.SSLException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.asn1.x500.X500NameBuilder;
+import org.bouncycastle.asn1.x500.style.BCStyle;
+import org.bouncycastle.asn1.x509.GeneralName;
+import org.bouncycastle.asn1.x509.GeneralNames;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses;
+
+/**
+ * Test cases taken and adapted from Apache ZooKeeper Project
+ * @see <a href=
+ *      
"https://github.com/apache/zookeeper/blob/5820d10d9dc58c8e12d2e25386fdf92acb360359/zookeeper-server/src/test/java/org/apache/zookeeper/common/ZKHostnameVerifierTest.java";>Base
+ *      revision</a>
+ */
+@Category({ MiscTests.class, SmallTests.class })
+public class TestHBaseHostnameVerifier {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestHBaseHostnameVerifier.class);
+  private static CertificateCreator certificateCreator;
+  private HBaseHostnameVerifier impl;
+
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    Security.addProvider(new BouncyCastleProvider());
+    X500NameBuilder caNameBuilder = new X500NameBuilder(BCStyle.INSTANCE);
+    caNameBuilder.addRDN(BCStyle.CN,
+      MethodHandles.lookup().lookupClass().getCanonicalName() + " Root CA");
+    KeyPair keyPair = X509TestHelpers.generateKeyPair(X509KeyType.EC);
+    X509Certificate caCert = 
X509TestHelpers.newSelfSignedCACert(caNameBuilder.build(), keyPair);
+    certificateCreator = new CertificateCreator(keyPair, caCert);
+  }
+
+  @Before
+  public void setup() {
+    impl = new HBaseHostnameVerifier();
+  }
+
+  private static class CertificateCreator {
+    private final KeyPair caCertPair;
+    private final X509Certificate caCert;
+
+    public CertificateCreator(KeyPair caCertPair, X509Certificate caCert) {
+      this.caCertPair = caCertPair;
+      this.caCert = caCert;
+    }
+
+    public byte[] newCert(String cn, String... subjectAltName) throws 
Exception {
+      return X509TestHelpers.newCert(caCert, caCertPair, cn == null ? null : 
new X500Name(cn),
+        caCertPair.getPublic(), 
parseSubjectAltNames(subjectAltName)).getEncoded();
+    }
+
+    private GeneralNames parseSubjectAltNames(String... subjectAltName) {
+      if (subjectAltName == null || subjectAltName.length == 0) {
+        return null;
+      }
+      GeneralName[] names = new GeneralName[subjectAltName.length];
+      for (int i = 0; i < subjectAltName.length; i++) {
+        String current = subjectAltName[i];
+        int type;
+        if (InetAddresses.isInetAddress(current)) {
+          type = GeneralName.iPAddress;
+        } else if (current.startsWith("email:")) {
+          type = GeneralName.rfc822Name;
+        } else {
+          type = GeneralName.dNSName;
+        }
+        names[i] = new GeneralName(type, subjectAltName[i]);
+      }
+      return new GeneralNames(names);
+    }
+
+  }
+
+  @Test
+  public void testVerify() throws Exception {
+    final CertificateFactory cf = CertificateFactory.getInstance("X.509");
+    InputStream in;
+    X509Certificate x509;
+
+    in = new ByteArrayInputStream(certificateCreator.newCert("CN=foo.com"));
+    x509 = (X509Certificate) cf.generateCertificate(in);
+
+    impl.verify("foo.com", x509);
+    exceptionPlease(impl, "a.foo.com", x509);
+    exceptionPlease(impl, "bar.com", x509);
+
+    in = new 
ByteArrayInputStream(certificateCreator.newCert("CN=\u82b1\u5b50.co.jp"));
+    x509 = (X509Certificate) cf.generateCertificate(in);
+    impl.verify("\u82b1\u5b50.co.jp", x509);
+    exceptionPlease(impl, "a.\u82b1\u5b50.co.jp", x509);
+
+    in = new ByteArrayInputStream(certificateCreator.newCert("CN=foo.com", 
"bar.com"));
+    x509 = (X509Certificate) cf.generateCertificate(in);
+    exceptionPlease(impl, "foo.com", x509);
+    exceptionPlease(impl, "a.foo.com", x509);
+    impl.verify("bar.com", x509);
+    exceptionPlease(impl, "a.bar.com", x509);
+
+    in = new ByteArrayInputStream(
+      certificateCreator.newCert("CN=foo.com", "bar.com", 
"\u82b1\u5b50.co.jp"));
+    x509 = (X509Certificate) cf.generateCertificate(in);
+    exceptionPlease(impl, "foo.com", x509);
+    exceptionPlease(impl, "a.foo.com", x509);
+    impl.verify("bar.com", x509);
+    exceptionPlease(impl, "a.bar.com", x509);
+
+    /*
+     * Java isn't extracting international subjectAlts properly. (Or OpenSSL 
isn't storing them
+     * properly).
+     */
+    // DEFAULT.verify("\u82b1\u5b50.co.jp", x509 );
+    // impl.verify("\u82b1\u5b50.co.jp", x509 );
+    exceptionPlease(impl, "a.\u82b1\u5b50.co.jp", x509);
+
+    in = new ByteArrayInputStream(certificateCreator.newCert("CN=", 
"foo.com"));
+    x509 = (X509Certificate) cf.generateCertificate(in);
+    impl.verify("foo.com", x509);
+    exceptionPlease(impl, "a.foo.com", x509);
+
+    in = new ByteArrayInputStream(
+      certificateCreator.newCert("CN=foo.com, CN=bar.com, 
CN=\u82b1\u5b50.co.jp"));
+    x509 = (X509Certificate) cf.generateCertificate(in);
+    exceptionPlease(impl, "foo.com", x509);
+    exceptionPlease(impl, "a.foo.com", x509);
+    exceptionPlease(impl, "bar.com", x509);
+    exceptionPlease(impl, "a.bar.com", x509);
+    impl.verify("\u82b1\u5b50.co.jp", x509);
+    exceptionPlease(impl, "a.\u82b1\u5b50.co.jp", x509);
+
+    in = new ByteArrayInputStream(certificateCreator.newCert("CN=*.foo.com"));
+    x509 = (X509Certificate) cf.generateCertificate(in);
+    exceptionPlease(impl, "foo.com", x509);
+    impl.verify("www.foo.com", x509);
+    impl.verify("\u82b1\u5b50.foo.com", x509);
+    exceptionPlease(impl, "a.b.foo.com", x509);
+
+    in = new ByteArrayInputStream(certificateCreator.newCert("CN=*.co.jp"));
+    x509 = (X509Certificate) cf.generateCertificate(in);
+    // Silly test because no-one would ever be able to lookup an IP address
+    // using "*.co.jp".
+    impl.verify("*.co.jp", x509);
+    impl.verify("foo.co.jp", x509);
+    impl.verify("\u82b1\u5b50.co.jp", x509);
+
+    in = new ByteArrayInputStream(
+      certificateCreator.newCert("CN=*.foo.com", "*.bar.com", 
"*.\u82b1\u5b50.co.jp"));
+    x509 = (X509Certificate) cf.generateCertificate(in);
+    // try the foo.com variations
+    exceptionPlease(impl, "foo.com", x509);
+    exceptionPlease(impl, "www.foo.com", x509);
+    exceptionPlease(impl, "\u82b1\u5b50.foo.com", x509);
+    exceptionPlease(impl, "a.b.foo.com", x509);
+    // try the bar.com variations
+    exceptionPlease(impl, "bar.com", x509);
+    impl.verify("www.bar.com", x509);
+    impl.verify("\u82b1\u5b50.bar.com", x509);
+    exceptionPlease(impl, "a.b.bar.com", x509);
+
+    in = new 
ByteArrayInputStream(certificateCreator.newCert("CN=repository.infonotary.com"));
+    x509 = (X509Certificate) cf.generateCertificate(in);
+    impl.verify("repository.infonotary.com", x509);
+
+    in = new 
ByteArrayInputStream(certificateCreator.newCert("CN=*.google.com"));
+    x509 = (X509Certificate) cf.generateCertificate(in);
+    impl.verify("*.google.com", x509);
+
+    in = new 
ByteArrayInputStream(certificateCreator.newCert("CN=*.google.com"));
+    x509 = (X509Certificate) cf.generateCertificate(in);
+    impl.verify("*.Google.com", x509);
+
+    in = new 
ByteArrayInputStream(certificateCreator.newCert("CN=dummy-value.com", 
"1.1.1.1"));
+    x509 = (X509Certificate) cf.generateCertificate(in);
+    impl.verify("1.1.1.1", x509);
+
+    exceptionPlease(impl, "1.1.1.2", x509);
+    exceptionPlease(impl, "2001:0db8:85a3:0000:0000:8a2e:0370:1111", x509);
+    exceptionPlease(impl, "dummy-value.com", x509);
+
+    in = new ByteArrayInputStream(
+      certificateCreator.newCert("CN=dummy-value.com", 
"2001:0db8:85a3:0000:0000:8a2e:0370:7334"));
+    x509 = (X509Certificate) cf.generateCertificate(in);
+    impl.verify("2001:0db8:85a3:0000:0000:8a2e:0370:7334", x509);
+
+    exceptionPlease(impl, "1.1.1.2", x509);
+    exceptionPlease(impl, "2001:0db8:85a3:0000:0000:8a2e:0370:1111", x509);
+    exceptionPlease(impl, "dummy-value.com", x509);
+
+    in = new ByteArrayInputStream(
+      certificateCreator.newCert("CN=dummy-value.com", 
"2001:0db8:85a3:0000:0000:8a2e:0370:7334"));
+    x509 = (X509Certificate) cf.generateCertificate(in);
+    impl.verify("2001:0db8:85a3:0000:0000:8a2e:0370:7334", x509);
+    impl.verify("[2001:0db8:85a3:0000:0000:8a2e:0370:7334]", x509);
+
+    exceptionPlease(impl, "1.1.1.2", x509);
+    exceptionPlease(impl, "2001:0db8:85a3:0000:0000:8a2e:0370:1111", x509);
+    exceptionPlease(impl, "dummy-value.com", x509);
+
+    in = new ByteArrayInputStream(
+      certificateCreator.newCert("CN=www.company.com", 
"email:[email protected]"));
+    x509 = (X509Certificate) cf.generateCertificate(in);
+    impl.verify("www.company.com", x509);
+  }
+
+  private void exceptionPlease(final HBaseHostnameVerifier hv, final String 
host,
+    final X509Certificate x509) {
+    try {
+      hv.verify(host, x509);
+      fail("HostnameVerifier shouldn't allow [" + host + "]");
+    } catch (final SSLException e) {
+      // whew! we're okay!
+    }
+  }
+}
diff --git 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/TestHBaseTrustManager.java
 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/TestHBaseTrustManager.java
new file mode 100644
index 00000000000..07fc87e0135
--- /dev/null
+++ 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/TestHBaseTrustManager.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.crypto.tls;
+
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.Security;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+import javax.net.ssl.X509ExtendedTrustManager;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.bouncycastle.asn1.x500.X500NameBuilder;
+import org.bouncycastle.asn1.x500.style.BCStyle;
+import org.bouncycastle.asn1.x509.BasicConstraints;
+import org.bouncycastle.asn1.x509.Extension;
+import org.bouncycastle.asn1.x509.GeneralName;
+import org.bouncycastle.asn1.x509.GeneralNames;
+import org.bouncycastle.asn1.x509.KeyUsage;
+import org.bouncycastle.cert.X509v3CertificateBuilder;
+import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
+import org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.bouncycastle.operator.ContentSigner;
+import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.stubbing.Answer;
+
+//
+/**
+ * Test cases taken and adapted from Apache ZooKeeper Project. We can only 
test calls to
+ * HBaseTrustManager using Sockets (not SSLEngines). This can be fine since 
the logic is the same.
+ * @see <a href=
+ *      
"https://github.com/apache/zookeeper/blob/c74658d398cdc1d207aa296cb6e20de00faec03e/zookeeper-server/src/test/java/org/apache/zookeeper/common/HBaseTrustManagerTest.java";>Base
+ *      revision</a>
+ */
+@Category({ MiscTests.class, SmallTests.class })
+public class TestHBaseTrustManager {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestHBaseTrustManager.class);
+
+  private static KeyPair keyPair;
+
+  private X509ExtendedTrustManager mockX509ExtendedTrustManager;
+  private static final String IP_ADDRESS = "127.0.0.1";
+  private static final String HOSTNAME = "localhost";
+
+  private InetAddress mockInetAddressWithoutHostname;
+  private InetAddress mockInetAddressWithHostname;
+  private Socket mockSocketWithoutHostname;
+  private Socket mockSocketWithHostname;
+
+  @BeforeClass
+  public static void createKeyPair() throws Exception {
+    Security.addProvider(new BouncyCastleProvider());
+    KeyPairGenerator keyPairGenerator =
+      KeyPairGenerator.getInstance("RSA", BouncyCastleProvider.PROVIDER_NAME);
+    keyPairGenerator.initialize(4096);
+    keyPair = keyPairGenerator.genKeyPair();
+  }
+
+  @AfterClass
+  public static void removeBouncyCastleProvider() throws Exception {
+    Security.removeProvider("BC");
+  }
+
+  @Before
+  public void setup() throws Exception {
+    mockX509ExtendedTrustManager = mock(X509ExtendedTrustManager.class);
+
+    mockInetAddressWithoutHostname = mock(InetAddress.class);
+    when(mockInetAddressWithoutHostname.getHostAddress())
+      .thenAnswer((Answer<?>) invocationOnMock -> IP_ADDRESS);
+    when(mockInetAddressWithoutHostname.toString())
+      .thenAnswer((Answer<?>) invocationOnMock -> "/" + IP_ADDRESS);
+
+    mockInetAddressWithHostname = mock(InetAddress.class);
+    when(mockInetAddressWithHostname.getHostAddress())
+      .thenAnswer((Answer<?>) invocationOnMock -> IP_ADDRESS);
+    when(mockInetAddressWithHostname.getHostName())
+      .thenAnswer((Answer<?>) invocationOnMock -> HOSTNAME);
+    when(mockInetAddressWithHostname.toString())
+      .thenAnswer((Answer<?>) invocationOnMock -> HOSTNAME + "/" + IP_ADDRESS);
+
+    mockSocketWithoutHostname = mock(Socket.class);
+    when(mockSocketWithoutHostname.getInetAddress())
+      .thenAnswer((Answer<?>) invocationOnMock -> 
mockInetAddressWithoutHostname);
+
+    mockSocketWithHostname = mock(Socket.class);
+    when(mockSocketWithHostname.getInetAddress())
+      .thenAnswer((Answer<?>) invocationOnMock -> mockInetAddressWithHostname);
+  }
+
+  @SuppressWarnings("JavaUtilDate")
+  private X509Certificate[] createSelfSignedCertificateChain(String ipAddress, 
String hostname)
+    throws Exception {
+    X500NameBuilder nameBuilder = new X500NameBuilder(BCStyle.INSTANCE);
+    nameBuilder.addRDN(BCStyle.CN, "NOT_LOCALHOST");
+    Date notBefore = new Date();
+    Calendar cal = Calendar.getInstance();
+    cal.setTime(notBefore);
+    cal.add(Calendar.YEAR, 1);
+    Date notAfter = cal.getTime();
+    BigInteger serialNumber = new BigInteger(128, new Random());
+
+    X509v3CertificateBuilder certificateBuilder =
+      new JcaX509v3CertificateBuilder(nameBuilder.build(), serialNumber, 
notBefore, notAfter,
+        nameBuilder.build(), keyPair.getPublic())
+          .addExtension(Extension.basicConstraints, true, new 
BasicConstraints(0))
+          .addExtension(Extension.keyUsage, true,
+            new KeyUsage(KeyUsage.digitalSignature | KeyUsage.keyCertSign | 
KeyUsage.cRLSign));
+
+    List<GeneralName> generalNames = new ArrayList<>();
+    if (ipAddress != null) {
+      generalNames.add(new GeneralName(GeneralName.iPAddress, ipAddress));
+    }
+    if (hostname != null) {
+      generalNames.add(new GeneralName(GeneralName.dNSName, hostname));
+    }
+
+    if (!generalNames.isEmpty()) {
+      certificateBuilder.addExtension(Extension.subjectAlternativeName, true,
+        new GeneralNames(generalNames.toArray(new GeneralName[] {})));
+    }
+
+    ContentSigner contentSigner =
+      new 
JcaContentSignerBuilder("SHA256WithRSAEncryption").build(keyPair.getPrivate());
+
+    return new X509Certificate[] {
+      new 
JcaX509CertificateConverter().getCertificate(certificateBuilder.build(contentSigner))
 };
+  }
+
+  @Test
+  public void testServerHostnameVerificationWithHostnameVerificationDisabled() 
throws Exception {
+    HBaseTrustManager trustManager =
+      new HBaseTrustManager(mockX509ExtendedTrustManager, false, false);
+
+    X509Certificate[] certificateChain = 
createSelfSignedCertificateChain(IP_ADDRESS, HOSTNAME);
+    trustManager.checkServerTrusted(certificateChain, null, 
mockSocketWithHostname);
+
+    verify(mockInetAddressWithHostname, times(0)).getHostAddress();
+    verify(mockInetAddressWithHostname, times(0)).getHostName();
+
+    verify(mockX509ExtendedTrustManager, 
times(1)).checkServerTrusted(certificateChain, null,
+      mockSocketWithHostname);
+  }
+
+  @SuppressWarnings("checkstyle:linelength")
+  @Test
+  public void testServerTrustedWithHostnameVerificationDisabled() throws 
Exception {
+    HBaseTrustManager trustManager =
+      new HBaseTrustManager(mockX509ExtendedTrustManager, false, false);
+
+    X509Certificate[] certificateChain = 
createSelfSignedCertificateChain(IP_ADDRESS, HOSTNAME);
+    trustManager.checkServerTrusted(certificateChain, null, 
mockSocketWithHostname);
+
+    verify(mockInetAddressWithHostname, times(0)).getHostAddress();
+    verify(mockInetAddressWithHostname, times(0)).getHostName();
+
+    verify(mockX509ExtendedTrustManager, 
times(1)).checkServerTrusted(certificateChain, null,
+      mockSocketWithHostname);
+  }
+
+  @Test
+  public void testServerTrustedWithHostnameVerificationEnabled() throws 
Exception {
+    HBaseTrustManager trustManager =
+      new HBaseTrustManager(mockX509ExtendedTrustManager, true, true);
+
+    X509Certificate[] certificateChain = 
createSelfSignedCertificateChain(null, HOSTNAME);
+    trustManager.checkServerTrusted(certificateChain, null, 
mockSocketWithHostname);
+
+    verify(mockInetAddressWithHostname, times(1)).getHostAddress();
+    verify(mockInetAddressWithHostname, times(1)).getHostName();
+
+    verify(mockX509ExtendedTrustManager, 
times(1)).checkServerTrusted(certificateChain, null,
+      mockSocketWithHostname);
+  }
+
+  @Test
+  public void testServerTrustedWithHostnameVerificationEnabledUsingIpAddress() 
throws Exception {
+    HBaseTrustManager trustManager =
+      new HBaseTrustManager(mockX509ExtendedTrustManager, true, true);
+
+    X509Certificate[] certificateChain = 
createSelfSignedCertificateChain(IP_ADDRESS, null);
+    trustManager.checkServerTrusted(certificateChain, null, 
mockSocketWithHostname);
+
+    verify(mockInetAddressWithHostname, times(1)).getHostAddress();
+    verify(mockInetAddressWithHostname, times(0)).getHostName();
+
+    verify(mockX509ExtendedTrustManager, 
times(1)).checkServerTrusted(certificateChain, null,
+      mockSocketWithHostname);
+  }
+
+  @Test
+  public void 
testServerTrustedWithHostnameVerificationEnabledNoReverseLookup() throws 
Exception {
+    HBaseTrustManager trustManager =
+      new HBaseTrustManager(mockX509ExtendedTrustManager, true, false);
+
+    X509Certificate[] certificateChain = 
createSelfSignedCertificateChain(null, HOSTNAME);
+
+    // We only include hostname in the cert above, but the socket passed in is 
for an ip address.
+    // This mismatch would succeed if reverse lookup is enabled, but here 
fails since it's
+    // not enabled.
+    assertThrows(CertificateException.class,
+      () -> trustManager.checkServerTrusted(certificateChain, null, 
mockSocketWithoutHostname));
+
+    verify(mockInetAddressWithoutHostname, times(1)).getHostAddress();
+    verify(mockInetAddressWithoutHostname, times(0)).getHostName();
+
+    verify(mockX509ExtendedTrustManager, 
times(1)).checkServerTrusted(certificateChain, null,
+      mockSocketWithoutHostname);
+  }
+
+  @Test
+  public void 
testServerTrustedWithHostnameVerificationEnabledWithHostnameNoReverseLookup()
+    throws Exception {
+    HBaseTrustManager trustManager =
+      new HBaseTrustManager(mockX509ExtendedTrustManager, true, false);
+
+    X509Certificate[] certificateChain = 
createSelfSignedCertificateChain(null, HOSTNAME);
+
+    // since the socket inetAddress already has a hostname, we don't need 
reverse lookup.
+    // so this succeeds
+    trustManager.checkServerTrusted(certificateChain, null, 
mockSocketWithHostname);
+
+    verify(mockInetAddressWithHostname, times(1)).getHostAddress();
+    verify(mockInetAddressWithHostname, times(1)).getHostName();
+
+    verify(mockX509ExtendedTrustManager, 
times(1)).checkServerTrusted(certificateChain, null,
+      mockSocketWithHostname);
+  }
+
+  @Test
+  public void testClientTrustedWithHostnameVerificationDisabled() throws 
Exception {
+    HBaseTrustManager trustManager =
+      new HBaseTrustManager(mockX509ExtendedTrustManager, false, false);
+
+    X509Certificate[] certificateChain = 
createSelfSignedCertificateChain(null, HOSTNAME);
+    trustManager.checkClientTrusted(certificateChain, null, 
mockSocketWithHostname);
+
+    verify(mockInetAddressWithHostname, times(0)).getHostAddress();
+    verify(mockInetAddressWithHostname, times(0)).getHostName();
+
+    verify(mockX509ExtendedTrustManager, 
times(1)).checkClientTrusted(certificateChain, null,
+      mockSocketWithHostname);
+  }
+
+  @Test
+  public void testClientTrustedWithHostnameVerificationEnabled() throws 
Exception {
+    HBaseTrustManager trustManager =
+      new HBaseTrustManager(mockX509ExtendedTrustManager, true, true);
+
+    X509Certificate[] certificateChain = 
createSelfSignedCertificateChain(null, HOSTNAME);
+    trustManager.checkClientTrusted(certificateChain, null, 
mockSocketWithHostname);
+
+    verify(mockInetAddressWithHostname, times(1)).getHostAddress();
+    verify(mockInetAddressWithHostname, times(1)).getHostName();
+
+    verify(mockX509ExtendedTrustManager, 
times(1)).checkClientTrusted(certificateChain, null,
+      mockSocketWithHostname);
+  }
+
+  @Test
+  public void testClientTrustedWithHostnameVerificationEnabledUsingIpAddress() 
throws Exception {
+    HBaseTrustManager trustManager =
+      new HBaseTrustManager(mockX509ExtendedTrustManager, true, true);
+
+    X509Certificate[] certificateChain = 
createSelfSignedCertificateChain(IP_ADDRESS, null);
+    trustManager.checkClientTrusted(certificateChain, null, 
mockSocketWithHostname);
+
+    verify(mockInetAddressWithHostname, times(1)).getHostAddress();
+    verify(mockInetAddressWithHostname, times(0)).getHostName();
+
+    verify(mockX509ExtendedTrustManager, 
times(1)).checkClientTrusted(certificateChain, null,
+      mockSocketWithHostname);
+  }
+
+  @Test
+  public void 
testClientTrustedWithHostnameVerificationEnabledWithoutReverseLookup()
+    throws Exception {
+    HBaseTrustManager trustManager =
+      new HBaseTrustManager(mockX509ExtendedTrustManager, true, false);
+
+    X509Certificate[] certificateChain = 
createSelfSignedCertificateChain(null, HOSTNAME);
+
+    // We only include hostname in the cert above, but the socket passed in is 
for an ip address.
+    // This mismatch would succeed if reverse lookup is enabled, but here 
fails since it's
+    // not enabled.
+    assertThrows(CertificateException.class,
+      () -> trustManager.checkClientTrusted(certificateChain, null, 
mockSocketWithoutHostname));
+
+    verify(mockInetAddressWithoutHostname, times(1)).getHostAddress();
+    verify(mockInetAddressWithoutHostname, times(0)).getHostName();
+
+    verify(mockX509ExtendedTrustManager, 
times(1)).checkClientTrusted(certificateChain, null,
+      mockSocketWithoutHostname);
+  }
+
+  @Test
+  public void 
testClientTrustedWithHostnameVerificationEnabledWithHostnameNoReverseLookup()
+    throws Exception {
+    HBaseTrustManager trustManager =
+      new HBaseTrustManager(mockX509ExtendedTrustManager, true, false);
+
+    X509Certificate[] certificateChain = 
createSelfSignedCertificateChain(null, HOSTNAME);
+
+    // since the socket inetAddress already has a hostname, we don't need 
reverse lookup.
+    // so this succeeds
+    trustManager.checkClientTrusted(certificateChain, null, 
mockSocketWithHostname);
+
+    verify(mockInetAddressWithHostname, times(1)).getHostAddress();
+    verify(mockInetAddressWithHostname, times(1)).getHostName();
+
+    verify(mockX509ExtendedTrustManager, 
times(1)).checkClientTrusted(certificateChain, null,
+      mockSocketWithHostname);
+  }
+
+}
diff --git 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/TestX509Util.java
 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/TestX509Util.java
index a847db98a04..dd81403af6f 100644
--- 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/TestX509Util.java
+++ 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/TestX509Util.java
@@ -62,6 +62,38 @@ public class TestX509Util extends 
AbstractTestX509Parameterized {
 
   private static final char[] EMPTY_CHAR_ARRAY = new char[0];
 
+  @Test
+  public void testCreateSSLContextWithClientAuthDefault() throws Exception {
+    SslContext sslContext = X509Util.createSslContextForServer(conf);
+    ByteBufAllocator byteBufAllocatorMock = mock(ByteBufAllocator.class);
+    assertTrue(sslContext.newEngine(byteBufAllocatorMock).getNeedClientAuth());
+  }
+
+  @Test
+  public void testCreateSSLContextWithClientAuthNEED() throws Exception {
+    conf.set(X509Util.HBASE_SERVER_NETTY_TLS_CLIENT_AUTH_MODE, 
X509Util.ClientAuth.NEED.name());
+    SslContext sslContext = X509Util.createSslContextForServer(conf);
+    ByteBufAllocator byteBufAllocatorMock = mock(ByteBufAllocator.class);
+    assertTrue(sslContext.newEngine(byteBufAllocatorMock).getNeedClientAuth());
+  }
+
+  @Test
+  public void testCreateSSLContextWithClientAuthWANT() throws Exception {
+    conf.set(X509Util.HBASE_SERVER_NETTY_TLS_CLIENT_AUTH_MODE, 
X509Util.ClientAuth.WANT.name());
+    SslContext sslContext = X509Util.createSslContextForServer(conf);
+    ByteBufAllocator byteBufAllocatorMock = mock(ByteBufAllocator.class);
+    assertTrue(sslContext.newEngine(byteBufAllocatorMock).getWantClientAuth());
+  }
+
+  @Test
+  public void testCreateSSLContextWithClientAuthNONE() throws Exception {
+    conf.set(X509Util.HBASE_SERVER_NETTY_TLS_CLIENT_AUTH_MODE, 
X509Util.ClientAuth.NONE.name());
+    SslContext sslContext = X509Util.createSslContextForServer(conf);
+    ByteBufAllocator byteBufAllocatorMock = mock(ByteBufAllocator.class);
+    
assertFalse(sslContext.newEngine(byteBufAllocatorMock).getNeedClientAuth());
+    
assertFalse(sslContext.newEngine(byteBufAllocatorMock).getWantClientAuth());
+  }
+
   @Test
   public void testCreateSSLContextWithoutCustomProtocol() throws Exception {
     SslContext sslContext = X509Util.createSslContextForClient(conf);
@@ -164,7 +196,7 @@ public class TestX509Util extends 
AbstractTestX509Parameterized {
     X509Util.createTrustManager(
       
x509TestContext.getTrustStoreFile(KeyStoreFileType.PEM).getAbsolutePath(),
       x509TestContext.getTrustStorePassword(), 
KeyStoreFileType.PEM.getPropertyValue(), false,
-      false);
+      false, true, true);
   }
 
   @Test
@@ -173,7 +205,7 @@ public class TestX509Util extends 
AbstractTestX509Parameterized {
     // Make sure that empty password and null password are treated the same
     X509Util.createTrustManager(
       
x509TestContext.getTrustStoreFile(KeyStoreFileType.PEM).getAbsolutePath(), null,
-      KeyStoreFileType.PEM.getPropertyValue(), false, false);
+      KeyStoreFileType.PEM.getPropertyValue(), false, false, true, true);
   }
 
   @Test
@@ -183,7 +215,7 @@ public class TestX509Util extends 
AbstractTestX509Parameterized {
       
x509TestContext.getTrustStoreFile(KeyStoreFileType.PEM).getAbsolutePath(),
       x509TestContext.getTrustStorePassword(), null, // null StoreFileType 
means 'autodetect from
                                                      // file extension'
-      false, false);
+      false, false, true, true);
   }
 
   @Test
@@ -227,7 +259,8 @@ public class TestX509Util extends 
AbstractTestX509Parameterized {
     // Make sure we can instantiate a trust manager from the JKS file on disk
     X509Util.createTrustManager(
       
x509TestContext.getTrustStoreFile(KeyStoreFileType.JKS).getAbsolutePath(),
-      x509TestContext.getTrustStorePassword(), 
KeyStoreFileType.JKS.getPropertyValue(), true, true);
+      x509TestContext.getTrustStorePassword(), 
KeyStoreFileType.JKS.getPropertyValue(), true, true,
+      true, true);
   }
 
   @Test
@@ -236,7 +269,7 @@ public class TestX509Util extends 
AbstractTestX509Parameterized {
     // Make sure that empty password and null password are treated the same
     X509Util.createTrustManager(
       
x509TestContext.getTrustStoreFile(KeyStoreFileType.JKS).getAbsolutePath(), null,
-      KeyStoreFileType.JKS.getPropertyValue(), false, false);
+      KeyStoreFileType.JKS.getPropertyValue(), false, false, true, true);
   }
 
   @Test
@@ -246,7 +279,7 @@ public class TestX509Util extends 
AbstractTestX509Parameterized {
       
x509TestContext.getTrustStoreFile(KeyStoreFileType.JKS).getAbsolutePath(),
       x509TestContext.getTrustStorePassword(), null, // null StoreFileType 
means 'autodetect from
                                                      // file extension'
-      true, true);
+      true, true, true, true);
   }
 
   @Test
@@ -255,7 +288,8 @@ public class TestX509Util extends 
AbstractTestX509Parameterized {
       // Attempting to load with the wrong key password should fail
       X509Util.createTrustManager(
         
x509TestContext.getTrustStoreFile(KeyStoreFileType.JKS).getAbsolutePath(),
-        "wrong password".toCharArray(), 
KeyStoreFileType.JKS.getPropertyValue(), true, true);
+        "wrong password".toCharArray(), 
KeyStoreFileType.JKS.getPropertyValue(), true, true, true,
+        true);
     });
   }
 
@@ -301,7 +335,7 @@ public class TestX509Util extends 
AbstractTestX509Parameterized {
     X509Util.createTrustManager(
       
x509TestContext.getTrustStoreFile(KeyStoreFileType.PKCS12).getAbsolutePath(),
       x509TestContext.getTrustStorePassword(), 
KeyStoreFileType.PKCS12.getPropertyValue(), true,
-      true);
+      true, true, true);
   }
 
   @Test
@@ -310,7 +344,7 @@ public class TestX509Util extends 
AbstractTestX509Parameterized {
     // Make sure that empty password and null password are treated the same
     X509Util.createTrustManager(
       
x509TestContext.getTrustStoreFile(KeyStoreFileType.PKCS12).getAbsolutePath(), 
null,
-      KeyStoreFileType.PKCS12.getPropertyValue(), false, false);
+      KeyStoreFileType.PKCS12.getPropertyValue(), false, false, true, true);
   }
 
   @Test
@@ -320,7 +354,7 @@ public class TestX509Util extends 
AbstractTestX509Parameterized {
       
x509TestContext.getTrustStoreFile(KeyStoreFileType.PKCS12).getAbsolutePath(),
       x509TestContext.getTrustStorePassword(), null, // null StoreFileType 
means 'autodetect from
                                                      // file extension'
-      true, true);
+      true, true, true, true);
   }
 
   @Test
@@ -329,7 +363,8 @@ public class TestX509Util extends 
AbstractTestX509Parameterized {
       // Attempting to load with the wrong key password should fail
       X509Util.createTrustManager(
         
x509TestContext.getTrustStoreFile(KeyStoreFileType.PKCS12).getAbsolutePath(),
-        "wrong password".toCharArray(), 
KeyStoreFileType.PKCS12.getPropertyValue(), true, true);
+        "wrong password".toCharArray(), 
KeyStoreFileType.PKCS12.getPropertyValue(), true, true,
+        true, true);
     });
   }
 
diff --git 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java
 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java
index 27cb5bde3ac..583e7efcfa9 100644
--- 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java
+++ 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java
@@ -32,8 +32,11 @@ import java.util.Arrays;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.bouncycastle.asn1.x500.X500Name;
 import org.bouncycastle.asn1.x500.X500NameBuilder;
 import org.bouncycastle.asn1.x500.style.BCStyle;
+import org.bouncycastle.asn1.x509.GeneralName;
+import org.bouncycastle.asn1.x509.GeneralNames;
 import org.bouncycastle.jce.provider.BouncyCastleProvider;
 import org.bouncycastle.operator.OperatorCreationException;
 
@@ -56,6 +59,7 @@ public final class X509TestContext {
 
   private final X509Certificate trustStoreCertificate;
   private final char[] trustStorePassword;
+  private final KeyPair trustStoreKeyPair;
   private File trustStoreJksFile;
   private File trustStorePemFile;
   private File trustStorePkcs12File;
@@ -91,6 +95,8 @@ public final class X509TestContext {
     if (!tempDir.isDirectory()) {
       throw new IllegalArgumentException("Not a directory: " + tempDir);
     }
+
+    this.trustStoreKeyPair = trustStoreKeyPair;
     this.trustStorePassword = requireNonNull(trustStorePassword);
     this.keyStoreKeyPair = requireNonNull(keyStoreKeyPair);
     this.keyStorePassword = requireNonNull(keyStorePassword);
@@ -104,8 +110,7 @@ public final class X509TestContext {
     X500NameBuilder nameBuilder = new X500NameBuilder(BCStyle.INSTANCE);
     nameBuilder.addRDN(BCStyle.CN,
       MethodHandles.lookup().lookupClass().getCanonicalName() + " Zookeeper 
Test");
-    keyStoreCertificate = X509TestHelpers.newCert(trustStoreCertificate, 
trustStoreKeyPair,
-      nameBuilder.build(), keyStoreKeyPair.getPublic());
+    keyStoreCertificate = newCert(nameBuilder.build());
     trustStorePkcs12File = null;
     trustStorePemFile = null;
     trustStoreJksFile = null;
@@ -114,6 +119,50 @@ public final class X509TestContext {
     keyStoreJksFile = null;
   }
 
+  /**
+   * Used by {@link #cloneWithNewKeystoreCert(X509Certificate)}. Should set 
all fields except
+   * generated keystore path fields
+   */
+  private X509TestContext(File tempDir, Configuration conf, X509Certificate 
trustStoreCertificate,
+    char[] trustStorePassword, KeyPair trustStoreKeyPair, File 
trustStoreJksFile,
+    File trustStorePemFile, File trustStorePkcs12File, KeyPair keyStoreKeyPair,
+    char[] keyStorePassword, X509Certificate keyStoreCertificate) {
+    this.tempDir = tempDir;
+    this.conf = conf;
+    this.trustStoreCertificate = trustStoreCertificate;
+    this.trustStorePassword = trustStorePassword;
+    this.trustStoreKeyPair = trustStoreKeyPair;
+    this.trustStoreJksFile = trustStoreJksFile;
+    this.trustStorePemFile = trustStorePemFile;
+    this.trustStorePkcs12File = trustStorePkcs12File;
+    this.keyStoreKeyPair = keyStoreKeyPair;
+    this.keyStoreCertificate = keyStoreCertificate;
+    this.keyStorePassword = keyStorePassword;
+    keyStorePkcs12File = null;
+    keyStorePemFile = null;
+    keyStoreJksFile = null;
+  }
+
+  /**
+   * Generates a new certificate using this context's CA and keystoreKeyPair. 
By default, the cert
+   * will have localhost in the subjectAltNames. This can be overridden by 
passing one or more
+   * string arguments after the cert name. The expectation for those arguments 
is that they are
+   * valid DNS names.
+   */
+  public X509Certificate newCert(X500Name name, String... subjectAltNames)
+    throws GeneralSecurityException, IOException, OperatorCreationException {
+    if (subjectAltNames.length == 0) {
+      return X509TestHelpers.newCert(trustStoreCertificate, trustStoreKeyPair, 
name,
+        keyStoreKeyPair.getPublic());
+    }
+    GeneralName[] names = new GeneralName[subjectAltNames.length];
+    for (int i = 0; i < subjectAltNames.length; i++) {
+      names[i] = new GeneralName(GeneralName.dNSName, subjectAltNames[i]);
+    }
+    return X509TestHelpers.newCert(trustStoreCertificate, trustStoreKeyPair, 
name,
+      keyStoreKeyPair.getPublic(), new GeneralNames(names));
+  }
+
   public File getTempDir() {
     return tempDir;
   }
@@ -347,16 +396,31 @@ public final class X509TestContext {
    */
   public void setConfigurations(KeyStoreFileType keyStoreFileType,
     KeyStoreFileType trustStoreFileType) throws IOException {
-    conf.set(X509Util.TLS_CONFIG_KEYSTORE_LOCATION,
-      this.getKeyStoreFile(keyStoreFileType).getAbsolutePath());
-    conf.set(X509Util.TLS_CONFIG_KEYSTORE_PASSWORD, 
String.valueOf(this.getKeyStorePassword()));
-    conf.set(X509Util.TLS_CONFIG_KEYSTORE_TYPE, 
keyStoreFileType.getPropertyValue());
+    setKeystoreConfigurations(keyStoreFileType, conf);
     conf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION,
       this.getTrustStoreFile(trustStoreFileType).getAbsolutePath());
     conf.set(X509Util.TLS_CONFIG_TRUSTSTORE_PASSWORD, 
String.valueOf(this.getTrustStorePassword()));
     conf.set(X509Util.TLS_CONFIG_TRUSTSTORE_TYPE, 
trustStoreFileType.getPropertyValue());
   }
 
+  /**
+   * Sets the KeyStore-related SSL system properties onto the given 
Configuration such that X509Util
+   * can be used to create SSL Contexts using that KeyStore. This can be used 
in special
+   * circumstances to inject a "bad" certificate where the keystore doesn't 
match the CA in the
+   * truststore. Or use it to create a connection without a truststore.
+   * @see #setConfigurations(KeyStoreFileType, KeyStoreFileType) which sets 
both keystore and
+   *      truststore and is more applicable to general use. nnn
+   */
+  public void setKeystoreConfigurations(KeyStoreFileType keyStoreFileType, 
Configuration confToSet)
+    throws IOException {
+
+    confToSet.set(X509Util.TLS_CONFIG_KEYSTORE_LOCATION,
+      this.getKeyStoreFile(keyStoreFileType).getAbsolutePath());
+    confToSet.set(X509Util.TLS_CONFIG_KEYSTORE_PASSWORD,
+      String.valueOf(this.getKeyStorePassword()));
+    confToSet.set(X509Util.TLS_CONFIG_KEYSTORE_TYPE, 
keyStoreFileType.getPropertyValue());
+  }
+
   public void clearConfigurations() {
     conf.unset(X509Util.TLS_CONFIG_KEYSTORE_LOCATION);
     conf.unset(X509Util.TLS_CONFIG_KEYSTORE_PASSWORD);
@@ -366,6 +430,21 @@ public final class X509TestContext {
     conf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_TYPE);
   }
 
+  /**
+   * Creates a clone of the current context, but injecting the passed 
certificate as the KeyStore
+   * cert. The new context's keystore path fields are nulled, so the next call 
to
+   * {@link #setConfigurations(KeyStoreFileType, KeyStoreFileType)},
+   * {@link #setKeystoreConfigurations(KeyStoreFileType, Configuration)} , or
+   * {@link #getKeyStoreFile(KeyStoreFileType)} will create a new keystore 
with this certificate in
+   * place.
+   * @param cert the cert to replace
+   */
+  public X509TestContext cloneWithNewKeystoreCert(X509Certificate cert) {
+    return new X509TestContext(tempDir, conf, trustStoreCertificate, 
trustStorePassword,
+      trustStoreKeyPair, trustStoreJksFile, trustStorePemFile, 
trustStorePkcs12File,
+      keyStoreKeyPair, keyStorePassword, cert);
+  }
+
   /**
    * Builder class, used for creating new instances of X509TestContext.
    */
diff --git 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestHelpers.java
 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestHelpers.java
index 968e616ef56..56d3c8cb859 100644
--- 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestHelpers.java
+++ 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestHelpers.java
@@ -127,6 +127,25 @@ final class X509TestHelpers {
   public static X509Certificate newCert(X509Certificate caCert, KeyPair 
caKeyPair,
     X500Name certSubject, PublicKey certPublicKey)
     throws IOException, OperatorCreationException, GeneralSecurityException {
+    return newCert(caCert, caKeyPair, certSubject, certPublicKey, 
getLocalhostSubjectAltNames());
+  }
+
+  /**
+   * Using the private key of the given CA key pair and the Subject of the 
given CA cert as the
+   * Issuer, issues a new cert with the given subject and public key. The 
returned certificate,
+   * combined with the private key half of the <code>certPublicKey</code>, 
should be used as the key
+   * store.
+   * @param caCert          the certificate of the CA that's doing the signing.
+   * @param caKeyPair       the key pair of the CA. The private key will be 
used to sign. The public
+   *                        key must match the public key in the 
<code>caCert</code>.
+   * @param certSubject     the subject field of the new cert being issued.
+   * @param certPublicKey   the public key of the new cert being issued.
+   * @param subjectAltNames the subject alternative names to use, or null if 
none
+   * @return a new certificate signed by the CA's private key.
+   */
+  public static X509Certificate newCert(X509Certificate caCert, KeyPair 
caKeyPair,
+    X500Name certSubject, PublicKey certPublicKey, GeneralNames 
subjectAltNames)
+    throws IOException, OperatorCreationException, GeneralSecurityException {
     if (!caKeyPair.getPublic().equals(caCert.getPublicKey())) {
       throw new IllegalArgumentException(
         "CA private key does not match the public key in " + "the CA cert");
@@ -140,7 +159,9 @@ final class X509TestHelpers {
     builder.addExtension(Extension.extendedKeyUsage, true, new 
ExtendedKeyUsage(
       new KeyPurposeId[] { KeyPurposeId.id_kp_serverAuth, 
KeyPurposeId.id_kp_clientAuth }));
 
-    builder.addExtension(Extension.subjectAlternativeName, false, 
getLocalhostSubjectAltNames());
+    if (subjectAltNames != null) {
+      builder.addExtension(Extension.subjectAlternativeName, false, 
subjectAltNames);
+    }
     return buildAndSignCertificate(caKeyPair.getPrivate(), builder);
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestMutualTls.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestMutualTls.java
new file mode 100644
index 00000000000..1663d41a67a
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestMutualTls.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThrows;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.net.InetSocketAddress;
+import java.security.GeneralSecurityException;
+import java.security.Security;
+import java.security.cert.X509Certificate;
+import javax.net.ssl.SSLHandshakeException;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.io.crypto.tls.KeyStoreFileType;
+import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType;
+import org.apache.hadoop.hbase.io.crypto.tls.X509TestContext;
+import org.apache.hadoop.hbase.io.crypto.tls.X509TestContextProvider;
+import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
+import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
+import org.apache.hadoop.hbase.ipc.NettyRpcClient;
+import org.apache.hadoop.hbase.ipc.NettyRpcServer;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.RpcServerFactory;
+import org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl;
+import org.bouncycastle.asn1.x500.X500NameBuilder;
+import org.bouncycastle.asn1.x500.style.BCStyle;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.bouncycastle.operator.OperatorCreationException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
+import 
org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos;
+
+public abstract class AbstractTestMutualTls {
+  protected static HBaseCommonTestingUtility UTIL;
+
+  protected static File DIR;
+
+  protected static X509TestContextProvider PROVIDER;
+
+  private X509TestContext x509TestContext;
+
+  protected RpcServer rpcServer;
+
+  protected RpcClient rpcClient;
+  private TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub;
+
+  @Parameterized.Parameter(0)
+  public X509KeyType caKeyType;
+
+  @Parameterized.Parameter(1)
+  public X509KeyType certKeyType;
+
+  @Parameterized.Parameter(2)
+  public String keyPassword;
+  @Parameterized.Parameter(3)
+  public boolean expectSuccess;
+
+  @Parameterized.Parameter(4)
+  public boolean validateHostnames;
+
+  @Parameterized.Parameter(5)
+  public CertConfig certConfig;
+
+  public enum CertConfig {
+    // For no cert, we literally pass no certificate to the server. It's 
possible (assuming server
+    // allows it based on ClientAuth mode) to use SSL without a KeyStore which 
will still do all
+    // the handshaking but without a client cert. This is what we do here.
+    // This mode only makes sense for client side, as server side must return 
a cert.
+    NO_CLIENT_CERT,
+    // For non-verifiable cert, we create a new certificate which is signed by 
a different
+    // CA. So we're passing a cert, but the client/server can't verify it.
+    NON_VERIFIABLE_CERT,
+    // Good cert is the default mode, which uses a cert signed by the same CA 
both sides
+    // and the hostname should match (localhost)
+    GOOD_CERT,
+    // For good cert/bad host, we create a new certificate signed by the same 
CA. But
+    // this cert has a SANS that will not match the localhost peer.
+    VERIFIABLE_CERT_WITH_BAD_HOST
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws IOException {
+    UTIL = new HBaseCommonTestingUtility();
+    Security.addProvider(new BouncyCastleProvider());
+    DIR =
+      new 
File(UTIL.getDataTestDir(AbstractTestTlsRejectPlainText.class.getSimpleName()).toString())
+        .getCanonicalFile();
+    FileUtils.forceMkdir(DIR);
+    Configuration conf = UTIL.getConfiguration();
+    conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, 
NettyRpcClient.class,
+      RpcClient.class);
+    conf.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, 
NettyRpcServer.class,
+      RpcServer.class);
+    conf.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_ENABLED, true);
+    conf.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, false);
+    conf.setBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, true);
+    PROVIDER = new X509TestContextProvider(conf, DIR);
+  }
+
+  @AfterClass
+  public static void cleanUp() {
+    Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
+    UTIL.cleanupTestDir();
+  }
+
+  protected abstract void initialize(Configuration serverConf, Configuration 
clientConf)
+    throws IOException, GeneralSecurityException, OperatorCreationException;
+
+  @Before
+  public void setUp() throws Exception {
+    x509TestContext = PROVIDER.get(caKeyType, certKeyType, 
keyPassword.toCharArray());
+    x509TestContext.setConfigurations(KeyStoreFileType.JKS, 
KeyStoreFileType.JKS);
+
+    Configuration serverConf = new Configuration(UTIL.getConfiguration());
+    Configuration clientConf = new Configuration(UTIL.getConfiguration());
+
+    initialize(serverConf, clientConf);
+
+    rpcServer = new NettyRpcServer(null, "testRpcServer",
+      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, 
null)),
+      new InetSocketAddress("localhost", 0), serverConf, new 
FifoRpcScheduler(serverConf, 1), true);
+    rpcServer.start();
+
+    rpcClient = new NettyRpcClient(clientConf);
+    stub = TestProtobufRpcServiceImpl.newBlockingStub(rpcClient, 
rpcServer.getListenerAddress());
+  }
+
+  protected void handleCertConfig(Configuration confToSet)
+    throws GeneralSecurityException, IOException, OperatorCreationException {
+    switch (certConfig) {
+      case NO_CLIENT_CERT:
+        // clearing out the keystore location will cause no cert to be sent.
+        confToSet.set(X509Util.TLS_CONFIG_KEYSTORE_LOCATION, "");
+        break;
+      case NON_VERIFIABLE_CERT:
+        // to simulate a bad cert, we inject a new keystore into the client 
side.
+        // the same truststore exists, so it will still successfully verify 
the server cert
+        // but since the new client keystore cert is created from a new CA 
(which the server doesn't
+        // have),
+        // the server will not be able to verify it.
+        X509TestContext context =
+          PROVIDER.get(caKeyType, certKeyType, "random value".toCharArray());
+        context.setKeystoreConfigurations(KeyStoreFileType.JKS, confToSet);
+        break;
+      case VERIFIABLE_CERT_WITH_BAD_HOST:
+        // to simulate a good cert with a bad host, we need to create a new 
cert using the existing
+        // context's CA/truststore. Here we can pass any random SANS, as long 
as it won't match
+        // localhost or any reasonable name that this test might run on.
+        X509Certificate cert = x509TestContext.newCert(new 
X500NameBuilder(BCStyle.INSTANCE)
+          .addRDN(BCStyle.CN,
+            MethodHandles.lookup().lookupClass().getCanonicalName() + " With 
Bad Host Test")
+          .build(), "www.example.com");
+        x509TestContext.cloneWithNewKeystoreCert(cert)
+          .setKeystoreConfigurations(KeyStoreFileType.JKS, confToSet);
+        break;
+      default:
+        break;
+    }
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (rpcServer != null) {
+      rpcServer.stop();
+    }
+    Closeables.close(rpcClient, true);
+    x509TestContext.clearConfigurations();
+    x509TestContext.getConf().unset(X509Util.TLS_CONFIG_OCSP);
+    x509TestContext.getConf().unset(X509Util.TLS_CONFIG_CLR);
+    x509TestContext.getConf().unset(X509Util.TLS_CONFIG_PROTOCOL);
+    System.clearProperty("com.sun.net.ssl.checkRevocation");
+    System.clearProperty("com.sun.security.enableCRLDP");
+    Security.setProperty("ocsp.enable", Boolean.FALSE.toString());
+    Security.setProperty("com.sun.security.enableCRLDP", 
Boolean.FALSE.toString());
+  }
+
+  @Test
+  public void testClientAuth() throws Exception {
+    if (expectSuccess) {
+      // we expect no exception, so if one is thrown the test will fail
+      submitRequest();
+    } else {
+      ServiceException se = assertThrows(ServiceException.class, 
this::submitRequest);
+      assertThat(se.getCause(), instanceOf(SSLHandshakeException.class));
+    }
+  }
+
+  private void submitRequest() throws ServiceException {
+    stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage("hello 
world").build());
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestMutualTlsClientSide.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestMutualTlsClientSide.java
new file mode 100644
index 00000000000..355fb58ccf9
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestMutualTlsClientSide.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType;
+import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.bouncycastle.operator.OperatorCreationException;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Comprehensively tests all permutations of certificate and host verification 
on the client side.
+ * Tests each permutation of that against each value of {@link CertConfig}, 
i.e. passing a bad cert,
+ * etc. See inline comments in {@link #data()} below for what the expectations 
are
+ */
+@RunWith(Parameterized.class)
+@Category({ RPCTests.class, MediumTests.class })
+public class TestMutualTlsClientSide extends AbstractTestMutualTls {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestMutualTlsClientSide.class);
+
+  @Parameterized.Parameters(name = "{index}: caKeyType={0}, certKeyType={1}, 
keyPassword={2}, "
+    + "validateServerHostnames={3}, testCase={4}")
+  public static List<Object[]> data() {
+    List<Object[]> params = new ArrayList<>();
+    for (X509KeyType caKeyType : X509KeyType.values()) {
+      for (X509KeyType certKeyType : X509KeyType.values()) {
+        for (String keyPassword : new String[] { "", "pa$$w0rd" }) {
+          // we want to run with and without validating hostnames. we encode 
the expected success
+          // criteria in the TestCase config. See below.
+          for (boolean validateServerHostnames : new Boolean[] { true, false 
}) {
+            // fail for non-verifiable certs or certs with bad hostnames when 
validateServerHostname
+            // is true. otherwise succeed.
+            params.add(new Object[] { caKeyType, certKeyType, keyPassword, 
false,
+              validateServerHostnames, CertConfig.NON_VERIFIABLE_CERT });
+            params.add(new Object[] { caKeyType, certKeyType, keyPassword, 
!validateServerHostnames,
+              validateServerHostnames, 
CertConfig.VERIFIABLE_CERT_WITH_BAD_HOST });
+            params.add(new Object[] { caKeyType, certKeyType, keyPassword, 
true,
+              validateServerHostnames, CertConfig.GOOD_CERT });
+          }
+        }
+      }
+    }
+    return params;
+  }
+
+  @Override
+  protected void initialize(Configuration serverConf, Configuration clientConf)
+    throws IOException, GeneralSecurityException, OperatorCreationException {
+    // client verifies server hostname, and injects bad certs into server conf
+    
clientConf.setBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_VERIFY_SERVER_HOSTNAME,
+      validateHostnames);
+    handleCertConfig(serverConf);
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestMutualTlsServerSide.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestMutualTlsServerSide.java
new file mode 100644
index 00000000000..b15299f485d
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestMutualTlsServerSide.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType;
+import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.bouncycastle.operator.OperatorCreationException;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Comprehensively tests all permutations of ClientAuth modes and host 
verification
+ * enabled/disabled. Tests each permutation of that against each relevant 
value of
+ * {@link CertConfig}, i.e. passing no cert, a bad cert, etc. See inline 
comments in {@link #data()}
+ * below for what the expectations are
+ */
+@RunWith(Parameterized.class)
+@Category({ RPCTests.class, MediumTests.class })
+public class TestMutualTlsServerSide extends AbstractTestMutualTls {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestMutualTlsServerSide.class);
+  @Parameterized.Parameter(6)
+  public X509Util.ClientAuth clientAuthMode;
+
+  @Parameterized.Parameters(name = "{index}: caKeyType={0}, certKeyType={1}, 
keyPassword={2}, "
+    + "validateClientHostnames={3}, testCase={4}, clientAuthMode={5}")
+  public static List<Object[]> data() {
+    List<Object[]> params = new ArrayList<>();
+    for (X509KeyType caKeyType : X509KeyType.values()) {
+      for (X509KeyType certKeyType : X509KeyType.values()) {
+        for (String keyPassword : new String[] { "", "pa$$w0rd" }) {
+          // we want to run with and without validating hostnames. we encode 
the expected success
+          // criteria
+          // in the TestCase config. See below.
+          for (boolean validateClientHostnames : new Boolean[] { true, false 
}) {
+            // ClientAuth.NONE should succeed in all cases, because it never 
requests the
+            // certificate for verification
+            params.add(new Object[] { caKeyType, certKeyType, keyPassword, 
true,
+              validateClientHostnames, CertConfig.NO_CLIENT_CERT, 
X509Util.ClientAuth.NONE });
+            params.add(new Object[] { caKeyType, certKeyType, keyPassword, 
true,
+              validateClientHostnames, CertConfig.NON_VERIFIABLE_CERT, 
X509Util.ClientAuth.NONE });
+            params.add(
+              new Object[] { caKeyType, certKeyType, keyPassword, true, 
validateClientHostnames,
+                CertConfig.VERIFIABLE_CERT_WITH_BAD_HOST, 
X509Util.ClientAuth.NONE });
+
+            // ClientAuth.WANT should succeed if no cert, but if the cert is 
provided it is
+            // validated. So should fail on bad cert or good cert with bad 
host when host
+            // verification is enabled
+            params.add(new Object[] { caKeyType, certKeyType, keyPassword, 
true,
+              validateClientHostnames, CertConfig.NO_CLIENT_CERT, 
X509Util.ClientAuth.WANT });
+            params.add(new Object[] { caKeyType, certKeyType, keyPassword, 
false,
+              validateClientHostnames, CertConfig.NON_VERIFIABLE_CERT, 
X509Util.ClientAuth.WANT });
+            params.add(new Object[] { caKeyType, certKeyType, keyPassword, 
!validateClientHostnames,
+              validateClientHostnames, 
CertConfig.VERIFIABLE_CERT_WITH_BAD_HOST,
+              X509Util.ClientAuth.WANT });
+
+            // ClientAuth.NEED is most restrictive, failing in all cases 
except "good cert/bad host"
+            // when host verification is disabled
+            params.add(new Object[] { caKeyType, certKeyType, keyPassword, 
false,
+              validateClientHostnames, CertConfig.NO_CLIENT_CERT, 
X509Util.ClientAuth.NEED });
+            params.add(new Object[] { caKeyType, certKeyType, keyPassword, 
false,
+              validateClientHostnames, CertConfig.NON_VERIFIABLE_CERT, 
X509Util.ClientAuth.NEED });
+            params.add(new Object[] { caKeyType, certKeyType, keyPassword, 
!validateClientHostnames,
+              validateClientHostnames, 
CertConfig.VERIFIABLE_CERT_WITH_BAD_HOST,
+              X509Util.ClientAuth.NEED });
+
+            // additionally ensure that all modes succeed when a good cert is 
presented
+            for (X509Util.ClientAuth mode : X509Util.ClientAuth.values()) {
+              params.add(new Object[] { caKeyType, certKeyType, keyPassword, 
true,
+                validateClientHostnames, CertConfig.GOOD_CERT, mode });
+            }
+          }
+        }
+      }
+    }
+    return params;
+  }
+
+  @Override
+  protected void initialize(Configuration serverConf, Configuration clientConf)
+    throws IOException, GeneralSecurityException, OperatorCreationException {
+    // server enables client auth mode and verifies client host names
+    // inject bad certs into client side
+    serverConf.set(X509Util.HBASE_SERVER_NETTY_TLS_CLIENT_AUTH_MODE, 
clientAuthMode.name());
+    
serverConf.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_VERIFY_CLIENT_HOSTNAME,
+      validateHostnames);
+    handleCertConfig(clientConf);
+  }
+}


Reply via email to