Repository: nifi Updated Branches: refs/heads/master ba56774fa -> a3b72f1bb
NIFI-4143 - externalize MAX_CONCURRENT_REQUESTS. This closes #1962 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a3b72f1b Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a3b72f1b Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a3b72f1b Branch: refs/heads/master Commit: a3b72f1bb7435f987a618eda85008d187332ad21 Parents: ba56774 Author: Pierre Villard <[email protected]> Authored: Thu Jun 29 20:27:55 2017 +0200 Committer: Matt Gilman <[email protected]> Committed: Thu Jul 6 15:38:55 2017 -0400 ---------------------------------------------------------------------- .../java/org/apache/nifi/util/NiFiProperties.java | 6 ++++++ .../replication/ThreadPoolRequestReplicator.java | 15 +++++++++------ .../ThreadPoolRequestReplicatorFactoryBean.java | 3 ++- .../replication/TestThreadPoolRequestReplicator.java | 8 ++++---- .../nifi-framework/nifi-resources/pom.xml | 1 + .../src/main/resources/conf/nifi.properties | 1 + 6 files changed, 23 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/a3b72f1b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index c1fc0bc..4c93205 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -177,6 +177,7 @@ public abstract class NiFiProperties { public static final String CLUSTER_NODE_PROTOCOL_MAX_THREADS = "nifi.cluster.node.protocol.max.threads"; public static final String CLUSTER_NODE_CONNECTION_TIMEOUT = "nifi.cluster.node.connection.timeout"; public static final String CLUSTER_NODE_READ_TIMEOUT = "nifi.cluster.node.read.timeout"; + public static final String CLUSTER_NODE_MAX_CONCURRENT_REQUESTS = "nifi.cluster.node.max.concurrent.requests"; public static final String CLUSTER_FIREWALL_FILE = "nifi.cluster.firewall.file"; public static final String FLOW_ELECTION_MAX_WAIT_TIME = "nifi.cluster.flow.election.max.wait.time"; public static final String FLOW_ELECTION_MAX_CANDIDATES = "nifi.cluster.flow.election.max.candidates"; @@ -244,6 +245,7 @@ public abstract class NiFiProperties { public static final String DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY = "1 sec"; public static final String DEFAULT_CLUSTER_NODE_READ_TIMEOUT = "5 sec"; public static final String DEFAULT_CLUSTER_NODE_CONNECTION_TIMEOUT = "5 sec"; + public static final int DEFAULT_CLUSTER_NODE_MAX_CONCURRENT_REQUESTS = 100; // cluster node defaults public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS = 10; @@ -561,6 +563,10 @@ public abstract class NiFiProperties { return getIntegerProperty(WEB_THREADS, DEFAULT_WEB_THREADS); } + public int getClusterNodeMaxConcurrentRequests() { + return getIntegerProperty(CLUSTER_NODE_MAX_CONCURRENT_REQUESTS, DEFAULT_CLUSTER_NODE_MAX_CONCURRENT_REQUESTS); + } + public File getWebWorkingDirectory() { return new File(getProperty(WEB_WORKING_DIR, DEFAULT_WEB_WORKING_DIR)); } http://git-wip-us.apache.org/repos/asf/nifi/blob/a3b72f1b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index c38b3e9..7bdf6fa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -84,11 +84,11 @@ import com.sun.jersey.core.util.MultivaluedMapImpl; public class ThreadPoolRequestReplicator implements RequestReplicator { private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class); - private static final int MAX_CONCURRENT_REQUESTS = 100; private final Client client; // the client to use for issuing requests private final int connectionTimeoutMs; // connection timeout per node request private final int readTimeoutMs; // read timeout per node request + private final int maxConcurrentRequests; // maximum number of concurrent requests private final HttpResponseMapper responseMapper; private final EventReporter eventReporter; private final RequestCompletionCallback callback; @@ -109,15 +109,16 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { * * @param corePoolSize core size of the thread pool * @param maxPoolSize the max number of threads in the thread pool + * @param maxConcurrentRequests maximum number of concurrent requests * @param client a client for making requests * @param clusterCoordinator the cluster coordinator to use for interacting with node statuses * @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null. * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null. * @param nifiProperties properties */ - public ThreadPoolRequestReplicator(final int corePoolSize, final int maxPoolSize, final Client client, final ClusterCoordinator clusterCoordinator, + public ThreadPoolRequestReplicator(final int corePoolSize, final int maxPoolSize, final int maxConcurrentRequests, final Client client, final ClusterCoordinator clusterCoordinator, final RequestCompletionCallback callback, final EventReporter eventReporter, final NiFiProperties nifiProperties) { - this(corePoolSize, maxPoolSize, client, clusterCoordinator, "5 sec", "5 sec", callback, eventReporter, nifiProperties); + this(corePoolSize, maxPoolSize, maxConcurrentRequests, client, clusterCoordinator, "5 sec", "5 sec", callback, eventReporter, nifiProperties); } /** @@ -125,6 +126,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { * * @param corePoolSize core size of the thread pool * @param maxPoolSize the max number of threads in the thread pool + * @param maxConcurrentRequests maximum number of concurrent requests * @param client a client for making requests * @param clusterCoordinator the cluster coordinator to use for interacting with node statuses * @param connectionTimeout the connection timeout specified in milliseconds @@ -133,7 +135,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null. * @param nifiProperties properties */ - public ThreadPoolRequestReplicator(final int corePoolSize, final int maxPoolSize, final Client client, final ClusterCoordinator clusterCoordinator, + public ThreadPoolRequestReplicator(final int corePoolSize, final int maxPoolSize, final int maxConcurrentRequests, final Client client, final ClusterCoordinator clusterCoordinator, final String connectionTimeout, final String readTimeout, final RequestCompletionCallback callback, final EventReporter eventReporter, final NiFiProperties nifiProperties) { if (corePoolSize <= 0) { @@ -148,6 +150,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { this.clusterCoordinator = clusterCoordinator; this.connectionTimeoutMs = (int) FormatUtils.getTimeDuration(connectionTimeout, TimeUnit.MILLISECONDS); this.readTimeoutMs = (int) FormatUtils.getTimeDuration(readTimeout, TimeUnit.MILLISECONDS); + this.maxConcurrentRequests = maxConcurrentRequests; this.responseMapper = new StandardHttpResponseMapper(nifiProperties); this.eventReporter = eventReporter; this.callback = callback; @@ -361,11 +364,11 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { } int numRequests = responseMap.size(); - if (numRequests >= MAX_CONCURRENT_REQUESTS) { + if (numRequests >= maxConcurrentRequests) { numRequests = purgeExpiredRequests(); } - if (numRequests >= MAX_CONCURRENT_REQUESTS) { + if (numRequests >= maxConcurrentRequests) { final Map<String, Long> countsByUri = responseMap.values().stream().collect( Collectors.groupingBy( StandardAsyncClusterResponse::getURIPath, http://git-wip-us.apache.org/repos/asf/nifi/blob/a3b72f1b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java index 128075e..8943276 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java @@ -46,11 +46,12 @@ public class ThreadPoolRequestReplicatorFactoryBean implements FactoryBean<Threa final int corePoolSize = nifiProperties.getClusterNodeProtocolCorePoolSize(); final int maxPoolSize = nifiProperties.getClusterNodeProtocolMaxPoolSize(); + final int maxConcurrentRequests = nifiProperties.getClusterNodeMaxConcurrentRequests(); final Client jerseyClient = WebUtils.createClient(new DefaultClientConfig(), SslContextFactory.createSslContext(nifiProperties)); final String connectionTimeout = nifiProperties.getClusterNodeConnectionTimeout(); final String readTimeout = nifiProperties.getClusterNodeReadTimeout(); - replicator = new ThreadPoolRequestReplicator(corePoolSize, maxPoolSize, jerseyClient, clusterCoordinator, + replicator = new ThreadPoolRequestReplicator(corePoolSize, maxPoolSize, maxConcurrentRequests, jerseyClient, clusterCoordinator, connectionTimeout, readTimeout, requestCompletionCallback, eventReporter, nifiProperties); } http://git-wip-us.apache.org/repos/asf/nifi/blob/a3b72f1b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java index b5eff63..214a509 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java @@ -233,7 +233,7 @@ public class TestThreadPoolRequestReplicator { final AtomicInteger requestCount = new AtomicInteger(0); final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null); - final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, props) { + final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, 100, new Client(), coordinator, "1 sec", "1 sec", null, null, props) { @Override protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) { @@ -305,7 +305,7 @@ public class TestThreadPoolRequestReplicator { Mockito.when(coordinator.getConnectionStates()).thenReturn(nodeMap); final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null); - final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, props) { + final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, 100, new Client(), coordinator, "1 sec", "1 sec", null, null, props) { @Override public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean indicateReplicated, boolean verify) { @@ -363,7 +363,7 @@ public class TestThreadPoolRequestReplicator { final ClusterCoordinator coordinator = createClusterCoordinator(); final AtomicInteger requestCount = new AtomicInteger(0); final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null); - final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, props) { + final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, 100, new Client(), coordinator, "1 sec", "1 sec", null, null, props) { @Override protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) { @@ -573,7 +573,7 @@ public class TestThreadPoolRequestReplicator { private void withReplicator(final WithReplicator function, final Status status, final long delayMillis, final RuntimeException failure, final String expectedRequestChain) { final ClusterCoordinator coordinator = createClusterCoordinator(); final NiFiProperties nifiProps = NiFiProperties.createBasicNiFiProperties(null, null); - final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, nifiProps) { + final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, 100, new Client(), coordinator, "1 sec", "1 sec", null, null, nifiProps) { @Override protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) { http://git-wip-us.apache.org/repos/asf/nifi/blob/a3b72f1b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml index 8e740a9..5cfe17c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml @@ -160,6 +160,7 @@ <nifi.cluster.node.event.history.size>25</nifi.cluster.node.event.history.size> <nifi.cluster.node.connection.timeout>5 sec</nifi.cluster.node.connection.timeout> <nifi.cluster.node.read.timeout>5 sec</nifi.cluster.node.read.timeout> + <nifi.cluster.node.max.concurrent.requests>100</nifi.cluster.node.max.concurrent.requests> <nifi.cluster.firewall.file /> <nifi.cluster.flow.election.max.wait.time>5 mins</nifi.cluster.flow.election.max.wait.time> <nifi.cluster.flow.election.max.candidates /> http://git-wip-us.apache.org/repos/asf/nifi/blob/a3b72f1b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index 8167c49..f96d167 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -179,6 +179,7 @@ nifi.cluster.node.protocol.max.threads=${nifi.cluster.node.protocol.max.threads} nifi.cluster.node.event.history.size=${nifi.cluster.node.event.history.size} nifi.cluster.node.connection.timeout=${nifi.cluster.node.connection.timeout} nifi.cluster.node.read.timeout=${nifi.cluster.node.read.timeout} +nifi.cluster.node.max.concurrent.requests=${nifi.cluster.node.max.concurrent.requests} nifi.cluster.firewall.file=${nifi.cluster.firewall.file} nifi.cluster.flow.election.max.wait.time=${nifi.cluster.flow.election.max.wait.time} nifi.cluster.flow.election.max.candidates=${nifi.cluster.flow.election.max.candidates}
