http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/AsyncClusterResponse.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/AsyncClusterResponse.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/AsyncClusterResponse.java
new file mode 100644
index 0000000..d181407
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/AsyncClusterResponse.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.replication;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+/**
+ * A response that is provided when an HTTP Request is made that must be 
federated to nodes in the cluster
+ */
+public interface AsyncClusterResponse {
+
+    /**
+     * @return the unique identifier of the request
+     */
+    String getRequestIdentifier();
+
+    /**
+     * @return the HTTP Method that was used for the request
+     */
+    String getMethod();
+
+    /**
+     * @return the Path of the URI that was used for the request
+     */
+    String getURIPath();
+
+    /**
+     * @return a Set that contains the Node Identifier of each node to which 
the request was replicated
+     */
+    Set<NodeIdentifier> getNodesInvolved();
+
+    /**
+     * @return a Set that contains the Node Identifier of each node for which 
the request has completed (either with
+     *         a successful response or a timeout)
+     */
+    Set<NodeIdentifier> getCompletedNodeIdentifiers();
+
+    /**
+     * @return a Set that contains the Node Response of each node for which 
the request has completed (either with a
+     *         successful response or a timeout)
+     */
+    Set<NodeResponse> getCompletedNodeResponses();
+
+    /**
+     * @return <code>true</code> if all nodes have responded (or timed out), 
<code>false</code> if still waiting on a response
+     *         from one or more nodes
+     */
+    boolean isComplete();
+
+    /**
+     * Indicates whether or not the request was created more than the 
specified amount of time ago.
+     * For example, if called via
+     * <code>isOlderThan(3, TimeUnit.SECONDS)</code>
+     * the method will return <code>true</code> if the request was created 
more than 3 seconds ago.
+     *
+     * @param time the amount of time to check
+     * @param timeUnit the associated time unit
+     * @return <code>true</code> if the request was created before (now - time)
+     */
+    boolean isOlderThan(long time, TimeUnit timeUnit);
+
+    /**
+     * @return the {@link NodeResponse} that represents the merged result from 
all nodes, or <code>null</code> if this request
+     *         is not yet complete
+     *
+     * @throws RuntimeException if the request could not be completed for some 
reason, a
+     *             RuntimeException will be thrown that indicates why the 
request failed
+     */
+    NodeResponse getMergedResponse();
+
+    /**
+     * Blocks until the request has completed and then returns the merged 
response
+     *
+     * @return the NodeResponse that represents the merged result from all 
nodes
+     *
+     * @throws InterruptedException if the thread is interrupted while waiting
+     * @throws RuntimeException if the request could not be completed for some 
reason, a
+     *             RuntimeException will be thrown that indicates why the 
request failed
+     */
+    NodeResponse awaitMergedResponse() throws InterruptedException;
+
+    /**
+     * Blocks until the request has completed or until the given timeout 
elapses. At that point, if the request has completed, then
+     * the merged response is returned. If the timeout elapses before the 
request completes, then <code>null</code> will be returned.
+     *
+     * @return the NodeResponse that represents the merged result from all 
nodes, or <code>null</code> if the given timeout elapses
+     *         before the request completes
+     *
+     * @throws InterruptedException if the thread is interrupted while waiting
+     * @throws RuntimeException if the request could not be completed for some 
reason, a
+     *             RuntimeException will be thrown that indicates why the 
request failed
+     *
+     */
+    NodeResponse awaitMergedResponse(long timeout, TimeUnit timeUnit) throws 
InterruptedException;
+
+    /**
+     * Returns the NodeResponse that represents the individual response from 
the node with the given identifier
+     *
+     * @param nodeId the ID of the node whose response is to be returned
+     * @return the NodeResponse from the node with the given identifier, or 
<code>null</code> if there is no response yet from the given node
+     */
+    NodeResponse getNodeResponse(NodeIdentifier nodeId);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/CompletionCallback.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/CompletionCallback.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/CompletionCallback.java
new file mode 100644
index 0000000..314a805
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/CompletionCallback.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.replication;
+
+public interface CompletionCallback {
+    void onCompletion(AsyncClusterResponse response);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestCompletionCallback.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestCompletionCallback.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestCompletionCallback.java
new file mode 100644
index 0000000..7e79da4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestCompletionCallback.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.replication;
+
+import java.util.Set;
+
+import org.apache.nifi.cluster.manager.NodeResponse;
+
+/**
+ * A callback that can be registered to be called after an HTTP Request is 
replicated and all nodes'
+ * responses have been accounted for (either successfully or not)
+ */
+public interface RequestCompletionCallback {
+
+    /**
+     * Called after all NodeResponse objects have been gathered for the request
+     *
+     * @param uriPath the path of the request URI
+     * @param method the HTTP method of the request
+     * @param nodeResponses the NodeResponse for each node that the request 
was replicated to
+     */
+    void afterRequest(String uriPath, String method, Set<NodeResponse> 
nodeResponses);
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
new file mode 100644
index 0000000..6c9f18c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.replication;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+public interface RequestReplicator {
+    public static final String REQUEST_TRANSACTION_ID = 
"X-RequestTransactionId";
+
+    /**
+     * Starts the instance for replicating requests. Calling this method on an 
already started instance has no effect.
+     */
+    void start();
+
+    /**
+     * Stops the instance from replicating requests. Calling this method on a 
stopped instance has no effect.
+     */
+    void stop();
+
+    /**
+     * @return true if the instance is started; false otherwise.
+     */
+    boolean isRunning();
+
+    /**
+     * Requests are sent to each node in the given set of Node Identifiers. 
The returned AsyncClusterResponse object will contain
+     * the results that are immediately available, as well as an identifier 
for obtaining an updated result later.
+     *
+     * HTTP DELETE, GET, HEAD, and OPTIONS methods will throw an 
IllegalArgumentException if used.
+     *
+     * @param nodeIds the node identifiers
+     * @param method the HTTP method (e.g., POST, PUT)
+     * @param uri the base request URI (up to, but not including, the query 
string)
+     * @param entity an entity
+     * @param headers any HTTP headers
+     *
+     * @return an AsyncClusterResponse that indicates the current status of 
the request and provides an identifier for obtaining an updated response later
+     */
+    AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, 
URI uri, Object entity, Map<String, String> headers);
+
+    /**
+     * <p>
+     * Returns an AsyncClusterResponse that provides the most up-to-date 
status of the request with the given identifier.
+     * If the request is finished, meaning that all nodes in the cluster have 
reported back their status or have timed out,
+     * then the response will be removed and any subsequent calls to obtain 
the response with the same identifier will return
+     * <code>null</code>. If the response is not complete, the method may be 
called again at some point in the future in order
+     * to check again if the request has completed.
+     * </p>
+     *
+     * @param requestIdentifier the identifier of the request to obtain a 
response for
+     * @return an AsyncClusterResponse that provides the most up-to-date 
status of the request with the given identifier, or <code>null</code> if
+     *         no request exists with the given identifier
+     */
+    AsyncClusterResponse getClusterResponse(String requestIdentifier);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ResponseUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ResponseUtils.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ResponseUtils.java
new file mode 100644
index 0000000..8435c60
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ResponseUtils.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.replication;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+public class ResponseUtils {
+
+    /**
+     * Finds the Node Identifier for all nodes that had a 'slow' response 
time. A 'slow' response time
+     * is defined as being more than X standard deviations from the mean 
response time, where X is the
+     * given Standard Deviation Multiple
+     *
+     * @param response the AsyncClusterResponse to base the calculations off of
+     * @param stdDeviationMultiple the number of standard deviations that a 
response time must be away from the mean in order
+     *            to be considered 'slow'
+     *
+     * @return a Set of all Node Identifiers that took a long time to respond
+     */
+    public static Set<NodeIdentifier> findLongResponseTimes(final 
AsyncClusterResponse response, final double stdDeviationMultiple) {
+        final Set<NodeIdentifier> slowResponses = new HashSet<>();
+
+        if (response.isOlderThan(2, TimeUnit.SECONDS)) {
+            // If the response is older than 2 seconds, determines if any node 
took a long time to respond.
+            final Set<NodeIdentifier> completedIds = 
response.getCompletedNodeIdentifiers();
+
+            if (completedIds.size() < 2) {
+                return slowResponses;
+            }
+
+            long requestMillisSum = 0L;
+            int numNodes = 0;
+            for (final NodeIdentifier nodeId : completedIds) {
+                final long requestMillis = 
response.getNodeResponse(nodeId).getRequestDuration(TimeUnit.NANOSECONDS);
+                if (requestMillis < 0) {
+                    continue;
+                }
+
+                requestMillisSum += requestMillis;
+                numNodes++;
+            }
+
+            if (numNodes < 2) {
+                return slowResponses;
+            }
+
+            final double mean = requestMillisSum / numNodes;
+            double differenceSquaredSum = 0D;
+            for (final NodeIdentifier nodeId : completedIds) {
+                final long requestMillis = 
response.getNodeResponse(nodeId).getRequestDuration(TimeUnit.NANOSECONDS);
+                final double differenceSquared = Math.pow(mean - 
requestMillis, 2);
+                differenceSquaredSum += differenceSquared;
+            }
+
+            final double meanOfDifferenceSquared = differenceSquaredSum / 
numNodes;
+            final double stdDev = Math.pow(meanOfDifferenceSquared, 0.5D);
+            final double longTimeThreshold = mean + stdDev * 
stdDeviationMultiple;
+
+            for (final NodeIdentifier nodeId : completedIds) {
+                final long requestMillis = 
response.getNodeResponse(nodeId).getRequestDuration(TimeUnit.NANOSECONDS);
+                if (requestMillis > longTimeThreshold) {
+                    slowResponses.add(nodeId);
+                }
+            }
+        }
+
+        return slowResponses;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
new file mode 100644
index 0000000..52bf805
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.replication;
+
+import java.net.URI;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StandardAsyncClusterResponse implements AsyncClusterResponse {
+    private static final Logger logger = 
LoggerFactory.getLogger(StandardAsyncClusterResponse.class);
+
+    private final String id;
+    private final Set<NodeIdentifier> nodeIds;
+    private final URI uri;
+    private final String method;
+    private final HttpResponseMerger responseMerger;
+    private final CompletionCallback completionCallback;
+    private final Runnable completedResultFetchedCallback;
+    private final long creationTimeNanos;
+
+    private final Map<NodeIdentifier, ResponseHolder> responseMap = new 
HashMap<>();
+    private final AtomicInteger requestsCompleted = new AtomicInteger(0);
+
+    private NodeResponse mergedResponse; // guarded by synchronizing on this
+    private RuntimeException failure; // guarded by synchronizing on this
+
+    public StandardAsyncClusterResponse(final String id, final URI uri, final 
String method, final Set<NodeIdentifier> nodeIds,
+        final HttpResponseMerger responseMerger, final CompletionCallback 
completionCallback, final Runnable completedResultFetchedCallback) {
+        this.id = id;
+        this.nodeIds = Collections.unmodifiableSet(new HashSet<>(nodeIds));
+        this.uri = uri;
+        this.method = method;
+
+        creationTimeNanos = System.nanoTime();
+        for (final NodeIdentifier nodeId : nodeIds) {
+            responseMap.put(nodeId, new ResponseHolder(creationTimeNanos));
+        }
+
+        this.responseMerger = responseMerger;
+        this.completionCallback = completionCallback;
+        this.completedResultFetchedCallback = completedResultFetchedCallback;
+    }
+
+    @Override
+    public String getRequestIdentifier() {
+        return id;
+    }
+
+    @Override
+    public Set<NodeIdentifier> getNodesInvolved() {
+        return nodeIds;
+    }
+
+    @Override
+    public Set<NodeIdentifier> getCompletedNodeIdentifiers() {
+        return responseMap.entrySet().stream()
+            .filter(entry -> entry.getValue().isComplete())
+            .map(entry -> entry.getKey())
+            .collect(Collectors.toSet());
+    }
+
+    @Override
+    public Set<NodeResponse> getCompletedNodeResponses() {
+        return responseMap.values().stream()
+            .filter(responseHolder -> responseHolder.isComplete())
+            .map(responseHolder -> responseHolder.getResponse())
+            .collect(Collectors.toSet());
+    }
+
+    @Override
+    public boolean isOlderThan(final long time, final TimeUnit timeUnit) {
+        final long nanos = timeUnit.toNanos(time);
+        final long threshold = System.nanoTime() - nanos;
+        return creationTimeNanos < threshold;
+    }
+
+    @Override
+    public boolean isComplete() {
+        return getMergedResponse() != null;
+    }
+
+    @Override
+    public String getMethod() {
+        return method;
+    }
+
+    @Override
+    public String getURIPath() {
+        return uri.getPath();
+    }
+
+    @Override
+    public NodeResponse getMergedResponse() {
+        return getMergedResponse(true);
+    }
+
+    public synchronized NodeResponse getMergedResponse(final boolean 
triggerCallback) {
+        if (failure != null) {
+            throw failure;
+        }
+
+        if (mergedResponse != null) {
+            if (triggerCallback && completedResultFetchedCallback != null) {
+                completedResultFetchedCallback.run();
+            }
+
+            return mergedResponse;
+        }
+
+        if (requestsCompleted.get() < responseMap.size()) {
+            return null;
+        }
+
+        final Set<NodeResponse> nodeResponses = 
responseMap.values().stream().map(p -> 
p.getResponse()).collect(Collectors.toSet());
+        mergedResponse = responseMerger.mergeResponses(uri, method, 
nodeResponses);
+
+        logger.debug("Notifying all that merged response is complete for {}", 
id);
+        this.notifyAll();
+
+        if (triggerCallback && completedResultFetchedCallback != null) {
+            completedResultFetchedCallback.run();
+        }
+
+        return mergedResponse;
+    }
+
+    @Override
+    public NodeResponse awaitMergedResponse() throws InterruptedException {
+        synchronized (this) {
+            while (getMergedResponse(false) == null) {
+                logger.debug("Waiting indefinitely for merged response to be 
complete for {}", id);
+                this.wait();
+            }
+        }
+
+        return getMergedResponse(true);
+    }
+
+    @Override
+    public NodeResponse awaitMergedResponse(final long timeout, final TimeUnit 
timeUnit) throws InterruptedException {
+        if (timeout < 0) {
+            throw new IllegalArgumentException();
+        }
+
+        final long maxTime = System.nanoTime() + timeUnit.toNanos(timeout);
+
+        synchronized (this) {
+            while (getMergedResponse(false) == null) {
+                final long nanosToWait = maxTime - System.nanoTime();
+                if (nanosToWait < 0) {
+                    return getMergedResponse(true);
+                }
+
+                final long millis = TimeUnit.NANOSECONDS.toMillis(nanosToWait);
+                final int nanos = (int) (nanosToWait - 
TimeUnit.MILLISECONDS.toNanos(millis));
+
+                logger.debug("Waiting {} millis and {} nanos for merged 
response to be complete for {}", millis, nanos, id);
+                this.wait(millis, nanos);
+            }
+        }
+
+        return getMergedResponse(true);
+    }
+
+    @Override
+    public NodeResponse getNodeResponse(final NodeIdentifier nodeId) {
+        final ResponseHolder request = responseMap.get(nodeId);
+        return request == null ? null : request.getResponse();
+    }
+
+    void add(final NodeResponse nodeResponse) {
+        final ResponseHolder responseHolder = 
responseMap.get(nodeResponse.getNodeId());
+        if (responseHolder == null) {
+            throw new IllegalStateException("Node " + nodeResponse.getNodeId() 
+ " is not known for this request");
+        }
+
+        responseHolder.setResponse(nodeResponse);
+        final int completedCount = requestsCompleted.incrementAndGet();
+
+        logger.debug("Received response {} out of {} for {} from {}", 
completedCount, responseMap.size(), id, nodeResponse.getNodeId());
+
+        if (completedCount == responseMap.size()) {
+            logger.debug("Notifying all that merged response is ready for {}", 
id);
+            synchronized (this) {
+                this.notifyAll();
+            }
+
+            if (completionCallback != null) {
+                completionCallback.onCompletion(this);
+            }
+        }
+    }
+
+    synchronized void setFailure(final RuntimeException failure) {
+        this.failure = failure;
+
+        notifyAll();
+        if (completionCallback != null) {
+            completionCallback.onCompletion(this);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "StandardAsyncClusterResponse[id=" + id + ", uri=" + uri + ", 
method=" + method + ", failure=" + (failure != null)
+            + ", responses=" + getCompletedNodeIdentifiers().size() + "/" + 
responseMap.size() + "]";
+    }
+
+    private static class ResponseHolder {
+        private final long nanoStart;
+        private long requestNanos;
+        private NodeResponse response;
+
+        public ResponseHolder(final long startNanos) {
+            this.nanoStart = startNanos;
+        }
+
+        public synchronized void setResponse(final NodeResponse response) {
+            this.response = response;
+            this.requestNanos = System.nanoTime() - nanoStart;
+        }
+
+        public synchronized NodeResponse getResponse() {
+            return response;
+        }
+
+        public synchronized boolean isComplete() {
+            return response != null;
+        }
+
+        @SuppressWarnings("unused")
+        public long getRequestDuration(final TimeUnit timeUnit) {
+            return timeUnit.toNanos(requestNanos);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
new file mode 100644
index 0000000..3b71654
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -0,0 +1,672 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.replication;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.LongSummaryStatistics;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+
+import org.apache.nifi.cluster.context.ClusterContext;
+import org.apache.nifi.cluster.context.ClusterContextImpl;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
+import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.flow.DataFlowManagementService;
+import org.apache.nifi.cluster.flow.PersistedFlowState;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import 
org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
+import 
org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
+import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
+import org.apache.nifi.cluster.manager.exception.UriConstructionException;
+import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.logging.NiFiLog;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.web.OptimisticLockingManager;
+import org.apache.nifi.web.util.WebUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+
+public class ThreadPoolRequestReplicator implements RequestReplicator {
+    /**
+     * The HTTP header to store a cluster context. An example of what may be 
stored in the context is a node's
+     * auditable actions in response to a cluster request. The cluster context 
is serialized
+     * using Java's serialization mechanism and hex encoded.
+     */
+    static final String CLUSTER_CONTEXT_HTTP_HEADER = "X-ClusterContext";
+
+    /**
+     * The HTTP header that the NCM specifies to ask a node if they are able 
to process a given request. The value
+     * is always 150-NodeContinue. The node will respond with 150 CONTINUE if 
it is able to
+     * process the request, 417 EXPECTATION_FAILED otherwise.
+     */
+    static final String NCM_EXPECTS_HTTP_HEADER = "X-NcmExpects";
+    static final String NODE_CONTINUE = "150-NodeContinue";
+    static final int NODE_CONTINUE_STATUS_CODE = 150;
+
+    private static final Logger logger = new 
NiFiLog(LoggerFactory.getLogger(ThreadPoolRequestReplicator.class));
+    private static final int MAX_CONCURRENT_REQUESTS = 100;
+
+    private final Client client; // the client to use for issuing requests
+    private final int numThreads; // number of threads to use for request 
replication
+    private final int connectionTimeoutMs; // connection timeout per node 
request
+    private final int readTimeoutMs; // read timeout per node request
+    private final HttpResponseMerger responseMerger;
+    private final EventReporter eventReporter;
+    private final RequestCompletionCallback callback;
+    private final ClusterCoordinator clusterCoordinator;
+    private final OptimisticLockingManager lockingManager;
+    private final DataFlowManagementService dfmService;
+
+    private ExecutorService executorService;
+    private ScheduledExecutorService maintenanceExecutor;
+
+    private final ConcurrentMap<String, StandardAsyncClusterResponse> 
responseMap = new ConcurrentHashMap<>();
+    private final ConcurrentMap<NodeIdentifier, AtomicInteger> 
sequentialLongRequestCounts = new ConcurrentHashMap<>();
+
+    /**
+     * Creates an instance using a connection timeout and read timeout of 3 
seconds
+     *
+     * @param numThreads the number of threads to use when parallelizing 
requests
+     * @param client a client for making requests
+     * @param clusterCoordinator the cluster coordinator to use for 
interacting with node statuses
+     * @param callback a callback that will be called whenever all of the 
responses have been gathered for a request. May be null.
+     * @param eventReporter an EventReporter that can be used to notify users 
of interesting events. May be null.
+     */
+    public ThreadPoolRequestReplicator(final int numThreads, final Client 
client, final ClusterCoordinator clusterCoordinator,
+        final RequestCompletionCallback callback, final EventReporter 
eventReporter, final OptimisticLockingManager lockingManager, final 
DataFlowManagementService dfmService) {
+        this(numThreads, client, clusterCoordinator, "3 sec", "3 sec", 
callback, eventReporter, null, lockingManager, dfmService);
+    }
+
+    /**
+     * Creates an instance.
+     *
+     * @param numThreads the number of threads to use when parallelizing 
requests
+     * @param client a client for making requests
+     * @param clusterCoordinator the cluster coordinator to use for 
interacting with node statuses
+     * @param connectionTimeout the connection timeout specified in 
milliseconds
+     * @param readTimeout the read timeout specified in milliseconds
+     * @param callback a callback that will be called whenever all of the 
responses have been gathered for a request. May be null.
+     * @param eventReporter an EventReporter that can be used to notify users 
of interesting events. May be null.
+     */
+    public ThreadPoolRequestReplicator(final int numThreads, final Client 
client, final ClusterCoordinator clusterCoordinator,
+        final String connectionTimeout, final String readTimeout, final 
RequestCompletionCallback callback, final EventReporter eventReporter,
+        final WebClusterManager clusterManager, final OptimisticLockingManager 
lockingManager, final DataFlowManagementService dfmService) {
+        if (numThreads <= 0) {
+            throw new IllegalArgumentException("The number of threads must be 
greater than zero.");
+        } else if (client == null) {
+            throw new IllegalArgumentException("Client may not be null.");
+        }
+
+        this.numThreads = numThreads;
+        this.client = client;
+        this.clusterCoordinator = clusterCoordinator;
+        this.connectionTimeoutMs = (int) 
FormatUtils.getTimeDuration(connectionTimeout, TimeUnit.MILLISECONDS);
+        this.readTimeoutMs = (int) FormatUtils.getTimeDuration(readTimeout, 
TimeUnit.MILLISECONDS);
+        this.responseMerger = new StandardHttpResponseMerger(clusterManager);
+        this.eventReporter = eventReporter;
+        this.callback = callback;
+        this.lockingManager = lockingManager;
+        this.dfmService = dfmService;
+
+        client.getProperties().put(ClientConfig.PROPERTY_CONNECT_TIMEOUT, 
connectionTimeoutMs);
+        client.getProperties().put(ClientConfig.PROPERTY_READ_TIMEOUT, 
readTimeoutMs);
+        client.getProperties().put(ClientConfig.PROPERTY_FOLLOW_REDIRECTS, 
Boolean.TRUE);
+    }
+
+    @Override
+    public void start() {
+        if (isRunning()) {
+            return;
+        }
+
+        executorService = Executors.newFixedThreadPool(numThreads);
+        maintenanceExecutor = Executors.newScheduledThreadPool(1, new 
ThreadFactory() {
+            @Override
+            public Thread newThread(final Runnable r) {
+                final Thread t = Executors.defaultThreadFactory().newThread(r);
+                t.setDaemon(true);
+                t.setName(ThreadPoolRequestReplicator.class.getSimpleName() + 
" Maintenance Thread");
+                return t;
+            }
+        });
+
+        maintenanceExecutor.scheduleWithFixedDelay(new 
PurgeExpiredRequestsTask(), 3, 3, TimeUnit.SECONDS);
+    }
+
+    @Override
+    public boolean isRunning() {
+        return executorService != null && !executorService.isShutdown();
+    }
+
+    @Override
+    public void stop() {
+        if (!isRunning()) {
+            return;
+        }
+
+        executorService.shutdown();
+        maintenanceExecutor.shutdown();
+    }
+
+    @Override
+    public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String 
method, URI uri, Object entity, Map<String, String> headers) {
+        return replicate(nodeIds, method, uri, entity, headers, true, null);
+    }
+
+    /**
+     * Replicates the request to all nodes in the given set of node identifiers
+     *
+     * @param nodeIds the NodeIdentifiers that identify which nodes to send 
the request to
+     * @param method the HTTP method to use
+     * @param uri the URI to send the request to
+     * @param entity the entity to use
+     * @param headers the HTTP Headers
+     * @param performVerification whether or not to use 2-phase commit to 
verify that all nodes can handle the request. Ignored if request is not mutable.
+     * @param response the response to update with the results
+     *
+     * @return an AsyncClusterResponse that can be used to obtain the result
+     */
+    private AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String 
method, URI uri, Object entity, Map<String, String> headers, boolean 
performVerification,
+        StandardAsyncClusterResponse response) {
+
+        // state validation
+        Objects.requireNonNull(nodeIds);
+        Objects.requireNonNull(method);
+        Objects.requireNonNull(uri);
+        Objects.requireNonNull(entity);
+        Objects.requireNonNull(headers);
+
+        if (nodeIds.isEmpty()) {
+            throw new IllegalArgumentException("Cannot replicate request to 0 
nodes");
+        }
+
+        if (performVerification) {
+            verifyState(method, uri.getPath());
+        }
+
+        final int numRequests = responseMap.size();
+        if (numRequests >= MAX_CONCURRENT_REQUESTS) {
+            throw new IllegalStateException("There are too many outstanding 
HTTP requests with a total " + numRequests + " outstanding requests");
+        }
+
+        // create the request objects and replicate to all nodes
+        final String requestId = UUID.randomUUID().toString();
+        final CompletionCallback completionCallback = clusterResponse -> 
onCompletedResponse(requestId);
+        final Runnable responseConsumedCallback = () -> 
onResponseConsumed(requestId);
+
+        // create a response object if one was not already passed to us
+        if (response == null) {
+            response = new StandardAsyncClusterResponse(requestId, uri, 
method, nodeIds,
+                responseMerger, completionCallback, responseConsumedCallback);
+            responseMap.put(requestId, response);
+        }
+
+        // Update headers to indicate the current revision so that we can
+        // prevent multiple users changing the flow at the same time
+        final Map<String, String> updatedHeaders = new HashMap<>(headers);
+        updatedHeaders.put(REQUEST_TRANSACTION_ID, 
UUID.randomUUID().toString());
+        // setRevision(updatedHeaders);
+
+        // if mutable request, we have to do a two-phase commit where we ask 
each node to verify
+        // that the request can take place and then, if all nodes agree that 
it can, we can actually
+        // issue the request. This is all handled by calling 
performVerification, which will replicate
+        // the 'vote' request to all nodes and then if successful will call 
back into this method to
+        // replicate the actual request.
+        final boolean mutableRequest = isMutableRequest(method, uri.getPath());
+        if (mutableRequest && performVerification) {
+            performVerification(nodeIds, method, uri, entity, updatedHeaders, 
response);
+            return response;
+        }
+
+        // Callback function for generating a NodeHttpRequestCallable that can 
be used to perform the work
+        final StandardAsyncClusterResponse finalResponse = response;
+        NodeRequestCompletionCallback nodeCompletionCallback = nodeResponse -> 
{
+            logger.debug("Received response from {} for {} {}", 
nodeResponse.getNodeId(), method, uri.getPath());
+            finalResponse.add(nodeResponse);
+        };
+
+
+        // replicate the request to all nodes
+        final Function<NodeIdentifier, NodeHttpRequest> requestFactory =
+            nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, 
nodeId), entity, updatedHeaders, nodeCompletionCallback);
+        replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), 
requestFactory, updatedHeaders);
+
+
+        // TODO: Must handle revisions!!
+
+        return response;
+    }
+
+
+    private void setRevision(final Map<String, String> headers) {
+        final ClusterContext clusterCtx = new ClusterContextImpl();
+        clusterCtx.setRequestSentByClusterManager(true); // indicate request 
is sent from cluster manager
+        
clusterCtx.setRevision(lockingManager.getLastModification().getRevision());
+
+        // serialize cluster context and add to request header
+        final String serializedClusterCtx = 
WebUtils.serializeObjectToHex(clusterCtx);
+        headers.put(CLUSTER_CONTEXT_HTTP_HEADER, serializedClusterCtx);
+    }
+
+
+    private void performVerification(Set<NodeIdentifier> nodeIds, String 
method, URI uri, Object entity, Map<String, String> headers, 
StandardAsyncClusterResponse clusterResponse) {
+        logger.debug("Verifying that mutable request {} {} can be made", 
method, uri.getPath());
+
+        final Map<String, String> updatedHeaders = new HashMap<>(headers);
+        updatedHeaders.put(NCM_EXPECTS_HTTP_HEADER, NODE_CONTINUE);
+
+        final int numNodes = nodeIds.size();
+        final NodeRequestCompletionCallback completionCallback = new 
NodeRequestCompletionCallback() {
+            final Set<NodeResponse> nodeResponses = 
Collections.synchronizedSet(new HashSet<>());
+
+            @Override
+            public void onCompletion(final NodeResponse nodeResponse) {
+                // Add the node response to our collection.
+                nodeResponses.add(nodeResponse);
+
+                try {
+                    // If we have all of the node responses, then we can 
verify the responses
+                    // and if good replicate the original request to all of 
the nodes.
+                    if (nodeResponses.size() == numNodes) {
+                        // Check if we have any requests that do not have a 
150-Continue status code.
+                        final long dissentingCount = 
nodeResponses.stream().filter(p -> p.getStatus() != 
NODE_CONTINUE_STATUS_CODE).count();
+
+                        // If all nodes responded with 150-Continue, then we 
can replicate the original request
+                        // to all nodes and we are finished.
+                        if (dissentingCount == 0) {
+                            logger.debug("Received verification from all {} 
nodes that mutable request {} {} can be made", numNodes, method, uri.getPath());
+                            replicate(nodeIds, method, uri, entity, headers, 
false, clusterResponse);
+                            return;
+                        }
+
+                        // Add a NodeResponse for each node to the Cluster 
Response
+                        // Check that all nodes responded successfully.
+                        for (final NodeResponse response : nodeResponses) {
+                            if (response.getStatus() != 
NODE_CONTINUE_STATUS_CODE) {
+                                final ClientResponse clientResponse = 
response.getClientResponse();
+
+                                final RuntimeException failure;
+                                if (clientResponse == null) {
+                                    failure = new 
IllegalClusterStateException("Node " + response.getNodeId()
+                                        + " is unable to fulfill this request 
due to: Unexpected Response Code " + response.getStatus());
+
+                                    logger.info("Received a status of {} from 
{} for request {} {} when performing first stage of two-stage commit. "
+                                        + "Will respond with CONFLICT response 
and action will not occur",
+                                        response.getStatus(), 
response.getNodeId(), method, uri.getPath());
+                                } else {
+                                    final String nodeExplanation = 
clientResponse.getEntity(String.class);
+                                    failure = new 
IllegalClusterStateException("Node " + response.getNodeId() + " is unable to 
fulfill this request due to: "
+                                        + nodeExplanation, 
response.getThrowable());
+
+                                    logger.info("Received a status of {} from 
{} for request {} {} when performing first stage of two-stage commit. "
+                                        + "Will respond with CONFLICT response 
and action will not occur. Node explanation: {}",
+                                        response.getStatus(), 
response.getNodeId(), method, uri.getPath(), nodeExplanation);
+                                }
+
+                                clusterResponse.setFailure(failure);
+                                break;
+                            }
+                        }
+                    }
+                } catch (final Exception e) {
+                    clusterResponse.add(new 
NodeResponse(nodeResponse.getNodeId(), method, uri, e));
+
+                    // If there was a problem, we need to ensure that we add 
all of the other nodes' responses
+                    // to the Cluster Response so that the Cluster Response is 
complete.
+                    for (final NodeResponse otherResponse : nodeResponses) {
+                        if 
(otherResponse.getNodeId().equals(nodeResponse.getNodeId())) {
+                            continue;
+                        }
+
+                        clusterResponse.add(otherResponse);
+                    }
+                }
+            }
+        };
+
+        // notify dataflow management service that flow state is not known
+        dfmService.setPersistedFlowState(PersistedFlowState.UNKNOWN);
+
+        // Callback function for generating a NodeHttpRequestCallable that can 
be used to perform the work
+        final Function<NodeIdentifier, NodeHttpRequest> requestFactory = 
nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, 
updatedHeaders, completionCallback);
+
+        // replicate the 'verification request' to all nodes
+        replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), 
requestFactory, updatedHeaders);
+    }
+
+
+    @Override
+    public AsyncClusterResponse getClusterResponse(final String identifier) {
+        final AsyncClusterResponse response = responseMap.get(identifier);
+        if (response == null) {
+            return null;
+        }
+
+        return response;
+    }
+
+    // Visible for testing - overriding this method makes it easy to verify 
behavior without actually making any web requests
+    protected NodeResponse replicateRequest(final WebResource.Builder 
resourceBuilder, final NodeIdentifier nodeId, final String method, final URI 
uri, final String requestId) {
+        final ClientResponse clientResponse;
+        final long startNanos = System.nanoTime();
+
+        switch (method.toUpperCase()) {
+            case HttpMethod.DELETE:
+                clientResponse = resourceBuilder.delete(ClientResponse.class);
+                break;
+            case HttpMethod.GET:
+                clientResponse = resourceBuilder.get(ClientResponse.class);
+                break;
+            case HttpMethod.HEAD:
+                clientResponse = resourceBuilder.head();
+                break;
+            case HttpMethod.OPTIONS:
+                clientResponse = resourceBuilder.options(ClientResponse.class);
+                break;
+            case HttpMethod.POST:
+                clientResponse = resourceBuilder.post(ClientResponse.class);
+                break;
+            case HttpMethod.PUT:
+                clientResponse = resourceBuilder.put(ClientResponse.class);
+                break;
+            default:
+                throw new IllegalArgumentException("HTTP Method '" + method + 
"' not supported for request replication.");
+        }
+
+        return new NodeResponse(nodeId, method, uri, clientResponse, 
System.nanoTime() - startNanos, requestId);
+    }
+
+    private boolean isMutableRequest(final String method, final String 
uriPath) {
+        switch (method.toUpperCase()) {
+            case HttpMethod.GET:
+            case HttpMethod.HEAD:
+            case HttpMethod.OPTIONS:
+                return false;
+            default:
+                return true;
+        }
+    }
+
+    /**
+     * Verifies that the cluster is in a state that will allow requests to be 
made using the given HTTP Method and URI path
+     *
+     * @param httpMethod the HTTP Method
+     * @param uriPath the URI Path
+     *
+     * @throw IllegalClusterStateException if the cluster is not in a state 
that allows a request to made to the given URI Path using the given HTTP Method
+     */
+    private void verifyState(final String httpMethod, final String uriPath) {
+        final boolean mutableRequest = HttpMethod.DELETE.equals(httpMethod) || 
HttpMethod.POST.equals(httpMethod) || HttpMethod.PUT.equals(httpMethod);
+
+        // check that the request can be applied
+        if (mutableRequest) {
+            final Map<NodeConnectionState, List<NodeIdentifier>> 
connectionStates = clusterCoordinator.getConnectionStates();
+            if (connectionStates.containsKey(NodeConnectionState.DISCONNECTED) 
|| connectionStates.containsKey(NodeConnectionState.DISCONNECTING)) {
+                throw new DisconnectedNodeMutableRequestException("Received a 
mutable request [" + httpMethod + " " + uriPath + "] while a node is 
disconnected from the cluster");
+            }
+
+            if (connectionStates.containsKey(NodeConnectionState.CONNECTING)) {
+                // if any node is connecting and a request can change the 
flow, then we throw an exception
+                throw new ConnectingNodeMutableRequestException("Received a 
mutable request [" + httpMethod + " " + uriPath + "] while a node is trying to 
connect to the cluster");
+            }
+        }
+    }
+
+    private void onResponseConsumed(final String requestId) {
+        final AsyncClusterResponse response = responseMap.remove(requestId);
+
+        if (response != null && logger.isDebugEnabled()) {
+            logTimingInfo(response);
+        }
+    }
+
+    private void onCompletedResponse(final String requestId) {
+        final AsyncClusterResponse response = responseMap.get(requestId);
+
+        if (response != null) {
+            if (isMutableRequest(response.getMethod(), response.getURIPath())) 
{
+                dfmService.setPersistedFlowState(PersistedFlowState.STALE);
+            }
+        }
+
+        if (response != null && callback != null) {
+            try {
+                callback.afterRequest(response.getURIPath(), 
response.getMethod(), response.getCompletedNodeResponses());
+            } catch (final Exception e) {
+                logger.warn("Completed request {} {} but failed to properly 
handle the Request Completion Callback due to {}",
+                    response.getMethod(), response.getURIPath(), e.toString());
+                logger.warn("", e);
+            }
+        }
+
+        // If we have any nodes that are slow to respond, keep track of this. 
If the same node is slow 3 times in
+        // a row, log a warning to indicate that the node is responding slowly.
+        final Set<NodeIdentifier> slowResponseNodes = 
ResponseUtils.findLongResponseTimes(response, 1.5D);
+        for (final NodeIdentifier nodeId : response.getNodesInvolved()) {
+            final AtomicInteger counter = 
sequentialLongRequestCounts.computeIfAbsent(nodeId, id -> new AtomicInteger(0));
+            if (slowResponseNodes.contains(nodeId)) {
+                final int sequentialLongRequests = counter.incrementAndGet();
+                if (sequentialLongRequests >= 3) {
+                    final String message = "Response time from " + nodeId + " 
was slow for each of the last 3 requests made. "
+                        + "To see more information about timing, enable DEBUG 
logging for " + logger.getName();
+
+                    logger.warn(message);
+                    if (eventReporter != null) {
+                        eventReporter.reportEvent(Severity.WARNING, "Node 
Response Time", message);
+                    }
+
+                    counter.set(0);
+                }
+            } else {
+                counter.set(0);
+            }
+        }
+    }
+
+    private void logTimingInfo(final AsyncClusterResponse response) {
+        // Calculate min, max, mean for the requests
+        final LongSummaryStatistics stats = 
response.getNodesInvolved().stream()
+            .map(p -> 
response.getNodeResponse(p).getRequestDuration(TimeUnit.MILLISECONDS))
+            .collect(Collectors.summarizingLong(Long::longValue));
+
+        final StringBuilder sb = new StringBuilder();
+        sb.append("Node Responses for ").append(response.getMethod()).append(" 
").append(response.getURIPath()).append(" (Request ID 
").append(response.getRequestIdentifier()).append("):\n");
+        for (final NodeIdentifier node : response.getNodesInvolved()) {
+            sb.append(node).append(": 
").append(response.getNodeResponse(node).getRequestDuration(TimeUnit.MILLISECONDS)).append("
 millis\n");
+        }
+
+        logger.debug("For {} {} (Request ID {}), minimum response time = {}, 
max = {}, average = {} ms",
+            response.getMethod(), response.getURIPath(), 
response.getRequestIdentifier(), stats.getMin(), stats.getMax(), 
stats.getAverage());
+        logger.debug(sb.toString());
+    }
+
+
+    private void replicateRequest(final Set<NodeIdentifier> nodeIds, final 
String scheme,
+        final String path, final Function<NodeIdentifier, NodeHttpRequest> 
callableFactory, final Map<String, String> headers) {
+
+        if (nodeIds.isEmpty()) {
+            return; // return quickly for trivial case
+        }
+
+        // submit the requests to the nodes
+        final String requestId = UUID.randomUUID().toString();
+        headers.put(WebClusterManager.REQUEST_ID_HEADER, requestId);
+        for (final NodeIdentifier nodeId : nodeIds) {
+            final NodeHttpRequest callable = callableFactory.apply(nodeId);
+            executorService.submit(callable);
+        }
+    }
+
+
+    private URI createURI(final URI exampleUri, final NodeIdentifier nodeId) {
+        return createURI(exampleUri.getScheme(), nodeId.getApiAddress(), 
nodeId.getApiPort(), exampleUri.getPath());
+    }
+
+    private URI createURI(final String scheme, final String nodeApiAddress, 
final int nodeApiPort, final String path) {
+        try {
+            return new URI(scheme, null, nodeApiAddress, nodeApiPort, path, 
null, null);
+        } catch (final URISyntaxException e) {
+            throw new UriConstructionException(e);
+        }
+    }
+
+
+    /**
+     * A Callable for making an HTTP request to a single node and returning 
its response.
+     */
+    private class NodeHttpRequest implements Runnable {
+        private final NodeIdentifier nodeId;
+        private final String method;
+        private final URI uri;
+        private final Object entity;
+        private final Map<String, String> headers = new HashMap<>();
+        private final NodeRequestCompletionCallback callback;
+
+        private NodeHttpRequest(final NodeIdentifier nodeId, final String 
method,
+            final URI uri, final Object entity, final Map<String, String> 
headers, final NodeRequestCompletionCallback callback) {
+            this.nodeId = nodeId;
+            this.method = method;
+            this.uri = uri;
+            this.entity = entity;
+            this.headers.putAll(headers);
+            this.callback = callback;
+        }
+
+
+        @Override
+        public void run() {
+            NodeResponse nodeResponse;
+
+            try {
+                // create and send the request
+                final WebResource.Builder resourceBuilder = 
createResourceBuilder();
+                final String requestId = headers.get("x-nifi-request-id");
+
+                logger.debug("Replicating request {} {} to {}", method, 
uri.getPath(), nodeId);
+                nodeResponse = replicateRequest(resourceBuilder, nodeId, 
method, uri, requestId);
+            } catch (final Exception e) {
+                nodeResponse = new NodeResponse(nodeId, method, uri, e);
+                logger.warn("Failed to replicate request {} {} to {} due to 
{}", method, uri.getPath(), nodeId, e);
+                logger.warn("", e);
+            }
+
+            if (callback != null) {
+                logger.debug("Request {} {} completed for {}", method, 
uri.getPath(), nodeId);
+                callback.onCompletion(nodeResponse);
+            }
+        }
+
+
+        @SuppressWarnings({"rawtypes", "unchecked"})
+        private WebResource.Builder createResourceBuilder() {
+            // convert parameters to a more convenient data structure
+            final MultivaluedMap<String, String> map = new 
MultivaluedMapImpl();
+
+            if (entity instanceof MultivaluedMap) {
+                map.putAll((Map) entity);
+            }
+
+            // create the resource
+            WebResource resource = client.resource(uri);
+
+            if (responseMerger.isResponseInterpreted(uri, method)) {
+                resource.addFilter(new GZIPContentEncodingFilter(false));
+            }
+
+            // set the parameters as either query parameters or as request body
+            final WebResource.Builder builder;
+            if (HttpMethod.DELETE.equalsIgnoreCase(method) || 
HttpMethod.HEAD.equalsIgnoreCase(method) || 
HttpMethod.GET.equalsIgnoreCase(method) || 
HttpMethod.OPTIONS.equalsIgnoreCase(method)) {
+                resource = resource.queryParams(map);
+                builder = resource.getRequestBuilder();
+            } else {
+                if (entity == null) {
+                    builder = resource.entity(map);
+                } else {
+                    builder = resource.entity(entity);
+                }
+            }
+
+            // set headers
+            boolean foundContentType = false;
+            for (final Map.Entry<String, String> entry : headers.entrySet()) {
+                builder.header(entry.getKey(), entry.getValue());
+                if (entry.getKey().equalsIgnoreCase("content-type")) {
+                    foundContentType = true;
+                }
+            }
+
+            // set default content type
+            if (!foundContentType) {
+                // set default content type
+                builder.type(MediaType.APPLICATION_FORM_URLENCODED);
+            }
+
+            return builder;
+        }
+    }
+
+    private static interface NodeRequestCompletionCallback {
+        void onCompletion(NodeResponse nodeResponse);
+    }
+
+    private class PurgeExpiredRequestsTask implements Runnable {
+        @Override
+        public void run() {
+            final Set<String> expiredRequestIds = 
ThreadPoolRequestReplicator.this.responseMap.entrySet().stream()
+                .filter(entry -> entry.getValue().isOlderThan(30, 
TimeUnit.SECONDS)) // older than 30 seconds
+                .filter(entry -> entry.getValue().isComplete()) // is complete
+                .map(entry -> entry.getKey()) // get the request id
+                .collect(Collectors.toSet());
+
+            expiredRequestIds.forEach(id -> onResponseConsumed(id));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
index 66ad494..7fc764a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
@@ -62,15 +62,9 @@ public class StatusMerger {
             return;
         }
 
-        target.setActiveRemotePortCount(target.getActiveRemotePortCount() + 
toMerge.getActiveRemotePortCount());
         target.setActiveThreadCount(target.getActiveThreadCount() + 
toMerge.getActiveThreadCount());
         target.setBytesQueued(target.getBytesQueued() + 
toMerge.getBytesQueued());
-        target.setDisabledCount(target.getDisabledCount() + 
toMerge.getDisabledCount());
         target.setFlowFilesQueued(target.getFlowFilesQueued() + 
toMerge.getFlowFilesQueued());
-        target.setInactiveRemotePortCount(target.getInactiveRemotePortCount() 
+ toMerge.getInactiveRemotePortCount());
-        target.setInvalidCount(target.getInvalidCount() + 
toMerge.getInvalidCount());
-        target.setRunningCount(target.getRunningCount() + 
toMerge.getRunningCount());
-        target.setStoppedCount(target.getStoppedCount() + 
toMerge.getStoppedCount());
 
         target.setBulletins(mergeBulletins(target.getBulletins(), 
toMerge.getBulletins()));
         
target.setControllerServiceBulletins(mergeBulletins(target.getControllerServiceBulletins(),
 toMerge.getControllerServiceBulletins()));

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java
index f86c290..e5f171d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java
@@ -40,6 +40,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
 import org.apache.nifi.cluster.manager.HttpRequestReplicator;
 import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.cluster.manager.exception.UriConstructionException;
@@ -475,7 +476,7 @@ public class HttpRequestReplicatorImpl implements 
HttpRequestReplicator {
             // create the resource
             WebResource resource = client.resource(uri);
 
-            if (WebClusterManager.isResponseInterpreted(uri, method)) {
+            if (new StandardHttpResponseMerger().isResponseInterpreted(uri, 
method)) {
                 resource.addFilter(new GZIPContentEncodingFilter(false));
             }
 

Reply via email to