Repository: nifi
Updated Branches:
  refs/heads/master f65286be8 -> 8f4d13eea


NIFI-5581: Fix replicate request timeout

This closes #3044

- Revert 87cf474e542ef16601a86cc66c624fb8902c9fc2 to enable connection
pooling
- Changes the expected HTTP status code for the 1st request of a
two-phase commit transaction from 150 (NiFi custom) to 202 Accepted
- Corrected RevisionManager Javadoc about revision varidation protocol


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8f4d13ee
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8f4d13ee
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8f4d13ee

Branch: refs/heads/master
Commit: 8f4d13eeacc0bb1bd159ecb8fedf71eda5ea15a9
Parents: f65286b
Author: Koji Kawamura <[email protected]>
Authored: Thu Oct 4 13:48:26 2018 +0900
Committer: Matt Gilman <[email protected]>
Committed: Thu Oct 4 10:25:42 2018 -0400

----------------------------------------------------------------------
 .../http/StandardHttpResponseMapper.java        |  4 +-
 .../http/replication/RequestReplicator.java     |  8 +--
 .../ThreadPoolRequestReplicator.java            | 63 ++++++++++----------
 .../okhttp/OkHttpReplicationClient.java         |  3 +-
 .../TestThreadPoolRequestReplicator.java        |  4 +-
 .../nifi/web/revision/RevisionManager.java      | 15 +++--
 .../nifi/web/api/ApplicationResource.java       |  4 +-
 7 files changed, 50 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8f4d13ee/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
index 96f2592..0cd550b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
@@ -166,7 +166,7 @@ public class StandardHttpResponseMapper implements 
HttpResponseMapper {
             // If we have a response that is a 3xx, 4xx, or 5xx, then we want 
to choose that.
             // Otherwise, it doesn't matter which one we choose. We do this 
because if we replicate
             // a mutable request, it's possible that one node will respond 
with a 409, for instance, while
-            // others respond with a 150-Continue. We do not want to pick the 
150-Continue; instead, we want
+            // others respond with a 202-Accepted. We do not want to pick the 
202-Accepted; instead, we want
             // the failed response.
             final NodeResponse clientResponse = 
nodeResponses.stream().filter(p -> p.getStatus() > 
299).findAny().orElse(nodeResponses.iterator().next());
 
@@ -236,7 +236,7 @@ public class StandardHttpResponseMapper implements 
HttpResponseMapper {
         responses.stream()
                 .parallel() // "parallelize" the draining of the responses, 
since we have multiple streams to consume
                 .filter(response -> response != exclude) // don't include the 
explicitly excluded node
-                .filter(response -> response.getStatus() != 
RequestReplicator.NODE_CONTINUE_STATUS_CODE) // don't include any 
150-NodeContinue responses because they contain no content
+                .filter(response -> response.getStatus() != 
RequestReplicator.NODE_CONTINUE_STATUS_CODE) // don't include any continue 
responses because they contain no content
                 .forEach(response -> drainResponse(response)); // drain all 
node responses that didn't get filtered out
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f4d13ee/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
index a7177d4..8a98ed7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
@@ -30,13 +30,13 @@ public interface RequestReplicator {
     public static final String CLUSTER_ID_GENERATION_SEED_HEADER = 
"X-Cluster-Id-Generation-Seed";
 
     /**
-     * The HTTP header that the requestor specifies to ask a node if they are 
able to process a given request. The value
-     * is always 150-NodeContinue. The node will respond with 150 CONTINUE if 
it is able to
+     * The HTTP header that the requestor specifies to ask a node if they are 
able to process a given request.
+     * The value is always 202-Accepted. The node will respond with 202 
ACCEPTED if it is able to
      * process the request, 417 EXPECTATION_FAILED otherwise.
      */
     public static final String REQUEST_VALIDATION_HTTP_HEADER = 
"X-Validation-Expects";
-    public static final String NODE_CONTINUE = "150-NodeContinue";
-    public static final int NODE_CONTINUE_STATUS_CODE = 150;
+    public static final String NODE_CONTINUE = "202-Accepted";
+    public static final int NODE_CONTINUE_STATUS_CODE = 202;
 
     /**
      * Indicates that the request is intended to cancel a transaction that was 
previously created without performing the action

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f4d13ee/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index 93804be..85618de 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -17,6 +17,35 @@
 
 package org.apache.nifi.cluster.coordination.http.replication;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.authorization.AccessDeniedException;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.authorization.user.NiFiUserUtils;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
+import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import 
org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
+import 
org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
+import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
+import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
+import org.apache.nifi.cluster.manager.exception.UriConstructionException;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.ComponentIdGenerator;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.security.ProxiedEntitiesUtils;
+import org.apache.nifi.web.security.jwt.JwtAuthenticationFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -45,36 +74,6 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.authorization.AccessDeniedException;
-import org.apache.nifi.authorization.user.NiFiUser;
-import org.apache.nifi.authorization.user.NiFiUserUtils;
-import org.apache.nifi.cluster.coordination.ClusterCoordinator;
-import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
-import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
-import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
-import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
-import org.apache.nifi.cluster.manager.NodeResponse;
-import 
org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
-import 
org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
-import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
-import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
-import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
-import org.apache.nifi.cluster.manager.exception.UriConstructionException;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.util.ComponentIdGenerator;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.web.security.ProxiedEntitiesUtils;
-import org.apache.nifi.web.security.jwt.JwtAuthenticationFilter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class ThreadPoolRequestReplicator implements RequestReplicator {
 
     private static final Logger logger = 
LoggerFactory.getLogger(ThreadPoolRequestReplicator.class);
@@ -503,10 +502,10 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
                     if (allNodesResponded) {
                         clusterResponse.addTiming("Verification Completed", 
"All Nodes", nanos);
 
-                        // Check if we have any requests that do not have a 
150-Continue status code.
+                        // Check if we have any requests that do not have a 
202-Accepted status code.
                         final long dissentingCount = 
nodeResponses.stream().filter(p -> p.getStatus() != 
NODE_CONTINUE_STATUS_CODE).count();
 
-                        // If all nodes responded with 150-Continue, then we 
can replicate the original request
+                        // If all nodes responded with 202-Accepted, then we 
can replicate the original request
                         // to all nodes and we are finished.
                         if (dissentingCount == 0) {
                             logger.debug("Received verification from all {} 
nodes that mutable request {} {} can be made", numNodes, method, uri.getPath());

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f4d13ee/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java
index ec8a2b0..81229de 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java
@@ -308,7 +308,8 @@ public class OkHttpReplicationClient implements 
HttpReplicationClient {
         okHttpClientBuilder.connectTimeout(connectionTimeoutMs, 
TimeUnit.MILLISECONDS);
         okHttpClientBuilder.readTimeout(readTimeoutMs, TimeUnit.MILLISECONDS);
         okHttpClientBuilder.followRedirects(true);
-        okHttpClientBuilder.connectionPool(new ConnectionPool(0, 5, 
TimeUnit.MINUTES));
+        final int connectionPoolSize = 
properties.getClusterNodeMaxConcurrentRequests();
+        okHttpClientBuilder.connectionPool(new 
ConnectionPool(connectionPoolSize, 5, TimeUnit.MINUTES));
 
         final Tuple<SSLSocketFactory, X509TrustManager> tuple = 
createSslSocketFactory(properties);
         if (tuple != null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f4d13ee/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
index 15b7774..70579ea 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
@@ -248,7 +248,7 @@ public class TestThreadPoolRequestReplicator {
                 final int statusCode;
                 if (requestCount.incrementAndGet() == 1) {
                     assertEquals(ThreadPoolRequestReplicator.NODE_CONTINUE, 
expectsHeader);
-                    statusCode = 150;
+                    statusCode = Status.ACCEPTED.getStatusCode();
                 } else {
                     assertNull(expectsHeader);
                     statusCode = Status.OK.getStatusCode();
@@ -390,7 +390,7 @@ public class TestThreadPoolRequestReplicator {
 
                 if (requestIndex == 1) {
                     final Response clientResponse = mock(Response.class);
-                    when(clientResponse.getStatus()).thenReturn(150);
+                    when(clientResponse.getStatus()).thenReturn(202);
                     return new NodeResponse(nodeId, request.getMethod(), uri, 
clientResponse, -1L, requestId);
                 } else {
                     final IllegalClusterStateException explanation = new 
IllegalClusterStateException("Intentional Exception for Unit Testing");

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f4d13ee/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
index 54ccd56..357f56a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
@@ -33,20 +33,19 @@ import org.apache.nifi.web.Revision;
  *
  * <p>
  * Clients that will modify a resource must do so using a two-phase commit. 
First,
- * the client will issue a request that includes an HTTP Header of 
"X-NcmExpects".
+ * the client will issue a request that includes an HTTP Header of 
"X-Validation-Expects".
  * This indicates that the request will not actually be performed but rather 
that the
  * node should validate that the request could in fact be performed. If all 
nodes respond
- * with a 150-Continue response, then the second phase will commence. The 
second phase
- * will consist of replicating the same request but without the "X-NcmExpects" 
header.
+ * with a 202-Accepted response, then the second phase will commence. The 
second phase
+ * will consist of replicating the same request but without the 
"X-Validation-Expects" header.
  * </p>
  *
  * <p>
  * When the first phase of the two-phase commit is processed, the Revision 
Manager should
- * be used to verify that the client-provided Revisions are current by calling 
the
- * {@link #verifyRevisions(Collection)}
- * method. If the revisions are up-to-date, the method will return 
successfully and the
- * request validation may continue. Otherwise, the request should fail and the 
second phase
- * should not be performed.
+ * be used to retrieve the current revision by calling the {@link 
#getRevision(String)} method
+ * to verify that the client-provided Revisions are current.
+ * If the revisions are up-to-date, the request validation may continue.
+ * Otherwise, the request should fail and the second phase should not be 
performed.
  * </p>
  *
  * <p>

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f4d13ee/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index 2e9e318..d99fb96 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -272,9 +272,9 @@ public abstract class ApplicationResource {
     }
 
     /**
-     * Generates a 150 Node Continue response to be used within the cluster 
request handshake.
+     * Generates a 202 Accepted (Node Continue) response to be used within the 
cluster request handshake.
      *
-     * @return a 150 Node Continue response to be used within the cluster 
request handshake
+     * @return a 202 Accepted (Node Continue) response to be used within the 
cluster request handshake
      */
     protected ResponseBuilder generateContinueResponse() {
         return Response.status(RequestReplicator.NODE_CONTINUE_STATUS_CODE);

Reply via email to