Repository: flink Updated Branches: refs/heads/master 78f22aaec -> e0614f655
[FLINK-5981] [security] Make SSL pick up configured protocols and cipher suites This closes #3486 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0614f65 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0614f65 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0614f65 Branch: refs/heads/master Commit: e0614f6551a232706b74963563694486fe2461b1 Parents: 78f22aa Author: WangTaoTheTonic <[email protected]> Authored: Tue Mar 7 20:05:21 2017 +0800 Committer: Stephan Ewen <[email protected]> Committed: Fri Mar 17 12:02:18 2017 +0100 ---------------------------------------------------------------------- docs/setup/config.md | 2 +- .../flink/mesos/util/MesosArtifactServer.java | 2 + .../runtime/webmonitor/WebRuntimeMonitor.java | 2 + .../apache/flink/runtime/blob/BlobServer.java | 1 + .../runtime/io/network/netty/NettyConfig.java | 5 + .../runtime/io/network/netty/NettyServer.java | 1 + .../org/apache/flink/runtime/net/SSLUtils.java | 39 ++++++++ .../apache/flink/runtime/net/SSLUtilsTest.java | 100 +++++++++++++++++++ .../flink/runtime/akka/AkkaSslITCase.scala | 27 +++++ 9 files changed, 178 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/docs/setup/config.md ---------------------------------------------------------------------- diff --git a/docs/setup/config.md b/docs/setup/config.md index c835882..d9d5f15 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -327,7 +327,7 @@ The following parameters configure Flink's JobManager and TaskManagers. - `security.ssl.truststore-password`: The secret to decrypt the truststore. -- `security.ssl.protocol`: The SSL protocol version to be supported for the ssl transport (DEFAULT: **TLSv1.2**). +- `security.ssl.protocol`: The SSL protocol version to be supported for the ssl transport (DEFAULT: **TLSv1.2**). Note that it doesn't support comma separated list. - `security.ssl.algorithms`: The comma separated list of standard SSL algorithms to be supported. Read more [here](http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites) (DEFAULT: **TLS_RSA_WITH_AES_128_CBC_SHA**). http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java index 37cb260..ae826db 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java @@ -130,6 +130,7 @@ public class MesosArtifactServer implements MesosArtifactResolver { router = new Router(); + final Configuration sslConfig = config; ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() { @Override @@ -139,6 +140,7 @@ public class MesosArtifactServer implements MesosArtifactResolver { // SSL should be the first handler in the pipeline if (serverSSLContext != null) { SSLEngine sslEngine = serverSSLContext.createSSLEngine(); + SSLUtils.setSSLVerAndCipherSuites(sslEngine, sslConfig); sslEngine.setUseClientMode(false); ch.pipeline().addLast("ssl", new SslHandler(sslEngine)); } http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index e604ce8..d88fdcf 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -380,6 +380,7 @@ public class WebRuntimeMonitor implements WebMonitor { LOG.warn("Error while adding shutdown hook", t); } + final Configuration sslConfig = config; ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() { @Override @@ -389,6 +390,7 @@ public class WebRuntimeMonitor implements WebMonitor { // SSL should be the first handler in the pipeline if (serverSSLContext != null) { SSLEngine sslEngine = serverSSLContext.createSSLEngine(); + SSLUtils.setSSLVerAndCipherSuites(sslEngine, sslConfig); sslEngine.setUseClientMode(false); ch.pipeline().addLast("ssl", new SslHandler(sslEngine)); } http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 5b00ae4..8a70559 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -168,6 +168,7 @@ public class BlobServer extends Thread implements BlobService { if(socketAttempt == null) { throw new IOException("Unable to allocate socket for blob server in specified port range: "+serverPortRange); } else { + SSLUtils.setSSLVerAndCipherSuites(socketAttempt, config); this.serverSocket = socketAttempt; } http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java index 7b0da43..b9a1b90 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLParameters; import java.net.InetAddress; @@ -236,6 +237,10 @@ public class NettyConfig { && SSLUtils.getSSLEnabled(config); } + public void setSSLVerAndCipherSuites(SSLEngine engine) { + SSLUtils.setSSLVerAndCipherSuites(engine, config); + } + public void setSSLVerifyHostname(SSLParameters sslParams) { SSLUtils.setSSLVerifyHostname(config, sslParams); } http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java index 55d2b18..3cf14b8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java @@ -140,6 +140,7 @@ class NettyServer { public void initChannel(SocketChannel channel) throws Exception { if (serverSSLContext != null) { SSLEngine sslEngine = serverSSLContext.createSSLEngine(); + config.setSSLVerAndCipherSuites(sslEngine); sslEngine.setUseClientMode(false); channel.pipeline().addLast("ssl", new SslHandler(sslEngine)); } http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java index fc38b5d..c2d7a7b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java @@ -27,10 +27,13 @@ import org.slf4j.LoggerFactory; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLParameters; +import javax.net.ssl.SSLServerSocket; import javax.net.ssl.TrustManagerFactory; import java.io.File; import java.io.FileInputStream; +import java.net.ServerSocket; import java.security.KeyStore; /** @@ -55,6 +58,42 @@ public class SSLUtils { } /** + * Sets SSl version and cipher suites for SSLServerSocket + * @param socket + * Socket to be handled + * @param config + * The application configuration + */ + public static void setSSLVerAndCipherSuites(ServerSocket socket, Configuration config) { + if (socket instanceof SSLServerSocket) { + ((SSLServerSocket) socket).setEnabledProtocols(config.getString( + ConfigConstants.SECURITY_SSL_PROTOCOL, + ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL).split(",")); + ((SSLServerSocket) socket).setEnabledCipherSuites(config.getString( + ConfigConstants.SECURITY_SSL_ALGORITHMS, + ConfigConstants.DEFAULT_SECURITY_SSL_ALGORITHMS).split(",")); + } else { + LOG.warn("Not a SSL socket, will skip setting tls version and cipher suites."); + } + } + + /** + * Sets SSL version and cipher suites for SSLEngine + * @param engine + * SSLEngine to be handled + * @param config + * The application configuration + */ + public static void setSSLVerAndCipherSuites(SSLEngine engine, Configuration config) { + engine.setEnabledProtocols(config.getString( + ConfigConstants.SECURITY_SSL_PROTOCOL, + ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL).split(",")); + engine.setEnabledCipherSuites(config.getString( + ConfigConstants.SECURITY_SSL_ALGORITHMS, + ConfigConstants.DEFAULT_SECURITY_SSL_ALGORITHMS).split(",")); + } + + /** * Sets SSL options to verify peer's hostname in the certificate * * @param sslConfig http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java index 1137341..d28d693 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java @@ -23,6 +23,10 @@ import org.junit.Assert; import org.junit.Test; import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLServerSocket; +import java.net.ServerSocket; +import java.util.Random; /* * Tests for the SSL utilities @@ -125,4 +129,100 @@ public class SSLUtilsTest { } } + /** + * Tests if SSL Server Context creation fails with bad SSL configuration + */ + @Test + public void testCreateSSLServerContextWithMultiProtocols() { + + Configuration serverConfig = new Configuration(); + serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1,TLSv1.2"); + + try { + SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig); + Assert.fail("SSL server context created even with multiple protocols set "); + } catch (Exception e) { + // Exception here is valid + } + } + + /** + * Tests if SSLUtils set the right ssl version and cipher suites for SSLServerSocket + */ + @Test + public void testSetSSLVersionAndCipherSuitesForSSLServerSocket() throws Exception { + + Configuration serverConfig = new Configuration(); + serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1.1"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_128_CBC_SHA256"); + + SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig); + ServerSocket socket = null; + try { + socket = serverContext.getServerSocketFactory().createServerSocket(0); + + String[] protocols = ((SSLServerSocket) socket).getEnabledProtocols(); + String[] algorithms = ((SSLServerSocket) socket).getEnabledCipherSuites(); + + Assert.assertNotEquals(1, protocols.length); + Assert.assertNotEquals(2, algorithms.length); + + SSLUtils.setSSLVerAndCipherSuites(socket, serverConfig); + protocols = ((SSLServerSocket) socket).getEnabledProtocols(); + algorithms = ((SSLServerSocket) socket).getEnabledCipherSuites(); + + Assert.assertEquals(1, protocols.length); + Assert.assertEquals("TLSv1.1", protocols[0]); + Assert.assertEquals(2, algorithms.length); + Assert.assertTrue(algorithms[0].equals("TLS_RSA_WITH_AES_128_CBC_SHA") || algorithms[0].equals("TLS_RSA_WITH_AES_128_CBC_SHA256")); + Assert.assertTrue(algorithms[1].equals("TLS_RSA_WITH_AES_128_CBC_SHA") || algorithms[1].equals("TLS_RSA_WITH_AES_128_CBC_SHA256")); + } finally { + if (socket != null) { + socket.close(); + } + } + } + + /** + * Tests if SSLUtils set the right ssl version and cipher suites for SSLEngine + */ + @Test + public void testSetSSLVersionAndCipherSuitesForSSLEngine() throws Exception { + + Configuration serverConfig = new Configuration(); + serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, "TLSv1"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256"); + + SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig); + SSLEngine engine = serverContext.createSSLEngine(); + + String[] protocols = engine.getEnabledProtocols(); + String[] algorithms = engine.getEnabledCipherSuites(); + + Assert.assertNotEquals(1, protocols.length); + Assert.assertNotEquals(2, algorithms.length); + + SSLUtils.setSSLVerAndCipherSuites(engine, serverConfig); + protocols = engine.getEnabledProtocols(); + algorithms = engine.getEnabledCipherSuites(); + + Assert.assertEquals(1, protocols.length); + Assert.assertEquals("TLSv1", protocols[0]); + Assert.assertEquals(2, algorithms.length); + Assert.assertTrue(algorithms[0].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA") || algorithms[0].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA256")); + Assert.assertTrue(algorithms[1].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA") || algorithms[1].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA256")); + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala index 0f6509c..9f8e3e1 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala @@ -71,6 +71,33 @@ class AkkaSslITCase(_system: ActorSystem) assert(cluster.running) } + "Failed to start ssl enabled akka with two protocols set" in { + + an[Exception] should be thrownBy { + + val config = new Configuration() + config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "127.0.0.1") + config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1") + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) + + config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true) + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, + getClass.getResource("/local127.keystore").getPath) + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password") + config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password") + config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, + getClass.getResource("/local127.truststore").getPath) + + config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password") + config.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, "TLSv1,TLSv1.1") + + val cluster = new TestingCluster(config, false) + + cluster.start(true) + } + } + "start with akka ssl disabled" in { val config = new Configuration()
