Repository: nifi
Updated Branches:
  refs/heads/master e6b166a3a -> cff81c0cd


NIFI-4153: Use a LinkedBlockingQueue instead of a SynchronousQueue for Request 
Replicator's thread pool so that requests will queue when all threads are 
active, instead of throwing an Exception. This closes #1980


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/cff81c0c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/cff81c0c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/cff81c0c

Branch: refs/heads/master
Commit: cff81c0cd24aa044602bbcc5ffc4c85918e9d484
Parents: e6b166a
Author: Mark Payne <[email protected]>
Authored: Wed Jul 5 16:35:04 2017 -0400
Committer: Matt Gilman <[email protected]>
Committed: Thu Jul 6 10:11:48 2017 -0400

----------------------------------------------------------------------
 .../ThreadPoolRequestReplicator.java            | 27 ++++++++++++--------
 1 file changed, 17 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/cff81c0c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index e5a46f8..c38b3e9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -17,12 +17,6 @@
 
 package org.apache.nifi.cluster.coordination.http.replication;
 
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-import com.sun.jersey.api.client.config.ClientConfig;
-import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter;
-import com.sun.jersey.core.util.MultivaluedMapImpl;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collections;
@@ -36,10 +30,9 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -49,10 +42,12 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+
 import javax.ws.rs.HttpMethod;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response.Status;
+
 import org.apache.nifi.authorization.AccessDeniedException;
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.authorization.user.NiFiUserUtils;
@@ -79,6 +74,13 @@ import 
org.apache.nifi.web.security.jwt.JwtAuthenticationFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+
 public class ThreadPoolRequestReplicator implements RequestReplicator {
 
     private static final Logger logger = 
LoggerFactory.getLogger(ThreadPoolRequestReplicator.class);
@@ -92,7 +94,7 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
     private final RequestCompletionCallback callback;
     private final ClusterCoordinator clusterCoordinator;
 
-    private ExecutorService executorService;
+    private ThreadPoolExecutor executorService;
     private ScheduledExecutorService maintenanceExecutor;
 
     private final ConcurrentMap<String, StandardAsyncClusterResponse> 
responseMap = new ConcurrentHashMap<>();
@@ -162,7 +164,7 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
             return t;
         };
 
-        executorService = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 5, 
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory);
+        executorService = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
 
         maintenanceExecutor = Executors.newScheduledThreadPool(1, new 
ThreadFactory() {
             @Override
@@ -444,6 +446,11 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
                 logger.debug("Notified monitor {} because request {} {} has 
failed with Throwable {}", monitor, method, uri, t);
             }
 
+            if (response != null) {
+                final RuntimeException failure = (t instanceof 
RuntimeException) ? (RuntimeException) t : new RuntimeException("Failed to 
submit Replication Request to background thread", t);
+                response.setFailure(failure, new NodeIdentifier());
+            }
+
             throw t;
         }
     }

Reply via email to