http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java
new file mode 100644
index 0000000..7dc598e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.core.StreamingOutput;
+
+import 
org.apache.nifi.cluster.coordination.http.endpoints.BulletinBoardEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.ComponentStateEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.ConnectionStatusEndpiontMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceReferenceEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.ControllerServicesEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.ControllerStatusEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.CountersEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.DropRequestEndpiontMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.FlowSnippetEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.GroupStatusEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.ListFlowFilesEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.PortStatusEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.ProcessorEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.ProcessorStatusEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.ProcessorsEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceEventEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceQueryEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupStatusEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupsEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.ReportingTasksEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.StatusHistoryEndpointMerger;
+import 
org.apache.nifi.cluster.coordination.http.endpoints.SystemDiagnosticsEndpointMerger;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.StatusMerger;
+import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.cluster.node.Node.Status;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinQuery;
+import org.apache.nifi.reporting.ComponentType;
+import org.apache.nifi.stream.io.NullOutputStream;
+import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
+import org.apache.nifi.web.api.entity.ControllerStatusEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StandardHttpResponseMerger implements HttpResponseMerger {
+    private Logger logger = 
LoggerFactory.getLogger(StandardHttpResponseMerger.class);
+
+    private static final int NODE_CONTINUE_STATUS_CODE = 150;
+    private final WebClusterManager clusterManager;
+
+    private static final List<EndpointResponseMerger> endpointMergers = new 
ArrayList<>();
+    static {
+        endpointMergers.add(new ControllerStatusEndpointMerger());
+        endpointMergers.add(new GroupStatusEndpointMerger());
+        endpointMergers.add(new ProcessorStatusEndpointMerger());
+        endpointMergers.add(new ConnectionStatusEndpiontMerger());
+        endpointMergers.add(new PortStatusEndpointMerger());
+        endpointMergers.add(new RemoteProcessGroupStatusEndpointMerger());
+        endpointMergers.add(new ProcessorEndpointMerger());
+        endpointMergers.add(new ProcessorsEndpointMerger());
+        endpointMergers.add(new RemoteProcessGroupEndpointMerger());
+        endpointMergers.add(new RemoteProcessGroupsEndpointMerger());
+        endpointMergers.add(new ProcessGroupEndpointMerger());
+        endpointMergers.add(new FlowSnippetEndpointMerger());
+        endpointMergers.add(new ProvenanceQueryEndpointMerger());
+        endpointMergers.add(new ProvenanceEventEndpointMerger());
+        endpointMergers.add(new ControllerServiceEndpointMerger());
+        endpointMergers.add(new ControllerServicesEndpointMerger());
+        endpointMergers.add(new ControllerServiceReferenceEndpointMerger());
+        endpointMergers.add(new ReportingTaskEndpointMerger());
+        endpointMergers.add(new ReportingTasksEndpointMerger());
+        endpointMergers.add(new DropRequestEndpiontMerger());
+        endpointMergers.add(new ListFlowFilesEndpointMerger());
+        endpointMergers.add(new ComponentStateEndpointMerger());
+        endpointMergers.add(new BulletinBoardEndpointMerger());
+        endpointMergers.add(new StatusHistoryEndpointMerger());
+        endpointMergers.add(new SystemDiagnosticsEndpointMerger());
+        endpointMergers.add(new CountersEndpointMerger());
+    }
+
+    public StandardHttpResponseMerger() {
+        this(null);
+    }
+
+    public StandardHttpResponseMerger(final WebClusterManager clusterManager) {
+        this.clusterManager = clusterManager;
+    }
+
+    @Override
+    public NodeResponse mergeResponses(final URI uri, final String httpMethod, 
final Set<NodeResponse> nodeResponses) {
+        final boolean hasSuccess = hasSuccessfulResponse(nodeResponses);
+        if (!hasSuccess) {
+            // If we have a response that is a 3xx, 4xx, or 5xx, then we want 
to choose that.
+            // Otherwise, it doesn't matter which one we choose. We do this 
because if we replicate
+            // a mutable request, it's possible that one node will respond 
with a 409, for instance, while
+            // others respond with a 150-Continue. We do not want to pick the 
150-Continue; instead, we want
+            // the failed response.
+            final NodeResponse clientResponse = 
nodeResponses.stream().filter(p -> p.getStatus() > 
299).findAny().orElse(nodeResponses.iterator().next());
+
+            // Drain the response from all nodes except for the 'chosen one'. 
This ensures that we don't
+            // leave data lingering on the socket and ensures that we don't 
consume the content of the response
+            // that we intend to respond with
+            drainResponses(nodeResponses, clientResponse);
+            return clientResponse;
+        }
+
+        // Determine which responses are successful
+        final Set<NodeResponse> successResponses = 
nodeResponses.stream().filter(p -> p.is2xx()).collect(Collectors.toSet());
+        final Set<NodeResponse> problematicResponses = 
nodeResponses.stream().filter(p -> !p.is2xx()).collect(Collectors.toSet());
+
+        // Choose any of the successful responses to be the 'chosen one'.
+        final NodeResponse clientResponse = successResponses.iterator().next();
+
+        final EndpointResponseMerger merger = getEndpointResponseMerger(uri, 
httpMethod);
+        if (merger == null) {
+            return clientResponse;
+        }
+
+        final NodeResponse response = merger.merge(uri, httpMethod, 
successResponses, problematicResponses, clientResponse);
+        if (clusterManager != null) {
+            mergeNCMBulletins(response, uri, httpMethod);
+        }
+
+        return response;
+    }
+
+    /**
+     * This method merges bulletins from the NCM. Eventually, the NCM will go 
away entirely, and
+     * at that point, we will completely remove this and the WebClusterManager 
as a member variable.
+     * However, until then, the bulletins from the NCM are important to 
include, since there is no other
+     * node that can include them.
+     *
+     * @param clientResponse the Node Response that will be returned to the 
client
+     * @param uri the URI
+     * @param method the HTTP Method
+     *
+     * @deprecated this method exists only until we can remove the Cluster 
Manager from the picture all together. It will then be removed.
+     */
+    @Deprecated
+    private void mergeNCMBulletins(final NodeResponse clientResponse, final 
URI uri, final String method) {
+        // determine if we have at least one response
+        final boolean hasClientResponse = clientResponse != null;
+        final boolean hasSuccessfulClientResponse = hasClientResponse && 
clientResponse.is2xx();
+
+        if (hasSuccessfulClientResponse && 
clusterManager.isControllerStatusEndpoint(uri, method)) {
+            // for now, we need to merge the NCM's bulletins too.
+            final ControllerStatusEntity responseEntity = 
(ControllerStatusEntity) clientResponse.getUpdatedEntity();
+            final ControllerStatusDTO mergedStatus = 
responseEntity.getControllerStatus();
+
+            final int totalNodeCount = clusterManager.getNodeIds().size();
+            final int connectedNodeCount = 
clusterManager.getNodeIds(Status.CONNECTED).size();
+
+            final List<Bulletin> ncmControllerBulletins = 
clusterManager.getBulletinRepository().findBulletinsForController();
+            
mergedStatus.setBulletins(clusterManager.mergeNCMBulletins(mergedStatus.getBulletins(),
 ncmControllerBulletins));
+
+            // get the controller service bulletins
+            final BulletinQuery controllerServiceQuery = new 
BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build();
+            final List<Bulletin> ncmServiceBulletins = 
clusterManager.getBulletinRepository().findBulletins(controllerServiceQuery);
+            
mergedStatus.setControllerServiceBulletins(clusterManager.mergeNCMBulletins(mergedStatus.getControllerServiceBulletins(),
 ncmServiceBulletins));
+
+            // get the reporting task bulletins
+            final BulletinQuery reportingTaskQuery = new 
BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build();
+            final List<Bulletin> ncmReportingTaskBulletins = 
clusterManager.getBulletinRepository().findBulletins(reportingTaskQuery);
+            
mergedStatus.setReportingTaskBulletins(clusterManager.mergeNCMBulletins(mergedStatus.getReportingTaskBulletins(),
 ncmReportingTaskBulletins));
+
+            mergedStatus.setConnectedNodeCount(connectedNodeCount);
+            mergedStatus.setTotalNodeCount(totalNodeCount);
+            StatusMerger.updatePrettyPrintedFields(mergedStatus);
+        }
+    }
+
+
+    @Override
+    public Set<NodeResponse> getProblematicNodeResponses(final 
Set<NodeResponse> allResponses) {
+        // Check if there are any 2xx responses
+        final boolean containsSuccessfulResponse = 
hasSuccessfulResponse(allResponses);
+
+        if (containsSuccessfulResponse) {
+            // If there is a 2xx response, we consider a response to be 
problematic if it is not 2xx
+            return allResponses.stream().filter(p -> 
!p.is2xx()).collect(Collectors.toSet());
+        } else {
+            // If no node is successful, we consider a problematic response to 
be only those that are 5xx
+            return allResponses.stream().filter(p -> 
p.is5xx()).collect(Collectors.toSet());
+        }
+    }
+
+    @Override
+    public boolean isResponseInterpreted(final URI uri, final String 
httpMethod) {
+        return getEndpointResponseMerger(uri, httpMethod) != null;
+    }
+
+    private static EndpointResponseMerger getEndpointResponseMerger(final URI 
uri, final String httpMethod) {
+        return endpointMergers.stream().filter(p -> p.canHandle(uri, 
httpMethod)).findFirst().orElse(null);
+    }
+
+    private boolean hasSuccessfulResponse(final Set<NodeResponse> 
allResponses) {
+        return allResponses.stream().anyMatch(p -> p.is2xx());
+    }
+
+
+    private void drainResponses(final Set<NodeResponse> responses, final 
NodeResponse exclude) {
+        responses.stream()
+            .parallel() // parallelize the draining of the responses, since we 
have multiple streams to consume
+            .filter(response -> response != exclude) // don't include the 
explicitly excluded node
+            .filter(response -> response.getStatus() != 
NODE_CONTINUE_STATUS_CODE) // don't include any 150-NodeContinue responses 
because they contain no content
+            .forEach(response -> drainResponse(response)); // drain all node 
responses that didn't get filtered out
+    }
+
+    private void drainResponse(final NodeResponse response) {
+        if (response.hasThrowable()) {
+            return;
+        }
+
+        try {
+            ((StreamingOutput) response.getResponse().getEntity()).write(new 
NullOutputStream());
+        } catch (final IOException ioe) {
+            logger.info("Failed clearing out non-client response buffer from " 
+ response.getNodeId() + " due to: " + ioe, ioe);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractMultiEntityEndpoint.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractMultiEntityEndpoint.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractMultiEntityEndpoint.java
new file mode 100644
index 0000000..d72814b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractMultiEntityEndpoint.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.entity.Entity;
+
+public abstract class AbstractMultiEntityEndpoint<EntityType extends Entity, 
DtoType> implements EndpointResponseMerger {
+
+    @Override
+    public final NodeResponse merge(final URI uri, final String method, final 
Set<NodeResponse> successfulResponses, final Set<NodeResponse> 
problematicResponses, final NodeResponse clientResponse) {
+        if (!canHandle(uri, method)) {
+            throw new IllegalArgumentException("Cannot use Endpoint Mapper of 
type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", 
HTTP Method " + method);
+        }
+
+        final EntityType responseEntity = 
clientResponse.getClientResponse().getEntity(getEntityClass());
+        final Set<DtoType> dtos = getDtos(responseEntity);
+
+        final Map<String, Map<NodeIdentifier, DtoType>> dtoMap = new 
HashMap<>();
+        for (final NodeResponse nodeResponse : successfulResponses) {
+            final EntityType nodeResponseEntity = nodeResponse == 
clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(getEntityClass());
+            final Set<DtoType> nodeDtos = getDtos(nodeResponseEntity);
+
+            for (final DtoType nodeDto : nodeDtos) {
+                final NodeIdentifier nodeId = nodeResponse.getNodeId();
+                Map<NodeIdentifier, DtoType> innerMap = dtoMap.get(nodeId);
+                if (innerMap == null) {
+                    innerMap = new HashMap<>();
+                    dtoMap.put(getComponentId(nodeDto), innerMap);
+                }
+
+                innerMap.put(nodeResponse.getNodeId(), nodeDto);
+            }
+        }
+
+        for (final DtoType dto : dtos) {
+            final String componentId = getComponentId(dto);
+            final Map<NodeIdentifier, DtoType> mergeMap = 
dtoMap.get(componentId);
+
+            mergeResponses(dto, mergeMap, successfulResponses, 
problematicResponses);
+        }
+
+        // create a new client response
+        return new NodeResponse(clientResponse, responseEntity);
+    }
+
+
+    /**
+     * @return the class that represents the type of Entity that is expected 
by this response mapper
+     */
+    protected abstract Class<EntityType> getEntityClass();
+
+    /**
+     * Extracts the DTOs from the given entity
+     *
+     * @param entity the entity to extract the DTOs from
+     * @return the DTOs from the given entity
+     */
+    protected abstract Set<DtoType> getDtos(EntityType entity);
+
+    /**
+     * Extracts the ID of the component that the DTO refers to
+     * @param dto the DTO to extract the ID from
+     * @return the ID of the component that the DTO refers to
+     */
+    protected abstract String getComponentId(DtoType dto);
+
+    /**
+     * Merges the responses from all nodes in the given map into the single 
given DTO
+     *
+     * @param clientDto the DTO to merge responses into
+     * @param dtoMap the responses from all nodes
+     * @param successfulResponses the responses from nodes that completed the 
request successfully
+     * @param problematicResponses the responses from nodes that did not 
complete the request successfully
+     */
+    protected abstract void mergeResponses(final DtoType clientDto, 
Map<NodeIdentifier, DtoType> dtoMap, Set<NodeResponse> successfulResponses, 
Set<NodeResponse> problematicResponses);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractNodeStatusEndpoint.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractNodeStatusEndpoint.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractNodeStatusEndpoint.java
new file mode 100644
index 0000000..cc73e34
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractNodeStatusEndpoint.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.entity.Entity;
+
+public abstract class AbstractNodeStatusEndpoint<EntityType extends Entity, 
DtoType> extends AbstractSingleEntityEndpoint<EntityType, DtoType> {
+
+    @Override
+    protected final void mergeResponses(DtoType clientDto, Map<NodeIdentifier, 
DtoType> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> 
problematicResponses) {
+        final NodeIdentifier selectedNodeId = 
dtoMap.entrySet().stream().filter(e -> e.getValue() == clientDto).map(e -> 
e.getKey()).findFirst().orElse(null);
+        if (selectedNodeId == null) {
+            throw new IllegalArgumentException("Attempted to merge Status 
request but could not find the appropriate Node Identifier");
+        }
+
+        mergeResponses(clientDto, dtoMap, selectedNodeId);
+    }
+
+    protected abstract void mergeResponses(DtoType clientDto, 
Map<NodeIdentifier, DtoType> dtoMap, NodeIdentifier selectedNodeId);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractSingleEntityEndpoint.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractSingleEntityEndpoint.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractSingleEntityEndpoint.java
new file mode 100644
index 0000000..66db949
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractSingleEntityEndpoint.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.entity.Entity;
+
+public abstract class AbstractSingleEntityEndpoint<EntityType extends Entity, 
DtoType> implements EndpointResponseMerger {
+
+    @Override
+    public final NodeResponse merge(final URI uri, final String method, final 
Set<NodeResponse> successfulResponses, final Set<NodeResponse> 
problematicResponses, final NodeResponse clientResponse) {
+        if (!canHandle(uri, method)) {
+            throw new IllegalArgumentException("Cannot use Endpoint Mapper of 
type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", 
HTTP Method " + method);
+        }
+
+        final EntityType responseEntity = 
clientResponse.getClientResponse().getEntity(getEntityClass());
+        final DtoType dto = getDto(responseEntity);
+
+        final Map<NodeIdentifier, DtoType> dtoMap = new HashMap<>();
+        for (final NodeResponse nodeResponse : successfulResponses) {
+            final EntityType nodeResponseEntity = nodeResponse == 
clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(getEntityClass());
+            final DtoType nodeDto = getDto(nodeResponseEntity);
+            dtoMap.put(nodeResponse.getNodeId(), nodeDto);
+        }
+
+        mergeResponses(dto, dtoMap, successfulResponses, problematicResponses);
+        return new NodeResponse(clientResponse, responseEntity);
+    }
+
+
+    /**
+     * Normalizes the validation errors by prepending the corresponding nodes 
when the error does not exist across all nodes.
+     *
+     * @param validationErrorMap map
+     * @param totalNodes total
+     * @return normalized errors
+     */
+    protected Set<String> normalizedMergedValidationErrors(final Map<String, 
Set<NodeIdentifier>> validationErrorMap, int totalNodes) {
+        final Set<String> normalizedValidationErrors = new HashSet<>();
+        for (final Map.Entry<String, Set<NodeIdentifier>> validationEntry : 
validationErrorMap.entrySet()) {
+            final String msg = validationEntry.getKey();
+            final Set<NodeIdentifier> nodeIds = validationEntry.getValue();
+
+            if (nodeIds.size() == totalNodes) {
+                normalizedValidationErrors.add(msg);
+            } else {
+                nodeIds.forEach(id -> 
normalizedValidationErrors.add(id.getApiAddress() + ":" + id.getApiPort() + " 
-- " + msg));
+            }
+        }
+        return normalizedValidationErrors;
+    }
+
+    /**
+     * Merges the validation errors into the specified map, recording the 
corresponding node identifier.
+     *
+     * @param validationErrorMap map
+     * @param nodeId id
+     * @param nodeValidationErrors errors
+     */
+    protected void mergeValidationErrors(final Map<String, 
Set<NodeIdentifier>> validationErrorMap, final NodeIdentifier nodeId, final 
Collection<String> nodeValidationErrors) {
+        if (nodeValidationErrors != null) {
+            nodeValidationErrors.stream().forEach(
+                err -> validationErrorMap.computeIfAbsent(err, k -> new 
HashSet<NodeIdentifier>())
+                    .add(nodeId));
+        }
+    }
+
+    /**
+     * @return the class that represents the type of Entity that is expected 
by this response mapper
+     */
+    protected abstract Class<EntityType> getEntityClass();
+
+    /**
+     * Extracts the DTO from the given entity
+     *
+     * @param entity the entity to extract the DTO from
+     * @return the DTO from the given entity
+     */
+    protected abstract DtoType getDto(EntityType entity);
+
+    /**
+     * Merges the responses from all nodes in the given map into the single 
given DTO
+     *
+     * @param clientDto the DTO to merge responses into
+     * @param dtoMap the responses from all nodes
+     * @param successfulResponses the responses from nodes that completed the 
request successfully
+     * @param problematicResponses the responses from nodes that did not 
complete the request successfully
+     */
+    protected abstract void mergeResponses(DtoType clientDto, 
Map<NodeIdentifier, DtoType> dtoMap, Set<NodeResponse> successfulResponses, 
Set<NodeResponse> problematicResponses);
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java
new file mode 100644
index 0000000..799d279
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.BulletinBoardDTO;
+import org.apache.nifi.web.api.dto.BulletinDTO;
+import org.apache.nifi.web.api.entity.BulletinBoardEntity;
+
+public class BulletinBoardEndpointMerger extends 
AbstractSingleEntityEndpoint<BulletinBoardEntity, BulletinBoardDTO> {
+    public static final Pattern BULLETIN_BOARD_URI_PATTERN = 
Pattern.compile("/nifi-api/flow/bulletin-board");
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        return "GET".equalsIgnoreCase(method) && 
BULLETIN_BOARD_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    @Override
+    protected Class<BulletinBoardEntity> getEntityClass() {
+        return BulletinBoardEntity.class;
+    }
+
+    @Override
+    protected BulletinBoardDTO getDto(BulletinBoardEntity entity) {
+        return entity.getBulletinBoard();
+    }
+
+    @Override
+    protected void mergeResponses(BulletinBoardDTO clientDto, 
Map<NodeIdentifier, BulletinBoardDTO> dtoMap, Set<NodeResponse> 
successfulResponses, Set<NodeResponse> problematicResponses) {
+        final List<BulletinDTO> bulletinDtos = new ArrayList<>();
+        for (final Map.Entry<NodeIdentifier, BulletinBoardDTO> entry : 
dtoMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final BulletinBoardDTO boardDto = entry.getValue();
+            final String nodeAddress = nodeId.getApiAddress() + ":" + 
nodeId.getApiPort();
+
+            for (final BulletinDTO bulletin : boardDto.getBulletins()) {
+                bulletin.setNodeAddress(nodeAddress);
+                bulletinDtos.add(bulletin);
+            }
+        }
+
+        Collections.sort(bulletinDtos, new Comparator<BulletinDTO>() {
+            @Override
+            public int compare(final BulletinDTO o1, final BulletinDTO o2) {
+                final int timeComparison = 
o1.getTimestamp().compareTo(o2.getTimestamp());
+                if (timeComparison != 0) {
+                    return timeComparison;
+                }
+
+                return o1.getNodeAddress().compareTo(o2.getNodeAddress());
+            }
+        });
+
+        clientDto.setBulletins(bulletinDtos);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ComponentStateEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ComponentStateEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ComponentStateEndpointMerger.java
new file mode 100644
index 0000000..0598259
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ComponentStateEndpointMerger.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.state.SortedStateUtils;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
+import org.apache.nifi.web.api.dto.StateEntryDTO;
+import org.apache.nifi.web.api.dto.StateMapDTO;
+import org.apache.nifi.web.api.entity.ComponentStateEntity;
+
+public class ComponentStateEndpointMerger extends 
AbstractSingleEntityEndpoint<ComponentStateEntity, ComponentStateDTO> {
+    public static final Pattern PROCESSOR_STATE_URI_PATTERN = 
Pattern.compile("/nifi-api/processors/[a-f0-9\\-]{36}/state");
+    public static final Pattern CONTROLLER_SERVICE_STATE_URI_PATTERN = 
Pattern.compile("/nifi-api/controller-services/node/[a-f0-9\\-]{36}/state");
+    public static final Pattern REPORTING_TASK_STATE_URI_PATTERN = 
Pattern.compile("/nifi-api/reporting-tasks/node/[a-f0-9\\-]{36}/state");
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        if (!"GET".equalsIgnoreCase(method)) {
+            return false;
+        }
+
+        return PROCESSOR_STATE_URI_PATTERN.matcher(uri.getPath()).matches()
+            || 
CONTROLLER_SERVICE_STATE_URI_PATTERN.matcher(uri.getPath()).matches()
+            || 
REPORTING_TASK_STATE_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    @Override
+    protected Class<ComponentStateEntity> getEntityClass() {
+        return ComponentStateEntity.class;
+    }
+
+    @Override
+    protected ComponentStateDTO getDto(ComponentStateEntity entity) {
+        return entity.getComponentState();
+    }
+
+    @Override
+    protected void mergeResponses(ComponentStateDTO clientDto, 
Map<NodeIdentifier, ComponentStateDTO> dtoMap, Set<NodeResponse> 
successfulResponses, Set<NodeResponse> problematicResponses) {
+        List<StateEntryDTO> localStateEntries = new ArrayList<>();
+
+        int totalStateEntries = 0;
+        for (final Map.Entry<NodeIdentifier, ComponentStateDTO> nodeEntry : 
dtoMap.entrySet()) {
+            final ComponentStateDTO nodeComponentState = nodeEntry.getValue();
+            final NodeIdentifier nodeId = nodeEntry.getKey();
+            final String nodeAddress = nodeId.getApiAddress() + ":" + 
nodeId.getApiPort();
+
+            final StateMapDTO nodeLocalStateMap = 
nodeComponentState.getLocalState();
+            if (nodeLocalStateMap.getState() != null) {
+                totalStateEntries += nodeLocalStateMap.getTotalEntryCount();
+
+                for (final StateEntryDTO nodeStateEntry : 
nodeLocalStateMap.getState()) {
+                    nodeStateEntry.setClusterNodeId(nodeId.getId());
+                    nodeStateEntry.setClusterNodeAddress(nodeAddress);
+                    localStateEntries.add(nodeStateEntry);
+                }
+            }
+        }
+
+        // ensure appropriate sort
+        Collections.sort(localStateEntries, 
SortedStateUtils.getEntryDtoComparator());
+
+        // sublist if necessary
+        if (localStateEntries.size() > 
SortedStateUtils.MAX_COMPONENT_STATE_ENTRIES) {
+            localStateEntries = localStateEntries.subList(0, 
SortedStateUtils.MAX_COMPONENT_STATE_ENTRIES);
+        }
+
+        // add all the local state entries
+        clientDto.getLocalState().setTotalEntryCount(totalStateEntries);
+        clientDto.getLocalState().setState(localStateEntries);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectionStatusEndpiontMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectionStatusEndpiontMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectionStatusEndpiontMerger.java
new file mode 100644
index 0000000..05bf0f5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectionStatusEndpiontMerger.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.manager.StatusMerger;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
+import org.apache.nifi.web.api.dto.status.NodeConnectionStatusSnapshotDTO;
+import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
+
+public class ConnectionStatusEndpiontMerger extends 
AbstractNodeStatusEndpoint<ConnectionStatusEntity, ConnectionStatusDTO> {
+    public static final Pattern CONNECTION_STATUS_URI_PATTERN = 
Pattern.compile("/nifi-api/flow/connections/[a-f0-9\\-]{36}/status");
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        return "GET".equalsIgnoreCase(method) && 
CONNECTION_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    @Override
+    protected Class<ConnectionStatusEntity> getEntityClass() {
+        return ConnectionStatusEntity.class;
+    }
+
+    @Override
+    protected ConnectionStatusDTO getDto(ConnectionStatusEntity entity) {
+        return entity.getConnectionStatus();
+    }
+
+    @Override
+    protected void mergeResponses(ConnectionStatusDTO clientDto, 
Map<NodeIdentifier, ConnectionStatusDTO> dtoMap, NodeIdentifier selectedNodeId) 
{
+        final ConnectionStatusDTO mergedConnectionStatus = clientDto;
+        mergedConnectionStatus.setNodeSnapshots(new 
ArrayList<NodeConnectionStatusSnapshotDTO>());
+
+        final NodeConnectionStatusSnapshotDTO selectedNodeSnapshot = new 
NodeConnectionStatusSnapshotDTO();
+        
selectedNodeSnapshot.setStatusSnapshot(clientDto.getAggregateSnapshot().clone());
+        selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+        selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+        selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
+
+        mergedConnectionStatus.getNodeSnapshots().add(selectedNodeSnapshot);
+
+        // merge the other nodes
+        for (final Map.Entry<NodeIdentifier, ConnectionStatusDTO> entry : 
dtoMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final ConnectionStatusDTO nodeConnectionStatus = entry.getValue();
+            if (nodeConnectionStatus == clientDto) {
+                continue;
+            }
+
+            StatusMerger.merge(mergedConnectionStatus, nodeConnectionStatus, 
nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceEndpointMerger.java
new file mode 100644
index 0000000..1591604
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceEndpointMerger.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import 
org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity;
+
+public class ControllerServiceEndpointMerger extends 
AbstractSingleEntityEndpoint<ControllerServiceEntity, ControllerServiceDTO> {
+    public static final String CONTROLLER_SERVICES_URI = 
"/nifi-api/controller-services/node";
+    public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = 
Pattern.compile("/nifi-api/controller-services/node/[a-f0-9\\-]{36}");
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) 
&& CONTROLLER_SERVICE_URI_PATTERN.matcher(uri.getPath()).matches()) {
+            return true;
+        } else if ("POST".equalsIgnoreCase(method) && 
CONTROLLER_SERVICES_URI.equals(uri.getPath())) {
+            return true;
+        }
+
+        return false;
+    }
+
+    @Override
+    protected Class<ControllerServiceEntity> getEntityClass() {
+        return ControllerServiceEntity.class;
+    }
+
+    @Override
+    protected ControllerServiceDTO getDto(ControllerServiceEntity entity) {
+        return entity.getControllerService();
+    }
+
+    @Override
+    protected void mergeResponses(ControllerServiceDTO clientDto, 
Map<NodeIdentifier, ControllerServiceDTO> dtoMap, Set<NodeResponse> 
successfulResponses, Set<NodeResponse> problematicResponses) {
+        final Map<String, Set<NodeIdentifier>> validationErrorMap = new 
HashMap<>();
+        final Set<ControllerServiceReferencingComponentEntity> 
referencingComponents = clientDto.getReferencingComponents();
+        final Map<NodeIdentifier, 
Set<ControllerServiceReferencingComponentEntity>> nodeReferencingComponentsMap 
= new HashMap<>();
+
+        String state = null;
+        for (final Map.Entry<NodeIdentifier, ControllerServiceDTO> nodeEntry : 
dtoMap.entrySet()) {
+            final NodeIdentifier nodeId = nodeEntry.getKey();
+            final ControllerServiceDTO nodeControllerService = 
nodeEntry.getValue();
+
+            if (state == null) {
+                if 
(ControllerServiceState.DISABLING.name().equals(nodeControllerService.getState()))
 {
+                    state = ControllerServiceState.DISABLING.name();
+                } else if 
(ControllerServiceState.ENABLING.name().equals(nodeControllerService.getState()))
 {
+                    state = ControllerServiceState.ENABLING.name();
+                }
+            }
+
+            nodeReferencingComponentsMap.put(nodeId, 
nodeControllerService.getReferencingComponents());
+
+            // merge the validation errors
+            mergeValidationErrors(validationErrorMap, nodeId, 
nodeControllerService.getValidationErrors());
+        }
+
+        // merge the referencing components
+        mergeControllerServiceReferences(referencingComponents, 
nodeReferencingComponentsMap);
+
+        // store the 'transition' state is applicable
+        if (state != null) {
+            clientDto.setState(state);
+        }
+
+        // set the merged the validation errors
+        
clientDto.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap,
 dtoMap.size()));
+    }
+
+    public static void 
mergeControllerServiceReferences(Set<ControllerServiceReferencingComponentEntity>
 referencingComponents,
+        Map<NodeIdentifier, Set<ControllerServiceReferencingComponentEntity>> 
referencingComponentMap) {
+
+        final Map<String, Integer> activeThreadCounts = new HashMap<>();
+        final Map<String, String> states = new HashMap<>();
+        for (final Map.Entry<NodeIdentifier, 
Set<ControllerServiceReferencingComponentEntity>> nodeEntry : 
referencingComponentMap.entrySet()) {
+            final Set<ControllerServiceReferencingComponentEntity> 
nodeReferencingComponents = nodeEntry.getValue();
+
+            // go through all the nodes referencing components
+            if (nodeReferencingComponents != null) {
+                for (final ControllerServiceReferencingComponentEntity 
nodeReferencingComponentEntity : nodeReferencingComponents) {
+                    final ControllerServiceReferencingComponentDTO 
nodeReferencingComponent = 
nodeReferencingComponentEntity.getControllerServiceReferencingComponent();
+
+                    // handle active thread counts
+                    if (nodeReferencingComponent.getActiveThreadCount() != 
null && nodeReferencingComponent.getActiveThreadCount() > 0) {
+                        final Integer current = 
activeThreadCounts.get(nodeReferencingComponent.getId());
+                        if (current == null) {
+                            
activeThreadCounts.put(nodeReferencingComponent.getId(), 
nodeReferencingComponent.getActiveThreadCount());
+                        } else {
+                            
activeThreadCounts.put(nodeReferencingComponent.getId(), 
nodeReferencingComponent.getActiveThreadCount() + current);
+                        }
+                    }
+
+                    // handle controller service state
+                    final String state = 
states.get(nodeReferencingComponent.getId());
+                    if (state == null) {
+                        if 
(ControllerServiceState.DISABLING.name().equals(nodeReferencingComponent.getState()))
 {
+                            states.put(nodeReferencingComponent.getId(), 
ControllerServiceState.DISABLING.name());
+                        } else if 
(ControllerServiceState.ENABLING.name().equals(nodeReferencingComponent.getState()))
 {
+                            states.put(nodeReferencingComponent.getId(), 
ControllerServiceState.ENABLING.name());
+                        }
+                    }
+                }
+            }
+        }
+
+        // go through each referencing components
+        for (final ControllerServiceReferencingComponentEntity 
referencingComponent : referencingComponents) {
+            final Integer activeThreadCount = 
activeThreadCounts.get(referencingComponent.getId());
+            if (activeThreadCount != null) {
+                
referencingComponent.getControllerServiceReferencingComponent().setActiveThreadCount(activeThreadCount);
+            }
+
+            final String state = states.get(referencingComponent.getId());
+            if (state != null) {
+                
referencingComponent.getControllerServiceReferencingComponent().setState(state);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceReferenceEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceReferenceEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceReferenceEndpointMerger.java
new file mode 100644
index 0000000..1fad628
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceReferenceEndpointMerger.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
+import 
org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity;
+import 
org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
+
+public class ControllerServiceReferenceEndpointMerger implements 
EndpointResponseMerger {
+    public static final Pattern CONTROLLER_SERVICE_REFERENCES_URI_PATTERN = 
Pattern.compile("/nifi-api/controller-services/node/[a-f0-9\\-]{36}/references");
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) 
&& CONTROLLER_SERVICE_REFERENCES_URI_PATTERN.matcher(uri.getPath()).matches()) {
+            return true;
+        }
+
+        return false;
+    }
+
+    @Override
+    public NodeResponse merge(URI uri, String method, Set<NodeResponse> 
successfulResponses, Set<NodeResponse> problematicResponses, NodeResponse 
clientResponse) {
+        if (!canHandle(uri, method)) {
+            throw new IllegalArgumentException("Cannot use Endpoint Mapper of 
type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", 
HTTP Method " + method);
+        }
+
+        final ControllerServiceReferencingComponentsEntity responseEntity = 
clientResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
+        final Set<ControllerServiceReferencingComponentEntity> 
referencingComponents = 
responseEntity.getControllerServiceReferencingComponents();
+
+        final Map<NodeIdentifier, 
Set<ControllerServiceReferencingComponentEntity>> resultsMap = new HashMap<>();
+        for (final NodeResponse nodeResponse : successfulResponses) {
+            final ControllerServiceReferencingComponentsEntity 
nodeResponseEntity = nodeResponse == clientResponse ? responseEntity
+                : 
nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
+            final Set<ControllerServiceReferencingComponentEntity> 
nodeReferencingComponents = 
nodeResponseEntity.getControllerServiceReferencingComponents();
+
+            resultsMap.put(nodeResponse.getNodeId(), 
nodeReferencingComponents);
+        }
+
+        
ControllerServiceEndpointMerger.mergeControllerServiceReferences(referencingComponents,
 resultsMap);
+
+        return new NodeResponse(clientResponse, responseEntity);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServicesEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServicesEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServicesEndpointMerger.java
new file mode 100644
index 0000000..097c974
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServicesEndpointMerger.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.apache.nifi.web.api.entity.ControllerServicesEntity;
+
+public class ControllerServicesEndpointMerger extends 
AbstractMultiEntityEndpoint<ControllerServicesEntity, ControllerServiceDTO> {
+    public static final String CONTROLLER_SERVICES_URI = 
"/nifi-api/controller-services/node";
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        return "GET".equalsIgnoreCase(method) && 
CONTROLLER_SERVICES_URI.equals(uri.getPath());
+    }
+
+    @Override
+    protected Class<ControllerServicesEntity> getEntityClass() {
+        return ControllerServicesEntity.class;
+    }
+
+    @Override
+    protected Set<ControllerServiceDTO> getDtos(ControllerServicesEntity 
entity) {
+        return entity.getControllerServices();
+    }
+
+    @Override
+    protected String getComponentId(ControllerServiceDTO dto) {
+        return dto.getId();
+    }
+
+    @Override
+    protected void mergeResponses(ControllerServiceDTO clientDto, 
Map<NodeIdentifier, ControllerServiceDTO> dtoMap, Set<NodeResponse> 
successfulResponses, Set<NodeResponse> problematicResponses) {
+        new ControllerServiceEndpointMerger().mergeResponses(clientDto, 
dtoMap, successfulResponses, problematicResponses);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerStatusEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerStatusEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerStatusEndpointMerger.java
new file mode 100644
index 0000000..50514f8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerStatusEndpointMerger.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.StatusMerger;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.BulletinDTO;
+import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
+import org.apache.nifi.web.api.entity.ControllerStatusEntity;
+
+public class ControllerStatusEndpointMerger extends 
AbstractSingleEntityEndpoint<ControllerStatusEntity, ControllerStatusDTO> {
+    public static final Pattern CONTROLLER_STATUS_URI_PATTERN = 
Pattern.compile("/nifi-api/flow/status");
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        return "GET".equalsIgnoreCase(method) && 
CONTROLLER_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    @Override
+    protected Class<ControllerStatusEntity> getEntityClass() {
+        return ControllerStatusEntity.class;
+    }
+
+    @Override
+    protected ControllerStatusDTO getDto(ControllerStatusEntity entity) {
+        return entity.getControllerStatus();
+    }
+
+    @Override
+    protected void mergeResponses(ControllerStatusDTO clientDto, 
Map<NodeIdentifier, ControllerStatusDTO> dtoMap, Set<NodeResponse> 
successfulResponses, Set<NodeResponse> problematicResponses) {
+        ControllerStatusDTO mergedStatus = clientDto;
+        for (final Map.Entry<NodeIdentifier, ControllerStatusDTO> entry : 
dtoMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final ControllerStatusDTO nodeStatus = entry.getValue();
+
+            final String nodeAddress = nodeId.getApiAddress() + ":" + 
nodeId.getApiPort();
+            for (final BulletinDTO bulletin : nodeStatus.getBulletins()) {
+                bulletin.setNodeAddress(nodeAddress);
+            }
+            for (final BulletinDTO bulletin : 
nodeStatus.getControllerServiceBulletins()) {
+                bulletin.setNodeAddress(nodeAddress);
+            }
+            for (final BulletinDTO bulletin : 
nodeStatus.getReportingTaskBulletins()) {
+                bulletin.setNodeAddress(nodeAddress);
+            }
+
+            if (nodeStatus == mergedStatus) {
+                continue;
+            }
+
+            StatusMerger.merge(mergedStatus, nodeStatus);
+        }
+
+        final int totalNodeCount = successfulResponses.size() + 
problematicResponses.size();
+        final int connectedNodeCount = successfulResponses.size(); // all 
nodes that responded successfully must be connected. Those that did not will be 
disconnected.
+
+        mergedStatus.setConnectedNodeCount(connectedNodeCount);
+        mergedStatus.setTotalNodeCount(totalNodeCount);
+        StatusMerger.updatePrettyPrintedFields(mergedStatus);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/CountersEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/CountersEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/CountersEndpointMerger.java
new file mode 100644
index 0000000..22b82b3
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/CountersEndpointMerger.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.manager.StatusMerger;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.CountersDTO;
+import org.apache.nifi.web.api.dto.NodeCountersSnapshotDTO;
+import org.apache.nifi.web.api.entity.CountersEntity;
+
+public class CountersEndpointMerger extends 
AbstractNodeStatusEndpoint<CountersEntity, CountersDTO> {
+    public static final Pattern COUNTERS_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/counters");
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        return "GET".equalsIgnoreCase(method) && 
COUNTERS_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    @Override
+    protected Class<CountersEntity> getEntityClass() {
+        return CountersEntity.class;
+    }
+
+    @Override
+    protected CountersDTO getDto(CountersEntity entity) {
+        return entity.getCounters();
+    }
+
+    @Override
+    protected void mergeResponses(CountersDTO clientDto, Map<NodeIdentifier, 
CountersDTO> dtoMap, NodeIdentifier selectedNodeId) {
+        final CountersDTO mergedCounters = clientDto;
+        mergedCounters.setNodeSnapshots(new 
ArrayList<NodeCountersSnapshotDTO>());
+
+        final NodeCountersSnapshotDTO selectedNodeSnapshot = new 
NodeCountersSnapshotDTO();
+        
selectedNodeSnapshot.setSnapshot(clientDto.getAggregateSnapshot().clone());
+        selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+        selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+        selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
+
+        mergedCounters.getNodeSnapshots().add(selectedNodeSnapshot);
+
+        for (final Map.Entry<NodeIdentifier, CountersDTO> entry : 
dtoMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final CountersDTO toMerge = entry.getValue();
+            if (toMerge == clientDto) {
+                continue;
+            }
+
+            StatusMerger.merge(mergedCounters, toMerge, nodeId.getId(), 
nodeId.getApiAddress(), nodeId.getApiPort());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/DropRequestEndpiontMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/DropRequestEndpiontMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/DropRequestEndpiontMerger.java
new file mode 100644
index 0000000..f6025b8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/DropRequestEndpiontMerger.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.queue.DropFlowFileState;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.web.api.dto.DropRequestDTO;
+import org.apache.nifi.web.api.entity.DropRequestEntity;
+
+public class DropRequestEndpiontMerger extends 
AbstractSingleEntityEndpoint<DropRequestEntity, DropRequestDTO> {
+    public static final Pattern DROP_REQUESTS_URI = 
Pattern.compile("/nifi-api/flowfile-queues/[a-f0-9\\-]{36}/drop-requests");
+    public static final Pattern DROP_REQUEST_URI = 
Pattern.compile("/nifi-api/flowfile-queues/[a-f0-9\\-]{36}/drop-requests/[a-f0-9\\-]{36}");
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        if (("GET".equalsIgnoreCase(method) || 
"DELETE".equalsIgnoreCase(method)) && 
DROP_REQUEST_URI.matcher(uri.getPath()).matches()) {
+            return true;
+        } else if (("POST".equalsIgnoreCase(method) && 
DROP_REQUESTS_URI.matcher(uri.getPath()).matches())) {
+            return true;
+        }
+
+        return false;
+    }
+
+    @Override
+    protected Class<DropRequestEntity> getEntityClass() {
+        return DropRequestEntity.class;
+    }
+
+    @Override
+    protected DropRequestDTO getDto(DropRequestEntity entity) {
+        return entity.getDropRequest();
+    }
+
+    @Override
+    protected void mergeResponses(DropRequestDTO clientDto, 
Map<NodeIdentifier, DropRequestDTO> dtoMap, Set<NodeResponse> 
successfulResponses, Set<NodeResponse> problematicResponses) {
+        boolean nodeWaiting = false;
+        int originalCount = 0;
+        long originalSize = 0;
+        int currentCount = 0;
+        long currentSize = 0;
+        int droppedCount = 0;
+        long droppedSize = 0;
+
+        DropFlowFileState state = null;
+        boolean allFinished = true;
+        String failureReason = null;
+        for (final Map.Entry<NodeIdentifier, DropRequestDTO> nodeEntry : 
dtoMap.entrySet()) {
+            final DropRequestDTO nodeDropRequest = nodeEntry.getValue();
+
+            if (!nodeDropRequest.isFinished()) {
+                allFinished = false;
+            }
+            if (nodeDropRequest.getFailureReason() != null) {
+                failureReason = nodeDropRequest.getFailureReason();
+            }
+
+            currentCount += nodeDropRequest.getCurrentCount();
+            currentSize += nodeDropRequest.getCurrentSize();
+            droppedCount += nodeDropRequest.getDroppedCount();
+            droppedSize += nodeDropRequest.getDroppedSize();
+
+            if (nodeDropRequest.getOriginalCount() == null) {
+                nodeWaiting = true;
+            } else {
+                originalCount += nodeDropRequest.getOriginalCount();
+                originalSize += nodeDropRequest.getOriginalSize();
+            }
+
+            final DropFlowFileState nodeState = 
DropFlowFileState.valueOfDescription(nodeDropRequest.getState());
+            if (state == null || state.ordinal() > nodeState.ordinal()) {
+                state = nodeState;
+            }
+        }
+
+        clientDto.setCurrentCount(currentCount);
+        clientDto.setCurrentSize(currentSize);
+        clientDto.setCurrent(FormatUtils.formatCount(currentCount) + " / " + 
FormatUtils.formatDataSize(currentSize));
+
+        clientDto.setDroppedCount(droppedCount);
+        clientDto.setDroppedSize(droppedSize);
+        clientDto.setDropped(FormatUtils.formatCount(droppedCount) + " / " + 
FormatUtils.formatDataSize(droppedSize));
+
+        clientDto.setFinished(allFinished);
+        clientDto.setFailureReason(failureReason);
+        if (originalCount == 0) {
+            clientDto.setPercentCompleted(allFinished ? 100 : 0);
+        } else {
+            clientDto.setPercentCompleted((int) ((double) droppedCount / 
(double) originalCount * 100D));
+        }
+
+        if (!nodeWaiting) {
+            clientDto.setOriginalCount(originalCount);
+            clientDto.setOriginalSize(originalSize);
+            clientDto.setOriginal(FormatUtils.formatCount(originalCount) + " / 
" + FormatUtils.formatDataSize(originalSize));
+        }
+
+        if (state != null) {
+            clientDto.setState(state.toString());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowSnippetEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowSnippetEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowSnippetEndpointMerger.java
new file mode 100644
index 0000000..d7f6948
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowSnippetEndpointMerger.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+import org.apache.nifi.web.api.dto.ProcessorDTO;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
+import org.apache.nifi.web.api.entity.FlowSnippetEntity;
+
+public class FlowSnippetEndpointMerger implements EndpointResponseMerger {
+    public static final Pattern TEMPLATE_INSTANCE_URI_PATTERN = 
Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/template-instance");
+    public static final Pattern FLOW_SNIPPET_INSTANCE_URI_PATTERN = 
Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/snippet-instance");
+
+    @Override
+    public boolean canHandle(final URI uri, final String method) {
+        return "POST".equalsIgnoreCase(method) && 
(TEMPLATE_INSTANCE_URI_PATTERN.matcher(uri.getPath()).matches()
+            || 
FLOW_SNIPPET_INSTANCE_URI_PATTERN.matcher(uri.getPath()).matches());
+    }
+
+    @Override
+    public NodeResponse merge(final URI uri, final String method, 
Set<NodeResponse> successfulResponses, final Set<NodeResponse> 
problematicResponses, final NodeResponse clientResponse) {
+        final FlowSnippetEntity responseEntity = 
clientResponse.getClientResponse().getEntity(FlowSnippetEntity.class);
+        final FlowSnippetDTO contents = responseEntity.getContents();
+
+        if (contents == null) {
+            return clientResponse;
+        } else {
+            final Map<String, Map<NodeIdentifier, ProcessorDTO>> processorMap 
= new HashMap<>();
+            final Map<String, Map<NodeIdentifier, RemoteProcessGroupDTO>> 
remoteProcessGroupMap = new HashMap<>();
+
+            for (final NodeResponse nodeResponse : successfulResponses) {
+                final FlowSnippetEntity nodeResponseEntity = nodeResponse == 
clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(FlowSnippetEntity.class);
+                final FlowSnippetDTO nodeContents = 
nodeResponseEntity.getContents();
+
+                for (final ProcessorDTO nodeProcessor : 
nodeContents.getProcessors()) {
+                    Map<NodeIdentifier, ProcessorDTO> innerMap = 
processorMap.get(nodeProcessor.getId());
+                    if (innerMap == null) {
+                        innerMap = new HashMap<>();
+                        processorMap.put(nodeProcessor.getId(), innerMap);
+                    }
+
+                    innerMap.put(nodeResponse.getNodeId(), nodeProcessor);
+                }
+
+                for (final RemoteProcessGroupDTO nodeRemoteProcessGroup : 
nodeContents.getRemoteProcessGroups()) {
+                    Map<NodeIdentifier, RemoteProcessGroupDTO> innerMap = 
remoteProcessGroupMap.get(nodeRemoteProcessGroup.getId());
+                    if (innerMap == null) {
+                        innerMap = new HashMap<>();
+                        
remoteProcessGroupMap.put(nodeRemoteProcessGroup.getId(), innerMap);
+                    }
+
+                    innerMap.put(nodeResponse.getNodeId(), 
nodeRemoteProcessGroup);
+                }
+            }
+
+            final ProcessorEndpointMerger procMerger = new 
ProcessorEndpointMerger();
+            for (final ProcessorDTO processor : contents.getProcessors()) {
+                final String procId = processor.getId();
+                final Map<NodeIdentifier, ProcessorDTO> mergeMap = 
processorMap.get(procId);
+
+                procMerger.mergeResponses(processor, mergeMap, 
successfulResponses, problematicResponses);
+            }
+
+            final RemoteProcessGroupEndpointMerger rpgMerger = new 
RemoteProcessGroupEndpointMerger();
+            for (final RemoteProcessGroupDTO remoteProcessGroup : 
contents.getRemoteProcessGroups()) {
+                if (remoteProcessGroup.getContents() != null) {
+                    final String remoteProcessGroupId = 
remoteProcessGroup.getId();
+                    final Map<NodeIdentifier, RemoteProcessGroupDTO> mergeMap 
= remoteProcessGroupMap.get(remoteProcessGroupId);
+
+                    rpgMerger.mergeResponses(remoteProcessGroup, mergeMap, 
successfulResponses, problematicResponses);
+                }
+            }
+        }
+
+        // create a new client response
+        return new NodeResponse(clientResponse, responseEntity);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/GroupStatusEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/GroupStatusEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/GroupStatusEndpointMerger.java
new file mode 100644
index 0000000..b769042
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/GroupStatusEndpointMerger.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.manager.StatusMerger;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
+
+public class GroupStatusEndpointMerger extends 
AbstractNodeStatusEndpoint<ProcessGroupStatusEntity, ProcessGroupStatusDTO> {
+    public static final Pattern GROUP_STATUS_URI_PATTERN = 
Pattern.compile("/nifi-api/flow/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/status");
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        return "GET".equalsIgnoreCase(method) && 
GROUP_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    @Override
+    protected Class<ProcessGroupStatusEntity> getEntityClass() {
+        return ProcessGroupStatusEntity.class;
+    }
+
+    @Override
+    protected ProcessGroupStatusDTO getDto(ProcessGroupStatusEntity entity) {
+        return entity.getProcessGroupStatus();
+    }
+
+    @Override
+    protected void mergeResponses(ProcessGroupStatusDTO clientDto, 
Map<NodeIdentifier, ProcessGroupStatusDTO> dtoMap, NodeIdentifier 
selectedNodeId) {
+        final ProcessGroupStatusDTO mergedProcessGroupStatus = clientDto;
+        mergedProcessGroupStatus.setNodeSnapshots(new 
ArrayList<NodeProcessGroupStatusSnapshotDTO>());
+
+        final NodeProcessGroupStatusSnapshotDTO selectedNodeSnapshot = new 
NodeProcessGroupStatusSnapshotDTO();
+        
selectedNodeSnapshot.setStatusSnapshot(clientDto.getAggregateSnapshot().clone());
+        selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+        selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+        selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
+
+        mergedProcessGroupStatus.getNodeSnapshots().add(selectedNodeSnapshot);
+
+        for (final Map.Entry<NodeIdentifier, ProcessGroupStatusDTO> entry : 
dtoMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final ProcessGroupStatusDTO nodeProcessGroupStatus = 
entry.getValue();
+            if (nodeProcessGroupStatus == mergedProcessGroupStatus) {
+                continue;
+            }
+
+            final ProcessGroupStatusSnapshotDTO nodeSnapshot = 
nodeProcessGroupStatus.getAggregateSnapshot();
+            for (final RemoteProcessGroupStatusSnapshotDTO 
remoteProcessGroupStatus : nodeSnapshot.getRemoteProcessGroupStatusSnapshots()) 
{
+                final List<String> nodeAuthorizationIssues = 
remoteProcessGroupStatus.getAuthorizationIssues();
+                if (!nodeAuthorizationIssues.isEmpty()) {
+                    for (final ListIterator<String> iter = 
nodeAuthorizationIssues.listIterator(); iter.hasNext();) {
+                        final String Issue = iter.next();
+                        iter.set("[" + nodeId.getApiAddress() + ":" + 
nodeId.getApiPort() + "] -- " + Issue);
+                    }
+                    
remoteProcessGroupStatus.setAuthorizationIssues(nodeAuthorizationIssues);
+                }
+            }
+
+            StatusMerger.merge(mergedProcessGroupStatus, 
nodeProcessGroupStatus, nodeId.getId(), nodeId.getApiAddress(), 
nodeId.getApiPort());
+        }
+    }
+
+}

Reply via email to