Repository: kafka
Updated Branches:
  refs/heads/0.11.0 8f9ee0ee4 -> 1d943a765


KAFKA-5051; Avoid reverse DNS lookup to obtain hostname for TLS

Author: Rajini Sivaram <rajinisiva...@googlemail.com>

Reviewers: Jun Rao <jun...@gmail.com>, Ismael Juma <ism...@juma.me.uk>

Closes #2835 from rajinisivaram/KAFKA-5051

(cherry picked from commit 17b2bde4be6296dfd47defce85443cbf6e23835b)
Signed-off-by: Rajini Sivaram <rajinisiva...@googlemail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d943a76
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d943a76
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d943a76

Branch: refs/heads/0.11.0
Commit: 1d943a765e6036033b67ff3d65ef7e6ffc6e731a
Parents: 8f9ee0e
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Tue Jun 6 12:34:39 2017 +0100
Committer: Rajini Sivaram <rajinisiva...@googlemail.com>
Committed: Tue Jun 6 12:35:05 2017 +0100

----------------------------------------------------------------------
 .../kafka/common/network/SslChannelBuilder.java |  46 ++++++-
 .../kafka/common/security/ssl/SslFactory.java   |   2 +
 .../apache/kafka/common/network/CertStores.java |  17 ++-
 .../kafka/common/network/NetworkTestUtils.java  |   2 +-
 .../kafka/common/network/NioEchoServer.java     |   6 +-
 .../kafka/common/network/SslSelectorTest.java   |   4 +-
 .../common/network/SslTransportLayerTest.java   | 130 +++++++++++++++++--
 .../org/apache/kafka/test/TestSslUtils.java     |  92 +++++++++----
 8 files changed, 252 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1d943a76/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java 
b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index 8e55d48..bc34b70 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.network;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.util.Map;
@@ -51,7 +52,7 @@ public class SslChannelBuilder implements ChannelBuilder {
 
     public KafkaChannel buildChannel(String id, SelectionKey key, int 
maxReceiveSize) throws KafkaException {
         try {
-            SslTransportLayer transportLayer = buildTransportLayer(sslFactory, 
id, key);
+            SslTransportLayer transportLayer = buildTransportLayer(sslFactory, 
id, key, peerHost(key));
             Authenticator authenticator = new DefaultAuthenticator();
             authenticator.configure(transportLayer, this.principalBuilder, 
this.configs);
             return new KafkaChannel(id, transportLayer, authenticator, 
maxReceiveSize);
@@ -65,9 +66,48 @@ public class SslChannelBuilder implements ChannelBuilder {
         this.principalBuilder.close();
     }
 
-    protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, 
String id, SelectionKey key) throws IOException {
+    protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, 
String id, SelectionKey key, String host) throws IOException {
         SocketChannel socketChannel = (SocketChannel) key.channel();
         return SslTransportLayer.create(id, key,
-            
sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(),
 socketChannel.socket().getPort()));
+            sslFactory.createSslEngine(host, 
socketChannel.socket().getPort()));
+    }
+
+    /**
+     * Returns host/IP address of remote host without reverse DNS lookup to be 
used as the host
+     * for creating SSL engine. This is used as a hint for session reuse 
strategy and also for
+     * hostname verification of server hostnames.
+     * <p>
+     * Scenarios:
+     * <ul>
+     *   <li>Server-side
+     *   <ul>
+     *     <li>Server accepts connection from a client. Server knows only 
client IP
+     *     address. We want to avoid reverse DNS lookup of the client IP 
address since the server
+     *     does not verify or use client hostname. The IP address can be used 
directly.</li>
+     *   </ul>
+     *   </li>
+     *   <li>Client-side
+     *   <ul>
+     *     <li>Client connects to server using hostname. No lookup is necessary
+     *     and the hostname should be used to create the SSL engine. This 
hostname is validated
+     *     against the hostname in SubjectAltName (dns) or CommonName in the 
certificate if
+     *     hostname verification is enabled. Authentication fails if hostname 
does not match.</li>
+     *     <li>Client connects to server using IP address, but certificate 
contains only
+     *     SubjectAltName (dns). Use of reverse DNS lookup to determine 
hostname introduces
+     *     a security vulnerability since authentication would be reliant on a 
secure DNS.
+     *     Hence hostname verification should fail in this case.</li>
+     *     <li>Client connects to server using IP address and certificate 
contains
+     *     SubjectAltName (ipaddress). This could be used when Kafka is on a 
private network.
+     *     If reverse DNS lookup is used, authentication would succeed using 
IP address if lookup
+     *     fails and IP address is used, but authentication would fail if 
lookup succeeds and
+     *     dns name is used. For consistency and to avoid dependency on a 
potentially insecure
+     *     DNS, reverse DNS lookup should be avoided and the IP address 
specified by the client for
+     *     connection should be used to create the SSL engine.</li>
+     *   </ul></li>
+     * </ul>
+     */
+    private String peerHost(SelectionKey key) {
+        SocketChannel socketChannel = (SocketChannel) key.channel();
+        return new InetSocketAddress(socketChannel.socket().getInetAddress(), 
0).getHostString();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d943a76/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java 
b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index 77f75f6..6572488 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -154,6 +154,8 @@ public class SslFactory implements Configurable {
         if (cipherSuites != null) 
sslEngine.setEnabledCipherSuites(cipherSuites);
         if (enabledProtocols != null) 
sslEngine.setEnabledProtocols(enabledProtocols);
 
+        // SSLParameters#setEndpointIdentificationAlgorithm enables endpoint 
validation
+        // only in client mode. Hence, validation is enabled only for clients.
         if (mode == Mode.SERVER) {
             sslEngine.setUseClientMode(false);
             if (needClientAuth)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d943a76/clients/src/test/java/org/apache/kafka/common/network/CertStores.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/CertStores.java 
b/clients/src/test/java/org/apache/kafka/common/network/CertStores.java
index 9cd981f..b6cc1d4 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/CertStores.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/CertStores.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.network;
 
 import java.io.File;
+import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -27,11 +28,23 @@ public class CertStores {
 
     private final Map<String, Object> sslConfig;
 
-    public CertStores(boolean server, String host) throws Exception {
+    public CertStores(boolean server, String hostName) throws Exception {
+        this(server, hostName, new TestSslUtils.CertificateBuilder());
+    }
+
+    public CertStores(boolean server, String commonName, String sanHostName) 
throws Exception {
+        this(server, commonName, new 
TestSslUtils.CertificateBuilder().sanDnsName(sanHostName));
+    }
+
+    public CertStores(boolean server, String commonName, InetAddress 
hostAddress) throws Exception {
+        this(server, commonName, new 
TestSslUtils.CertificateBuilder().sanIpAddress(hostAddress));
+    }
+
+    private CertStores(boolean server, String commonName, 
TestSslUtils.CertificateBuilder certBuilder) throws Exception {
         String name = server ? "server" : "client";
         Mode mode = server ? Mode.SERVER : Mode.CLIENT;
         File truststoreFile = File.createTempFile(name + "TS", ".jks");
-        sslConfig = TestSslUtils.createSslConfig(!server, true, mode, 
truststoreFile, name, host);
+        sslConfig = TestSslUtils.createSslConfig(!server, true, mode, 
truststoreFile, name, commonName, certBuilder);
         if (server)
             sslConfig.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, 
Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d943a76/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java 
b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
index 43c7d9b..68089e6 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
@@ -36,7 +36,7 @@ public class NetworkTestUtils {
 
     public static NioEchoServer createEchoServer(ListenerName listenerName, 
SecurityProtocol securityProtocol,
                                                  AbstractConfig serverConfig) 
throws Exception {
-        NioEchoServer server = new NioEchoServer(listenerName, 
securityProtocol, serverConfig, "localhost");
+        NioEchoServer server = new NioEchoServer(listenerName, 
securityProtocol, serverConfig, "localhost", null);
         server.start();
         return server;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d943a76/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java 
b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index f44131a..85c5002 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -51,7 +51,8 @@ public class NioEchoServer extends Thread {
     private volatile WritableByteChannel outputChannel;
     private final CredentialCache credentialCache;
 
-    public NioEchoServer(ListenerName listenerName, SecurityProtocol 
securityProtocol, AbstractConfig config, String serverHost) throws Exception {
+    public NioEchoServer(ListenerName listenerName, SecurityProtocol 
securityProtocol, AbstractConfig config,
+            String serverHost, ChannelBuilder channelBuilder) throws Exception 
{
         super("echoserver");
         setDaemon(true);
         serverSocketChannel = ServerSocketChannel.open();
@@ -63,7 +64,8 @@ public class NioEchoServer extends Thread {
         this.credentialCache = new CredentialCache();
         if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || 
securityProtocol == SecurityProtocol.SASL_SSL)
             ScramCredentialUtils.createCache(credentialCache, 
ScramMechanism.mechanismNames());
-        ChannelBuilder channelBuilder = 
ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, 
credentialCache);
+        if (channelBuilder == null)
+            channelBuilder = 
ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, 
credentialCache);
         this.selector = new Selector(5000, new Metrics(), new MockTime(), 
"MetricGroup", channelBuilder);
         acceptorThread = new AcceptorThread();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d943a76/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java 
b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index 80f266f..e272855 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -77,10 +77,10 @@ public class SslSelectorTest extends SelectorTest {
     public void testRenegotiation() throws Exception {
         ChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT) {
             @Override
-            protected SslTransportLayer buildTransportLayer(SslFactory 
sslFactory, String id, SelectionKey key) throws IOException {
+            protected SslTransportLayer buildTransportLayer(SslFactory 
sslFactory, String id, SelectionKey key, String host) throws IOException {
                 SocketChannel socketChannel = (SocketChannel) key.channel();
                 SslTransportLayer transportLayer = new SslTransportLayer(id, 
key,
-                    
sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(),
 socketChannel.socket().getPort()),
+                    sslFactory.createSslEngine(host, 
socketChannel.socket().getPort()),
                     true);
                 transportLayer.startHandshake();
                 return transportLayer;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d943a76/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 42e0f6f..bb5d2a7 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -34,6 +34,7 @@ import java.util.Map;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLParameters;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SslConfigs;
@@ -68,8 +69,8 @@ public class SslTransportLayerTest {
     @Before
     public void setup() throws Exception {
         // Create certificates for use by client and server. Add server cert 
to client truststore and vice versa.
-        serverCertStores = new CertStores(true, "localhost");
-        clientCertStores = new CertStores(false, "localhost");
+        serverCertStores = new CertStores(true, "server",  "localhost");
+        clientCertStores = new CertStores(false, "client", "localhost");
         sslServerConfigs = 
serverCertStores.getTrustingConfig(clientCertStores);
         sslClientConfigs = 
clientCertStores.getTrustingConfig(serverCertStores);
         this.channelBuilder = new SslChannelBuilder(Mode.CLIENT);
@@ -86,15 +87,123 @@ public class SslTransportLayerTest {
     }
 
     /**
-     * Tests that server certificate with valid IP address is accepted by
-     * a client that validates server endpoint.
+     * Tests that server certificate with SubjectAltName containing the valid 
hostname
+     * is accepted by a client that connects using the hostname and validates 
server endpoint.
+     */
+    @Test
+    public void testValidEndpointIdentificationSanDns() throws Exception {
+        String node = "0";
+        server = createEchoServer(SecurityProtocol.SSL);
+        
sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, 
"HTTPS");
+        createSelector(sslClientConfigs);
+        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
+    }
+
+    /**
+     * Tests that server certificate with SubjectAltName containing valid IP 
address
+     * is accepted by a client that connects using IP address and validates 
server endpoint.
+     */
+    @Test
+    public void testValidEndpointIdentificationSanIp() throws Exception {
+        String node = "0";
+        serverCertStores = new CertStores(true, "server", 
InetAddress.getByName("127.0.0.1"));
+        clientCertStores = new CertStores(false, "client", 
InetAddress.getByName("127.0.0.1"));
+        sslServerConfigs = 
serverCertStores.getTrustingConfig(clientCertStores);
+        sslClientConfigs = 
clientCertStores.getTrustingConfig(serverCertStores);
+        server = createEchoServer(SecurityProtocol.SSL);
+        
sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, 
"HTTPS");
+        createSelector(sslClientConfigs);
+        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 
server.port());
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
+    }
+
+    /**
+     * Tests that server certificate with CN containing valid hostname
+     * is accepted by a client that connects using hostname and validates 
server endpoint.
+     */
+    @Test
+    public void testValidEndpointIdentificationCN() throws Exception {
+        String node = "0";
+        serverCertStores = new CertStores(true, "localhost");
+        clientCertStores = new CertStores(false, "localhost");
+        sslServerConfigs = 
serverCertStores.getTrustingConfig(clientCertStores);
+        sslClientConfigs = 
clientCertStores.getTrustingConfig(serverCertStores);
+        server = createEchoServer(SecurityProtocol.SSL);
+        
sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, 
"HTTPS");
+        createSelector(sslClientConfigs);
+        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
+    }
+
+    /**
+     * Tests that hostname verification is performed on the host name or 
address
+     * specified by the client without using reverse DNS lookup. Certificate is
+     * created with hostname, client connection uses IP address. Endpoint 
validation
+     * must fail.
      */
     @Test
-    public void testValidEndpointIdentification() throws Exception {
+    public void testEndpointIdentificationNoReverseLookup() throws Exception {
         String node = "0";
         server = createEchoServer(SecurityProtocol.SSL);
         
sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, 
"HTTPS");
         createSelector(sslClientConfigs);
+        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 
server.port());
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        NetworkTestUtils.waitForChannelClose(selector, node, 
ChannelState.AUTHENTICATE);
+    }
+
+    /**
+     * According to RFC 2818:
+     * <blockquote>Typically, the server has no external knowledge of what the 
client's
+     * identity ought to be and so checks (other than that the client has a
+     * certificate chain rooted in an appropriate CA) are not possible. If a
+     * server has such knowledge (typically from some source external to
+     * HTTP or TLS) it SHOULD check the identity as described 
above.</blockquote>
+     *
+     * However, Java SSL engine does not perform any endpoint validation for 
client IP address.
+     * Hence it is safe to avoid reverse DNS lookup while creating the SSL 
engine. This test checks
+     * that client validation does not fail even if the client certificate has 
an invalid hostname.
+     * This test is to ensure that if client endpoint validation is added to 
Java in future, we can detect
+     * and update Kafka SSL code to enable validation on the server-side and 
provide hostname if required.
+     */
+    @Test
+    public void testClientEndpointNotValidated() throws Exception {
+        String node = "0";
+
+        // Create client certificate with an invalid hostname
+        clientCertStores = new CertStores(false, "non-existent.com");
+        serverCertStores = new CertStores(true, "localhost");
+        sslServerConfigs = 
serverCertStores.getTrustingConfig(clientCertStores);
+        sslClientConfigs = 
clientCertStores.getTrustingConfig(serverCertStores);
+
+        // Create a server with endpoint validation enabled on the server SSL 
engine
+        SslChannelBuilder serverChannelBuilder = new 
SslChannelBuilder(Mode.SERVER) {
+            @Override
+            protected SslTransportLayer buildTransportLayer(SslFactory 
sslFactory, String id, SelectionKey key, String host) throws IOException {
+                SocketChannel socketChannel = (SocketChannel) key.channel();
+                SSLEngine sslEngine = sslFactory.createSslEngine(host, 
socketChannel.socket().getPort());
+                SSLParameters sslParams = sslEngine.getSSLParameters();
+                sslParams.setEndpointIdentificationAlgorithm("HTTPS");
+                sslEngine.setSSLParameters(sslParams);
+                TestSslTransportLayer transportLayer = new 
TestSslTransportLayer(id, key, sslEngine, BUFFER_SIZE, BUFFER_SIZE, 
BUFFER_SIZE);
+                transportLayer.startHandshake();
+                return transportLayer;
+            }
+        };
+        serverChannelBuilder.configure(sslServerConfigs);
+        server = new 
NioEchoServer(ListenerName.forSecurityProtocol(SecurityProtocol.SSL), 
SecurityProtocol.SSL,
+                new TestSecurityConfig(sslServerConfigs), "localhost", 
serverChannelBuilder);
+        server.start();
+
+        createSelector(sslClientConfigs);
         InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
@@ -109,8 +218,8 @@ public class SslTransportLayerTest {
     @Test
     public void testInvalidEndpointIdentification() throws Exception {
         String node = "0";
-        serverCertStores = new CertStores(true, "notahost");
-        clientCertStores = new CertStores(false, "localhost");
+        serverCertStores = new CertStores(true, "server", "notahost");
+        clientCertStores = new CertStores(false, "client", "localhost");
         sslServerConfigs = 
serverCertStores.getTrustingConfig(clientCertStores);
         sslClientConfigs = 
clientCertStores.getTrustingConfig(serverCertStores);
         
sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, 
"HTTPS");
@@ -132,7 +241,7 @@ public class SslTransportLayerTest {
         String serverHost = InetAddress.getLocalHost().getHostAddress();
         SecurityProtocol securityProtocol = SecurityProtocol.SSL;
         server = new 
NioEchoServer(ListenerName.forSecurityProtocol(securityProtocol), 
securityProtocol,
-                new TestSecurityConfig(sslServerConfigs), serverHost);
+                new TestSecurityConfig(sslServerConfigs), serverHost, null);
         server.start();
         
sslClientConfigs.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
         createSelector(sslClientConfigs);
@@ -551,10 +660,9 @@ public class SslTransportLayerTest {
         this.channelBuilder = new SslChannelBuilder(Mode.CLIENT) {
 
             @Override
-            protected SslTransportLayer buildTransportLayer(SslFactory 
sslFactory, String id, SelectionKey key) throws IOException {
+            protected SslTransportLayer buildTransportLayer(SslFactory 
sslFactory, String id, SelectionKey key, String host) throws IOException {
                 SocketChannel socketChannel = (SocketChannel) key.channel();
-                SSLEngine sslEngine = 
sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(),
-                                socketChannel.socket().getPort());
+                SSLEngine sslEngine = sslFactory.createSslEngine(host, 
socketChannel.socket().getPort());
                 TestSslTransportLayer transportLayer = new 
TestSslTransportLayer(id, key, sslEngine, netReadBufSize, netWriteBufSize, 
appBufSize);
                 transportLayer.startHandshake();
                 return transportLayer;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d943a76/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java 
b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
index f4f8818..6c057d0 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
@@ -25,6 +25,7 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.EOFException;
 import java.math.BigInteger;
+import java.net.InetAddress;
 
 import javax.net.ssl.TrustManagerFactory;
 
@@ -41,11 +42,15 @@ import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
 
 import org.apache.kafka.common.config.types.Password;
+import org.bouncycastle.asn1.DEROctetString;
 import org.bouncycastle.asn1.x500.X500Name;
 import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
+import org.bouncycastle.asn1.x509.Extension;
+import org.bouncycastle.asn1.x509.GeneralName;
+import org.bouncycastle.asn1.x509.GeneralNames;
 import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
 import org.bouncycastle.cert.X509CertificateHolder;
-import org.bouncycastle.cert.X509v1CertificateBuilder;
+import org.bouncycastle.cert.X509v3CertificateBuilder;
 import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
 import org.bouncycastle.crypto.params.AsymmetricKeyParameter;
 import org.bouncycastle.crypto.util.PrivateKeyFactory;
@@ -77,27 +82,7 @@ public class TestSslUtils {
     public static X509Certificate generateCertificate(String dn, KeyPair pair,
                                                       int days, String 
algorithm)
         throws  CertificateException {
-
-        try {
-            Security.addProvider(new BouncyCastleProvider());
-            AlgorithmIdentifier sigAlgId = new 
DefaultSignatureAlgorithmIdentifierFinder().find(algorithm);
-            AlgorithmIdentifier digAlgId = new 
DefaultDigestAlgorithmIdentifierFinder().find(sigAlgId);
-            AsymmetricKeyParameter privateKeyAsymKeyParam = 
PrivateKeyFactory.createKey(pair.getPrivate().getEncoded());
-            SubjectPublicKeyInfo subPubKeyInfo = 
SubjectPublicKeyInfo.getInstance(pair.getPublic().getEncoded());
-            ContentSigner sigGen = new BcRSAContentSignerBuilder(sigAlgId, 
digAlgId).build(privateKeyAsymKeyParam);
-            X500Name name = new X500Name(dn);
-            Date from = new Date();
-            Date to = new Date(from.getTime() + days * 86400000L);
-            BigInteger sn = new BigInteger(64, new SecureRandom());
-
-            X509v1CertificateBuilder v1CertGen = new 
X509v1CertificateBuilder(name, sn, from, to, name, subPubKeyInfo);
-            X509CertificateHolder certificateHolder = v1CertGen.build(sigGen);
-            return new 
JcaX509CertificateConverter().setProvider("BC").getCertificate(certificateHolder);
-        } catch (CertificateException ce) {
-            throw ce;
-        } catch (Exception e) {
-            throw new CertificateException(e);
-        }
+        return new CertificateBuilder(days, algorithm).generate(dn, pair);
     }
 
     public static KeyPair generateKeyPair(String algorithm) throws 
NoSuchAlgorithmException {
@@ -193,8 +178,15 @@ public class TestSslUtils {
         return createSslConfig(useClientCert, trustStore, mode, 
trustStoreFile, certAlias, "localhost");
     }
 
-    public static  Map<String, Object> createSslConfig(boolean useClientCert, 
boolean trustStore, Mode mode, File trustStoreFile, String certAlias, String 
host)
+    public static  Map<String, Object> createSslConfig(boolean useClientCert, 
boolean trustStore,
+            Mode mode, File trustStoreFile, String certAlias, String hostName)
         throws IOException, GeneralSecurityException {
+        return createSslConfig(useClientCert, trustStore, mode, 
trustStoreFile, certAlias, hostName, new CertificateBuilder());
+    }
+
+    public static  Map<String, Object> createSslConfig(boolean useClientCert, 
boolean trustStore,
+            Mode mode, File trustStoreFile, String certAlias, String cn, 
CertificateBuilder certBuilder)
+            throws IOException, GeneralSecurityException {
         Map<String, X509Certificate> certs = new HashMap<>();
         File keyStoreFile = null;
         Password password = mode == Mode.SERVER ? new 
Password("ServerPassword") : new Password("ClientPassword");
@@ -204,15 +196,14 @@ public class TestSslUtils {
         if (mode == Mode.CLIENT && useClientCert) {
             keyStoreFile = File.createTempFile("clientKS", ".jks");
             KeyPair cKP = generateKeyPair("RSA");
-            X509Certificate cCert = generateCertificate("CN=" + host + ", O=A 
client", cKP, 30, "SHA1withRSA");
+            X509Certificate cCert = certBuilder.generate("CN=" + cn + ", O=A 
client", cKP);
             createKeyStore(keyStoreFile.getPath(), password, "client", 
cKP.getPrivate(), cCert);
             certs.put(certAlias, cCert);
             keyStoreFile.deleteOnExit();
         } else if (mode == Mode.SERVER) {
             keyStoreFile = File.createTempFile("serverKS", ".jks");
             KeyPair sKP = generateKeyPair("RSA");
-            X509Certificate sCert = generateCertificate("CN=" + host + ", O=A 
server", sKP, 30,
-                                                        "SHA1withRSA");
+            X509Certificate sCert = certBuilder.generate("CN=" + cn + ", O=A 
server", sKP);
             createKeyStore(keyStoreFile.getPath(), password, password, 
"server", sKP.getPrivate(), sCert);
             certs.put(certAlias, sCert);
             keyStoreFile.deleteOnExit();
@@ -226,4 +217,53 @@ public class TestSslUtils {
         return createSslConfig(mode, keyStoreFile, password, password, 
trustStoreFile, trustStorePassword);
     }
 
+    public static class CertificateBuilder {
+        private final int days;
+        private final String algorithm;
+        private byte[] subjectAltName;
+
+        public CertificateBuilder() {
+            this(30, "SHA1withRSA");
+        }
+
+        public CertificateBuilder(int days, String algorithm) {
+            this.days = days;
+            this.algorithm = algorithm;
+        }
+
+        public CertificateBuilder sanDnsName(String hostName) throws 
IOException {
+            subjectAltName = new GeneralNames(new 
GeneralName(GeneralName.dNSName, hostName)).getEncoded();
+            return this;
+        }
+
+        public CertificateBuilder sanIpAddress(InetAddress hostAddress) throws 
IOException {
+            subjectAltName = new GeneralNames(new 
GeneralName(GeneralName.iPAddress, new 
DEROctetString(hostAddress.getAddress()))).getEncoded();
+            return this;
+        }
+
+        public X509Certificate generate(String dn, KeyPair keyPair) throws 
CertificateException {
+            try {
+                Security.addProvider(new BouncyCastleProvider());
+                AlgorithmIdentifier sigAlgId = new 
DefaultSignatureAlgorithmIdentifierFinder().find(algorithm);
+                AlgorithmIdentifier digAlgId = new 
DefaultDigestAlgorithmIdentifierFinder().find(sigAlgId);
+                AsymmetricKeyParameter privateKeyAsymKeyParam = 
PrivateKeyFactory.createKey(keyPair.getPrivate().getEncoded());
+                SubjectPublicKeyInfo subPubKeyInfo = 
SubjectPublicKeyInfo.getInstance(keyPair.getPublic().getEncoded());
+                ContentSigner sigGen = new BcRSAContentSignerBuilder(sigAlgId, 
digAlgId).build(privateKeyAsymKeyParam);
+                X500Name name = new X500Name(dn);
+                Date from = new Date();
+                Date to = new Date(from.getTime() + days * 86400000L);
+                BigInteger sn = new BigInteger(64, new SecureRandom());
+                X509v3CertificateBuilder v3CertGen = new 
X509v3CertificateBuilder(name, sn, from, to, name, subPubKeyInfo);
+
+                if (subjectAltName != null)
+                    v3CertGen.addExtension(Extension.subjectAlternativeName, 
false, subjectAltName);
+                X509CertificateHolder certificateHolder = 
v3CertGen.build(sigGen);
+                return new 
JcaX509CertificateConverter().setProvider("BC").getCertificate(certificateHolder);
+            } catch (CertificateException ce) {
+                throw ce;
+            } catch (Exception e) {
+                throw new CertificateException(e);
+            }
+        }
+    }
 }

Reply via email to