Repository: flink
Updated Branches:
  refs/heads/master 78f22aaec -> e0614f655


[FLINK-5981] [security] Make SSL pick up configured protocols and cipher suites

This closes #3486


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0614f65
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0614f65
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0614f65

Branch: refs/heads/master
Commit: e0614f6551a232706b74963563694486fe2461b1
Parents: 78f22aa
Author: WangTaoTheTonic <[email protected]>
Authored: Tue Mar 7 20:05:21 2017 +0800
Committer: Stephan Ewen <[email protected]>
Committed: Fri Mar 17 12:02:18 2017 +0100

----------------------------------------------------------------------
 docs/setup/config.md                            |   2 +-
 .../flink/mesos/util/MesosArtifactServer.java   |   2 +
 .../runtime/webmonitor/WebRuntimeMonitor.java   |   2 +
 .../apache/flink/runtime/blob/BlobServer.java   |   1 +
 .../runtime/io/network/netty/NettyConfig.java   |   5 +
 .../runtime/io/network/netty/NettyServer.java   |   1 +
 .../org/apache/flink/runtime/net/SSLUtils.java  |  39 ++++++++
 .../apache/flink/runtime/net/SSLUtilsTest.java  | 100 +++++++++++++++++++
 .../flink/runtime/akka/AkkaSslITCase.scala      |  27 +++++
 9 files changed, 178 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index c835882..d9d5f15 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -327,7 +327,7 @@ The following parameters configure Flink's JobManager and 
TaskManagers.
 
 - `security.ssl.truststore-password`: The secret to decrypt the truststore.
 
-- `security.ssl.protocol`: The SSL protocol version to be supported for the 
ssl transport (DEFAULT: **TLSv1.2**).
+- `security.ssl.protocol`: The SSL protocol version to be supported for the 
ssl transport (DEFAULT: **TLSv1.2**). Note that it doesn't support comma 
separated list.
 
 - `security.ssl.algorithms`: The comma separated list of standard SSL 
algorithms to be supported. Read more 
[here](http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites)
 (DEFAULT: **TLS_RSA_WITH_AES_128_CBC_SHA**).
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index 37cb260..ae826db 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -130,6 +130,7 @@ public class MesosArtifactServer implements 
MesosArtifactResolver {
 
                router = new Router();
 
+               final Configuration sslConfig = config;
                ChannelInitializer<SocketChannel> initializer = new 
ChannelInitializer<SocketChannel>() {
 
                        @Override
@@ -139,6 +140,7 @@ public class MesosArtifactServer implements 
MesosArtifactResolver {
                                // SSL should be the first handler in the 
pipeline
                                if (serverSSLContext != null) {
                                        SSLEngine sslEngine = 
serverSSLContext.createSSLEngine();
+                                       
SSLUtils.setSSLVerAndCipherSuites(sslEngine, sslConfig);
                                        sslEngine.setUseClientMode(false);
                                        ch.pipeline().addLast("ssl", new 
SslHandler(sslEngine));
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index e604ce8..d88fdcf 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -380,6 +380,7 @@ public class WebRuntimeMonitor implements WebMonitor {
                        LOG.warn("Error while adding shutdown hook", t);
                }
 
+               final Configuration sslConfig = config;
                ChannelInitializer<SocketChannel> initializer = new 
ChannelInitializer<SocketChannel>() {
 
                        @Override
@@ -389,6 +390,7 @@ public class WebRuntimeMonitor implements WebMonitor {
                                // SSL should be the first handler in the 
pipeline
                                if (serverSSLContext != null) {
                                        SSLEngine sslEngine = 
serverSSLContext.createSSLEngine();
+                                       
SSLUtils.setSSLVerAndCipherSuites(sslEngine, sslConfig);
                                        sslEngine.setUseClientMode(false);
                                        ch.pipeline().addLast("ssl", new 
SslHandler(sslEngine));
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 5b00ae4..8a70559 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -168,6 +168,7 @@ public class BlobServer extends Thread implements 
BlobService {
                if(socketAttempt == null) {
                        throw new IOException("Unable to allocate socket for 
blob server in specified port range: "+serverPortRange);
                } else {
+                       SSLUtils.setSSLVerAndCipherSuites(socketAttempt, 
config);
                        this.serverSocket = socketAttempt;
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 7b0da43..b9a1b90 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -25,6 +25,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLParameters;
 import java.net.InetAddress;
 
@@ -236,6 +237,10 @@ public class NettyConfig {
                        && SSLUtils.getSSLEnabled(config);
        }
 
+       public void setSSLVerAndCipherSuites(SSLEngine engine) {
+               SSLUtils.setSSLVerAndCipherSuites(engine, config);
+       }
+
        public void setSSLVerifyHostname(SSLParameters sslParams) {
                SSLUtils.setSSLVerifyHostname(config, sslParams);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
index 55d2b18..3cf14b8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
@@ -140,6 +140,7 @@ class NettyServer {
                        public void initChannel(SocketChannel channel) throws 
Exception {
                                if (serverSSLContext != null) {
                                        SSLEngine sslEngine = 
serverSSLContext.createSSLEngine();
+                                       
config.setSSLVerAndCipherSuites(sslEngine);
                                        sslEngine.setUseClientMode(false);
                                        channel.pipeline().addLast("ssl", new 
SslHandler(sslEngine));
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
index fc38b5d..c2d7a7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
@@ -27,10 +27,13 @@ import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLParameters;
+import javax.net.ssl.SSLServerSocket;
 import javax.net.ssl.TrustManagerFactory;
 import java.io.File;
 import java.io.FileInputStream;
+import java.net.ServerSocket;
 import java.security.KeyStore;
 
 /**
@@ -55,6 +58,42 @@ public class SSLUtils {
        }
 
        /**
+        * Sets SSl version and cipher suites for SSLServerSocket
+        * @param socket
+        *        Socket to be handled
+        * @param config
+        *        The application configuration
+        */
+       public static void setSSLVerAndCipherSuites(ServerSocket socket, 
Configuration config) {
+               if (socket instanceof SSLServerSocket) {
+                       ((SSLServerSocket) 
socket).setEnabledProtocols(config.getString(
+                               ConfigConstants.SECURITY_SSL_PROTOCOL,
+                               
ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL).split(","));
+                       ((SSLServerSocket) 
socket).setEnabledCipherSuites(config.getString(
+                               ConfigConstants.SECURITY_SSL_ALGORITHMS,
+                               
ConfigConstants.DEFAULT_SECURITY_SSL_ALGORITHMS).split(","));
+               } else {
+                       LOG.warn("Not a SSL socket, will skip setting tls 
version and cipher suites.");
+               }
+       }
+
+       /**
+        * Sets SSL version and cipher suites for SSLEngine
+        * @param engine
+        *        SSLEngine to be handled
+        * @param config
+        *        The application configuration
+        */
+       public static void setSSLVerAndCipherSuites(SSLEngine engine, 
Configuration config) {
+               engine.setEnabledProtocols(config.getString(
+                       ConfigConstants.SECURITY_SSL_PROTOCOL,
+                       
ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL).split(","));
+               engine.setEnabledCipherSuites(config.getString(
+                       ConfigConstants.SECURITY_SSL_ALGORITHMS,
+                       
ConfigConstants.DEFAULT_SECURITY_SSL_ALGORITHMS).split(","));
+       }
+
+       /**
         * Sets SSL options to verify peer's hostname in the certificate
         *
         * @param sslConfig

http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
index 1137341..d28d693 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
@@ -23,6 +23,10 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLServerSocket;
+import java.net.ServerSocket;
+import java.util.Random;
 
 /*
  * Tests for the SSL utilities
@@ -125,4 +129,100 @@ public class SSLUtilsTest {
                }
        }
 
+       /**
+        * Tests if SSL Server Context creation fails with bad SSL configuration
+        */
+       @Test
+       public void testCreateSSLServerContextWithMultiProtocols() {
+
+               Configuration serverConfig = new Configuration();
+               serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
true);
+               serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
+               
serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, 
"password");
+               
serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
+               serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, 
"TLSv1,TLSv1.2");
+
+               try {
+                       SSLContext serverContext = 
SSLUtils.createSSLServerContext(serverConfig);
+                       Assert.fail("SSL server context created even with 
multiple protocols set ");
+               } catch (Exception e) {
+                       // Exception here is valid
+               }
+       }
+
+       /**
+        * Tests if SSLUtils set the right ssl version and cipher suites for 
SSLServerSocket
+        */
+       @Test
+       public void testSetSSLVersionAndCipherSuitesForSSLServerSocket() throws 
Exception {
+
+               Configuration serverConfig = new Configuration();
+               serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
true);
+               serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
+               
serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, 
"password");
+               
serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
+               serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, 
"TLSv1.1");
+               serverConfig.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, 
"TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_128_CBC_SHA256");
+
+               SSLContext serverContext = 
SSLUtils.createSSLServerContext(serverConfig);
+               ServerSocket socket = null;
+               try {
+                       socket = 
serverContext.getServerSocketFactory().createServerSocket(0);
+
+                       String[] protocols = ((SSLServerSocket) 
socket).getEnabledProtocols();
+                       String[] algorithms = ((SSLServerSocket) 
socket).getEnabledCipherSuites();
+
+                       Assert.assertNotEquals(1, protocols.length);
+                       Assert.assertNotEquals(2, algorithms.length);
+
+                       SSLUtils.setSSLVerAndCipherSuites(socket, serverConfig);
+                       protocols = ((SSLServerSocket) 
socket).getEnabledProtocols();
+                       algorithms = ((SSLServerSocket) 
socket).getEnabledCipherSuites();
+
+                       Assert.assertEquals(1, protocols.length);
+                       Assert.assertEquals("TLSv1.1", protocols[0]);
+                       Assert.assertEquals(2, algorithms.length);
+                       
Assert.assertTrue(algorithms[0].equals("TLS_RSA_WITH_AES_128_CBC_SHA") || 
algorithms[0].equals("TLS_RSA_WITH_AES_128_CBC_SHA256"));
+                       
Assert.assertTrue(algorithms[1].equals("TLS_RSA_WITH_AES_128_CBC_SHA") || 
algorithms[1].equals("TLS_RSA_WITH_AES_128_CBC_SHA256"));
+               } finally {
+                       if (socket != null) {
+                               socket.close();
+                       }
+               }
+       }
+
+       /**
+        * Tests if SSLUtils set the right ssl version and cipher suites for 
SSLEngine
+        */
+       @Test
+       public void testSetSSLVersionAndCipherSuitesForSSLEngine() throws 
Exception {
+
+               Configuration serverConfig = new Configuration();
+               serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
true);
+               serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
+               
serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, 
"password");
+               
serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
+               serverConfig.setString(ConfigConstants.SECURITY_SSL_PROTOCOL, 
"TLSv1");
+               serverConfig.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, 
"TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256");
+
+               SSLContext serverContext = 
SSLUtils.createSSLServerContext(serverConfig);
+               SSLEngine engine = serverContext.createSSLEngine();
+
+               String[] protocols = engine.getEnabledProtocols();
+               String[] algorithms = engine.getEnabledCipherSuites();
+
+               Assert.assertNotEquals(1, protocols.length);
+               Assert.assertNotEquals(2, algorithms.length);
+
+               SSLUtils.setSSLVerAndCipherSuites(engine, serverConfig);
+               protocols = engine.getEnabledProtocols();
+               algorithms = engine.getEnabledCipherSuites();
+
+               Assert.assertEquals(1, protocols.length);
+               Assert.assertEquals("TLSv1", protocols[0]);
+               Assert.assertEquals(2, algorithms.length);
+               
Assert.assertTrue(algorithms[0].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA") || 
algorithms[0].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA256"));
+               
Assert.assertTrue(algorithms[1].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA") || 
algorithms[1].equals("TLS_DHE_RSA_WITH_AES_128_CBC_SHA256"));
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0614f65/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
index 0f6509c..9f8e3e1 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
@@ -71,6 +71,33 @@ class AkkaSslITCase(_system: ActorSystem)
       assert(cluster.running)
     }
 
+    "Failed to start ssl enabled akka with two protocols set" in {
+
+      an[Exception] should be thrownBy {
+
+        val config = new Configuration()
+        config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
"127.0.0.1")
+        config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, 
"127.0.0.1")
+        config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+        config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
+
+        config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true)
+        config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE,
+          getClass.getResource("/local127.keystore").getPath)
+        config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, 
"password")
+        config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password")
+        config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE,
+          getClass.getResource("/local127.truststore").getPath)
+
+        config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, 
"password")
+        config.setString(ConfigConstants.SECURITY_SSL_ALGORITHMS, 
"TLSv1,TLSv1.1")
+
+        val cluster = new TestingCluster(config, false)
+
+        cluster.start(true)
+      }
+    }
+
     "start with akka ssl disabled" in {
 
       val config = new Configuration()

Reply via email to