Repository: nifi
Updated Branches:
  refs/heads/master ba56774fa -> a3b72f1bb


NIFI-4143 - externalize MAX_CONCURRENT_REQUESTS. This closes #1962


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a3b72f1b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a3b72f1b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a3b72f1b

Branch: refs/heads/master
Commit: a3b72f1bb7435f987a618eda85008d187332ad21
Parents: ba56774
Author: Pierre Villard <[email protected]>
Authored: Thu Jun 29 20:27:55 2017 +0200
Committer: Matt Gilman <[email protected]>
Committed: Thu Jul 6 15:38:55 2017 -0400

----------------------------------------------------------------------
 .../java/org/apache/nifi/util/NiFiProperties.java    |  6 ++++++
 .../replication/ThreadPoolRequestReplicator.java     | 15 +++++++++------
 .../ThreadPoolRequestReplicatorFactoryBean.java      |  3 ++-
 .../replication/TestThreadPoolRequestReplicator.java |  8 ++++----
 .../nifi-framework/nifi-resources/pom.xml            |  1 +
 .../src/main/resources/conf/nifi.properties          |  1 +
 6 files changed, 23 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a3b72f1b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index c1fc0bc..4c93205 100644
--- 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -177,6 +177,7 @@ public abstract class NiFiProperties {
     public static final String CLUSTER_NODE_PROTOCOL_MAX_THREADS = 
"nifi.cluster.node.protocol.max.threads";
     public static final String CLUSTER_NODE_CONNECTION_TIMEOUT = 
"nifi.cluster.node.connection.timeout";
     public static final String CLUSTER_NODE_READ_TIMEOUT = 
"nifi.cluster.node.read.timeout";
+    public static final String CLUSTER_NODE_MAX_CONCURRENT_REQUESTS = 
"nifi.cluster.node.max.concurrent.requests";
     public static final String CLUSTER_FIREWALL_FILE = 
"nifi.cluster.firewall.file";
     public static final String FLOW_ELECTION_MAX_WAIT_TIME = 
"nifi.cluster.flow.election.max.wait.time";
     public static final String FLOW_ELECTION_MAX_CANDIDATES = 
"nifi.cluster.flow.election.max.candidates";
@@ -244,6 +245,7 @@ public abstract class NiFiProperties {
     public static final String 
DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY = "1 sec";
     public static final String DEFAULT_CLUSTER_NODE_READ_TIMEOUT = "5 sec";
     public static final String DEFAULT_CLUSTER_NODE_CONNECTION_TIMEOUT = "5 
sec";
+    public static final int DEFAULT_CLUSTER_NODE_MAX_CONCURRENT_REQUESTS = 100;
 
     // cluster node defaults
     public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS = 10;
@@ -561,6 +563,10 @@ public abstract class NiFiProperties {
         return getIntegerProperty(WEB_THREADS, DEFAULT_WEB_THREADS);
     }
 
+    public int getClusterNodeMaxConcurrentRequests() {
+        return getIntegerProperty(CLUSTER_NODE_MAX_CONCURRENT_REQUESTS, 
DEFAULT_CLUSTER_NODE_MAX_CONCURRENT_REQUESTS);
+    }
+
     public File getWebWorkingDirectory() {
         return new File(getProperty(WEB_WORKING_DIR, DEFAULT_WEB_WORKING_DIR));
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a3b72f1b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index c38b3e9..7bdf6fa 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -84,11 +84,11 @@ import com.sun.jersey.core.util.MultivaluedMapImpl;
 public class ThreadPoolRequestReplicator implements RequestReplicator {
 
     private static final Logger logger = 
LoggerFactory.getLogger(ThreadPoolRequestReplicator.class);
-    private static final int MAX_CONCURRENT_REQUESTS = 100;
 
     private final Client client; // the client to use for issuing requests
     private final int connectionTimeoutMs; // connection timeout per node 
request
     private final int readTimeoutMs; // read timeout per node request
+    private final int maxConcurrentRequests; // maximum number of concurrent 
requests
     private final HttpResponseMapper responseMapper;
     private final EventReporter eventReporter;
     private final RequestCompletionCallback callback;
@@ -109,15 +109,16 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
      *
      * @param corePoolSize core size of the thread pool
      * @param maxPoolSize the max number of threads in the thread pool
+     * @param maxConcurrentRequests maximum number of concurrent requests
      * @param client a client for making requests
      * @param clusterCoordinator the cluster coordinator to use for 
interacting with node statuses
      * @param callback a callback that will be called whenever all of the 
responses have been gathered for a request. May be null.
      * @param eventReporter an EventReporter that can be used to notify users 
of interesting events. May be null.
      * @param nifiProperties properties
      */
-    public ThreadPoolRequestReplicator(final int corePoolSize, final int 
maxPoolSize, final Client client, final ClusterCoordinator clusterCoordinator,
+    public ThreadPoolRequestReplicator(final int corePoolSize, final int 
maxPoolSize, final int maxConcurrentRequests, final Client client, final 
ClusterCoordinator clusterCoordinator,
                                        final RequestCompletionCallback 
callback, final EventReporter eventReporter, final NiFiProperties 
nifiProperties) {
-        this(corePoolSize, maxPoolSize, client, clusterCoordinator, "5 sec", 
"5 sec", callback, eventReporter, nifiProperties);
+        this(corePoolSize, maxPoolSize, maxConcurrentRequests, client, 
clusterCoordinator, "5 sec", "5 sec", callback, eventReporter, nifiProperties);
     }
 
     /**
@@ -125,6 +126,7 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
      *
      * @param corePoolSize core size of the thread pool
      * @param maxPoolSize the max number of threads in the thread pool
+     * @param maxConcurrentRequests maximum number of concurrent requests
      * @param client a client for making requests
      * @param clusterCoordinator the cluster coordinator to use for 
interacting with node statuses
      * @param connectionTimeout the connection timeout specified in 
milliseconds
@@ -133,7 +135,7 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
      * @param eventReporter an EventReporter that can be used to notify users 
of interesting events. May be null.
      * @param nifiProperties properties
      */
-    public ThreadPoolRequestReplicator(final int corePoolSize, final int 
maxPoolSize, final Client client, final ClusterCoordinator clusterCoordinator,
+    public ThreadPoolRequestReplicator(final int corePoolSize, final int 
maxPoolSize, final int maxConcurrentRequests, final Client client, final 
ClusterCoordinator clusterCoordinator,
                                        final String connectionTimeout, final 
String readTimeout, final RequestCompletionCallback callback,
                                        final EventReporter eventReporter, 
final NiFiProperties nifiProperties) {
         if (corePoolSize <= 0) {
@@ -148,6 +150,7 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
         this.clusterCoordinator = clusterCoordinator;
         this.connectionTimeoutMs = (int) 
FormatUtils.getTimeDuration(connectionTimeout, TimeUnit.MILLISECONDS);
         this.readTimeoutMs = (int) FormatUtils.getTimeDuration(readTimeout, 
TimeUnit.MILLISECONDS);
+        this.maxConcurrentRequests = maxConcurrentRequests;
         this.responseMapper = new StandardHttpResponseMapper(nifiProperties);
         this.eventReporter = eventReporter;
         this.callback = callback;
@@ -361,11 +364,11 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
             }
 
             int numRequests = responseMap.size();
-            if (numRequests >= MAX_CONCURRENT_REQUESTS) {
+            if (numRequests >= maxConcurrentRequests) {
                 numRequests = purgeExpiredRequests();
             }
 
-            if (numRequests >= MAX_CONCURRENT_REQUESTS) {
+            if (numRequests >= maxConcurrentRequests) {
                 final Map<String, Long> countsByUri = 
responseMap.values().stream().collect(
                         Collectors.groupingBy(
                                 StandardAsyncClusterResponse::getURIPath,

http://git-wip-us.apache.org/repos/asf/nifi/blob/a3b72f1b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java
index 128075e..8943276 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java
@@ -46,11 +46,12 @@ public class ThreadPoolRequestReplicatorFactoryBean 
implements FactoryBean<Threa
 
             final int corePoolSize = 
nifiProperties.getClusterNodeProtocolCorePoolSize();
             final int maxPoolSize = 
nifiProperties.getClusterNodeProtocolMaxPoolSize();
+            final int maxConcurrentRequests = 
nifiProperties.getClusterNodeMaxConcurrentRequests();
             final Client jerseyClient = WebUtils.createClient(new 
DefaultClientConfig(), SslContextFactory.createSslContext(nifiProperties));
             final String connectionTimeout = 
nifiProperties.getClusterNodeConnectionTimeout();
             final String readTimeout = 
nifiProperties.getClusterNodeReadTimeout();
 
-            replicator = new ThreadPoolRequestReplicator(corePoolSize, 
maxPoolSize, jerseyClient, clusterCoordinator,
+            replicator = new ThreadPoolRequestReplicator(corePoolSize, 
maxPoolSize, maxConcurrentRequests, jerseyClient, clusterCoordinator,
                 connectionTimeout, readTimeout, requestCompletionCallback, 
eventReporter, nifiProperties);
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a3b72f1b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
index b5eff63..214a509 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
@@ -233,7 +233,7 @@ public class TestThreadPoolRequestReplicator {
 
         final AtomicInteger requestCount = new AtomicInteger(0);
         final NiFiProperties props = 
NiFiProperties.createBasicNiFiProperties(null, null);
-        final ThreadPoolRequestReplicator replicator = new 
ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", 
null, null, props) {
+        final ThreadPoolRequestReplicator replicator = new 
ThreadPoolRequestReplicator(2, 5, 100, new Client(), coordinator, "1 sec", "1 
sec", null, null, props) {
             @Override
             protected NodeResponse replicateRequest(final WebResource.Builder 
resourceBuilder, final NodeIdentifier nodeId, final String method,
                     final URI uri, final String requestId, Map<String, String> 
givenHeaders, final StandardAsyncClusterResponse response) {
@@ -305,7 +305,7 @@ public class TestThreadPoolRequestReplicator {
 
         Mockito.when(coordinator.getConnectionStates()).thenReturn(nodeMap);
         final NiFiProperties props = 
NiFiProperties.createBasicNiFiProperties(null, null);
-        final ThreadPoolRequestReplicator replicator = new 
ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", 
null, null, props) {
+        final ThreadPoolRequestReplicator replicator = new 
ThreadPoolRequestReplicator(2, 5, 100, new Client(), coordinator, "1 sec", "1 
sec", null, null, props) {
             @Override
             public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, 
String method, URI uri, Object entity, Map<String, String> headers,
                                                   boolean indicateReplicated, 
boolean verify) {
@@ -363,7 +363,7 @@ public class TestThreadPoolRequestReplicator {
         final ClusterCoordinator coordinator = createClusterCoordinator();
         final AtomicInteger requestCount = new AtomicInteger(0);
         final NiFiProperties props = 
NiFiProperties.createBasicNiFiProperties(null, null);
-        final ThreadPoolRequestReplicator replicator = new 
ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", 
null, null, props) {
+        final ThreadPoolRequestReplicator replicator = new 
ThreadPoolRequestReplicator(2, 5, 100, new Client(), coordinator, "1 sec", "1 
sec", null, null, props) {
             @Override
             protected NodeResponse replicateRequest(final WebResource.Builder 
resourceBuilder, final NodeIdentifier nodeId, final String method,
                     final URI uri, final String requestId, Map<String, String> 
givenHeaders, final StandardAsyncClusterResponse response) {
@@ -573,7 +573,7 @@ public class TestThreadPoolRequestReplicator {
     private void withReplicator(final WithReplicator function, final Status 
status, final long delayMillis, final RuntimeException failure, final String 
expectedRequestChain) {
         final ClusterCoordinator coordinator = createClusterCoordinator();
         final NiFiProperties nifiProps = 
NiFiProperties.createBasicNiFiProperties(null, null);
-        final ThreadPoolRequestReplicator replicator = new 
ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", 
null, null, nifiProps) {
+        final ThreadPoolRequestReplicator replicator = new 
ThreadPoolRequestReplicator(2, 5, 100, new Client(), coordinator, "1 sec", "1 
sec", null, null, nifiProps) {
             @Override
             protected NodeResponse replicateRequest(final WebResource.Builder 
resourceBuilder, final NodeIdentifier nodeId, final String method,
                 final URI uri, final String requestId, Map<String, String> 
givenHeaders, final StandardAsyncClusterResponse response) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/a3b72f1b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
index 8e740a9..5cfe17c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
@@ -160,6 +160,7 @@
         
<nifi.cluster.node.event.history.size>25</nifi.cluster.node.event.history.size>
         <nifi.cluster.node.connection.timeout>5 
sec</nifi.cluster.node.connection.timeout>
         <nifi.cluster.node.read.timeout>5 sec</nifi.cluster.node.read.timeout>
+        
<nifi.cluster.node.max.concurrent.requests>100</nifi.cluster.node.max.concurrent.requests>
         <nifi.cluster.firewall.file />
         <nifi.cluster.flow.election.max.wait.time>5 
mins</nifi.cluster.flow.election.max.wait.time>
         <nifi.cluster.flow.election.max.candidates />

http://git-wip-us.apache.org/repos/asf/nifi/blob/a3b72f1b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index 8167c49..f96d167 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -179,6 +179,7 @@ 
nifi.cluster.node.protocol.max.threads=${nifi.cluster.node.protocol.max.threads}
 nifi.cluster.node.event.history.size=${nifi.cluster.node.event.history.size}
 nifi.cluster.node.connection.timeout=${nifi.cluster.node.connection.timeout}
 nifi.cluster.node.read.timeout=${nifi.cluster.node.read.timeout}
+nifi.cluster.node.max.concurrent.requests=${nifi.cluster.node.max.concurrent.requests}
 nifi.cluster.firewall.file=${nifi.cluster.firewall.file}
 
nifi.cluster.flow.election.max.wait.time=${nifi.cluster.flow.election.max.wait.time}
 
nifi.cluster.flow.election.max.candidates=${nifi.cluster.flow.election.max.candidates}

Reply via email to