Repository: nifi Updated Branches: refs/heads/master 84f1fb395 -> 5e62b4ae7
NIFI-3668: Fix purging expired replicate requests. This closes #1646. Newly created async response is added before checking map size nor purging expired ones. If there are already 100 remaining requests, the added request will not be executed nor removed. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5e62b4ae Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5e62b4ae Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5e62b4ae Branch: refs/heads/master Commit: 5e62b4ae72c6fa8c81aa3ee817ad1bd14e350c4a Parents: 84f1fb3 Author: Koji Kawamura <[email protected]> Authored: Mon Apr 3 15:56:21 2017 +0900 Committer: Mark Payne <[email protected]> Committed: Mon Apr 3 09:50:47 2017 -0400 ---------------------------------------------------------------------- .../ThreadPoolRequestReplicator.java | 68 +++++++++++--------- 1 file changed, 36 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/5e62b4ae/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index bc2f8bb..5a19ca3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -316,36 +316,6 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { throw new IllegalArgumentException("Cannot replicate request to 0 nodes"); } - // Update headers to indicate the current revision so that we can - // prevent multiple users changing the flow at the same time - final Map<String, String> updatedHeaders = new HashMap<>(headers); - final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString()); - - // create a response object if one was not already passed to us - if (response == null) { - // create the request objects and replicate to all nodes. - // When the request has completed, we need to ensure that we notify the monitor, if there is one. - final CompletionCallback completionCallback = clusterResponse -> { - try { - onCompletedResponse(requestId); - } finally { - if (monitor != null) { - synchronized (monitor) { - monitor.notify(); - } - - logger.debug("Notified monitor {} because request {} {} has completed", monitor, method, uri); - } - } - }; - - final Runnable responseConsumedCallback = () -> onResponseConsumed(requestId); - - response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds, - responseMapper, completionCallback, responseConsumedCallback, merge); - responseMap.put(requestId, response); - } - // verify all of the nodes exist and are in the proper state for (final NodeIdentifier nodeId : nodeIds) { final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId); @@ -360,11 +330,16 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { logger.debug("Replicating request {} {} with entity {} to {}; response is {}", method, uri, entity, nodeIds, response); + // Update headers to indicate the current revision so that we can + // prevent multiple users changing the flow at the same time + final Map<String, String> updatedHeaders = new HashMap<>(headers); + final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString()); + + long verifyClusterStateNanos = -1; if (performVerification) { final long start = System.nanoTime(); verifyClusterState(method, uri.getPath()); - final long nanos = System.nanoTime() - start; - response.addTiming("Verify Cluster State", "All Nodes", nanos); + verifyClusterStateNanos = System.nanoTime() - start; } int numRequests = responseMap.size(); @@ -382,6 +357,35 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests"); } + // create a response object if one was not already passed to us + if (response == null) { + // create the request objects and replicate to all nodes. + // When the request has completed, we need to ensure that we notify the monitor, if there is one. + final CompletionCallback completionCallback = clusterResponse -> { + try { + onCompletedResponse(requestId); + } finally { + if (monitor != null) { + synchronized (monitor) { + monitor.notify(); + } + + logger.debug("Notified monitor {} because request {} {} has completed", monitor, method, uri); + } + } + }; + + final Runnable responseConsumedCallback = () -> onResponseConsumed(requestId); + + response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds, + responseMapper, completionCallback, responseConsumedCallback, merge); + responseMap.put(requestId, response); + } + + if (verifyClusterStateNanos > -1) { + response.addTiming("Verify Cluster State", "All Nodes", verifyClusterStateNanos); + } + logger.debug("For Request ID {}, response object is {}", requestId, response); // if mutable request, we have to do a two-phase commit where we ask each node to verify
