Repository: nifi Updated Branches: refs/heads/master ded396f0e -> a8de27e69
NIFI-3732 Adding connect with timeout to StandardCommsSession and SSLCommsSession to avoid blocking This closes #1842. Signed-off-by: Koji Kawamura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a8de27e6 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a8de27e6 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a8de27e6 Branch: refs/heads/master Commit: a8de27e69b8c8cf553b7e1d44699a1f68e53c5bd Parents: ded396f Author: Bryan Bende <[email protected]> Authored: Mon May 22 20:51:04 2017 -0400 Committer: Koji Kawamura <[email protected]> Committed: Tue May 23 15:43:27 2017 +0900 ---------------------------------------------------------------------- .../cache/client/DistributedMapCacheClientService.java | 12 +++++++++--- .../cache/client/DistributedSetCacheClientService.java | 12 +++++++++--- .../nifi/distributed/cache/client/SSLCommsSession.java | 10 ++++++++-- .../distributed/cache/client/StandardCommsSession.java | 6 ++++-- 4 files changed, 30 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/a8de27e6/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java index 81013f6..f197bac 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java @@ -31,6 +31,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; @@ -100,6 +101,11 @@ public class DistributedMapCacheClientService extends AbstractControllerService this.configContext = context; } + @OnStopped + public void onStopped() throws IOException { + close(); + } + @Override public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { return withCommsSession(new CommsAction<Boolean>() { @@ -292,14 +298,14 @@ public class DistributedMapCacheClientService extends AbstractControllerService public CommsSession createCommsSession(final ConfigurationContext context) throws IOException { final String hostname = context.getProperty(HOSTNAME).getValue(); final int port = context.getProperty(PORT).asInteger(); - final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); + final int timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final CommsSession commsSession; if (sslContextService == null) { - commsSession = new StandardCommsSession(hostname, port); + commsSession = new StandardCommsSession(hostname, port, timeoutMillis); } else { - commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port); + commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port, timeoutMillis); } commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/nifi/blob/a8de27e6/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java index c1fa274..34a0a7c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java @@ -29,6 +29,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; @@ -100,17 +101,22 @@ public class DistributedSetCacheClientService extends AbstractControllerService this.configContext = context; } + @OnStopped + public void onStopped() throws IOException { + close(); + } + public CommsSession createCommsSession(final ConfigurationContext context) throws IOException { final String hostname = context.getProperty(HOSTNAME).getValue(); final int port = context.getProperty(PORT).asInteger(); - final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); + final int timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final CommsSession commsSession; if (sslContextService == null) { - commsSession = new StandardCommsSession(hostname, port); + commsSession = new StandardCommsSession(hostname, port, timeoutMillis); } else { - commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port); + commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port, timeoutMillis); } commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/nifi/blob/a8de27e6/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java index 7808d21..18ca571 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java @@ -19,6 +19,8 @@ package org.apache.nifi.distributed.cache.client; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; @@ -44,8 +46,12 @@ public class SSLCommsSession implements CommsSession { private int protocolVersion; - public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException { - sslSocketChannel = new SSLSocketChannel(sslContext, hostname, port, null, true); + public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port, final int timeoutMillis) throws IOException { + final SocketChannel socketChannel = SocketChannel.open(); + socketChannel.socket().connect(new InetSocketAddress(hostname, port), timeoutMillis); + socketChannel.configureBlocking(false); + + sslSocketChannel = new SSLSocketChannel(sslContext, socketChannel,true); in = new SSLSocketChannelInputStream(sslSocketChannel); bufferedIn = new BufferedInputStream(in); http://git-wip-us.apache.org/repos/asf/nifi/blob/a8de27e6/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java index 6a8ee45..7545bef 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java @@ -47,9 +47,11 @@ public class StandardCommsSession implements CommsSession { private int protocolVersion; - public StandardCommsSession(final String hostname, final int port) throws IOException { - socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port)); + public StandardCommsSession(final String hostname, final int port, final int timeoutMillis) throws IOException { + socketChannel = SocketChannel.open(); + socketChannel.socket().connect(new InetSocketAddress(hostname, port), timeoutMillis); socketChannel.configureBlocking(false); + in = new SocketChannelInputStream(socketChannel); bufferedIn = new InterruptableInputStream(new BufferedInputStream(in));
