This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.5 by this push:
     new 9e421a4  [FLINK-9878][network][ssl] add more low-level ssl options
9e421a4 is described below

commit 9e421a438dd830c6be72e5f13f855e68a82aef21
Author: Nico Kruber <[email protected]>
AuthorDate: Mon Aug 20 23:53:12 2018 +0200

    [FLINK-9878][network][ssl] add more low-level ssl options
    
    This is mostly to tackle bugs like https://github.com/netty/netty/issues/832
    (JDK issue during garbage collection when the SSL session cache is not 
limited).
    We add the following low-level configuration options for the user to 
fine-tune
    their system:
    
    - SSL session cache size via 'security.ssl.session-cache-size'
    - SSL session timeout via 'security.ssl.session-timeout'
    - SSL handshake timeout via 'security.ssl.handshake-timeout'
    - SSL close notify flush timeout via 
'security.ssl.close-notify-flush-timeout'
    
    This closes #6355.
---
 .../generated/security_configuration.html          |  20 +++
 docs/ops/security-ssl.md                           |   4 +
 .../flink/configuration/SecurityOptions.java       |  37 ++++++
 .../flink/mesos/util/MesosArtifactServer.java      |   7 +-
 .../runtime/webmonitor/WebRuntimeMonitor.java      |   5 +-
 .../runtime/webmonitor/history/HistoryServer.java  |   5 +-
 .../webmonitor/utils/WebFrontendBootstrap.java     |   9 +-
 .../org/apache/flink/runtime/blob/BlobClient.java  |   5 +-
 .../org/apache/flink/runtime/blob/BlobServer.java  |   5 +-
 .../runtime/io/network/netty/NettyClient.java      |  19 ++-
 .../runtime/io/network/netty/NettyConfig.java      |  27 ++--
 .../runtime/io/network/netty/NettyServer.java      |  17 ++-
 .../org/apache/flink/runtime/net/SSLUtils.java     | 139 ++++++++++++++-------
 .../io/network/netty/NettyClientServerSslTest.java | 112 +++++++++++------
 .../org/apache/flink/runtime/net/SSLUtilsTest.java |  32 +++--
 .../runtime/rest/RestServerEndpointITCase.java     |   4 +-
 16 files changed, 299 insertions(+), 148 deletions(-)

diff --git a/docs/_includes/generated/security_configuration.html 
b/docs/_includes/generated/security_configuration.html
index cd682ec..3576294 100644
--- a/docs/_includes/generated/security_configuration.html
+++ b/docs/_includes/generated/security_configuration.html
@@ -13,11 +13,21 @@
             <td>The comma separated list of standard SSL algorithms to be 
supported. Read more &#60;a 
href="http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites"&#62;here&#60;/a&#62;.</td>
         </tr>
         <tr>
+            <td><h5>security.ssl.close-notify-flush-timeout</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>The timeout (in ms) for flushing the `close_notify` that was 
triggered by closing a channel. If the `close_notify` was not flushed in the 
given timeout the channel will be closed forcibly. (-1 = use system 
default)</td>
+        </tr>
+        <tr>
             <td><h5>security.ssl.enabled</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Turns on SSL for internal network communication. This can be 
optionally overridden by flags defined in different transport modules.</td>
         </tr>
         <tr>
+            <td><h5>security.ssl.handshake-timeout</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>The timeout (in ms) during SSL handshake. (-1 = use system 
default)</td>
+        </tr>
+        <tr>
             <td><h5>security.ssl.key-password</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>The secret to decrypt the server key in the keystore.</td>
@@ -38,6 +48,16 @@
             <td>The SSL protocol version to be supported for the ssl 
transport. Note that it doesn’t support comma separated list.</td>
         </tr>
         <tr>
+            <td><h5>security.ssl.session-cache-size</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>The size of the cache used for storing SSL session objects. 
According to https://github.com/netty/netty/issues/832, you should always set 
this to an appropriate number to not run into a bug with stalling IO threads 
during garbage collection. (-1 = use system default).</td>
+        </tr>
+        <tr>
+            <td><h5>security.ssl.session-timeout</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>The timeout (in ms) for the cached SSL session objects. (-1 = 
use system default)</td>
+        </tr>
+        <tr>
             <td><h5>security.ssl.truststore</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>The truststore file containing the public CA certificates to 
be used by flink endpoints to verify the peer’s certificate.</td>
diff --git a/docs/ops/security-ssl.md b/docs/ops/security-ssl.md
index c2ba7df..a805238 100644
--- a/docs/ops/security-ssl.md
+++ b/docs/ops/security-ssl.md
@@ -33,6 +33,10 @@ SSL can be enabled for all network communication between 
Flink components. SSL k
 * **akka.ssl.enabled**: SSL flag for akka based control connection between the 
Flink client, jobmanager and taskmanager 
 * **jobmanager.web.ssl.enabled**: Flag to enable https access to the 
jobmanager's web frontend
 
+### Complete List of SSL Options
+
+{% include generated/security_configuration.html %}
+
 ## Deploying Keystores and Truststores
 
 You need to have a Java Keystore generated and copied to each node in the 
Flink cluster. The common name or subject alternative names in the certificate 
should match the node's hostname and IP address. Keystores and truststores can 
be generated using the [keytool 
utility](https://docs.oracle.com/javase/8/docs/technotes/tools/unix/keytool.html).
 All Flink components should have read access to the keystore and truststore 
files.
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index 0f25c6c..60a9764 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -160,4 +160,41 @@ public class SecurityOptions {
                key("security.ssl.verify-hostname")
                        .defaultValue(true)
                        .withDescription("Flag to enable peer’s hostname 
verification during ssl handshake.");
+
+       /**
+        * SSL session cache size.
+        */
+       public static final ConfigOption<Integer> SSL_SESSION_CACHE_SIZE =
+               key("security.ssl.session-cache-size")
+                       .defaultValue(-1)
+                       .withDescription("The size of the cache used for 
storing SSL session objects. "
+                               + "According to 
https://github.com/netty/netty/issues/832, you should always set "
+                               + "this to an appropriate number to not run 
into a bug with stalling IO threads "
+                               + "during garbage collection. (-1 = use system 
default).");
+
+       /**
+        * SSL session timeout.
+        */
+       public static final ConfigOption<Integer> SSL_SESSION_TIMEOUT =
+               key("security.ssl.session-timeout")
+                       .defaultValue(-1)
+                       .withDescription("The timeout (in ms) for the cached 
SSL session objects. (-1 = use system default)");
+
+       /**
+        * SSL session timeout during handshakes.
+        */
+       public static final ConfigOption<Integer> SSL_HANDSHAKE_TIMEOUT =
+               key("security.ssl.handshake-timeout")
+                       .defaultValue(-1)
+                       .withDescription("The timeout (in ms) during SSL 
handshake. (-1 = use system default)");
+
+       /**
+        * SSL session timeout after flushing the <tt>close_notify</tt> message.
+        */
+       public static final ConfigOption<Integer> 
SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT =
+               key("security.ssl.close-notify-flush-timeout")
+                       .defaultValue(-1)
+                       .withDescription("The timeout (in ms) for flushing the 
`close_notify` that was triggered by closing a " +
+                               "channel. If the `close_notify` was not flushed 
in the given timeout the channel will be closed " +
+                               "forcibly. (-1 = use system default)");
 }
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index 57f4718..30c4edf 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -58,7 +58,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.util.CharsetUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLEngine;
 
 import java.io.File;
@@ -104,7 +104,8 @@ public class MesosArtifactServer implements 
MesosArtifactResolver {
 
        private final Map<Path, URL> paths = new HashMap<>();
 
-       private final SSLContext serverSSLContext;
+       @Nullable
+       private final SSLUtils.SSLContext serverSSLContext;
 
        public MesosArtifactServer(String prefix, String serverHostname, int 
configuredPort, Configuration config)
                throws Exception {
@@ -139,7 +140,7 @@ public class MesosArtifactServer implements 
MesosArtifactResolver {
 
                                // SSL should be the first handler in the 
pipeline
                                if (serverSSLContext != null) {
-                                       SSLEngine sslEngine = 
serverSSLContext.createSSLEngine();
+                                       SSLEngine sslEngine = 
serverSSLContext.getSslContext().createSSLEngine();
                                        
SSLUtils.setSSLVerAndCipherSuites(sslEngine, sslConfig);
                                        sslEngine.setUseClientMode(false);
                                        ch.pipeline().addLast("ssl", new 
SslHandler(sslEngine));
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index f27ae00..4323ad0 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -92,7 +92,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
@@ -130,7 +130,8 @@ public class WebRuntimeMonitor implements WebMonitor {
        /** Service which retrieves the currently leading JobManager and opens 
a JobManagerGateway. */
        private final LeaderGatewayRetriever<JobManagerGateway> retriever;
 
-       private final SSLContext serverSSLContext;
+       @Nullable
+       private final SSLUtils.SSLContext serverSSLContext;
 
        private final CompletableFuture<String> localRestAddress = new 
CompletableFuture<>();
 
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 108f5c4..0484afd 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -43,7 +43,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.FileWriter;
@@ -90,7 +90,8 @@ public class HistoryServer {
 
        private final HistoryServerArchiveFetcher archiveFetcher;
 
-       private final SSLContext serverSSLContext;
+       @Nullable
+       private final SSLUtils.SSLContext serverSSLContext;
        private WebFrontendBootstrap netty;
 
        private final Object startupShutdownLock = new Object();
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
index 740beae..c3148b7 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
@@ -40,7 +40,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandle
 
 import org.slf4j.Logger;
 
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLEngine;
 
 import java.io.File;
@@ -55,7 +55,8 @@ public class WebFrontendBootstrap {
        private final Router router;
        private final Logger log;
        private final File uploadDir;
-       private final SSLContext serverSSLContext;
+       @Nullable
+       private final SSLUtils.SSLContext serverSSLContext;
        private final ServerBootstrap bootstrap;
        private final Channel serverChannel;
        private final String restAddress;
@@ -64,7 +65,7 @@ public class WebFrontendBootstrap {
                        Router router,
                        Logger log,
                        File directory,
-                       SSLContext sslContext,
+                       @Nullable SSLUtils.SSLContext sslContext,
                        String configuredAddress,
                        int configuredPort,
                        final Configuration config) throws 
InterruptedException, UnknownHostException {
@@ -81,7 +82,7 @@ public class WebFrontendBootstrap {
 
                                // SSL should be the first handler in the 
pipeline
                                if (serverSSLContext != null) {
-                                       SSLEngine sslEngine = 
serverSSLContext.createSSLEngine();
+                                       SSLEngine sslEngine = 
serverSSLContext.getSslContext().createSSLEngine();
                                        
SSLUtils.setSSLVerAndCipherSuites(sslEngine, config);
                                        sslEngine.setUseClientMode(false);
                                        ch.pipeline().addLast("ssl", new 
SslHandler(sslEngine));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 8e6b328..4bf7177 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -31,7 +31,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLParameters;
 import javax.net.ssl.SSLSocket;
 
@@ -91,7 +90,7 @@ public final class BlobClient implements Closeable {
 
                try {
                        // Check if ssl is enabled
-                       SSLContext clientSSLContext = null;
+                       SSLUtils.SSLContext clientSSLContext = null;
                        if (clientConfig != null &&
                                
clientConfig.getBoolean(BlobServerOptions.SSL_ENABLED)) {
 
@@ -102,7 +101,7 @@ public final class BlobClient implements Closeable {
 
                                LOG.info("Using ssl connection to the blob 
server");
 
-                               SSLSocket sslSocket = (SSLSocket) 
clientSSLContext.getSocketFactory().createSocket(
+                               SSLSocket sslSocket = (SSLSocket) 
clientSSLContext.getSslContext().getSocketFactory().createSocket(
                                        serverAddress.getAddress(),
                                        serverAddress.getPort());
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index dd0155c..1a1b0da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -33,7 +33,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import javax.net.ssl.SSLContext;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -79,7 +78,7 @@ public class BlobServer extends Thread implements 
BlobService, BlobWriter, Perma
        private final ServerSocket serverSocket;
 
        /** The SSL server context if ssl is enabled for the connections. */
-       private final SSLContext serverSSLContext;
+       private final SSLUtils.SSLContext serverSSLContext;
 
        /** Blob Server configuration. */
        private final Configuration blobServiceConfiguration;
@@ -196,7 +195,7 @@ public class BlobServer extends Thread implements 
BlobService, BlobWriter, Perma
                                        return new ServerSocket(port, 
finalBacklog);
                                } else {
                                        LOG.info("Enabling ssl for the blob 
server");
-                                       return 
serverSSLContext.getServerSocketFactory().createServerSocket(port, 
finalBacklog);
+                                       return 
serverSSLContext.getSslContext().getServerSocketFactory().createServerSocket(port,
 finalBacklog);
                                }
                        }
                });
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
index 5fb083d..44561b0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.runtime.net.SSLUtils;
+
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
@@ -34,9 +36,10 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLParameters;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
@@ -52,7 +55,8 @@ class NettyClient {
 
        private Bootstrap bootstrap;
 
-       private SSLContext clientSSLContext = null;
+       @Nullable
+       private SSLUtils.SSLContext clientSSLContext = null;
 
        NettyClient(NettyConfig config) {
                this.config = config;
@@ -178,7 +182,7 @@ class NettyClient {
 
                                // SSL handler should be added first in the 
pipeline
                                if (clientSSLContext != null) {
-                                       SSLEngine sslEngine = 
clientSSLContext.createSSLEngine(
+                                       SSLEngine sslEngine = 
clientSSLContext.getSslContext().createSSLEngine(
                                                
serverSocketAddress.getAddress().getCanonicalHostName(),
                                                serverSocketAddress.getPort());
                                        sslEngine.setUseClientMode(true);
@@ -190,7 +194,14 @@ class NettyClient {
                                                
sslEngine.setSSLParameters(newSSLParameters);
                                        }
 
-                                       channel.pipeline().addLast("ssl", new 
SslHandler(sslEngine));
+                                       SslHandler sslHandler = new 
SslHandler(sslEngine);
+                                       if 
(clientSSLContext.getHandshakeTimeoutMs() >= 0) {
+                                               
sslHandler.setHandshakeTimeoutMillis(clientSSLContext.getHandshakeTimeoutMs());
+                                       }
+                                       if 
(clientSSLContext.getCloseNotifyFlushTimeoutMs() >= 0) {
+                                               
sslHandler.setCloseNotifyTimeoutMillis(clientSSLContext.getCloseNotifyFlushTimeoutMs());
+                                       }
+                                       channel.pipeline().addLast("ssl", 
sslHandler);
                                }
                                
channel.pipeline().addLast(protocol.getClientChannelHandlers());
                        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 18527c4..9b32ebb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -23,12 +23,14 @@ import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.net.SSLUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLParameters;
+
 import java.net.InetAddress;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -189,26 +191,13 @@ public class NettyConfig {
                }
        }
 
-       public SSLContext createClientSSLContext() throws Exception {
-
-               // Create SSL Context from config
-               SSLContext clientSSLContext = null;
-               if (getSSLEnabled()) {
-                       clientSSLContext = 
SSLUtils.createSSLClientContext(config);
-               }
-
-               return clientSSLContext;
+       @Nullable
+       public SSLUtils.SSLContext createClientSSLContext() throws Exception {
+               return SSLUtils.createSSLClientContext(config);
        }
 
-       public SSLContext createServerSSLContext() throws Exception {
-
-               // Create SSL Context from config
-               SSLContext serverSSLContext = null;
-               if (getSSLEnabled()) {
-                       serverSSLContext = 
SSLUtils.createSSLServerContext(config);
-               }
-
-               return serverSSLContext;
+       public SSLUtils.SSLContext createServerSSLContext() throws Exception {
+               return SSLUtils.createSSLServerContext(config);
        }
 
        public boolean getSSLEnabled() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
index c6d09d0..f919ded 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 
 import 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -36,7 +37,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLEngine;
 
 import java.io.IOException;
@@ -61,7 +62,8 @@ class NettyServer {
 
        private ChannelFuture bindFuture;
 
-       private SSLContext serverSSLContext = null;
+       @Nullable
+       private SSLUtils.SSLContext serverSSLContext = null;
 
        private InetSocketAddress localAddress;
 
@@ -152,10 +154,17 @@ class NettyServer {
                        @Override
                        public void initChannel(SocketChannel channel) throws 
Exception {
                                if (serverSSLContext != null) {
-                                       SSLEngine sslEngine = 
serverSSLContext.createSSLEngine();
+                                       SSLEngine sslEngine = 
serverSSLContext.getSslContext().createSSLEngine();
                                        
config.setSSLVerAndCipherSuites(sslEngine);
                                        sslEngine.setUseClientMode(false);
-                                       channel.pipeline().addLast("ssl", new 
SslHandler(sslEngine));
+                                       SslHandler sslHandler = new 
SslHandler(sslEngine);
+                                       if 
(serverSSLContext.getHandshakeTimeoutMs() >= 0) {
+                                               
sslHandler.setHandshakeTimeoutMillis(serverSSLContext.getHandshakeTimeoutMs());
+                                       }
+                                       if 
(serverSSLContext.getCloseNotifyFlushTimeoutMs() >= 0) {
+                                               
sslHandler.setCloseNotifyTimeoutMillis(serverSSLContext.getCloseNotifyFlushTimeoutMs());
+                                       }
+                                       channel.pipeline().addLast("ssl", 
sslHandler);
                                }
 
                                
channel.pipeline().addLast(protocol.getServerChannelHandlers());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
index b574d30..69da666 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
@@ -113,7 +113,7 @@ public class SSLUtils {
                checkState(sslContext != null, "%s it not enabled", 
SecurityOptions.SSL_ENABLED.key());
 
                return new SSLEngineFactory(
-                       sslContext,
+                       sslContext.getSslContext(),
                        getEnabledProtocols(config),
                        getEnabledCipherSuites(config),
                        clientMode);
@@ -176,39 +176,43 @@ public class SSLUtils {
        public static SSLContext createSSLClientContext(Configuration 
sslConfig) throws Exception {
 
                Preconditions.checkNotNull(sslConfig);
-               SSLContext clientSSLContext = null;
 
-               if (getSSLEnabled(sslConfig)) {
-                       LOG.debug("Creating client SSL context from 
configuration");
-
-                       String trustStoreFilePath = 
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE);
-                       String trustStorePassword = 
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
-                       String sslProtocolVersion = 
sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+               if (!getSSLEnabled(sslConfig)) {
+                       return null;
+               }
 
-                       Preconditions.checkNotNull(trustStoreFilePath, 
SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured.");
-                       Preconditions.checkNotNull(trustStorePassword, 
SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured.");
+               LOG.debug("Creating client SSL context from configuration");
 
-                       KeyStore trustStore = 
KeyStore.getInstance(KeyStore.getDefaultType());
+               String trustStoreFilePath = 
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE);
+               String trustStorePassword = 
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
+               String sslProtocolVersion = 
sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+               int sessionCacheSize = 
sslConfig.getInteger(SecurityOptions.SSL_SESSION_CACHE_SIZE);
+               int sessionTimeoutMs = 
sslConfig.getInteger(SecurityOptions.SSL_SESSION_TIMEOUT);
+               int handshakeTimeoutMs = 
sslConfig.getInteger(SecurityOptions.SSL_HANDSHAKE_TIMEOUT);
+               int closeNotifyFlushTimeoutMs = 
sslConfig.getInteger(SecurityOptions.SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT);
 
-                       FileInputStream trustStoreFile = null;
-                       try {
-                               trustStoreFile = new FileInputStream(new 
File(trustStoreFilePath));
-                               trustStore.load(trustStoreFile, 
trustStorePassword.toCharArray());
-                       } finally {
-                               if (trustStoreFile != null) {
-                                       trustStoreFile.close();
-                               }
-                       }
+               Preconditions.checkNotNull(trustStoreFilePath, 
SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured.");
+               Preconditions.checkNotNull(trustStorePassword, 
SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured.");
 
-                       TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance(
-                               TrustManagerFactory.getDefaultAlgorithm());
-                       trustManagerFactory.init(trustStore);
+               KeyStore trustStore = 
KeyStore.getInstance(KeyStore.getDefaultType());
 
-                       clientSSLContext = 
SSLContext.getInstance(sslProtocolVersion);
-                       clientSSLContext.init(null, 
trustManagerFactory.getTrustManagers(), null);
+               try (FileInputStream trustStoreFile = new FileInputStream(new 
File(trustStoreFilePath))) {
+                       trustStore.load(trustStoreFile, 
trustStorePassword.toCharArray());
                }
 
-               return clientSSLContext;
+               TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance(
+                       TrustManagerFactory.getDefaultAlgorithm());
+               trustManagerFactory.init(trustStore);
+
+               javax.net.ssl.SSLContext clientSSLContext = 
javax.net.ssl.SSLContext.getInstance(sslProtocolVersion);
+               clientSSLContext.init(null, 
trustManagerFactory.getTrustManagers(), null);
+               if (sessionCacheSize >= 0) {
+                       
clientSSLContext.getClientSessionContext().setSessionCacheSize(sessionCacheSize);
+               }
+               if (sessionTimeoutMs >= 0) {
+                       
clientSSLContext.getClientSessionContext().setSessionTimeout(sessionTimeoutMs / 
1000);
+               }
+               return new SSLContext(clientSSLContext, handshakeTimeoutMs, 
closeNotifyFlushTimeoutMs);
        }
 
        /**
@@ -225,38 +229,77 @@ public class SSLUtils {
        public static SSLContext createSSLServerContext(Configuration 
sslConfig) throws Exception {
 
                Preconditions.checkNotNull(sslConfig);
-               SSLContext serverSSLContext = null;
 
-               if (getSSLEnabled(sslConfig)) {
-                       LOG.debug("Creating server SSL context from 
configuration");
+               if (!getSSLEnabled(sslConfig)) {
+                       return null;
+               }
 
-                       String keystoreFilePath = 
sslConfig.getString(SecurityOptions.SSL_KEYSTORE);
+               LOG.debug("Creating server SSL context from configuration");
 
-                       String keystorePassword = 
sslConfig.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD);
+               String keystoreFilePath = 
sslConfig.getString(SecurityOptions.SSL_KEYSTORE);
+               String keystorePassword = 
sslConfig.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD);
+               String certPassword = 
sslConfig.getString(SecurityOptions.SSL_KEY_PASSWORD);
+               String sslProtocolVersion = 
sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+               int sessionCacheSize = 
sslConfig.getInteger(SecurityOptions.SSL_SESSION_CACHE_SIZE);
+               int sessionTimeoutMs = 
sslConfig.getInteger(SecurityOptions.SSL_SESSION_TIMEOUT);
+               int handshakeTimeoutMs = 
sslConfig.getInteger(SecurityOptions.SSL_HANDSHAKE_TIMEOUT);
+               int closeNotifyFlushTimeoutMs = 
sslConfig.getInteger(SecurityOptions.SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT);
 
-                       String certPassword = 
sslConfig.getString(SecurityOptions.SSL_KEY_PASSWORD);
+               Preconditions.checkNotNull(keystoreFilePath, 
SecurityOptions.SSL_KEYSTORE.key() + " was not configured.");
+               Preconditions.checkNotNull(keystorePassword, 
SecurityOptions.SSL_KEYSTORE_PASSWORD.key() + " was not configured.");
+               Preconditions.checkNotNull(certPassword, 
SecurityOptions.SSL_KEY_PASSWORD.key() + " was not configured.");
 
-                       String sslProtocolVersion = 
sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+               KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
+               try (FileInputStream keyStoreFile = new FileInputStream(new 
File(keystoreFilePath))) {
+                       ks.load(keyStoreFile, keystorePassword.toCharArray());
+               }
 
-                       Preconditions.checkNotNull(keystoreFilePath, 
SecurityOptions.SSL_KEYSTORE.key() + " was not configured.");
-                       Preconditions.checkNotNull(keystorePassword, 
SecurityOptions.SSL_KEYSTORE_PASSWORD.key() + " was not configured.");
-                       Preconditions.checkNotNull(certPassword, 
SecurityOptions.SSL_KEY_PASSWORD.key() + " was not configured.");
+               // Set up key manager factory to use the server key store
+               KeyManagerFactory kmf = KeyManagerFactory.getInstance(
+                       KeyManagerFactory.getDefaultAlgorithm());
+               kmf.init(ks, certPassword.toCharArray());
 
-                       KeyStore ks = 
KeyStore.getInstance(KeyStore.getDefaultType());
-                       try (FileInputStream keyStoreFile = new 
FileInputStream(new File(keystoreFilePath))) {
-                               ks.load(keyStoreFile, 
keystorePassword.toCharArray());
-                       }
+               // Initialize the SSLContext
+               javax.net.ssl.SSLContext serverSSLContext = 
javax.net.ssl.SSLContext.getInstance(sslProtocolVersion);
+               serverSSLContext.init(kmf.getKeyManagers(), null, null);
+               if (sessionCacheSize >= 0) {
+                       
serverSSLContext.getServerSessionContext().setSessionCacheSize(sessionCacheSize);
+               }
+               if (sessionTimeoutMs >= 0) {
+                       
serverSSLContext.getServerSessionContext().setSessionTimeout(sessionTimeoutMs / 
1000);
+               }
 
-                       // Set up key manager factory to use the server key 
store
-                       KeyManagerFactory kmf = KeyManagerFactory.getInstance(
-                                       
KeyManagerFactory.getDefaultAlgorithm());
-                       kmf.init(ks, certPassword.toCharArray());
+               return new SSLContext(serverSSLContext, handshakeTimeoutMs, 
closeNotifyFlushTimeoutMs);
+       }
 
-                       // Initialize the SSLContext
-                       serverSSLContext = 
SSLContext.getInstance(sslProtocolVersion);
-                       serverSSLContext.init(kmf.getKeyManagers(), null, null);
+       /**
+        * Wrapper around javax.net.ssl.SSLContext, adding SSL handshake and 
close notify timeouts
+        * which cannot be set on the SSL context directly.
+        */
+       public static class SSLContext {
+               private final javax.net.ssl.SSLContext sslContext;
+               private final int handshakeTimeoutMs;
+               private final int closeNotifyFlushTimeoutMs;
+
+               public SSLContext(
+                               javax.net.ssl.SSLContext sslContext,
+                               int handshakeTimeoutMs,
+                               int closeNotifyFlushTimeoutMs) {
+                       this.sslContext = sslContext;
+                       this.handshakeTimeoutMs = handshakeTimeoutMs;
+                       this.closeNotifyFlushTimeoutMs = 
closeNotifyFlushTimeoutMs;
                }
 
-               return serverSSLContext;
+               public javax.net.ssl.SSLContext getSslContext() {
+                       return sslContext;
+               }
+
+               public int getHandshakeTimeoutMs() {
+                       return handshakeTimeoutMs;
+               }
+
+               public int getCloseNotifyFlushTimeoutMs() {
+                       return closeNotifyFlushTimeoutMs;
+               }
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
index 33e004e..e7113ec 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.util.NetUtils;
@@ -26,15 +27,27 @@ import 
org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.string.StringDecoder;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.string.StringEncoder;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import javax.net.ssl.SSLSessionContext;
+
 import java.net.InetAddress;
 
+import static 
org.apache.flink.configuration.SecurityOptions.SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT;
+import static 
org.apache.flink.configuration.SecurityOptions.SSL_HANDSHAKE_TIMEOUT;
+import static 
org.apache.flink.configuration.SecurityOptions.SSL_SESSION_CACHE_SIZE;
+import static 
org.apache.flink.configuration.SecurityOptions.SSL_SESSION_TIMEOUT;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for communication between {@link NettyServer} and {@link NettyClient} 
via SSL.
+ */
 public class NettyClientServerSslTest {
 
        /**
@@ -42,52 +55,76 @@ public class NettyClientServerSslTest {
         */
        @Test
        public void testValidSslConnection() throws Exception {
-               NettyProtocol protocol = new NettyProtocol(null, null, true) {
-                       @Override
-                       public ChannelHandler[] getServerChannelHandlers() {
-                               return new ChannelHandler[0];
-                       }
+               testValidSslConnection(createSslConfig());
+       }
 
-                       @Override
-                       public ChannelHandler[] getClientChannelHandlers() {
-                               return new ChannelHandler[0];
-                       }
-               };
+       /**
+        * Verify valid (advanced) ssl configuration and connection.
+        */
+       @Test
+       public void testValidSslConnectionAdvanced() throws Exception {
+               Configuration sslConfig = createSslConfig();
+               sslConfig.setInteger(SSL_SESSION_CACHE_SIZE, 1);
+               sslConfig.setInteger(SSL_SESSION_TIMEOUT, 1_000);
+               sslConfig.setInteger(SSL_HANDSHAKE_TIMEOUT, 1_000);
+               sslConfig.setInteger(SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT, 1_000);
+
+               testValidSslConnection(sslConfig);
+       }
+
+       private void testValidSslConnection(Configuration sslConfig) throws 
Exception {
+               NettyProtocol protocol = getEmptyNettyProtocol();
 
                NettyConfig nettyConfig = new NettyConfig(
                        InetAddress.getLoopbackAddress(),
                        NetUtils.getAvailablePort(),
                        NettyTestUtil.DEFAULT_SEGMENT_SIZE,
                        1,
-                       createSslConfig());
+                       sslConfig);
 
                NettyTestUtil.NettyServerAndClient serverAndClient = 
NettyTestUtil.initServerAndClient(protocol, nettyConfig);
 
                Channel ch = NettyTestUtil.connect(serverAndClient);
 
+               SslHandler sslHandler = (SslHandler) ch.pipeline().get("ssl");
+               assertEqualsOrDefault(sslConfig, SSL_HANDSHAKE_TIMEOUT, 
sslHandler.getHandshakeTimeoutMillis());
+               assertEqualsOrDefault(sslConfig, 
SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT, sslHandler.getCloseNotifyTimeoutMillis());
+
                // should be able to send text data
                ch.pipeline().addLast(new StringDecoder()).addLast(new 
StringEncoder());
                assertTrue(ch.writeAndFlush("test").await().isSuccess());
 
+               // session context is only be available after a session was 
setup -> this should be true after data was sent
+               SSLSessionContext sessionContext = 
sslHandler.engine().getSession().getSessionContext();
+               assertNotNull("bug in unit test setup: session context not 
available", sessionContext);
+               assertEqualsOrDefault(sslConfig, SSL_SESSION_CACHE_SIZE, 
sessionContext.getSessionCacheSize());
+               int sessionTimeout = sslConfig.getInteger(SSL_SESSION_TIMEOUT);
+               if (sessionTimeout != -1) {
+                       // session timeout config is in milliseconds but the 
context returns it in seconds
+                       assertEquals(sessionTimeout / 1000, 
sessionContext.getSessionTimeout());
+               } else {
+                       assertTrue("default value (-1) should not be 
propagated", sessionContext.getSessionTimeout() >= 0);
+               }
+
                NettyTestUtil.shutdown(serverAndClient);
        }
 
+       private static void assertEqualsOrDefault(Configuration sslConfig, 
ConfigOption<Integer> option, long actual) {
+               long expected = sslConfig.getInteger(option);
+               if (expected != option.defaultValue()) {
+                       assertEquals(expected, actual);
+               } else {
+                       assertTrue("default value (" + option.defaultValue() + 
") should not be propagated",
+                               actual >= 0);
+               }
+       }
+
        /**
         * Verify failure on invalid ssl configuration.
         */
        @Test
-       public void testInvalidSslConfiguration() throws Exception {
-               NettyProtocol protocol = new NettyProtocol(null, null, true) {
-                       @Override
-                       public ChannelHandler[] getServerChannelHandlers() {
-                               return new ChannelHandler[0];
-                       }
-
-                       @Override
-                       public ChannelHandler[] getClientChannelHandlers() {
-                               return new ChannelHandler[0];
-                       }
-               };
+       public void testInvalidSslConfiguration() {
+               NettyProtocol protocol = getEmptyNettyProtocol();
 
                Configuration config = createSslConfig();
                // Modify the keystore password to an incorrect one
@@ -116,17 +153,7 @@ public class NettyClientServerSslTest {
         */
        @Test
        public void testSslHandshakeError() throws Exception {
-               NettyProtocol protocol = new NettyProtocol(null, null, true) {
-                       @Override
-                       public ChannelHandler[] getServerChannelHandlers() {
-                               return new ChannelHandler[0];
-                       }
-
-                       @Override
-                       public ChannelHandler[] getClientChannelHandlers() {
-                               return new ChannelHandler[0];
-                       }
-               };
+               NettyProtocol protocol = getEmptyNettyProtocol();
 
                Configuration config = createSslConfig();
 
@@ -151,8 +178,7 @@ public class NettyClientServerSslTest {
                NettyTestUtil.shutdown(serverAndClient);
        }
 
-       private Configuration createSslConfig() throws Exception {
-
+       private Configuration createSslConfig() {
                Configuration flinkConfig = new Configuration();
                flinkConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
                flinkConfig.setString(SecurityOptions.SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
@@ -162,4 +188,18 @@ public class NettyClientServerSslTest {
                flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, 
"password");
                return flinkConfig;
        }
+
+       private static NettyProtocol getEmptyNettyProtocol() {
+               return new NettyProtocol(null, null, true) {
+                       @Override
+                       public ChannelHandler[] getServerChannelHandlers() {
+                               return new ChannelHandler[0];
+                       }
+
+                       @Override
+                       public ChannelHandler[] getClientChannelHandlers() {
+                               return new ChannelHandler[0];
+                       }
+               };
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
index 38c8cee..a5db40f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.SecurityOptions;
 import org.junit.Assert;
 import org.junit.Test;
 
-import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLServerSocket;
 
@@ -33,6 +32,7 @@ import java.util.Arrays;
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -58,7 +58,7 @@ public class SSLUtilsTest {
                clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, 
"src/test/resources/local127.truststore");
                clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, 
"password");
 
-               SSLContext clientContext = 
SSLUtils.createSSLClientContext(clientConfig);
+               SSLUtils.SSLContext clientContext = 
SSLUtils.createSSLClientContext(clientConfig);
                Assert.assertNotNull(clientContext);
        }
 
@@ -71,7 +71,7 @@ public class SSLUtilsTest {
                Configuration clientConfig = new Configuration();
                clientConfig.setBoolean(SecurityOptions.SSL_ENABLED, false);
 
-               SSLContext clientContext = 
SSLUtils.createSSLClientContext(clientConfig);
+               SSLUtils.SSLContext clientContext = 
SSLUtils.createSSLClientContext(clientConfig);
                Assert.assertNull(clientContext);
        }
 
@@ -87,7 +87,7 @@ public class SSLUtilsTest {
                clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, 
"badpassword");
 
                try {
-                       SSLContext clientContext = 
SSLUtils.createSSLClientContext(clientConfig);
+                       SSLUtils.SSLContext clientContext = 
SSLUtils.createSSLClientContext(clientConfig);
                        Assert.fail("SSL client context created even with bad 
SSL configuration ");
                } catch (Exception e) {
                        // Exception here is valid
@@ -106,7 +106,7 @@ public class SSLUtilsTest {
                serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, 
"password");
                serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, 
"password");
 
-               SSLContext serverContext = 
SSLUtils.createSSLServerContext(serverConfig);
+               SSLUtils.SSLContext serverContext = 
SSLUtils.createSSLServerContext(serverConfig);
                Assert.assertNotNull(serverContext);
        }
 
@@ -119,7 +119,7 @@ public class SSLUtilsTest {
                Configuration serverConfig = new Configuration();
                serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, false);
 
-               SSLContext serverContext = 
SSLUtils.createSSLServerContext(serverConfig);
+               SSLUtils.SSLContext serverContext = 
SSLUtils.createSSLServerContext(serverConfig);
                Assert.assertNull(serverContext);
        }
 
@@ -136,7 +136,7 @@ public class SSLUtilsTest {
                serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD, 
"badpassword");
 
                try {
-                       SSLContext serverContext = 
SSLUtils.createSSLServerContext(serverConfig);
+                       SSLUtils.SSLContext serverContext = 
SSLUtils.createSSLServerContext(serverConfig);
                        Assert.fail("SSL server context created even with bad 
SSL configuration ");
                } catch (Exception e) {
                        // Exception here is valid
@@ -157,7 +157,7 @@ public class SSLUtilsTest {
                serverConfig.setString(SecurityOptions.SSL_PROTOCOL, 
"TLSv1,TLSv1.2");
 
                try {
-                       SSLContext serverContext = 
SSLUtils.createSSLServerContext(serverConfig);
+                       SSLUtils.SSLContext serverContext = 
SSLUtils.createSSLServerContext(serverConfig);
                        Assert.fail("SSL server context created even with 
multiple protocols set ");
                } catch (Exception e) {
                        // Exception here is valid
@@ -178,10 +178,9 @@ public class SSLUtilsTest {
                serverConfig.setString(SecurityOptions.SSL_PROTOCOL, "TLSv1.1");
                serverConfig.setString(SecurityOptions.SSL_ALGORITHMS, 
"TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_128_CBC_SHA256");
 
-               SSLContext serverContext = 
SSLUtils.createSSLServerContext(serverConfig);
-               ServerSocket socket = null;
-               try {
-                       socket = 
serverContext.getServerSocketFactory().createServerSocket(0);
+               SSLUtils.SSLContext serverContext = 
SSLUtils.createSSLServerContext(serverConfig);
+               assertNotNull(serverContext);
+               try (ServerSocket socket = 
serverContext.getSslContext().getServerSocketFactory().createServerSocket(0)) {
 
                        String[] protocols = ((SSLServerSocket) 
socket).getEnabledProtocols();
                        String[] algorithms = ((SSLServerSocket) 
socket).getEnabledCipherSuites();
@@ -198,10 +197,6 @@ public class SSLUtilsTest {
                        Assert.assertEquals(2, algorithms.length);
                        
Assert.assertTrue(algorithms[0].equals("TLS_RSA_WITH_AES_128_CBC_SHA") || 
algorithms[0].equals("TLS_RSA_WITH_AES_128_CBC_SHA256"));
                        
Assert.assertTrue(algorithms[1].equals("TLS_RSA_WITH_AES_128_CBC_SHA") || 
algorithms[1].equals("TLS_RSA_WITH_AES_128_CBC_SHA256"));
-               } finally {
-                       if (socket != null) {
-                               socket.close();
-                       }
                }
        }
 
@@ -219,8 +214,9 @@ public class SSLUtilsTest {
                serverConfig.setString(SecurityOptions.SSL_PROTOCOL, "TLSv1");
                serverConfig.setString(SecurityOptions.SSL_ALGORITHMS, 
"TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256");
 
-               SSLContext serverContext = 
SSLUtils.createSSLServerContext(serverConfig);
-               SSLEngine engine = serverContext.createSSLEngine();
+               SSLUtils.SSLContext serverContext = 
SSLUtils.createSSLServerContext(serverConfig);
+               assertNotNull(serverContext);
+               SSLEngine engine = 
serverContext.getSslContext().createSSLEngine();
 
                String[] protocols = engine.getEnabledProtocols();
                String[] algorithms = engine.getEnabledCipherSuites();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 93dbb5d..59db163 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -164,9 +164,9 @@ public class RestServerEndpointITCase extends TestLogger {
                config.setString(WebOptions.UPLOAD_DIR, 
temporaryFolder.newFolder().getCanonicalPath());
 
                defaultSSLContext = SSLContext.getDefault();
-               final SSLContext sslClientContext = 
SSLUtils.createSSLClientContext(config);
+               final SSLUtils.SSLContext sslClientContext = 
SSLUtils.createSSLClientContext(config);
                if (sslClientContext != null) {
-                       SSLContext.setDefault(sslClientContext);
+                       SSLContext.setDefault(sslClientContext.getSslContext());
                }
 
                RestServerEndpointConfiguration serverConfig = 
RestServerEndpointConfiguration.fromConfiguration(config);

Reply via email to