[
https://issues.apache.org/jira/browse/FLINK-9878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16586567#comment-16586567
]
ASF GitHub Bot commented on FLINK-9878:
---------------------------------------
NicoK closed pull request #6355: [FLINK-9878][network][ssl] add more low-level
ssl options
URL: https://github.com/apache/flink/pull/6355
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/docs/_includes/generated/security_configuration.html
b/docs/_includes/generated/security_configuration.html
index cd682ecaf0f..357629473cd 100644
--- a/docs/_includes/generated/security_configuration.html
+++ b/docs/_includes/generated/security_configuration.html
@@ -12,11 +12,21 @@
<td style="word-wrap:
break-word;">"TLS_RSA_WITH_AES_128_CBC_SHA"</td>
<td>The comma separated list of standard SSL algorithms to be
supported. Read more <a
href="http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites">here</a>.</td>
</tr>
+ <tr>
+ <td><h5>security.ssl.close-notify-flush-timeout</h5></td>
+ <td style="word-wrap: break-word;">-1</td>
+ <td>The timeout (in ms) for flushing the `close_notify` that was
triggered by closing a channel. If the `close_notify` was not flushed in the
given timeout the channel will be closed forcibly. (-1 = use system
default)</td>
+ </tr>
<tr>
<td><h5>security.ssl.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Turns on SSL for internal network communication. This can be
optionally overridden by flags defined in different transport modules.</td>
</tr>
+ <tr>
+ <td><h5>security.ssl.handshake-timeout</h5></td>
+ <td style="word-wrap: break-word;">-1</td>
+ <td>The timeout (in ms) during SSL handshake. (-1 = use system
default)</td>
+ </tr>
<tr>
<td><h5>security.ssl.key-password</h5></td>
<td style="word-wrap: break-word;">(none)</td>
@@ -37,6 +47,16 @@
<td style="word-wrap: break-word;">"TLSv1.2"</td>
<td>The SSL protocol version to be supported for the ssl
transport. Note that it doesn’t support comma separated list.</td>
</tr>
+ <tr>
+ <td><h5>security.ssl.session-cache-size</h5></td>
+ <td style="word-wrap: break-word;">-1</td>
+ <td>The size of the cache used for storing SSL session objects.
According to https://github.com/netty/netty/issues/832, you should always set
this to an appropriate number to not run into a bug with stalling IO threads
during garbage collection. (-1 = use system default).</td>
+ </tr>
+ <tr>
+ <td><h5>security.ssl.session-timeout</h5></td>
+ <td style="word-wrap: break-word;">-1</td>
+ <td>The timeout (in ms) for the cached SSL session objects. (-1 =
use system default)</td>
+ </tr>
<tr>
<td><h5>security.ssl.truststore</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/docs/ops/security-ssl.md b/docs/ops/security-ssl.md
index c2ba7df8849..a805238ae08 100644
--- a/docs/ops/security-ssl.md
+++ b/docs/ops/security-ssl.md
@@ -33,6 +33,10 @@ SSL can be enabled for all network communication between
Flink components. SSL k
* **akka.ssl.enabled**: SSL flag for akka based control connection between the
Flink client, jobmanager and taskmanager
* **jobmanager.web.ssl.enabled**: Flag to enable https access to the
jobmanager's web frontend
+### Complete List of SSL Options
+
+{% include generated/security_configuration.html %}
+
## Deploying Keystores and Truststores
You need to have a Java Keystore generated and copied to each node in the
Flink cluster. The common name or subject alternative names in the certificate
should match the node's hostname and IP address. Keystores and truststores can
be generated using the [keytool
utility](https://docs.oracle.com/javase/8/docs/technotes/tools/unix/keytool.html).
All Flink components should have read access to the keystore and truststore
files.
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index 0f25c6caf95..60a97643a4e 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -160,4 +160,41 @@
key("security.ssl.verify-hostname")
.defaultValue(true)
.withDescription("Flag to enable peer’s hostname
verification during ssl handshake.");
+
+ /**
+ * SSL session cache size.
+ */
+ public static final ConfigOption<Integer> SSL_SESSION_CACHE_SIZE =
+ key("security.ssl.session-cache-size")
+ .defaultValue(-1)
+ .withDescription("The size of the cache used for
storing SSL session objects. "
+ + "According to
https://github.com/netty/netty/issues/832, you should always set "
+ + "this to an appropriate number to not run
into a bug with stalling IO threads "
+ + "during garbage collection. (-1 = use system
default).");
+
+ /**
+ * SSL session timeout.
+ */
+ public static final ConfigOption<Integer> SSL_SESSION_TIMEOUT =
+ key("security.ssl.session-timeout")
+ .defaultValue(-1)
+ .withDescription("The timeout (in ms) for the cached
SSL session objects. (-1 = use system default)");
+
+ /**
+ * SSL session timeout during handshakes.
+ */
+ public static final ConfigOption<Integer> SSL_HANDSHAKE_TIMEOUT =
+ key("security.ssl.handshake-timeout")
+ .defaultValue(-1)
+ .withDescription("The timeout (in ms) during SSL
handshake. (-1 = use system default)");
+
+ /**
+ * SSL session timeout after flushing the <tt>close_notify</tt> message.
+ */
+ public static final ConfigOption<Integer>
SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT =
+ key("security.ssl.close-notify-flush-timeout")
+ .defaultValue(-1)
+ .withDescription("The timeout (in ms) for flushing the
`close_notify` that was triggered by closing a " +
+ "channel. If the `close_notify` was not flushed
in the given timeout the channel will be closed " +
+ "forcibly. (-1 = use system default)");
}
diff --git
a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index 57f4718816c..30c4edf0155 100644
---
a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++
b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -58,7 +58,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
import javax.net.ssl.SSLEngine;
import java.io.File;
@@ -104,7 +104,8 @@
private final Map<Path, URL> paths = new HashMap<>();
- private final SSLContext serverSSLContext;
+ @Nullable
+ private final SSLUtils.SSLContext serverSSLContext;
public MesosArtifactServer(String prefix, String serverHostname, int
configuredPort, Configuration config)
throws Exception {
@@ -139,7 +140,7 @@ protected void initChannel(SocketChannel ch) {
// SSL should be the first handler in the
pipeline
if (serverSSLContext != null) {
- SSLEngine sslEngine =
serverSSLContext.createSSLEngine();
+ SSLEngine sslEngine =
serverSSLContext.getSslContext().createSSLEngine();
SSLUtils.setSSLVerAndCipherSuites(sslEngine, sslConfig);
sslEngine.setUseClientMode(false);
ch.pipeline().addLast("ssl", new
SslHandler(sslEngine));
diff --git
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index f27ae0067d7..4323ad01168 100644
---
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -92,7 +92,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
@@ -130,7 +130,8 @@
/** Service which retrieves the currently leading JobManager and opens
a JobManagerGateway. */
private final LeaderGatewayRetriever<JobManagerGateway> retriever;
- private final SSLContext serverSSLContext;
+ @Nullable
+ private final SSLUtils.SSLContext serverSSLContext;
private final CompletableFuture<String> localRestAddress = new
CompletableFuture<>();
diff --git
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 108f5c4595a..0484afdcc34 100644
---
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -43,7 +43,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
import java.io.File;
import java.io.FileWriter;
@@ -90,7 +90,8 @@
private final HistoryServerArchiveFetcher archiveFetcher;
- private final SSLContext serverSSLContext;
+ @Nullable
+ private final SSLUtils.SSLContext serverSSLContext;
private WebFrontendBootstrap netty;
private final Object startupShutdownLock = new Object();
diff --git
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
index 740beaee1cb..c3148b73878 100644
---
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
+++
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
@@ -40,7 +40,7 @@
import org.slf4j.Logger;
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
import javax.net.ssl.SSLEngine;
import java.io.File;
@@ -55,7 +55,8 @@
private final Router router;
private final Logger log;
private final File uploadDir;
- private final SSLContext serverSSLContext;
+ @Nullable
+ private final SSLUtils.SSLContext serverSSLContext;
private final ServerBootstrap bootstrap;
private final Channel serverChannel;
private final String restAddress;
@@ -64,7 +65,7 @@ public WebFrontendBootstrap(
Router router,
Logger log,
File directory,
- SSLContext sslContext,
+ @Nullable SSLUtils.SSLContext sslContext,
String configuredAddress,
int configuredPort,
final Configuration config) throws
InterruptedException, UnknownHostException {
@@ -81,7 +82,7 @@ protected void initChannel(SocketChannel ch) {
// SSL should be the first handler in the
pipeline
if (serverSSLContext != null) {
- SSLEngine sslEngine =
serverSSLContext.createSSLEngine();
+ SSLEngine sslEngine =
serverSSLContext.getSslContext().createSSLEngine();
SSLUtils.setSSLVerAndCipherSuites(sslEngine, config);
sslEngine.setUseClientMode(false);
ch.pipeline().addLast("ssl", new
SslHandler(sslEngine));
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 8e6b32811c2..4bf71779058 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -31,7 +31,6 @@
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocket;
@@ -91,7 +90,7 @@ public BlobClient(InetSocketAddress serverAddress,
Configuration clientConfig) t
try {
// Check if ssl is enabled
- SSLContext clientSSLContext = null;
+ SSLUtils.SSLContext clientSSLContext = null;
if (clientConfig != null &&
clientConfig.getBoolean(BlobServerOptions.SSL_ENABLED)) {
@@ -102,7 +101,7 @@ public BlobClient(InetSocketAddress serverAddress,
Configuration clientConfig) t
LOG.info("Using ssl connection to the blob
server");
- SSLSocket sslSocket = (SSLSocket)
clientSSLContext.getSocketFactory().createSocket(
+ SSLSocket sslSocket = (SSLSocket)
clientSSLContext.getSslContext().getSocketFactory().createSocket(
serverAddress.getAddress(),
serverAddress.getPort());
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index dd0155cbbdd..1a1b0da63dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -33,7 +33,6 @@
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import javax.net.ssl.SSLContext;
import java.io.File;
import java.io.FileNotFoundException;
@@ -79,7 +78,7 @@
private final ServerSocket serverSocket;
/** The SSL server context if ssl is enabled for the connections. */
- private final SSLContext serverSSLContext;
+ private final SSLUtils.SSLContext serverSSLContext;
/** Blob Server configuration. */
private final Configuration blobServiceConfiguration;
@@ -196,7 +195,7 @@ public ServerSocket createSocket(int port) throws
IOException {
return new ServerSocket(port,
finalBacklog);
} else {
LOG.info("Enabling ssl for the blob
server");
- return
serverSSLContext.getServerSocketFactory().createServerSocket(port,
finalBacklog);
+ return
serverSSLContext.getSslContext().getServerSocketFactory().createServerSocket(port,
finalBacklog);
}
}
});
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
index 5fb083d33ad..44561b0638f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.io.network.netty;
+import org.apache.flink.runtime.net.SSLUtils;
+
import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
@@ -34,9 +36,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
+
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -52,7 +55,8 @@
private Bootstrap bootstrap;
- private SSLContext clientSSLContext = null;
+ @Nullable
+ private SSLUtils.SSLContext clientSSLContext = null;
NettyClient(NettyConfig config) {
this.config = config;
@@ -178,7 +182,7 @@ public void initChannel(SocketChannel channel) throws
Exception {
// SSL handler should be added first in the
pipeline
if (clientSSLContext != null) {
- SSLEngine sslEngine =
clientSSLContext.createSSLEngine(
+ SSLEngine sslEngine =
clientSSLContext.getSslContext().createSSLEngine(
serverSocketAddress.getAddress().getCanonicalHostName(),
serverSocketAddress.getPort());
sslEngine.setUseClientMode(true);
@@ -190,7 +194,14 @@ public void initChannel(SocketChannel channel) throws
Exception {
sslEngine.setSSLParameters(newSSLParameters);
}
- channel.pipeline().addLast("ssl", new
SslHandler(sslEngine));
+ SslHandler sslHandler = new
SslHandler(sslEngine);
+ if
(clientSSLContext.getHandshakeTimeoutMs() >= 0) {
+
sslHandler.setHandshakeTimeoutMillis(clientSSLContext.getHandshakeTimeoutMs());
+ }
+ if
(clientSSLContext.getCloseNotifyFlushTimeoutMs() >= 0) {
+
sslHandler.setCloseNotifyTimeoutMillis(clientSSLContext.getCloseNotifyFlushTimeoutMs());
+ }
+ channel.pipeline().addLast("ssl",
sslHandler);
}
channel.pipeline().addLast(protocol.getClientChannelHandlers());
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 18527c46a53..9b32ebbcd95 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -23,12 +23,14 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.net.SSLUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
+
import java.net.InetAddress;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -189,26 +191,13 @@ public TransportType getTransportType() {
}
}
- public SSLContext createClientSSLContext() throws Exception {
-
- // Create SSL Context from config
- SSLContext clientSSLContext = null;
- if (getSSLEnabled()) {
- clientSSLContext =
SSLUtils.createSSLClientContext(config);
- }
-
- return clientSSLContext;
+ @Nullable
+ public SSLUtils.SSLContext createClientSSLContext() throws Exception {
+ return SSLUtils.createSSLClientContext(config);
}
- public SSLContext createServerSSLContext() throws Exception {
-
- // Create SSL Context from config
- SSLContext serverSSLContext = null;
- if (getSSLEnabled()) {
- serverSSLContext =
SSLUtils.createSSLServerContext(config);
- }
-
- return serverSSLContext;
+ public SSLUtils.SSLContext createServerSSLContext() throws Exception {
+ return SSLUtils.createSSLServerContext(config);
}
public boolean getSSLEnabled() {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
index c6d09d05499..f919ded9e3c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.io.network.netty;
+import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.util.FatalExitExceptionHandler;
import
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -36,7 +37,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.SSLContext;
+import javax.annotation.Nullable;
import javax.net.ssl.SSLEngine;
import java.io.IOException;
@@ -61,7 +62,8 @@
private ChannelFuture bindFuture;
- private SSLContext serverSSLContext = null;
+ @Nullable
+ private SSLUtils.SSLContext serverSSLContext = null;
private InetSocketAddress localAddress;
@@ -152,10 +154,17 @@ void init(final NettyProtocol protocol, NettyBufferPool
nettyBufferPool) throws
@Override
public void initChannel(SocketChannel channel) throws
Exception {
if (serverSSLContext != null) {
- SSLEngine sslEngine =
serverSSLContext.createSSLEngine();
+ SSLEngine sslEngine =
serverSSLContext.getSslContext().createSSLEngine();
config.setSSLVerAndCipherSuites(sslEngine);
sslEngine.setUseClientMode(false);
- channel.pipeline().addLast("ssl", new
SslHandler(sslEngine));
+ SslHandler sslHandler = new
SslHandler(sslEngine);
+ if
(serverSSLContext.getHandshakeTimeoutMs() >= 0) {
+
sslHandler.setHandshakeTimeoutMillis(serverSSLContext.getHandshakeTimeoutMs());
+ }
+ if
(serverSSLContext.getCloseNotifyFlushTimeoutMs() >= 0) {
+
sslHandler.setCloseNotifyTimeoutMillis(serverSSLContext.getCloseNotifyFlushTimeoutMs());
+ }
+ channel.pipeline().addLast("ssl",
sslHandler);
}
channel.pipeline().addLast(protocol.getServerChannelHandlers());
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
index b574d30484f..69da666ab1d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
@@ -113,7 +113,7 @@ private static SSLEngineFactory createSSLEngineFactory(
checkState(sslContext != null, "%s it not enabled",
SecurityOptions.SSL_ENABLED.key());
return new SSLEngineFactory(
- sslContext,
+ sslContext.getSslContext(),
getEnabledProtocols(config),
getEnabledCipherSuites(config),
clientMode);
@@ -176,39 +176,43 @@ public static void setSSLVerifyHostname(Configuration
sslConfig, SSLParameters s
public static SSLContext createSSLClientContext(Configuration
sslConfig) throws Exception {
Preconditions.checkNotNull(sslConfig);
- SSLContext clientSSLContext = null;
- if (getSSLEnabled(sslConfig)) {
- LOG.debug("Creating client SSL context from
configuration");
-
- String trustStoreFilePath =
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE);
- String trustStorePassword =
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
- String sslProtocolVersion =
sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+ if (!getSSLEnabled(sslConfig)) {
+ return null;
+ }
- Preconditions.checkNotNull(trustStoreFilePath,
SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured.");
- Preconditions.checkNotNull(trustStorePassword,
SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured.");
+ LOG.debug("Creating client SSL context from configuration");
- KeyStore trustStore =
KeyStore.getInstance(KeyStore.getDefaultType());
+ String trustStoreFilePath =
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE);
+ String trustStorePassword =
sslConfig.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
+ String sslProtocolVersion =
sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+ int sessionCacheSize =
sslConfig.getInteger(SecurityOptions.SSL_SESSION_CACHE_SIZE);
+ int sessionTimeoutMs =
sslConfig.getInteger(SecurityOptions.SSL_SESSION_TIMEOUT);
+ int handshakeTimeoutMs =
sslConfig.getInteger(SecurityOptions.SSL_HANDSHAKE_TIMEOUT);
+ int closeNotifyFlushTimeoutMs =
sslConfig.getInteger(SecurityOptions.SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT);
- FileInputStream trustStoreFile = null;
- try {
- trustStoreFile = new FileInputStream(new
File(trustStoreFilePath));
- trustStore.load(trustStoreFile,
trustStorePassword.toCharArray());
- } finally {
- if (trustStoreFile != null) {
- trustStoreFile.close();
- }
- }
+ Preconditions.checkNotNull(trustStoreFilePath,
SecurityOptions.SSL_TRUSTSTORE.key() + " was not configured.");
+ Preconditions.checkNotNull(trustStorePassword,
SecurityOptions.SSL_TRUSTSTORE_PASSWORD.key() + " was not configured.");
- TrustManagerFactory trustManagerFactory =
TrustManagerFactory.getInstance(
- TrustManagerFactory.getDefaultAlgorithm());
- trustManagerFactory.init(trustStore);
+ KeyStore trustStore =
KeyStore.getInstance(KeyStore.getDefaultType());
- clientSSLContext =
SSLContext.getInstance(sslProtocolVersion);
- clientSSLContext.init(null,
trustManagerFactory.getTrustManagers(), null);
+ try (FileInputStream trustStoreFile = new FileInputStream(new
File(trustStoreFilePath))) {
+ trustStore.load(trustStoreFile,
trustStorePassword.toCharArray());
}
- return clientSSLContext;
+ TrustManagerFactory trustManagerFactory =
TrustManagerFactory.getInstance(
+ TrustManagerFactory.getDefaultAlgorithm());
+ trustManagerFactory.init(trustStore);
+
+ javax.net.ssl.SSLContext clientSSLContext =
javax.net.ssl.SSLContext.getInstance(sslProtocolVersion);
+ clientSSLContext.init(null,
trustManagerFactory.getTrustManagers(), null);
+ if (sessionCacheSize >= 0) {
+
clientSSLContext.getClientSessionContext().setSessionCacheSize(sessionCacheSize);
+ }
+ if (sessionTimeoutMs >= 0) {
+
clientSSLContext.getClientSessionContext().setSessionTimeout(sessionTimeoutMs /
1000);
+ }
+ return new SSLContext(clientSSLContext, handshakeTimeoutMs,
closeNotifyFlushTimeoutMs);
}
/**
@@ -225,38 +229,77 @@ public static SSLContext
createSSLClientContext(Configuration sslConfig) throws
public static SSLContext createSSLServerContext(Configuration
sslConfig) throws Exception {
Preconditions.checkNotNull(sslConfig);
- SSLContext serverSSLContext = null;
- if (getSSLEnabled(sslConfig)) {
- LOG.debug("Creating server SSL context from
configuration");
+ if (!getSSLEnabled(sslConfig)) {
+ return null;
+ }
- String keystoreFilePath =
sslConfig.getString(SecurityOptions.SSL_KEYSTORE);
+ LOG.debug("Creating server SSL context from configuration");
- String keystorePassword =
sslConfig.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD);
+ String keystoreFilePath =
sslConfig.getString(SecurityOptions.SSL_KEYSTORE);
+ String keystorePassword =
sslConfig.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD);
+ String certPassword =
sslConfig.getString(SecurityOptions.SSL_KEY_PASSWORD);
+ String sslProtocolVersion =
sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+ int sessionCacheSize =
sslConfig.getInteger(SecurityOptions.SSL_SESSION_CACHE_SIZE);
+ int sessionTimeoutMs =
sslConfig.getInteger(SecurityOptions.SSL_SESSION_TIMEOUT);
+ int handshakeTimeoutMs =
sslConfig.getInteger(SecurityOptions.SSL_HANDSHAKE_TIMEOUT);
+ int closeNotifyFlushTimeoutMs =
sslConfig.getInteger(SecurityOptions.SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT);
- String certPassword =
sslConfig.getString(SecurityOptions.SSL_KEY_PASSWORD);
+ Preconditions.checkNotNull(keystoreFilePath,
SecurityOptions.SSL_KEYSTORE.key() + " was not configured.");
+ Preconditions.checkNotNull(keystorePassword,
SecurityOptions.SSL_KEYSTORE_PASSWORD.key() + " was not configured.");
+ Preconditions.checkNotNull(certPassword,
SecurityOptions.SSL_KEY_PASSWORD.key() + " was not configured.");
- String sslProtocolVersion =
sslConfig.getString(SecurityOptions.SSL_PROTOCOL);
+ KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
+ try (FileInputStream keyStoreFile = new FileInputStream(new
File(keystoreFilePath))) {
+ ks.load(keyStoreFile, keystorePassword.toCharArray());
+ }
- Preconditions.checkNotNull(keystoreFilePath,
SecurityOptions.SSL_KEYSTORE.key() + " was not configured.");
- Preconditions.checkNotNull(keystorePassword,
SecurityOptions.SSL_KEYSTORE_PASSWORD.key() + " was not configured.");
- Preconditions.checkNotNull(certPassword,
SecurityOptions.SSL_KEY_PASSWORD.key() + " was not configured.");
+ // Set up key manager factory to use the server key store
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(
+ KeyManagerFactory.getDefaultAlgorithm());
+ kmf.init(ks, certPassword.toCharArray());
- KeyStore ks =
KeyStore.getInstance(KeyStore.getDefaultType());
- try (FileInputStream keyStoreFile = new
FileInputStream(new File(keystoreFilePath))) {
- ks.load(keyStoreFile,
keystorePassword.toCharArray());
- }
+ // Initialize the SSLContext
+ javax.net.ssl.SSLContext serverSSLContext =
javax.net.ssl.SSLContext.getInstance(sslProtocolVersion);
+ serverSSLContext.init(kmf.getKeyManagers(), null, null);
+ if (sessionCacheSize >= 0) {
+
serverSSLContext.getServerSessionContext().setSessionCacheSize(sessionCacheSize);
+ }
+ if (sessionTimeoutMs >= 0) {
+
serverSSLContext.getServerSessionContext().setSessionTimeout(sessionTimeoutMs /
1000);
+ }
- // Set up key manager factory to use the server key
store
- KeyManagerFactory kmf = KeyManagerFactory.getInstance(
-
KeyManagerFactory.getDefaultAlgorithm());
- kmf.init(ks, certPassword.toCharArray());
+ return new SSLContext(serverSSLContext, handshakeTimeoutMs,
closeNotifyFlushTimeoutMs);
+ }
- // Initialize the SSLContext
- serverSSLContext =
SSLContext.getInstance(sslProtocolVersion);
- serverSSLContext.init(kmf.getKeyManagers(), null, null);
+ /**
+ * Wrapper around javax.net.ssl.SSLContext, adding SSL handshake and
close notify timeouts
+ * which cannot be set on the SSL context directly.
+ */
+ public static class SSLContext {
+ private final javax.net.ssl.SSLContext sslContext;
+ private final int handshakeTimeoutMs;
+ private final int closeNotifyFlushTimeoutMs;
+
+ public SSLContext(
+ javax.net.ssl.SSLContext sslContext,
+ int handshakeTimeoutMs,
+ int closeNotifyFlushTimeoutMs) {
+ this.sslContext = sslContext;
+ this.handshakeTimeoutMs = handshakeTimeoutMs;
+ this.closeNotifyFlushTimeoutMs =
closeNotifyFlushTimeoutMs;
}
- return serverSSLContext;
+ public javax.net.ssl.SSLContext getSslContext() {
+ return sslContext;
+ }
+
+ public int getHandshakeTimeoutMs() {
+ return handshakeTimeoutMs;
+ }
+
+ public int getCloseNotifyFlushTimeoutMs() {
+ return closeNotifyFlushTimeoutMs;
+ }
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
index 33e004ebdf6..e7113ec6695 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.io.network.netty;
+import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.util.NetUtils;
@@ -26,15 +27,27 @@
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import
org.apache.flink.shaded.netty4.io.netty.handler.codec.string.StringDecoder;
import
org.apache.flink.shaded.netty4.io.netty.handler.codec.string.StringEncoder;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.junit.Assert;
import org.junit.Test;
+import javax.net.ssl.SSLSessionContext;
+
import java.net.InetAddress;
+import static
org.apache.flink.configuration.SecurityOptions.SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT;
+import static
org.apache.flink.configuration.SecurityOptions.SSL_HANDSHAKE_TIMEOUT;
+import static
org.apache.flink.configuration.SecurityOptions.SSL_SESSION_CACHE_SIZE;
+import static
org.apache.flink.configuration.SecurityOptions.SSL_SESSION_TIMEOUT;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+/**
+ * Tests for communication between {@link NettyServer} and {@link NettyClient}
via SSL.
+ */
public class NettyClientServerSslTest {
/**
@@ -42,52 +55,76 @@
*/
@Test
public void testValidSslConnection() throws Exception {
- NettyProtocol protocol = new NettyProtocol(null, null, true) {
- @Override
- public ChannelHandler[] getServerChannelHandlers() {
- return new ChannelHandler[0];
- }
+ testValidSslConnection(createSslConfig());
+ }
- @Override
- public ChannelHandler[] getClientChannelHandlers() {
- return new ChannelHandler[0];
- }
- };
+ /**
+ * Verify valid (advanced) ssl configuration and connection.
+ */
+ @Test
+ public void testValidSslConnectionAdvanced() throws Exception {
+ Configuration sslConfig = createSslConfig();
+ sslConfig.setInteger(SSL_SESSION_CACHE_SIZE, 1);
+ sslConfig.setInteger(SSL_SESSION_TIMEOUT, 1_000);
+ sslConfig.setInteger(SSL_HANDSHAKE_TIMEOUT, 1_000);
+ sslConfig.setInteger(SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT, 1_000);
+
+ testValidSslConnection(sslConfig);
+ }
+
+ private void testValidSslConnection(Configuration sslConfig) throws
Exception {
+ NettyProtocol protocol = getEmptyNettyProtocol();
NettyConfig nettyConfig = new NettyConfig(
InetAddress.getLoopbackAddress(),
NetUtils.getAvailablePort(),
NettyTestUtil.DEFAULT_SEGMENT_SIZE,
1,
- createSslConfig());
+ sslConfig);
NettyTestUtil.NettyServerAndClient serverAndClient =
NettyTestUtil.initServerAndClient(protocol, nettyConfig);
Channel ch = NettyTestUtil.connect(serverAndClient);
+ SslHandler sslHandler = (SslHandler) ch.pipeline().get("ssl");
+ assertEqualsOrDefault(sslConfig, SSL_HANDSHAKE_TIMEOUT,
sslHandler.getHandshakeTimeoutMillis());
+ assertEqualsOrDefault(sslConfig,
SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT, sslHandler.getCloseNotifyTimeoutMillis());
+
// should be able to send text data
ch.pipeline().addLast(new StringDecoder()).addLast(new
StringEncoder());
assertTrue(ch.writeAndFlush("test").await().isSuccess());
+ // session context is only be available after a session was
setup -> this should be true after data was sent
+ SSLSessionContext sessionContext =
sslHandler.engine().getSession().getSessionContext();
+ assertNotNull("bug in unit test setup: session context not
available", sessionContext);
+ assertEqualsOrDefault(sslConfig, SSL_SESSION_CACHE_SIZE,
sessionContext.getSessionCacheSize());
+ int sessionTimeout = sslConfig.getInteger(SSL_SESSION_TIMEOUT);
+ if (sessionTimeout != -1) {
+ // session timeout config is in milliseconds but the
context returns it in seconds
+ assertEquals(sessionTimeout / 1000,
sessionContext.getSessionTimeout());
+ } else {
+ assertTrue("default value (-1) should not be
propagated", sessionContext.getSessionTimeout() >= 0);
+ }
+
NettyTestUtil.shutdown(serverAndClient);
}
+ private static void assertEqualsOrDefault(Configuration sslConfig,
ConfigOption<Integer> option, long actual) {
+ long expected = sslConfig.getInteger(option);
+ if (expected != option.defaultValue()) {
+ assertEquals(expected, actual);
+ } else {
+ assertTrue("default value (" + option.defaultValue() +
") should not be propagated",
+ actual >= 0);
+ }
+ }
+
/**
* Verify failure on invalid ssl configuration.
*/
@Test
- public void testInvalidSslConfiguration() throws Exception {
- NettyProtocol protocol = new NettyProtocol(null, null, true) {
- @Override
- public ChannelHandler[] getServerChannelHandlers() {
- return new ChannelHandler[0];
- }
-
- @Override
- public ChannelHandler[] getClientChannelHandlers() {
- return new ChannelHandler[0];
- }
- };
+ public void testInvalidSslConfiguration() {
+ NettyProtocol protocol = getEmptyNettyProtocol();
Configuration config = createSslConfig();
// Modify the keystore password to an incorrect one
@@ -116,17 +153,7 @@ public void testInvalidSslConfiguration() throws Exception
{
*/
@Test
public void testSslHandshakeError() throws Exception {
- NettyProtocol protocol = new NettyProtocol(null, null, true) {
- @Override
- public ChannelHandler[] getServerChannelHandlers() {
- return new ChannelHandler[0];
- }
-
- @Override
- public ChannelHandler[] getClientChannelHandlers() {
- return new ChannelHandler[0];
- }
- };
+ NettyProtocol protocol = getEmptyNettyProtocol();
Configuration config = createSslConfig();
@@ -151,8 +178,7 @@ public void testSslHandshakeError() throws Exception {
NettyTestUtil.shutdown(serverAndClient);
}
- private Configuration createSslConfig() throws Exception {
-
+ private Configuration createSslConfig() {
Configuration flinkConfig = new Configuration();
flinkConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
flinkConfig.setString(SecurityOptions.SSL_KEYSTORE,
"src/test/resources/local127.keystore");
@@ -162,4 +188,18 @@ private Configuration createSslConfig() throws Exception {
flinkConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD,
"password");
return flinkConfig;
}
+
+ private static NettyProtocol getEmptyNettyProtocol() {
+ return new NettyProtocol(null, null, true) {
+ @Override
+ public ChannelHandler[] getServerChannelHandlers() {
+ return new ChannelHandler[0];
+ }
+
+ @Override
+ public ChannelHandler[] getClientChannelHandlers() {
+ return new ChannelHandler[0];
+ }
+ };
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
index 38c8ceeedf9..a5db40fd5af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
@@ -24,7 +24,6 @@
import org.junit.Assert;
import org.junit.Test;
-import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLServerSocket;
@@ -33,6 +32,7 @@
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
/**
@@ -58,7 +58,7 @@ public void testCreateSSLClientContext() throws Exception {
clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE,
"src/test/resources/local127.truststore");
clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD,
"password");
- SSLContext clientContext =
SSLUtils.createSSLClientContext(clientConfig);
+ SSLUtils.SSLContext clientContext =
SSLUtils.createSSLClientContext(clientConfig);
Assert.assertNotNull(clientContext);
}
@@ -71,7 +71,7 @@ public void testCreateSSLClientContextWithSSLDisabled()
throws Exception {
Configuration clientConfig = new Configuration();
clientConfig.setBoolean(SecurityOptions.SSL_ENABLED, false);
- SSLContext clientContext =
SSLUtils.createSSLClientContext(clientConfig);
+ SSLUtils.SSLContext clientContext =
SSLUtils.createSSLClientContext(clientConfig);
Assert.assertNull(clientContext);
}
@@ -87,7 +87,7 @@ public void testCreateSSLClientContextMisconfiguration() {
clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD,
"badpassword");
try {
- SSLContext clientContext =
SSLUtils.createSSLClientContext(clientConfig);
+ SSLUtils.SSLContext clientContext =
SSLUtils.createSSLClientContext(clientConfig);
Assert.fail("SSL client context created even with bad
SSL configuration ");
} catch (Exception e) {
// Exception here is valid
@@ -106,7 +106,7 @@ public void testCreateSSLServerContext() throws Exception {
serverConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD,
"password");
serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD,
"password");
- SSLContext serverContext =
SSLUtils.createSSLServerContext(serverConfig);
+ SSLUtils.SSLContext serverContext =
SSLUtils.createSSLServerContext(serverConfig);
Assert.assertNotNull(serverContext);
}
@@ -119,7 +119,7 @@ public void testCreateSSLServerContextWithSSLDisabled()
throws Exception {
Configuration serverConfig = new Configuration();
serverConfig.setBoolean(SecurityOptions.SSL_ENABLED, false);
- SSLContext serverContext =
SSLUtils.createSSLServerContext(serverConfig);
+ SSLUtils.SSLContext serverContext =
SSLUtils.createSSLServerContext(serverConfig);
Assert.assertNull(serverContext);
}
@@ -136,7 +136,7 @@ public void testCreateSSLServerContextMisconfiguration() {
serverConfig.setString(SecurityOptions.SSL_KEY_PASSWORD,
"badpassword");
try {
- SSLContext serverContext =
SSLUtils.createSSLServerContext(serverConfig);
+ SSLUtils.SSLContext serverContext =
SSLUtils.createSSLServerContext(serverConfig);
Assert.fail("SSL server context created even with bad
SSL configuration ");
} catch (Exception e) {
// Exception here is valid
@@ -157,7 +157,7 @@ public void testCreateSSLServerContextWithMultiProtocols() {
serverConfig.setString(SecurityOptions.SSL_PROTOCOL,
"TLSv1,TLSv1.2");
try {
- SSLContext serverContext =
SSLUtils.createSSLServerContext(serverConfig);
+ SSLUtils.SSLContext serverContext =
SSLUtils.createSSLServerContext(serverConfig);
Assert.fail("SSL server context created even with
multiple protocols set ");
} catch (Exception e) {
// Exception here is valid
@@ -178,10 +178,9 @@ public void
testSetSSLVersionAndCipherSuitesForSSLServerSocket() throws Exceptio
serverConfig.setString(SecurityOptions.SSL_PROTOCOL, "TLSv1.1");
serverConfig.setString(SecurityOptions.SSL_ALGORITHMS,
"TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_128_CBC_SHA256");
- SSLContext serverContext =
SSLUtils.createSSLServerContext(serverConfig);
- ServerSocket socket = null;
- try {
- socket =
serverContext.getServerSocketFactory().createServerSocket(0);
+ SSLUtils.SSLContext serverContext =
SSLUtils.createSSLServerContext(serverConfig);
+ assertNotNull(serverContext);
+ try (ServerSocket socket =
serverContext.getSslContext().getServerSocketFactory().createServerSocket(0)) {
String[] protocols = ((SSLServerSocket)
socket).getEnabledProtocols();
String[] algorithms = ((SSLServerSocket)
socket).getEnabledCipherSuites();
@@ -198,10 +197,6 @@ public void
testSetSSLVersionAndCipherSuitesForSSLServerSocket() throws Exceptio
Assert.assertEquals(2, algorithms.length);
Assert.assertTrue(algorithms[0].equals("TLS_RSA_WITH_AES_128_CBC_SHA") ||
algorithms[0].equals("TLS_RSA_WITH_AES_128_CBC_SHA256"));
Assert.assertTrue(algorithms[1].equals("TLS_RSA_WITH_AES_128_CBC_SHA") ||
algorithms[1].equals("TLS_RSA_WITH_AES_128_CBC_SHA256"));
- } finally {
- if (socket != null) {
- socket.close();
- }
}
}
@@ -219,8 +214,9 @@ public void testSetSSLVersionAndCipherSuitesForSSLEngine()
throws Exception {
serverConfig.setString(SecurityOptions.SSL_PROTOCOL, "TLSv1");
serverConfig.setString(SecurityOptions.SSL_ALGORITHMS,
"TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256");
- SSLContext serverContext =
SSLUtils.createSSLServerContext(serverConfig);
- SSLEngine engine = serverContext.createSSLEngine();
+ SSLUtils.SSLContext serverContext =
SSLUtils.createSSLServerContext(serverConfig);
+ assertNotNull(serverContext);
+ SSLEngine engine =
serverContext.getSslContext().createSSLEngine();
String[] protocols = engine.getEnabledProtocols();
String[] algorithms = engine.getEnabledCipherSuites();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 93dbb5dc145..59db1630cce 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -164,9 +164,9 @@ public void setup() throws Exception {
config.setString(WebOptions.UPLOAD_DIR,
temporaryFolder.newFolder().getCanonicalPath());
defaultSSLContext = SSLContext.getDefault();
- final SSLContext sslClientContext =
SSLUtils.createSSLClientContext(config);
+ final SSLUtils.SSLContext sslClientContext =
SSLUtils.createSSLClientContext(config);
if (sslClientContext != null) {
- SSLContext.setDefault(sslClientContext);
+ SSLContext.setDefault(sslClientContext.getSslContext());
}
RestServerEndpointConfiguration serverConfig =
RestServerEndpointConfiguration.fromConfiguration(config);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> IO worker threads BLOCKED on SSL Session Cache while CMS full gc
> ----------------------------------------------------------------
>
> Key: FLINK-9878
> URL: https://issues.apache.org/jira/browse/FLINK-9878
> Project: Flink
> Issue Type: Bug
> Components: Network
> Affects Versions: 1.5.0, 1.5.1, 1.6.0
> Reporter: Nico Kruber
> Assignee: Nico Kruber
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> According to https://github.com/netty/netty/issues/832, there is a JDK issue
> during garbage collection when the SSL session cache is not limited. We
> should allow the user to configure this and further (advanced) SSL parameters
> for fine-tuning to fix this and similar issues. In particular, the following
> parameters should be configurable:
> - SSL session cache size
> - SSL session timeout
> - SSL handshake timeout
> - SSL close notify flush timeout
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)