Repository: nifi
Updated Branches:
  refs/heads/master 925138b6c -> b7f7e6ed8


NIFI-1436 This closes #189. Combining stop() and close() into a single method 
to simplify, and adding checks on stopped flag in the run method of 
SocketChannelDispatcher and DatagramChannelDispatcher to ensure the run() 
method exits as soon as possible upon close() being called

NIFI-1436 Adding synchronization on keys set in close() method  based on 
Selector JavaDoc

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b7f7e6ed
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b7f7e6ed
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b7f7e6ed

Branch: refs/heads/master
Commit: b7f7e6ed80ce1aa257cb6bcbd9add89c84549e01
Parents: 925138b
Author: Bryan Bende <[email protected]>
Authored: Mon Jan 25 09:44:24 2016 -0500
Committer: joewitt <[email protected]>
Committed: Mon Jan 25 22:50:25 2016 -0500

----------------------------------------------------------------------
 .../listen/AbstractListenEventProcessor.java    |  1 -
 .../listen/dispatcher/ChannelDispatcher.java    |  5 ---
 .../dispatcher/DatagramChannelDispatcher.java   | 18 ++++----
 .../dispatcher/SocketChannelDispatcher.java     | 47 ++++++++++++--------
 .../nifi/processors/standard/ListenSyslog.java  |  1 -
 .../handler/TestRELPSocketChannelHandler.java   |  1 -
 6 files changed, 38 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b7f7e6ed/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
index e994d81..029f6db 100644
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
+++ 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
@@ -212,7 +212,6 @@ public abstract class AbstractListenEventProcessor<E 
extends Event> extends Abst
     @OnUnscheduled
     public void onUnscheduled() {
         if (dispatcher != null) {
-            dispatcher.stop();
             dispatcher.close();
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b7f7e6ed/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java
index 001ee9b..cff230e 100644
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java
+++ 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java
@@ -39,11 +39,6 @@ public interface ChannelDispatcher extends Runnable {
     int getPort();
 
     /**
-     * Stops the main dispatcher thread.
-     */
-    void stop();
-
-    /**
      * Closes all listeners and stops all handler threads.
      */
     void close();

http://git-wip-us.apache.org/repos/asf/nifi/blob/b7f7e6ed/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
index a00a39f..e362e15 100644
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
+++ 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
@@ -65,8 +65,10 @@ public class DatagramChannelDispatcher<E extends 
Event<DatagramChannel>> impleme
 
     @Override
     public void open(final int port, int maxBufferSize) throws IOException {
+        stopped = false;
         datagramChannel = DatagramChannel.open();
         datagramChannel.configureBlocking(false);
+
         if (maxBufferSize > 0) {
             datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, 
maxBufferSize);
             final int actualReceiveBufSize = 
datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF);
@@ -87,9 +89,11 @@ public class DatagramChannelDispatcher<E extends 
Event<DatagramChannel>> impleme
         while (!stopped) {
             try {
                 int selected = selector.select();
-                if (selected > 0){
+                // if stopped the selector could already be closed which would 
result in a ClosedSelectorException
+                if (selected > 0 && !stopped) {
                     Iterator<SelectionKey> selectorKeys = 
selector.selectedKeys().iterator();
-                    while (selectorKeys.hasNext()) {
+                    // if stopped we don't want to modify the keys because 
close() may still be in progress
+                    while (selectorKeys.hasNext() && !stopped) {
                         SelectionKey key = selectorKeys.next();
                         selectorKeys.remove();
                         if (!key.isValid()) {
@@ -141,13 +145,11 @@ public class DatagramChannelDispatcher<E extends 
Event<DatagramChannel>> impleme
     }
 
     @Override
-    public void stop() {
-        selector.wakeup();
-        stopped = true;
-    }
-
-    @Override
     public void close() {
+        stopped = true;
+        if (selector != null) {
+            selector.wakeup();
+        }
         IOUtils.closeQuietly(selector);
         IOUtils.closeQuietly(datagramChannel);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b7f7e6ed/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
index 6dd6345..da5c414 100644
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
+++ 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
@@ -91,7 +91,8 @@ public class SocketChannelDispatcher<E extends 
Event<SocketChannel>> implements
 
     @Override
     public void open(final int port, int maxBufferSize) throws IOException {
-        this.executor = Executors.newFixedThreadPool(maxConnections);
+        stopped = false;
+        executor = Executors.newFixedThreadPool(maxConnections);
 
         final ServerSocketChannel serverSocketChannel = 
ServerSocketChannel.open();
         serverSocketChannel.configureBlocking(false);
@@ -114,9 +115,11 @@ public class SocketChannelDispatcher<E extends 
Event<SocketChannel>> implements
         while (!stopped) {
             try {
                 int selected = selector.select();
-                if (selected > 0){
+                // if stopped the selector could already be closed which would 
result in a ClosedSelectorException
+                if (selected > 0 && !stopped){
                     Iterator<SelectionKey> selectorKeys = 
selector.selectedKeys().iterator();
-                    while (selectorKeys.hasNext()){
+                    // if stopped we don't want to modify the keys because 
close() may still be in progress
+                    while (selectorKeys.hasNext() && !stopped) {
                         SelectionKey key = selectorKeys.next();
                         selectorKeys.remove();
                         if (!key.isValid()){
@@ -197,27 +200,33 @@ public class SocketChannelDispatcher<E extends 
Event<SocketChannel>> implements
     }
 
     @Override
-    public void stop() {
+    public void close() {
         stopped = true;
-        selector.wakeup();
-    }
+        if (selector != null) {
+            selector.wakeup();
+        }
 
-    @Override
-    public void close() {
-        executor.shutdown();
-        try {
-            // Wait a while for existing tasks to terminate
-            if (!executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {
+        if (executor != null) {
+            executor.shutdown();
+            try {
+                // Wait a while for existing tasks to terminate
+                if (!executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {
+                    executor.shutdownNow();
+                }
+            } catch (InterruptedException ie) {
+                // (Re-)Cancel if current thread also interrupted
                 executor.shutdownNow();
+                // Preserve interrupt status
+                Thread.currentThread().interrupt();
             }
-        } catch (InterruptedException ie) {
-            // (Re-)Cancel if current thread also interrupted
-            executor.shutdownNow();
-            // Preserve interrupt status
-            Thread.currentThread().interrupt();
         }
-        for(SelectionKey key : selector.keys()){
-            IOUtils.closeQuietly(key.channel());
+
+        if (selector != null) {
+            synchronized (selector.keys()) {
+                for (SelectionKey key : selector.keys()) {
+                    IOUtils.closeQuietly(key.channel());
+                }
+            }
         }
         IOUtils.closeQuietly(selector);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b7f7e6ed/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 2e439b4..8ca7eb1 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -309,7 +309,6 @@ public class ListenSyslog extends AbstractSyslogProcessor {
     @OnUnscheduled
     public void onUnscheduled() {
         if (channelDispatcher != null) {
-            channelDispatcher.stop();
             channelDispatcher.close();
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b7f7e6ed/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java
index 2a51ad6..e351670 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java
@@ -165,7 +165,6 @@ public class TestRELPSocketChannelHandler {
 
         } finally {
             // stop the dispatcher thread and ensure we shut down handler 
threads
-            dispatcher.stop();
             dispatcher.close();
         }
     }

Reply via email to