Repository: nifi Updated Branches: refs/heads/master f65286be8 -> 8f4d13eea
NIFI-5581: Fix replicate request timeout This closes #3044 - Revert 87cf474e542ef16601a86cc66c624fb8902c9fc2 to enable connection pooling - Changes the expected HTTP status code for the 1st request of a two-phase commit transaction from 150 (NiFi custom) to 202 Accepted - Corrected RevisionManager Javadoc about revision varidation protocol Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8f4d13ee Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8f4d13ee Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8f4d13ee Branch: refs/heads/master Commit: 8f4d13eeacc0bb1bd159ecb8fedf71eda5ea15a9 Parents: f65286b Author: Koji Kawamura <[email protected]> Authored: Thu Oct 4 13:48:26 2018 +0900 Committer: Matt Gilman <[email protected]> Committed: Thu Oct 4 10:25:42 2018 -0400 ---------------------------------------------------------------------- .../http/StandardHttpResponseMapper.java | 4 +- .../http/replication/RequestReplicator.java | 8 +-- .../ThreadPoolRequestReplicator.java | 63 ++++++++++---------- .../okhttp/OkHttpReplicationClient.java | 3 +- .../TestThreadPoolRequestReplicator.java | 4 +- .../nifi/web/revision/RevisionManager.java | 15 +++-- .../nifi/web/api/ApplicationResource.java | 4 +- 7 files changed, 50 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/8f4d13ee/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java index 96f2592..0cd550b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java @@ -166,7 +166,7 @@ public class StandardHttpResponseMapper implements HttpResponseMapper { // If we have a response that is a 3xx, 4xx, or 5xx, then we want to choose that. // Otherwise, it doesn't matter which one we choose. We do this because if we replicate // a mutable request, it's possible that one node will respond with a 409, for instance, while - // others respond with a 150-Continue. We do not want to pick the 150-Continue; instead, we want + // others respond with a 202-Accepted. We do not want to pick the 202-Accepted; instead, we want // the failed response. final NodeResponse clientResponse = nodeResponses.stream().filter(p -> p.getStatus() > 299).findAny().orElse(nodeResponses.iterator().next()); @@ -236,7 +236,7 @@ public class StandardHttpResponseMapper implements HttpResponseMapper { responses.stream() .parallel() // "parallelize" the draining of the responses, since we have multiple streams to consume .filter(response -> response != exclude) // don't include the explicitly excluded node - .filter(response -> response.getStatus() != RequestReplicator.NODE_CONTINUE_STATUS_CODE) // don't include any 150-NodeContinue responses because they contain no content + .filter(response -> response.getStatus() != RequestReplicator.NODE_CONTINUE_STATUS_CODE) // don't include any continue responses because they contain no content .forEach(response -> drainResponse(response)); // drain all node responses that didn't get filtered out } http://git-wip-us.apache.org/repos/asf/nifi/blob/8f4d13ee/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java index a7177d4..8a98ed7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java @@ -30,13 +30,13 @@ public interface RequestReplicator { public static final String CLUSTER_ID_GENERATION_SEED_HEADER = "X-Cluster-Id-Generation-Seed"; /** - * The HTTP header that the requestor specifies to ask a node if they are able to process a given request. The value - * is always 150-NodeContinue. The node will respond with 150 CONTINUE if it is able to + * The HTTP header that the requestor specifies to ask a node if they are able to process a given request. + * The value is always 202-Accepted. The node will respond with 202 ACCEPTED if it is able to * process the request, 417 EXPECTATION_FAILED otherwise. */ public static final String REQUEST_VALIDATION_HTTP_HEADER = "X-Validation-Expects"; - public static final String NODE_CONTINUE = "150-NodeContinue"; - public static final int NODE_CONTINUE_STATUS_CODE = 150; + public static final String NODE_CONTINUE = "202-Accepted"; + public static final int NODE_CONTINUE_STATUS_CODE = 202; /** * Indicates that the request is intended to cancel a transaction that was previously created without performing the action http://git-wip-us.apache.org/repos/asf/nifi/blob/8f4d13ee/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index 93804be..85618de 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -17,6 +17,35 @@ package org.apache.nifi.cluster.coordination.http.replication; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.AccessDeniedException; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.authorization.user.NiFiUserUtils; +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.http.HttpResponseMapper; +import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException; +import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException; +import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; +import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException; +import org.apache.nifi.cluster.manager.exception.UnknownNodeException; +import org.apache.nifi.cluster.manager.exception.UriConstructionException; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.util.ComponentIdGenerator; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.security.ProxiedEntitiesUtils; +import org.apache.nifi.web.security.jwt.JwtAuthenticationFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -45,36 +74,6 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.Status; - -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.authorization.AccessDeniedException; -import org.apache.nifi.authorization.user.NiFiUser; -import org.apache.nifi.authorization.user.NiFiUserUtils; -import org.apache.nifi.cluster.coordination.ClusterCoordinator; -import org.apache.nifi.cluster.coordination.http.HttpResponseMapper; -import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper; -import org.apache.nifi.cluster.coordination.node.NodeConnectionState; -import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; -import org.apache.nifi.cluster.manager.NodeResponse; -import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException; -import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException; -import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; -import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException; -import org.apache.nifi.cluster.manager.exception.UnknownNodeException; -import org.apache.nifi.cluster.manager.exception.UriConstructionException; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.events.EventReporter; -import org.apache.nifi.reporting.Severity; -import org.apache.nifi.util.ComponentIdGenerator; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.security.ProxiedEntitiesUtils; -import org.apache.nifi.web.security.jwt.JwtAuthenticationFilter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class ThreadPoolRequestReplicator implements RequestReplicator { private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class); @@ -503,10 +502,10 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { if (allNodesResponded) { clusterResponse.addTiming("Verification Completed", "All Nodes", nanos); - // Check if we have any requests that do not have a 150-Continue status code. + // Check if we have any requests that do not have a 202-Accepted status code. final long dissentingCount = nodeResponses.stream().filter(p -> p.getStatus() != NODE_CONTINUE_STATUS_CODE).count(); - // If all nodes responded with 150-Continue, then we can replicate the original request + // If all nodes responded with 202-Accepted, then we can replicate the original request // to all nodes and we are finished. if (dissentingCount == 0) { logger.debug("Received verification from all {} nodes that mutable request {} {} can be made", numNodes, method, uri.getPath()); http://git-wip-us.apache.org/repos/asf/nifi/blob/8f4d13ee/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java index ec8a2b0..81229de 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java @@ -308,7 +308,8 @@ public class OkHttpReplicationClient implements HttpReplicationClient { okHttpClientBuilder.connectTimeout(connectionTimeoutMs, TimeUnit.MILLISECONDS); okHttpClientBuilder.readTimeout(readTimeoutMs, TimeUnit.MILLISECONDS); okHttpClientBuilder.followRedirects(true); - okHttpClientBuilder.connectionPool(new ConnectionPool(0, 5, TimeUnit.MINUTES)); + final int connectionPoolSize = properties.getClusterNodeMaxConcurrentRequests(); + okHttpClientBuilder.connectionPool(new ConnectionPool(connectionPoolSize, 5, TimeUnit.MINUTES)); final Tuple<SSLSocketFactory, X509TrustManager> tuple = createSslSocketFactory(properties); if (tuple != null) { http://git-wip-us.apache.org/repos/asf/nifi/blob/8f4d13ee/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java index 15b7774..70579ea 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java @@ -248,7 +248,7 @@ public class TestThreadPoolRequestReplicator { final int statusCode; if (requestCount.incrementAndGet() == 1) { assertEquals(ThreadPoolRequestReplicator.NODE_CONTINUE, expectsHeader); - statusCode = 150; + statusCode = Status.ACCEPTED.getStatusCode(); } else { assertNull(expectsHeader); statusCode = Status.OK.getStatusCode(); @@ -390,7 +390,7 @@ public class TestThreadPoolRequestReplicator { if (requestIndex == 1) { final Response clientResponse = mock(Response.class); - when(clientResponse.getStatus()).thenReturn(150); + when(clientResponse.getStatus()).thenReturn(202); return new NodeResponse(nodeId, request.getMethod(), uri, clientResponse, -1L, requestId); } else { final IllegalClusterStateException explanation = new IllegalClusterStateException("Intentional Exception for Unit Testing"); http://git-wip-us.apache.org/repos/asf/nifi/blob/8f4d13ee/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java index 54ccd56..357f56a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java @@ -33,20 +33,19 @@ import org.apache.nifi.web.Revision; * * <p> * Clients that will modify a resource must do so using a two-phase commit. First, - * the client will issue a request that includes an HTTP Header of "X-NcmExpects". + * the client will issue a request that includes an HTTP Header of "X-Validation-Expects". * This indicates that the request will not actually be performed but rather that the * node should validate that the request could in fact be performed. If all nodes respond - * with a 150-Continue response, then the second phase will commence. The second phase - * will consist of replicating the same request but without the "X-NcmExpects" header. + * with a 202-Accepted response, then the second phase will commence. The second phase + * will consist of replicating the same request but without the "X-Validation-Expects" header. * </p> * * <p> * When the first phase of the two-phase commit is processed, the Revision Manager should - * be used to verify that the client-provided Revisions are current by calling the - * {@link #verifyRevisions(Collection)} - * method. If the revisions are up-to-date, the method will return successfully and the - * request validation may continue. Otherwise, the request should fail and the second phase - * should not be performed. + * be used to retrieve the current revision by calling the {@link #getRevision(String)} method + * to verify that the client-provided Revisions are current. + * If the revisions are up-to-date, the request validation may continue. + * Otherwise, the request should fail and the second phase should not be performed. * </p> * * <p> http://git-wip-us.apache.org/repos/asf/nifi/blob/8f4d13ee/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java index 2e9e318..d99fb96 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java @@ -272,9 +272,9 @@ public abstract class ApplicationResource { } /** - * Generates a 150 Node Continue response to be used within the cluster request handshake. + * Generates a 202 Accepted (Node Continue) response to be used within the cluster request handshake. * - * @return a 150 Node Continue response to be used within the cluster request handshake + * @return a 202 Accepted (Node Continue) response to be used within the cluster request handshake */ protected ResponseBuilder generateContinueResponse() { return Response.status(RequestReplicator.NODE_CONTINUE_STATUS_CODE);
