Repository: nifi
Updated Branches:
  refs/heads/master 3906d4e1d -> 45f82dc85


NIFI-4111 - NiFi shutdown

Fixed threads shutdown so that NiFi can shutdown gracefully

NIFI-4111 - Review - Handling SocketRemoteSiteListener (RAW S2S)

This closes #1963.

Signed-off-by: Koji Kawamura <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/45f82dc8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/45f82dc8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/45f82dc8

Branch: refs/heads/master
Commit: 45f82dc855177702a0c1a0e4c38af334b713c278
Parents: 3906d4e
Author: Pierre Villard <[email protected]>
Authored: Fri Jun 30 00:03:53 2017 +0200
Committer: Koji Kawamura <[email protected]>
Committed: Fri Jul 7 14:23:18 2017 +0900

----------------------------------------------------------------------
 .../nifi/remote/HttpRemoteSiteListener.java       |  8 +++++---
 .../nifi/remote/SocketRemoteSiteListener.java     | 18 +++++++++++++++---
 .../apache/nifi/http/StandardHttpContextMap.java  | 12 +++++++++++-
 3 files changed, 31 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/45f82dc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
index c9c523e..deb25a7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
@@ -54,11 +54,9 @@ public class HttpRemoteSiteListener implements 
RemoteSiteListener {
     private HttpRemoteSiteListener(final NiFiProperties nifiProperties) {
         super();
         taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() 
{
-            private final ThreadFactory defaultFactory = 
Executors.defaultThreadFactory();
-
             @Override
             public Thread newThread(final Runnable r) {
-                final Thread thread = defaultFactory.newThread(r);
+                final Thread thread = 
Executors.defaultThreadFactory().newThread(r);
                 thread.setName("Http Site-to-Site Transaction Maintenance");
                 thread.setDaemon(true);
                 return thread;
@@ -160,6 +158,10 @@ public class HttpRemoteSiteListener implements 
RemoteSiteListener {
 
     @Override
     public void stop() {
+        if(taskExecutor != null) {
+            logger.debug("Stopping Http Site-to-Site Transaction Maintenance 
task...");
+            taskExecutor.shutdown();
+        }
         if (transactionMaintenanceTask != null) {
             logger.debug("Stopping transactionMaintenanceTask...");
             transactionMaintenanceTask.cancel(true);

http://git-wip-us.apache.org/repos/asf/nifi/blob/45f82dc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index a367e9e..2fae669 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.remote;
 
 import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.remote.cluster.ClusterNodeInformation;
 import org.apache.nifi.remote.cluster.NodeInformant;
 import org.apache.nifi.remote.cluster.NodeInformation;
 import org.apache.nifi.remote.exception.BadRequestException;
@@ -29,6 +30,7 @@ import 
org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSessio
 import org.apache.nifi.remote.protocol.CommunicationsSession;
 import org.apache.nifi.remote.protocol.RequestType;
 import org.apache.nifi.remote.protocol.ServerProtocol;
+import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,12 +48,12 @@ import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.nifi.remote.cluster.ClusterNodeInformation;
-import org.apache.nifi.util.NiFiProperties;
 
 public class SocketRemoteSiteListener implements RemoteSiteListener {
 
@@ -86,6 +88,7 @@ public class SocketRemoteSiteListener implements 
RemoteSiteListener {
     @Override
     public void start() throws IOException {
         final boolean secure = (sslContext != null);
+        final List<Thread> threads = new ArrayList<Thread>();
 
         final ServerSocketChannel serverSocketChannel = 
ServerSocketChannel.open();
         serverSocketChannel.configureBlocking(true);
@@ -132,8 +135,9 @@ public class SocketRemoteSiteListener implements 
RemoteSiteListener {
                     LOG.trace("Got connection");
 
                     if (stopped.get()) {
-                        return;
+                        break;
                     }
+
                     final Socket socket = acceptedSocket;
                     final SocketChannel socketChannel = socket.getChannel();
                     final Thread thread = new Thread(new Runnable() {
@@ -304,6 +308,14 @@ public class SocketRemoteSiteListener implements 
RemoteSiteListener {
                     thread.setName("Site-to-Site Worker Thread-" + 
(threadCount++));
                     LOG.debug("Handing connection to {}", thread);
                     thread.start();
+                    threads.add(thread);
+                    threads.removeIf(t -> !t.isAlive());
+                }
+
+                for(Thread thread : threads) {
+                    if(thread != null) {
+                        thread.interrupt();
+                    }
                 }
             }
         });

http://git-wip-us.apache.org/repos/asf/nifi/blob/45f82dc8/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/java/org/apache/nifi/http/StandardHttpContextMap.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/java/org/apache/nifi/http/StandardHttpContextMap.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/java/org/apache/nifi/http/StandardHttpContextMap.java
index 9da357d..88ce51a 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/java/org/apache/nifi/http/StandardHttpContextMap.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/java/org/apache/nifi/http/StandardHttpContextMap.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import javax.servlet.AsyncContext;
@@ -35,6 +36,7 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
@@ -81,13 +83,21 @@ public class StandardHttpContextMap extends 
AbstractControllerService implements
     @OnEnabled
     public void onConfigured(final ConfigurationContext context) {
         maxSize = context.getProperty(MAX_OUTSTANDING_REQUESTS).asInteger();
-        executor = Executors.newSingleThreadScheduledExecutor();
+        executor = Executors.newSingleThreadScheduledExecutor(new 
ThreadFactory() {
+            @Override
+            public Thread newThread(final Runnable r) {
+                final Thread thread = 
Executors.defaultThreadFactory().newThread(r);
+                thread.setName("StandardHttpContextMap-" + getIdentifier());
+                return thread;
+            }
+        });
 
         maxRequestNanos = 
context.getProperty(REQUEST_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS);
         final long scheduleNanos = maxRequestNanos / 2;
         executor.scheduleWithFixedDelay(new CleanupExpiredRequests(), 
scheduleNanos, scheduleNanos, TimeUnit.NANOSECONDS);
     }
 
+    @OnShutdown
     @OnDisabled
     public void cleanup() {
         if (executor != null) {

Reply via email to