Repository: nifi Updated Branches: refs/heads/master e6b166a3a -> cff81c0cd
NIFI-4153: Use a LinkedBlockingQueue instead of a SynchronousQueue for Request Replicator's thread pool so that requests will queue when all threads are active, instead of throwing an Exception. This closes #1980 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/cff81c0c Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/cff81c0c Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/cff81c0c Branch: refs/heads/master Commit: cff81c0cd24aa044602bbcc5ffc4c85918e9d484 Parents: e6b166a Author: Mark Payne <[email protected]> Authored: Wed Jul 5 16:35:04 2017 -0400 Committer: Matt Gilman <[email protected]> Committed: Thu Jul 6 10:11:48 2017 -0400 ---------------------------------------------------------------------- .../ThreadPoolRequestReplicator.java | 27 ++++++++++++-------- 1 file changed, 17 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/cff81c0c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index e5a46f8..c38b3e9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -17,12 +17,6 @@ package org.apache.nifi.cluster.coordination.http.replication; -import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.WebResource; -import com.sun.jersey.api.client.config.ClientConfig; -import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter; -import com.sun.jersey.core.util.MultivaluedMapImpl; import java.net.URI; import java.net.URISyntaxException; import java.util.Collections; @@ -36,10 +30,9 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -49,10 +42,12 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.stream.Collectors; + import javax.ws.rs.HttpMethod; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response.Status; + import org.apache.nifi.authorization.AccessDeniedException; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; @@ -79,6 +74,13 @@ import org.apache.nifi.web.security.jwt.JwtAuthenticationFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter; +import com.sun.jersey.core.util.MultivaluedMapImpl; + public class ThreadPoolRequestReplicator implements RequestReplicator { private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class); @@ -92,7 +94,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { private final RequestCompletionCallback callback; private final ClusterCoordinator clusterCoordinator; - private ExecutorService executorService; + private ThreadPoolExecutor executorService; private ScheduledExecutorService maintenanceExecutor; private final ConcurrentMap<String, StandardAsyncClusterResponse> responseMap = new ConcurrentHashMap<>(); @@ -162,7 +164,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { return t; }; - executorService = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); + executorService = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); maintenanceExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override @@ -444,6 +446,11 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { logger.debug("Notified monitor {} because request {} {} has failed with Throwable {}", monitor, method, uri, t); } + if (response != null) { + final RuntimeException failure = (t instanceof RuntimeException) ? (RuntimeException) t : new RuntimeException("Failed to submit Replication Request to background thread", t); + response.setFailure(failure, new NodeIdentifier()); + } + throw t; } }
