http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ListFlowFilesEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ListFlowFilesEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ListFlowFilesEndpointMerger.java
new file mode 100644
index 0000000..415334e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ListFlowFilesEndpointMerger.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.queue.ListFlowFileState;
+import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
+import org.apache.nifi.web.api.dto.ListingRequestDTO;
+import org.apache.nifi.web.api.dto.QueueSizeDTO;
+import org.apache.nifi.web.api.entity.ListingRequestEntity;
+
+public class ListFlowFilesEndpointMerger extends 
AbstractSingleEntityEndpoint<ListingRequestEntity, ListingRequestDTO> {
+    public static final Pattern LISTING_REQUESTS_URI = 
Pattern.compile("/nifi-api/flowfile-queues/[a-f0-9\\-]{36}/listing-requests");
+    public static final Pattern LISTING_REQUEST_URI = 
Pattern.compile("/nifi-api/flowfile-queues/[a-f0-9\\-]{36}/listing-requests/[a-f0-9\\-]{36}");
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        if (("GET".equalsIgnoreCase(method) || 
"DELETE".equalsIgnoreCase(method)) && 
LISTING_REQUEST_URI.matcher(uri.getPath()).matches()) {
+            return true;
+        } else if ("POST".equalsIgnoreCase(method) && 
LISTING_REQUESTS_URI.matcher(uri.getPath()).matches()) {
+            return true;
+        }
+
+        return false;
+    }
+
+    @Override
+    protected Class<ListingRequestEntity> getEntityClass() {
+        return ListingRequestEntity.class;
+    }
+
+    @Override
+    protected ListingRequestDTO getDto(ListingRequestEntity entity) {
+        return entity.getListingRequest();
+    }
+
+    @Override
+    protected void mergeResponses(ListingRequestDTO clientDto, 
Map<NodeIdentifier, ListingRequestDTO> dtoMap, Set<NodeResponse> 
successfulResponses, Set<NodeResponse> problematicResponses) {
+        final Comparator<FlowFileSummaryDTO> comparator = new 
Comparator<FlowFileSummaryDTO>() {
+            @Override
+            public int compare(final FlowFileSummaryDTO dto1, final 
FlowFileSummaryDTO dto2) {
+                int positionCompare = 
dto1.getPosition().compareTo(dto2.getPosition());
+                if (positionCompare != 0) {
+                    return positionCompare;
+                }
+
+                final String address1 = dto1.getClusterNodeAddress();
+                final String address2 = dto2.getClusterNodeAddress();
+                if (address1 == null && address2 == null) {
+                    return 0;
+                }
+                if (address1 == null) {
+                    return 1;
+                }
+                if (address2 == null) {
+                    return -1;
+                }
+                return address1.compareTo(address2);
+            }
+        };
+
+        final NavigableSet<FlowFileSummaryDTO> flowFileSummaries = new 
TreeSet<>(comparator);
+
+        ListFlowFileState state = null;
+        int numStepsCompleted = 0;
+        int numStepsTotal = 0;
+        int objectCount = 0;
+        long byteCount = 0;
+        boolean finished = true;
+        for (final Map.Entry<NodeIdentifier, ListingRequestDTO> entry : 
dtoMap.entrySet()) {
+            final NodeIdentifier nodeIdentifier = entry.getKey();
+            final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + 
nodeIdentifier.getApiPort();
+
+            final ListingRequestDTO nodeRequest = entry.getValue();
+
+            numStepsTotal++;
+            if (Boolean.TRUE.equals(nodeRequest.getFinished())) {
+                numStepsCompleted++;
+            }
+
+            final QueueSizeDTO nodeQueueSize = nodeRequest.getQueueSize();
+            objectCount += nodeQueueSize.getObjectCount();
+            byteCount += nodeQueueSize.getByteCount();
+
+            if (!nodeRequest.getFinished()) {
+                finished = false;
+            }
+
+            if 
(nodeRequest.getLastUpdated().after(clientDto.getLastUpdated())) {
+                clientDto.setLastUpdated(nodeRequest.getLastUpdated());
+            }
+
+            // Keep the state with the lowest ordinal value (the "least 
completed").
+            final ListFlowFileState nodeState = 
ListFlowFileState.valueOfDescription(nodeRequest.getState());
+            if (state == null || state.compareTo(nodeState) > 0) {
+                state = nodeState;
+            }
+
+            if (nodeRequest.getFlowFileSummaries() != null) {
+                for (final FlowFileSummaryDTO summaryDTO : 
nodeRequest.getFlowFileSummaries()) {
+                    summaryDTO.setClusterNodeId(nodeIdentifier.getId());
+                    summaryDTO.setClusterNodeAddress(nodeAddress);
+
+                    flowFileSummaries.add(summaryDTO);
+
+                    // Keep the set from growing beyond our max
+                    if (flowFileSummaries.size() > clientDto.getMaxResults()) {
+                        flowFileSummaries.pollLast();
+                    }
+                }
+            }
+
+            if (nodeRequest.getFailureReason() != null) {
+                clientDto.setFailureReason(nodeRequest.getFailureReason());
+            }
+        }
+
+        final List<FlowFileSummaryDTO> summaryDTOs = new 
ArrayList<>(flowFileSummaries);
+        clientDto.setFlowFileSummaries(summaryDTOs);
+        // depends on invariant if numStepsTotal is 0, so is 
numStepsCompleted, all steps being completed
+        // would be 1
+        final int percentCompleted = (numStepsTotal == 0) ? 1 : 
numStepsCompleted / numStepsTotal;
+        clientDto.setPercentCompleted(percentCompleted);
+        clientDto.setFinished(finished);
+
+        clientDto.getQueueSize().setByteCount(byteCount);
+        clientDto.getQueueSize().setObjectCount(objectCount);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/PortStatusEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/PortStatusEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/PortStatusEndpointMerger.java
new file mode 100644
index 0000000..5c570f0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/PortStatusEndpointMerger.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.manager.StatusMerger;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.status.NodePortStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.PortStatusDTO;
+import org.apache.nifi.web.api.entity.PortStatusEntity;
+
+public class PortStatusEndpointMerger extends 
AbstractNodeStatusEndpoint<PortStatusEntity, PortStatusDTO> {
+    public static final Pattern INPUT_PORT_STATUS_URI_PATTERN = 
Pattern.compile("/nifi-api/flow/input-ports/[a-f0-9\\-]{36}/status");
+    public static final Pattern OUTPUT_PORT_STATUS_URI_PATTERN = 
Pattern.compile("/nifi-api/flow/output-ports/[a-f0-9\\-]{36}/status");
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        return "GET".equalsIgnoreCase(method) && 
(INPUT_PORT_STATUS_URI_PATTERN.matcher(uri.getPath()).matches() || 
OUTPUT_PORT_STATUS_URI_PATTERN.matcher(uri.getPath()).matches());
+    }
+
+    @Override
+    protected Class<PortStatusEntity> getEntityClass() {
+        return PortStatusEntity.class;
+    }
+
+    @Override
+    protected PortStatusDTO getDto(PortStatusEntity entity) {
+        return entity.getPortStatus();
+    }
+
+    @Override
+    protected void mergeResponses(PortStatusDTO clientDto, Map<NodeIdentifier, 
PortStatusDTO> dtoMap, NodeIdentifier selectedNodeId) {
+        final PortStatusDTO mergedPortStatus = clientDto;
+        mergedPortStatus.setNodeSnapshots(new 
ArrayList<NodePortStatusSnapshotDTO>());
+
+        final NodePortStatusSnapshotDTO selectedNodeSnapshot = new 
NodePortStatusSnapshotDTO();
+        
selectedNodeSnapshot.setStatusSnapshot(clientDto.getAggregateSnapshot().clone());
+        selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+        selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+        selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
+
+        mergedPortStatus.getNodeSnapshots().add(selectedNodeSnapshot);
+
+        // merge the other nodes
+        for (final Map.Entry<NodeIdentifier, PortStatusDTO> entry : 
dtoMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final PortStatusDTO nodePortStatus = entry.getValue();
+            if (nodePortStatus == clientDto) {
+                continue;
+            }
+
+            StatusMerger.merge(mergedPortStatus, nodePortStatus, 
nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupEndpointMerger.java
new file mode 100644
index 0000000..bef75a0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupEndpointMerger.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+import org.apache.nifi.web.api.dto.ProcessGroupDTO;
+import org.apache.nifi.web.api.dto.ProcessorDTO;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
+import org.apache.nifi.web.api.entity.ProcessGroupEntity;
+
+public class ProcessGroupEndpointMerger implements EndpointResponseMerger {
+    public static final Pattern PROCESS_GROUP_URI_PATTERN = 
Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))");
+
+    @Override
+    public boolean canHandle(final URI uri, final String method) {
+        return ("GET".equalsIgnoreCase(method) || 
"PUT".equalsIgnoreCase(method)) && 
PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    @Override
+    public NodeResponse merge(final URI uri, final String method, final 
Set<NodeResponse> successfulResponses, final Set<NodeResponse> 
problematicResponses, final NodeResponse clientResponse) {
+        if (!canHandle(uri, method)) {
+            throw new IllegalArgumentException("Cannot use Endpoint Mapper of 
type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", 
HTTP Method " + method);
+        }
+
+        final ProcessGroupEntity responseEntity = 
clientResponse.getClientResponse().getEntity(ProcessGroupEntity.class);
+        final ProcessGroupDTO responseDto = responseEntity.getComponent();
+
+        final FlowSnippetDTO contents = responseDto.getContents();
+        if (contents == null) {
+            return new NodeResponse(clientResponse, responseEntity);
+        } else {
+            final Map<String, Map<NodeIdentifier, ProcessorDTO>> processorMap 
= new HashMap<>();
+            final Map<String, Map<NodeIdentifier, RemoteProcessGroupDTO>> 
remoteProcessGroupMap = new HashMap<>();
+
+            for (final NodeResponse nodeResponse : successfulResponses) {
+                final ProcessGroupEntity nodeResponseEntity = nodeResponse == 
clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ProcessGroupEntity.class);
+                final ProcessGroupDTO nodeProcessGroup = 
nodeResponseEntity.getComponent();
+
+                for (final ProcessorDTO nodeProcessor : 
nodeProcessGroup.getContents().getProcessors()) {
+                    Map<NodeIdentifier, ProcessorDTO> innerMap = 
processorMap.get(nodeProcessor.getId());
+                    if (innerMap == null) {
+                        innerMap = new HashMap<>();
+                        processorMap.put(nodeProcessor.getId(), innerMap);
+                    }
+
+                    innerMap.put(nodeResponse.getNodeId(), nodeProcessor);
+                }
+
+                for (final RemoteProcessGroupDTO nodeRemoteProcessGroup : 
nodeProcessGroup.getContents().getRemoteProcessGroups()) {
+                    Map<NodeIdentifier, RemoteProcessGroupDTO> innerMap = 
remoteProcessGroupMap.get(nodeRemoteProcessGroup.getId());
+                    if (innerMap == null) {
+                        innerMap = new HashMap<>();
+                        
remoteProcessGroupMap.put(nodeRemoteProcessGroup.getId(), innerMap);
+                    }
+
+                    innerMap.put(nodeResponse.getNodeId(), 
nodeRemoteProcessGroup);
+                }
+            }
+
+            final ProcessorEndpointMerger procMerger = new 
ProcessorEndpointMerger();
+            for (final ProcessorDTO processor : contents.getProcessors()) {
+                final String procId = processor.getId();
+                final Map<NodeIdentifier, ProcessorDTO> mergeMap = 
processorMap.get(procId);
+
+                procMerger.mergeResponses(processor, mergeMap, 
successfulResponses, problematicResponses);
+            }
+
+            final RemoteProcessGroupEndpointMerger rpgMerger = new 
RemoteProcessGroupEndpointMerger();
+            for (final RemoteProcessGroupDTO remoteProcessGroup : 
contents.getRemoteProcessGroups()) {
+                if (remoteProcessGroup.getContents() != null) {
+                    final String remoteProcessGroupId = 
remoteProcessGroup.getId();
+                    final Map<NodeIdentifier, RemoteProcessGroupDTO> mergeMap 
= remoteProcessGroupMap.get(remoteProcessGroupId);
+
+                    rpgMerger.mergeResponses(remoteProcessGroup, mergeMap, 
successfulResponses, problematicResponses);
+                }
+            }
+        }
+
+        // create a new client response
+        return new NodeResponse(clientResponse, responseEntity);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java
new file mode 100644
index 0000000..6a040fa
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.ProcessorDTO;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+
+public class ProcessorEndpointMerger extends 
AbstractSingleEntityEndpoint<ProcessorEntity, ProcessorDTO> implements 
EndpointResponseMerger {
+    public static final Pattern PROCESSORS_URI_PATTERN = 
Pattern.compile("/nifi-api/processors");
+    public static final Pattern PROCESSOR_URI_PATTERN = 
Pattern.compile("/nifi-api/processors/[a-f0-9\\-]{36}");
+    public static final Pattern CLUSTER_PROCESSOR_URI_PATTERN = 
Pattern.compile("/nifi-api/cluster/processors/[a-f0-9\\-]{36}");
+
+    @Override
+    public boolean canHandle(final URI uri, final String method) {
+        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method))
+            && (PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches() || 
CLUSTER_PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches())) {
+            return true;
+        } else if ("POST".equalsIgnoreCase(method) && 
PROCESSORS_URI_PATTERN.matcher(uri.getPath()).matches()) {
+            return true;
+        }
+
+        return false;
+    }
+
+    @Override
+    protected Class<ProcessorEntity> getEntityClass() {
+        return ProcessorEntity.class;
+    }
+
+    @Override
+    protected ProcessorDTO getDto(final ProcessorEntity entity) {
+        return entity.getComponent();
+    }
+
+    @Override
+    protected void mergeResponses(final ProcessorDTO clientDto, final 
Map<NodeIdentifier, ProcessorDTO> dtoMap, final Set<NodeResponse> 
successfulResponses,
+        final Set<NodeResponse> problematicResponses) {
+        final Map<String, Set<NodeIdentifier>> validationErrorMap = new 
HashMap<>();
+
+        for (final Map.Entry<NodeIdentifier, ProcessorDTO> nodeEntry : 
dtoMap.entrySet()) {
+            final NodeIdentifier nodeId = nodeEntry.getKey();
+            final ProcessorDTO nodeProcessor = nodeEntry.getValue();
+
+            // merge the validation errors
+            mergeValidationErrors(validationErrorMap, nodeId, 
nodeProcessor.getValidationErrors());
+        }
+
+        // set the merged the validation errors
+        
clientDto.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap,
 dtoMap.size()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorStatusEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorStatusEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorStatusEndpointMerger.java
new file mode 100644
index 0000000..8fdceb1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorStatusEndpointMerger.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.manager.StatusMerger;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.status.NodeProcessorStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
+import org.apache.nifi.web.api.entity.ProcessorStatusEntity;
+
+public class ProcessorStatusEndpointMerger extends 
AbstractNodeStatusEndpoint<ProcessorStatusEntity, ProcessorStatusDTO> {
+    public static final Pattern PROCESSOR_STATUS_URI_PATTERN = 
Pattern.compile("/nifi-api/flow/processors/[a-f0-9\\-]{36}/status");
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        return "GET".equalsIgnoreCase(method) && 
PROCESSOR_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    @Override
+    protected Class<ProcessorStatusEntity> getEntityClass() {
+        return ProcessorStatusEntity.class;
+    }
+
+    @Override
+    protected ProcessorStatusDTO getDto(ProcessorStatusEntity entity) {
+        return entity.getProcessorStatus();
+    }
+
+    @Override
+    protected void mergeResponses(ProcessorStatusDTO clientDto, 
Map<NodeIdentifier, ProcessorStatusDTO> dtoMap, NodeIdentifier selectedNodeId) {
+        final ProcessorStatusDTO mergedProcessorStatus = clientDto;
+        mergedProcessorStatus.setNodeSnapshots(new 
ArrayList<NodeProcessorStatusSnapshotDTO>());
+
+        final NodeProcessorStatusSnapshotDTO selectedNodeSnapshot = new 
NodeProcessorStatusSnapshotDTO();
+        
selectedNodeSnapshot.setStatusSnapshot(clientDto.getAggregateSnapshot().clone());
+        selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+        selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+        selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
+
+        mergedProcessorStatus.getNodeSnapshots().add(selectedNodeSnapshot);
+
+        // merge the other nodes
+        for (final Map.Entry<NodeIdentifier, ProcessorStatusDTO> entry : 
dtoMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final ProcessorStatusDTO nodeProcessorStatus = entry.getValue();
+            if (nodeProcessorStatus == clientDto) {
+                continue;
+            }
+
+            StatusMerger.merge(mergedProcessorStatus, nodeProcessorStatus, 
nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorsEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorsEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorsEndpointMerger.java
new file mode 100644
index 0000000..fa076b9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorsEndpointMerger.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.ProcessorDTO;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.apache.nifi.web.api.entity.ProcessorsEntity;
+
+public class ProcessorsEndpointMerger implements EndpointResponseMerger {
+    public static final Pattern PROCESSORS_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors");
+
+    @Override
+    public boolean canHandle(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && 
PROCESSORS_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    @Override
+    public final NodeResponse merge(final URI uri, final String method, final 
Set<NodeResponse> successfulResponses, final Set<NodeResponse> 
problematicResponses, final NodeResponse clientResponse) {
+        if (!canHandle(uri, method)) {
+            throw new IllegalArgumentException("Cannot use Endpoint Mapper of 
type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", 
HTTP Method " + method);
+        }
+
+        final ProcessorsEntity responseEntity = 
clientResponse.getClientResponse().getEntity(ProcessorsEntity.class);
+        final Set<ProcessorEntity> processorEntities = 
responseEntity.getProcessors();
+
+        final Map<String, Map<NodeIdentifier, ProcessorDTO>> dtoMap = new 
HashMap<>();
+        for (final NodeResponse nodeResponse : successfulResponses) {
+            final ProcessorsEntity nodeResponseEntity = nodeResponse == 
clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ProcessorsEntity.class);
+            final Set<ProcessorEntity> nodeProcessorEntities = 
nodeResponseEntity.getProcessors();
+
+            for (final ProcessorEntity nodeProcessorEntity : 
nodeProcessorEntities) {
+                final NodeIdentifier nodeId = nodeResponse.getNodeId();
+                Map<NodeIdentifier, ProcessorDTO> innerMap = 
dtoMap.get(nodeId);
+                if (innerMap == null) {
+                    innerMap = new HashMap<>();
+                    dtoMap.put(nodeProcessorEntity.getId(), innerMap);
+                }
+
+                innerMap.put(nodeResponse.getNodeId(), 
nodeProcessorEntity.getComponent());
+            }
+        }
+
+        final ProcessorEndpointMerger procMerger = new 
ProcessorEndpointMerger();
+        for (final ProcessorEntity entity : processorEntities) {
+            final String componentId = entity.getId();
+            final Map<NodeIdentifier, ProcessorDTO> mergeMap = 
dtoMap.get(componentId);
+
+            procMerger.mergeResponses(entity.getComponent(), mergeMap, 
successfulResponses, problematicResponses);
+        }
+
+        // create a new client response
+        return new NodeResponse(clientResponse, responseEntity);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProvenanceEventEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProvenanceEventEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProvenanceEventEndpointMerger.java
new file mode 100644
index 0000000..3f895bd
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProvenanceEventEndpointMerger.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
+import org.apache.nifi.web.api.entity.ProvenanceEventEntity;
+
+public class ProvenanceEventEndpointMerger extends 
AbstractSingleEntityEndpoint<ProvenanceEventEntity, ProvenanceEventDTO> {
+    public static final Pattern PROVENANCE_EVENT_URI = 
Pattern.compile("/nifi-api/provenance/events/[0-9]+");
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        return "GET".equalsIgnoreCase(method) && 
PROVENANCE_EVENT_URI.matcher(uri.getPath()).matches();
+    }
+
+    @Override
+    protected Class<ProvenanceEventEntity> getEntityClass() {
+        return ProvenanceEventEntity.class;
+    }
+
+    @Override
+    protected ProvenanceEventDTO getDto(ProvenanceEventEntity entity) {
+        return entity.getProvenanceEvent();
+    }
+
+    @Override
+    protected void mergeResponses(ProvenanceEventDTO clientDto, 
Map<NodeIdentifier, ProvenanceEventDTO> dtoMap, Set<NodeResponse> 
successfulResponses, Set<NodeResponse> problematicResponses) {
+        // The request for a Provenance Event is replicated to a single Node. 
We simply update its cluster node info.
+        final NodeIdentifier nodeId = 
successfulResponses.iterator().next().getNodeId();
+        clientDto.setClusterNodeId(nodeId.getId());
+        clientDto.setClusterNodeAddress(nodeId.getApiAddress() + ":" + 
nodeId.getApiPort());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProvenanceQueryEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProvenanceQueryEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProvenanceQueryEndpointMerger.java
new file mode 100644
index 0000000..4875499
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProvenanceQueryEndpointMerger.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
+import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
+import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO;
+import org.apache.nifi.web.api.dto.provenance.ProvenanceResultsDTO;
+import org.apache.nifi.web.api.entity.ProvenanceEntity;
+
+public class ProvenanceQueryEndpointMerger implements EndpointResponseMerger {
+    public static final String PROVENANCE_URI = "/nifi-api/provenance";
+    public static final Pattern PROVENANCE_QUERY_URI = 
Pattern.compile("/nifi-api/provenance/[a-f0-9\\-]{36}");
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        if ("POST".equalsIgnoreCase(method) && 
PROVENANCE_URI.equals(uri.getPath())) {
+            return true;
+        } else if ("GET".equalsIgnoreCase(method) && 
PROVENANCE_QUERY_URI.matcher(uri.getPath()).matches()) {
+            return true;
+        }
+        return false;
+    }
+
+
+    @Override
+    public NodeResponse merge(URI uri, String method, Set<NodeResponse> 
successfulResponses, Set<NodeResponse> problematicResponses, NodeResponse 
clientResponse) {
+        if (!canHandle(uri, method)) {
+            throw new IllegalArgumentException("Cannot use Endpoint Mapper of 
type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", 
HTTP Method " + method);
+        }
+
+        final ProvenanceEntity responseEntity = 
clientResponse.getClientResponse().getEntity(ProvenanceEntity.class);
+        final ProvenanceDTO dto = responseEntity.getProvenance();
+
+        final Map<NodeIdentifier, ProvenanceDTO> dtoMap = new HashMap<>();
+        for (final NodeResponse nodeResponse : successfulResponses) {
+            final ProvenanceEntity nodeResponseEntity = nodeResponse == 
clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ProvenanceEntity.class);
+            final ProvenanceDTO nodeDto = nodeResponseEntity.getProvenance();
+            dtoMap.put(nodeResponse.getNodeId(), nodeDto);
+        }
+
+        mergeResponses(dto, dtoMap, successfulResponses, problematicResponses);
+        return new NodeResponse(clientResponse, responseEntity);
+    }
+
+
+    protected void mergeResponses(ProvenanceDTO clientDto, Map<NodeIdentifier, 
ProvenanceDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> 
problematicResponses) {
+        final ProvenanceResultsDTO results = clientDto.getResults();
+        final ProvenanceRequestDTO request = clientDto.getRequest();
+        final List<ProvenanceEventDTO> allResults = new ArrayList<>(1024);
+
+        final Set<String> errors = new HashSet<>();
+        Date oldestEventDate = new Date();
+        int percentageComplete = 0;
+        boolean finished = true;
+
+        long totalRecords = 0;
+        for (final Map.Entry<NodeIdentifier, ProvenanceDTO> entry : 
dtoMap.entrySet()) {
+            final NodeIdentifier nodeIdentifier = entry.getKey();
+            final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + 
nodeIdentifier.getApiPort();
+
+            final ProvenanceDTO nodeDto = entry.getValue();
+            final ProvenanceResultsDTO nodeResultDto = nodeDto.getResults();
+            if (nodeResultDto != null && nodeResultDto.getProvenanceEvents() 
!= null) {
+                // increment the total number of records
+                totalRecords += nodeResultDto.getTotalCount();
+
+                // populate the cluster identifier
+                for (final ProvenanceEventDTO eventDto : 
nodeResultDto.getProvenanceEvents()) {
+                    eventDto.setClusterNodeId(nodeIdentifier.getId());
+                    eventDto.setClusterNodeAddress(nodeAddress);
+                    // add node identifier to the event's id so that it is 
unique across cluster
+                    eventDto.setId(nodeIdentifier.getId() + eventDto.getId());
+                    allResults.add(eventDto);
+                }
+            }
+
+            if (nodeResultDto.getOldestEvent() != null && 
nodeResultDto.getOldestEvent().before(oldestEventDate)) {
+                oldestEventDate = nodeResultDto.getOldestEvent();
+            }
+
+            if (nodeResultDto.getErrors() != null) {
+                for (final String error : nodeResultDto.getErrors()) {
+                    errors.add(nodeAddress + " -- " + error);
+                }
+            }
+
+            percentageComplete += nodeDto.getPercentCompleted();
+            if (!nodeDto.isFinished()) {
+                finished = false;
+            }
+        }
+        percentageComplete /= dtoMap.size();
+
+        // consider any problematic responses as errors
+        for (final NodeResponse problematicResponse : problematicResponses) {
+            final NodeIdentifier problemNode = problematicResponse.getNodeId();
+            final String problemNodeAddress = problemNode.getApiAddress() + 
":" + problemNode.getApiPort();
+            errors.add(String.format("%s -- Request did not complete 
successfully (Status code: %s)", problemNodeAddress, 
problematicResponse.getStatus()));
+        }
+
+        // Since we get back up to the maximum number of results from each 
node, we need to sort those values and then
+        // grab only the first X number of them. We do a sort based on time, 
such that the newest are included.
+        // If 2 events have the same timestamp, we do a secondary sort based 
on Cluster Node Identifier. If those are
+        // equal, we perform a terciary sort based on the the event id
+        Collections.sort(allResults, new Comparator<ProvenanceEventDTO>() {
+            @Override
+            public int compare(final ProvenanceEventDTO o1, final 
ProvenanceEventDTO o2) {
+                final int eventTimeComparison = 
o1.getEventTime().compareTo(o2.getEventTime());
+                if (eventTimeComparison != 0) {
+                    return -eventTimeComparison;
+                }
+
+                final String nodeId1 = o1.getClusterNodeId();
+                final String nodeId2 = o2.getClusterNodeId();
+                final int nodeIdComparison;
+                if (nodeId1 == null && nodeId2 == null) {
+                    nodeIdComparison = 0;
+                } else if (nodeId1 == null) {
+                    nodeIdComparison = 1;
+                } else if (nodeId2 == null) {
+                    nodeIdComparison = -1;
+                } else {
+                    nodeIdComparison = -nodeId1.compareTo(nodeId2);
+                }
+
+                if (nodeIdComparison != 0) {
+                    return nodeIdComparison;
+                }
+
+                return -Long.compare(o1.getEventId(), o2.getEventId());
+            }
+        });
+
+        final int maxResults = request.getMaxResults().intValue();
+        final List<ProvenanceEventDTO> selectedResults;
+        if (allResults.size() < maxResults) {
+            selectedResults = allResults;
+        } else {
+            selectedResults = allResults.subList(0, maxResults);
+        }
+
+        // include any errors
+        if (errors.size() > 0) {
+            results.setErrors(errors);
+        }
+
+        results.setTotalCount(totalRecords);
+        results.setTotal(FormatUtils.formatCount(totalRecords));
+        results.setProvenanceEvents(selectedResults);
+        results.setOldestEvent(oldestEventDate);
+        results.setGenerated(new Date());
+        clientDto.setPercentCompleted(percentageComplete);
+        clientDto.setFinished(finished);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java
new file mode 100644
index 0000000..94383de
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
+
+public class RemoteProcessGroupEndpointMerger extends 
AbstractSingleEntityEndpoint<RemoteProcessGroupEntity, RemoteProcessGroupDTO> {
+    public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = 
Pattern.compile("/nifi-api/remote-process-groups");
+    public static final Pattern REMOTE_PROCESS_GROUP_URI_PATTERN = 
Pattern.compile("/nifi-api/remote-process-groups/[a-f0-9\\-]{36}");
+
+    @Override
+    public boolean canHandle(final URI uri, final String method) {
+        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) 
&& REMOTE_PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches()) {
+            return true;
+        } else if ("POST".equalsIgnoreCase(method) && 
REMOTE_PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches()) {
+            return true;
+        }
+
+        return false;
+    }
+
+    @Override
+    protected Class<RemoteProcessGroupEntity> getEntityClass() {
+        return RemoteProcessGroupEntity.class;
+    }
+
+    @Override
+    protected RemoteProcessGroupDTO getDto(final RemoteProcessGroupEntity 
entity) {
+        return entity.getComponent();
+    }
+
+    @Override
+    protected void mergeResponses(RemoteProcessGroupDTO clientDto, 
Map<NodeIdentifier, RemoteProcessGroupDTO> dtoMap, Set<NodeResponse> 
successfulResponses, Set<NodeResponse> problematicResponses) {
+        final RemoteProcessGroupContentsDTO remoteProcessGroupContents = 
clientDto.getContents();
+
+        Boolean mergedIsTargetSecure = null;
+        final List<String> mergedAuthorizationIssues = new ArrayList<>();
+        final Set<RemoteProcessGroupPortDTO> mergedInputPorts = new 
HashSet<>();
+        final Set<RemoteProcessGroupPortDTO> mergedOutputPorts = new 
HashSet<>();
+
+        for (final Map.Entry<NodeIdentifier, RemoteProcessGroupDTO> nodeEntry 
: dtoMap.entrySet()) {
+            final NodeIdentifier nodeId = nodeEntry.getKey();
+            final RemoteProcessGroupDTO nodeRemoteProcessGroupDto = 
nodeEntry.getValue();
+
+            // merge the issues
+            final List<String> nodeAuthorizationIssues = 
nodeRemoteProcessGroupDto.getAuthorizationIssues();
+            if (nodeAuthorizationIssues != null && 
!nodeAuthorizationIssues.isEmpty()) {
+                for (final String nodeAuthorizationIssue : 
nodeAuthorizationIssues) {
+                    mergedAuthorizationIssues.add(nodeId.getApiAddress() + ":" 
+ nodeId.getApiPort() + " -- " + nodeAuthorizationIssue);
+                }
+            }
+
+            // use the first target secure flag since they will all be the same
+            final Boolean nodeIsTargetSecure = 
nodeRemoteProcessGroupDto.isTargetSecure();
+            if (mergedIsTargetSecure == null) {
+                mergedIsTargetSecure = nodeIsTargetSecure;
+            }
+
+            // merge the ports in the contents
+            final RemoteProcessGroupContentsDTO 
nodeRemoteProcessGroupContentsDto = nodeRemoteProcessGroupDto.getContents();
+            if (remoteProcessGroupContents != null && 
nodeRemoteProcessGroupContentsDto != null) {
+                if (nodeRemoteProcessGroupContentsDto.getInputPorts() != null) 
{
+                    
mergedInputPorts.addAll(nodeRemoteProcessGroupContentsDto.getInputPorts());
+                }
+                if (nodeRemoteProcessGroupContentsDto.getOutputPorts() != 
null) {
+                    
mergedOutputPorts.addAll(nodeRemoteProcessGroupContentsDto.getOutputPorts());
+                }
+            }
+        }
+
+        if (remoteProcessGroupContents != null) {
+            if (!mergedInputPorts.isEmpty()) {
+                remoteProcessGroupContents.setInputPorts(mergedInputPorts);
+            }
+            if (!mergedOutputPorts.isEmpty()) {
+                remoteProcessGroupContents.setOutputPorts(mergedOutputPorts);
+            }
+        }
+
+        if (mergedIsTargetSecure != null) {
+            clientDto.setTargetSecure(mergedIsTargetSecure);
+        }
+
+        if (!mergedAuthorizationIssues.isEmpty()) {
+            clientDto.setAuthorizationIssues(mergedAuthorizationIssues);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupStatusEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupStatusEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupStatusEndpointMerger.java
new file mode 100644
index 0000000..d2056f7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupStatusEndpointMerger.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.manager.StatusMerger;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import 
org.apache.nifi.web.api.dto.status.NodeRemoteProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity;
+
+public class RemoteProcessGroupStatusEndpointMerger extends 
AbstractNodeStatusEndpoint<RemoteProcessGroupStatusEntity, 
RemoteProcessGroupStatusDTO> {
+    public static final Pattern REMOTE_PROCESS_GROUP_STATUS_URI_PATTERN = 
Pattern.compile("/nifi-api/flow/remote-process-groups/[a-f0-9\\-]{36}/status");
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        return "GET".equalsIgnoreCase(method) && 
REMOTE_PROCESS_GROUP_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    @Override
+    protected Class<RemoteProcessGroupStatusEntity> getEntityClass() {
+        return RemoteProcessGroupStatusEntity.class;
+    }
+
+    @Override
+    protected RemoteProcessGroupStatusDTO 
getDto(RemoteProcessGroupStatusEntity entity) {
+        return entity.getRemoteProcessGroupStatus();
+    }
+
+    @Override
+    protected void mergeResponses(RemoteProcessGroupStatusDTO clientDto, 
Map<NodeIdentifier, RemoteProcessGroupStatusDTO> dtoMap, NodeIdentifier 
selectedNodeId) {
+        final RemoteProcessGroupStatusDTO mergedRemoteProcessGroupStatus = 
clientDto;
+        mergedRemoteProcessGroupStatus.setNodeSnapshots(new 
ArrayList<NodeRemoteProcessGroupStatusSnapshotDTO>());
+
+        final NodeRemoteProcessGroupStatusSnapshotDTO selectedNodeSnapshot = 
new NodeRemoteProcessGroupStatusSnapshotDTO();
+        
selectedNodeSnapshot.setStatusSnapshot(clientDto.getAggregateSnapshot().clone());
+        selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+        selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+        selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
+
+        
mergedRemoteProcessGroupStatus.getNodeSnapshots().add(selectedNodeSnapshot);
+
+        // merge the other nodes
+        for (final Map.Entry<NodeIdentifier, RemoteProcessGroupStatusDTO> 
entry : dtoMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final RemoteProcessGroupStatusDTO nodeRemoteProcessGroupStatus = 
entry.getValue();
+            if (nodeRemoteProcessGroupStatus == clientDto) {
+                continue;
+            }
+
+            StatusMerger.merge(mergedRemoteProcessGroupStatus, 
nodeRemoteProcessGroupStatus, nodeId.getId(), nodeId.getApiAddress(), 
nodeId.getApiPort());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java
new file mode 100644
index 0000000..c387951
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
+
+public class RemoteProcessGroupsEndpointMerger implements 
EndpointResponseMerger {
+    public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups");
+
+    @Override
+    public boolean canHandle(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && 
REMOTE_PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    @Override
+    public NodeResponse merge(URI uri, String method, Set<NodeResponse> 
successfulResponses, Set<NodeResponse> problematicResponses, NodeResponse 
clientResponse) {
+        if (!canHandle(uri, method)) {
+            throw new IllegalArgumentException("Cannot use Endpoint Mapper of 
type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", 
HTTP Method " + method);
+        }
+
+        final RemoteProcessGroupsEntity responseEntity = 
clientResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class);
+        final Set<RemoteProcessGroupEntity> rpgEntities = 
responseEntity.getRemoteProcessGroups();
+
+        final Map<String, Map<NodeIdentifier, RemoteProcessGroupDTO>> dtoMap = 
new HashMap<>();
+        for (final NodeResponse nodeResponse : successfulResponses) {
+            final RemoteProcessGroupsEntity nodeResponseEntity = nodeResponse 
== clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class);
+            final Set<RemoteProcessGroupEntity> nodeRpgEntities = 
nodeResponseEntity.getRemoteProcessGroups();
+
+            for (final RemoteProcessGroupEntity nodeRpgEntity : 
nodeRpgEntities) {
+                final NodeIdentifier nodeId = nodeResponse.getNodeId();
+                Map<NodeIdentifier, RemoteProcessGroupDTO> innerMap = 
dtoMap.get(nodeId);
+                if (innerMap == null) {
+                    innerMap = new HashMap<>();
+                    dtoMap.put(nodeRpgEntity.getId(), innerMap);
+                }
+
+                innerMap.put(nodeResponse.getNodeId(), 
nodeRpgEntity.getComponent());
+            }
+        }
+
+        final RemoteProcessGroupEndpointMerger rpgMerger = new 
RemoteProcessGroupEndpointMerger();
+        for (final RemoteProcessGroupEntity entity : rpgEntities) {
+            final String componentId = entity.getId();
+            final Map<NodeIdentifier, RemoteProcessGroupDTO> mergeMap = 
dtoMap.get(componentId);
+
+            rpgMerger.mergeResponses(entity.getComponent(), mergeMap, 
successfulResponses, problematicResponses);
+        }
+
+        // create a new client response
+        return new NodeResponse(clientResponse, responseEntity);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskEndpointMerger.java
new file mode 100644
index 0000000..2188af6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskEndpointMerger.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.ReportingTaskDTO;
+import org.apache.nifi.web.api.entity.ReportingTaskEntity;
+
+public class ReportingTaskEndpointMerger extends 
AbstractSingleEntityEndpoint<ReportingTaskEntity, ReportingTaskDTO> {
+    public static final String REPORTING_TASKS_URI = 
"/nifi-api/controller/reporting-tasks/node";
+    public static final Pattern REPORTING_TASK_URI_PATTERN = 
Pattern.compile("/nifi-api/reporting-tasks/node/[a-f0-9\\-]{36}");
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) 
&& REPORTING_TASK_URI_PATTERN.matcher(uri.getPath()).matches()) {
+            return true;
+        } else if ("POST".equalsIgnoreCase(method) && 
REPORTING_TASKS_URI.equals(uri.getPath())) {
+            return true;
+        }
+
+        return false;
+    }
+
+    @Override
+    protected Class<ReportingTaskEntity> getEntityClass() {
+        return ReportingTaskEntity.class;
+    }
+
+    @Override
+    protected ReportingTaskDTO getDto(ReportingTaskEntity entity) {
+        return entity.getReportingTask();
+    }
+
+    @Override
+    protected void mergeResponses(ReportingTaskDTO clientDto, 
Map<NodeIdentifier, ReportingTaskDTO> dtoMap, Set<NodeResponse> 
successfulResponses, Set<NodeResponse> problematicResponses) {
+        final Map<String, Set<NodeIdentifier>> validationErrorMap = new 
HashMap<>();
+
+        int activeThreadCount = 0;
+        for (final Map.Entry<NodeIdentifier, ReportingTaskDTO> nodeEntry : 
dtoMap.entrySet()) {
+            final NodeIdentifier nodeId = nodeEntry.getKey();
+            final ReportingTaskDTO nodeReportingTask = nodeEntry.getValue();
+
+            if (nodeReportingTask.getActiveThreadCount() != null) {
+                activeThreadCount += nodeReportingTask.getActiveThreadCount();
+            }
+
+            // merge the validation errors
+            mergeValidationErrors(validationErrorMap, nodeId, 
nodeReportingTask.getValidationErrors());
+        }
+
+        // set the merged active thread counts
+        clientDto.setActiveThreadCount(activeThreadCount);
+
+        // set the merged the validation errors
+        
clientDto.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap,
 dtoMap.size()));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTasksEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTasksEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTasksEndpointMerger.java
new file mode 100644
index 0000000..f5b2e4d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTasksEndpointMerger.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.ReportingTaskDTO;
+import org.apache.nifi.web.api.entity.ReportingTasksEntity;
+
+public class ReportingTasksEndpointMerger extends 
AbstractMultiEntityEndpoint<ReportingTasksEntity, ReportingTaskDTO> {
+    public static final String REPORTING_TASKS_URI = 
"/nifi-api/controller/reporting-tasks/node";
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        return "GET".equalsIgnoreCase(method) && 
REPORTING_TASKS_URI.equals(uri.getPath());
+    }
+
+    @Override
+    protected Class<ReportingTasksEntity> getEntityClass() {
+        return ReportingTasksEntity.class;
+    }
+
+    @Override
+    protected Set<ReportingTaskDTO> getDtos(ReportingTasksEntity entity) {
+        return entity.getReportingTasks();
+    }
+
+    @Override
+    protected String getComponentId(ReportingTaskDTO dto) {
+        return dto.getId();
+    }
+
+    @Override
+    protected void mergeResponses(ReportingTaskDTO clientDto, 
Map<NodeIdentifier, ReportingTaskDTO> dtoMap, Set<NodeResponse> 
successfulResponses, Set<NodeResponse> problematicResponses) {
+        new ReportingTaskEndpointMerger().mergeResponses(clientDto, dtoMap, 
successfulResponses, problematicResponses);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
new file mode 100644
index 0000000..6d8da5f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
+import org.apache.nifi.controller.status.history.MetricDescriptor;
+import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor;
+import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor;
+import 
org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor;
+import org.apache.nifi.controller.status.history.StandardStatusSnapshot;
+import org.apache.nifi.controller.status.history.StatusHistoryUtil;
+import org.apache.nifi.controller.status.history.StatusSnapshot;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.api.dto.status.NodeStatusSnapshotsDTO;
+import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
+import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
+import org.apache.nifi.web.api.entity.StatusHistoryEntity;
+
+public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
+    public static final Pattern PROCESSOR_STATUS_HISTORY_URI_PATTERN = 
Pattern.compile("/nifi-api/flow/processors/[a-f0-9\\-]{36}/status/history");
+    public static final Pattern PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = 
Pattern.compile("/nifi-api/flow/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/status/history");
+    public static final Pattern 
REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = 
Pattern.compile("/nifi-api/flow/remote-process-groups/[a-f0-9\\-]{36}/status/history");
+    public static final Pattern CONNECTION_STATUS_HISTORY_URI_PATTERN = 
Pattern.compile("/nifi-api/flow/connections/[a-f0-9\\-]{36}/status/history");
+
+    private final long componentStatusSnapshotMillis;
+
+
+    public StatusHistoryEndpointMerger() {
+        final NiFiProperties properties = NiFiProperties.getInstance();
+        final String snapshotFrequency = 
properties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, 
NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY);
+        long snapshotMillis;
+        try {
+            snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, 
TimeUnit.MILLISECONDS);
+        } catch (final Exception e) {
+            snapshotMillis = 
FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY,
 TimeUnit.MILLISECONDS);
+        }
+        componentStatusSnapshotMillis = snapshotMillis;
+    }
+
+    private Map<String, MetricDescriptor<?>> getMetricDescriptors(final URI 
uri) {
+        final String path = uri.getPath();
+
+        final Map<String, MetricDescriptor<?>> metricDescriptors = new 
HashMap<>();
+
+        if (PROCESSOR_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) {
+            for (final ProcessorStatusDescriptor descriptor : 
ProcessorStatusDescriptor.values()) {
+                metricDescriptors.put(descriptor.getField(), 
descriptor.getDescriptor());
+            }
+        } else if 
(PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) {
+            for (final ProcessGroupStatusDescriptor descriptor : 
ProcessGroupStatusDescriptor.values()) {
+                metricDescriptors.put(descriptor.getField(), 
descriptor.getDescriptor());
+            }
+        } else if 
(REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) {
+            for (final RemoteProcessGroupStatusDescriptor descriptor : 
RemoteProcessGroupStatusDescriptor.values()) {
+                metricDescriptors.put(descriptor.getField(), 
descriptor.getDescriptor());
+            }
+        } else if 
(CONNECTION_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) {
+            for (final ConnectionStatusDescriptor descriptor : 
ConnectionStatusDescriptor.values()) {
+                metricDescriptors.put(descriptor.getField(), 
descriptor.getDescriptor());
+            }
+        }
+
+        return metricDescriptors;
+    }
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        if (!"GET".equalsIgnoreCase(method)) {
+            return false;
+        }
+
+        final Map<String, MetricDescriptor<?>> descriptors = 
getMetricDescriptors(uri);
+        return descriptors != null && !descriptors.isEmpty();
+    }
+
+    @Override
+    public NodeResponse merge(URI uri, String method, Set<NodeResponse> 
successfulResponses, Set<NodeResponse> problematicResponses, NodeResponse 
clientResponse) {
+        final Map<String, MetricDescriptor<?>> metricDescriptors = 
getMetricDescriptors(uri);
+
+        final StatusHistoryEntity responseEntity = 
clientResponse.getClientResponse().getEntity(StatusHistoryEntity.class);
+
+        StatusHistoryDTO lastStatusHistory = null;
+        final List<NodeStatusSnapshotsDTO> nodeStatusSnapshots = new 
ArrayList<>(successfulResponses.size());
+        for (final NodeResponse nodeResponse : successfulResponses) {
+            final StatusHistoryEntity nodeResponseEntity = nodeResponse == 
clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(StatusHistoryEntity.class);
+            final StatusHistoryDTO nodeStatus = 
nodeResponseEntity.getStatusHistory();
+            lastStatusHistory = nodeStatus;
+
+            final NodeIdentifier nodeId = nodeResponse.getNodeId();
+            final NodeStatusSnapshotsDTO nodeStatusSnapshot = new 
NodeStatusSnapshotsDTO();
+            nodeStatusSnapshot.setNodeId(nodeId.getId());
+            nodeStatusSnapshot.setAddress(nodeId.getApiAddress());
+            nodeStatusSnapshot.setApiPort(nodeId.getApiPort());
+            
nodeStatusSnapshot.setStatusSnapshots(nodeStatus.getAggregateSnapshots());
+            nodeStatusSnapshots.add(nodeStatusSnapshot);
+        }
+
+        final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO();
+        
clusterStatusHistory.setAggregateSnapshots(mergeStatusHistories(nodeStatusSnapshots,
 metricDescriptors));
+        clusterStatusHistory.setGenerated(new Date());
+        clusterStatusHistory.setNodeSnapshots(nodeStatusSnapshots);
+        if (lastStatusHistory != null) {
+            
clusterStatusHistory.setComponentDetails(lastStatusHistory.getComponentDetails());
+            
clusterStatusHistory.setFieldDescriptors(lastStatusHistory.getFieldDescriptors());
+        }
+
+        final StatusHistoryEntity clusterEntity = new StatusHistoryEntity();
+        clusterEntity.setStatusHistory(clusterStatusHistory);
+        clusterEntity.setRevision(responseEntity.getRevision());
+
+        return new NodeResponse(clientResponse, clusterEntity);
+    }
+
+    private List<StatusSnapshotDTO> mergeStatusHistories(final 
List<NodeStatusSnapshotsDTO> nodeStatusSnapshots, final Map<String, 
MetricDescriptor<?>> metricDescriptors) {
+        // We want a Map<Date, List<StatusSnapshot>>, which is a Map of 
"normalized Date" (i.e., a time range, essentially)
+        // to all Snapshots for that time. The list will contain one snapshot 
for each node. However, we can have the case
+        // where the NCM has a different value for the 
componentStatusSnapshotMillis than the nodes have. In this case,
+        // we end up with multiple entries in the List<StatusSnapshot> for the 
same node/timestamp, which skews our aggregate
+        // results. In order to avoid this, we will use only the latest 
snapshot for a node that falls into the the time range
+        // of interest.
+        // To accomplish this, we have an intermediate data structure, which 
is a Map of "normalized Date" to an inner Map
+        // of Node Identifier to StatusSnapshot. We then will flatten this Map 
and aggregate the results.
+        final Map<Date, Map<String, StatusSnapshot>> dateToNodeSnapshots = new 
TreeMap<>();
+
+        // group status snapshot's for each node by date
+        for (final NodeStatusSnapshotsDTO nodeStatusSnapshot : 
nodeStatusSnapshots) {
+            for (final StatusSnapshotDTO snapshotDto : 
nodeStatusSnapshot.getStatusSnapshots()) {
+                final StatusSnapshot snapshot = createSnapshot(snapshotDto, 
metricDescriptors);
+                final Date normalizedDate = 
normalizeStatusSnapshotDate(snapshot.getTimestamp(), 
componentStatusSnapshotMillis);
+
+                Map<String, StatusSnapshot> nodeToSnapshotMap = 
dateToNodeSnapshots.get(normalizedDate);
+                if (nodeToSnapshotMap == null) {
+                    nodeToSnapshotMap = new HashMap<>();
+                    dateToNodeSnapshots.put(normalizedDate, nodeToSnapshotMap);
+                }
+                nodeToSnapshotMap.put(nodeStatusSnapshot.getNodeId(), 
snapshot);
+            }
+        }
+
+        // aggregate the snapshots by (normalized) timestamp
+        final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new 
TreeMap<>();
+        for (final Map.Entry<Date, Map<String, StatusSnapshot>> entry : 
dateToNodeSnapshots.entrySet()) {
+            final Date normalizedDate = entry.getKey();
+            final Map<String, StatusSnapshot> nodeToSnapshot = 
entry.getValue();
+            final List<StatusSnapshot> snapshotsForTimestamp = new 
ArrayList<>(nodeToSnapshot.values());
+            snapshotsToAggregate.put(normalizedDate, snapshotsForTimestamp);
+        }
+
+        final List<StatusSnapshotDTO> aggregatedSnapshots = 
aggregate(snapshotsToAggregate);
+        return aggregatedSnapshots;
+    }
+
+    private StatusSnapshot createSnapshot(final StatusSnapshotDTO snapshotDto, 
final Map<String, MetricDescriptor<?>> metricDescriptors) {
+        final StandardStatusSnapshot snapshot = new StandardStatusSnapshot();
+        snapshot.setTimestamp(snapshotDto.getTimestamp());
+
+        final Map<String, Long> metrics = snapshotDto.getStatusMetrics();
+        for (final Map.Entry<String, Long> entry : metrics.entrySet()) {
+            final String metricId = entry.getKey();
+            final Long value = entry.getValue();
+
+            final MetricDescriptor<?> descriptor = 
metricDescriptors.get(metricId);
+            if (descriptor != null) {
+                snapshot.addStatusMetric(descriptor, value);
+            }
+        }
+
+        return snapshot;
+    }
+
+    private List<StatusSnapshotDTO> aggregate(Map<Date, List<StatusSnapshot>> 
snapshotsToAggregate) {
+        // Aggregate the snapshots
+        final List<StatusSnapshotDTO> aggregatedSnapshotDtos = new 
ArrayList<>();
+        for (final Map.Entry<Date, List<StatusSnapshot>> entry : 
snapshotsToAggregate.entrySet()) {
+            final List<StatusSnapshot> snapshots = entry.getValue();
+            final StatusSnapshot reducedSnapshot = 
snapshots.get(0).getValueReducer().reduce(snapshots);
+
+            final StatusSnapshotDTO dto = new StatusSnapshotDTO();
+            dto.setTimestamp(reducedSnapshot.getTimestamp());
+            
dto.setStatusMetrics(StatusHistoryUtil.createStatusSnapshotDto(reducedSnapshot).getStatusMetrics());
+
+            aggregatedSnapshotDtos.add(dto);
+        }
+
+        return aggregatedSnapshotDtos;
+    }
+
+    public static Date normalizeStatusSnapshotDate(final Date toNormalize, 
final long numMillis) {
+        final long time = toNormalize.getTime();
+        return new Date(time - time % numMillis);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/SystemDiagnosticsEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/SystemDiagnosticsEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/SystemDiagnosticsEndpointMerger.java
new file mode 100644
index 0000000..6cccc8e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/SystemDiagnosticsEndpointMerger.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.manager.StatusMerger;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsSnapshotDTO;
+import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
+import org.apache.nifi.web.api.entity.SystemDiagnosticsEntity;
+
+public class SystemDiagnosticsEndpointMerger extends 
AbstractNodeStatusEndpoint<SystemDiagnosticsEntity, SystemDiagnosticsDTO> {
+    public static final Pattern SYSTEM_DIAGNOSTICS_URI_PATTERN = 
Pattern.compile("/nifi-api/system-diagnostics");
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        return "GET".equalsIgnoreCase(method) && 
SYSTEM_DIAGNOSTICS_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    @Override
+    protected Class<SystemDiagnosticsEntity> getEntityClass() {
+        return SystemDiagnosticsEntity.class;
+    }
+
+    @Override
+    protected SystemDiagnosticsDTO getDto(SystemDiagnosticsEntity entity) {
+        return entity.getSystemDiagnostics();
+    }
+
+    @Override
+    protected void mergeResponses(SystemDiagnosticsDTO clientDto, 
Map<NodeIdentifier, SystemDiagnosticsDTO> dtoMap, NodeIdentifier 
selectedNodeId) {
+        final SystemDiagnosticsDTO mergedSystemDiagnostics = clientDto;
+        mergedSystemDiagnostics.setNodeSnapshots(new 
ArrayList<NodeSystemDiagnosticsSnapshotDTO>());
+
+        final NodeSystemDiagnosticsSnapshotDTO selectedNodeSnapshot = new 
NodeSystemDiagnosticsSnapshotDTO();
+        
selectedNodeSnapshot.setSnapshot(clientDto.getAggregateSnapshot().clone());
+        selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+        selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+        selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
+
+        mergedSystemDiagnostics.getNodeSnapshots().add(selectedNodeSnapshot);
+
+        for (final Map.Entry<NodeIdentifier, SystemDiagnosticsDTO> entry : 
dtoMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final SystemDiagnosticsDTO toMerge = entry.getValue();
+            if (toMerge == clientDto) {
+                continue;
+            }
+
+            StatusMerger.merge(mergedSystemDiagnostics, toMerge, 
nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
+        }
+    }
+
+}

Reply via email to