Repository: nifi Updated Branches: refs/heads/master 925138b6c -> b7f7e6ed8
NIFI-1436 This closes #189. Combining stop() and close() into a single method to simplify, and adding checks on stopped flag in the run method of SocketChannelDispatcher and DatagramChannelDispatcher to ensure the run() method exits as soon as possible upon close() being called NIFI-1436 Adding synchronization on keys set in close() method based on Selector JavaDoc Signed-off-by: joewitt <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b7f7e6ed Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b7f7e6ed Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b7f7e6ed Branch: refs/heads/master Commit: b7f7e6ed80ce1aa257cb6bcbd9add89c84549e01 Parents: 925138b Author: Bryan Bende <[email protected]> Authored: Mon Jan 25 09:44:24 2016 -0500 Committer: joewitt <[email protected]> Committed: Mon Jan 25 22:50:25 2016 -0500 ---------------------------------------------------------------------- .../listen/AbstractListenEventProcessor.java | 1 - .../listen/dispatcher/ChannelDispatcher.java | 5 --- .../dispatcher/DatagramChannelDispatcher.java | 18 ++++---- .../dispatcher/SocketChannelDispatcher.java | 47 ++++++++++++-------- .../nifi/processors/standard/ListenSyslog.java | 1 - .../handler/TestRELPSocketChannelHandler.java | 1 - 6 files changed, 38 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/b7f7e6ed/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java index e994d81..029f6db 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java @@ -212,7 +212,6 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst @OnUnscheduled public void onUnscheduled() { if (dispatcher != null) { - dispatcher.stop(); dispatcher.close(); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/b7f7e6ed/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java index 001ee9b..cff230e 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java @@ -39,11 +39,6 @@ public interface ChannelDispatcher extends Runnable { int getPort(); /** - * Stops the main dispatcher thread. - */ - void stop(); - - /** * Closes all listeners and stops all handler threads. */ void close(); http://git-wip-us.apache.org/repos/asf/nifi/blob/b7f7e6ed/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java index a00a39f..e362e15 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java @@ -65,8 +65,10 @@ public class DatagramChannelDispatcher<E extends Event<DatagramChannel>> impleme @Override public void open(final int port, int maxBufferSize) throws IOException { + stopped = false; datagramChannel = DatagramChannel.open(); datagramChannel.configureBlocking(false); + if (maxBufferSize > 0) { datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize); final int actualReceiveBufSize = datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF); @@ -87,9 +89,11 @@ public class DatagramChannelDispatcher<E extends Event<DatagramChannel>> impleme while (!stopped) { try { int selected = selector.select(); - if (selected > 0){ + // if stopped the selector could already be closed which would result in a ClosedSelectorException + if (selected > 0 && !stopped) { Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator(); - while (selectorKeys.hasNext()) { + // if stopped we don't want to modify the keys because close() may still be in progress + while (selectorKeys.hasNext() && !stopped) { SelectionKey key = selectorKeys.next(); selectorKeys.remove(); if (!key.isValid()) { @@ -141,13 +145,11 @@ public class DatagramChannelDispatcher<E extends Event<DatagramChannel>> impleme } @Override - public void stop() { - selector.wakeup(); - stopped = true; - } - - @Override public void close() { + stopped = true; + if (selector != null) { + selector.wakeup(); + } IOUtils.closeQuietly(selector); IOUtils.closeQuietly(datagramChannel); } http://git-wip-us.apache.org/repos/asf/nifi/blob/b7f7e6ed/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java index 6dd6345..da5c414 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java @@ -91,7 +91,8 @@ public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements @Override public void open(final int port, int maxBufferSize) throws IOException { - this.executor = Executors.newFixedThreadPool(maxConnections); + stopped = false; + executor = Executors.newFixedThreadPool(maxConnections); final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); @@ -114,9 +115,11 @@ public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements while (!stopped) { try { int selected = selector.select(); - if (selected > 0){ + // if stopped the selector could already be closed which would result in a ClosedSelectorException + if (selected > 0 && !stopped){ Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator(); - while (selectorKeys.hasNext()){ + // if stopped we don't want to modify the keys because close() may still be in progress + while (selectorKeys.hasNext() && !stopped) { SelectionKey key = selectorKeys.next(); selectorKeys.remove(); if (!key.isValid()){ @@ -197,27 +200,33 @@ public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements } @Override - public void stop() { + public void close() { stopped = true; - selector.wakeup(); - } + if (selector != null) { + selector.wakeup(); + } - @Override - public void close() { - executor.shutdown(); - try { - // Wait a while for existing tasks to terminate - if (!executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) { + if (executor != null) { + executor.shutdown(); + try { + // Wait a while for existing tasks to terminate + if (!executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted executor.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - executor.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); } - for(SelectionKey key : selector.keys()){ - IOUtils.closeQuietly(key.channel()); + + if (selector != null) { + synchronized (selector.keys()) { + for (SelectionKey key : selector.keys()) { + IOUtils.closeQuietly(key.channel()); + } + } } IOUtils.closeQuietly(selector); } http://git-wip-us.apache.org/repos/asf/nifi/blob/b7f7e6ed/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java index 2e439b4..8ca7eb1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java @@ -309,7 +309,6 @@ public class ListenSyslog extends AbstractSyslogProcessor { @OnUnscheduled public void onUnscheduled() { if (channelDispatcher != null) { - channelDispatcher.stop(); channelDispatcher.close(); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/b7f7e6ed/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java index 2a51ad6..e351670 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java @@ -165,7 +165,6 @@ public class TestRELPSocketChannelHandler { } finally { // stop the dispatcher thread and ensure we shut down handler threads - dispatcher.stop(); dispatcher.close(); } }
