NIFI-250:
- Adding active thread count on reporting tasks.
- Merging clustered responses for controller service and reporting task 
endpoints.

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/6b36aefe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/6b36aefe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/6b36aefe

Branch: refs/heads/NIFI-250
Commit: 6b36aefef942482a665182594fd3bb1044cad627
Parents: 79ddcb8
Author: Matt Gilman <[email protected]>
Authored: Fri Mar 13 10:11:17 2015 -0400
Committer: Matt Gilman <[email protected]>
Committed: Fri Mar 13 10:11:17 2015 -0400

----------------------------------------------------------------------
 .../nifi/web/api/dto/ReportingTaskDTO.java      |  22 +-
 .../cluster/manager/impl/WebClusterManager.java | 359 +++++++++++++++++--
 2 files changed, 346 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6b36aefe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ReportingTaskDTO.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ReportingTaskDTO.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ReportingTaskDTO.java
index 3ec048b..b9e7ff1 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ReportingTaskDTO.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ReportingTaskDTO.java
@@ -29,11 +29,11 @@ public class ReportingTaskDTO extends NiFiComponentDTO {
        private String name;
        private String comment;
        private String type;
-       private String schedulingPeriod;
        private String state;
-       private String schedulingStrategy;
        private String availability;
        
+       private String schedulingPeriod;
+       private String schedulingStrategy;
     private Map<String, String> defaultSchedulingPeriod;
     
        private Map<String, String> properties;
@@ -42,7 +42,8 @@ public class ReportingTaskDTO extends NiFiComponentDTO {
     private String annotationData;
     
     private Collection<String> validationErrors;
-
+    private Integer activeThreadCount;
+    
     /**
      * The user-defined name of the reporting task
      * @return
@@ -195,5 +196,18 @@ public class ReportingTaskDTO extends NiFiComponentDTO {
     public void setDefaultSchedulingPeriod(Map<String, String> 
defaultSchedulingPeriod) {
         this.defaultSchedulingPeriod = defaultSchedulingPeriod;
     }
-    
+
+    /**
+     * The number of active threads for this reporting task.
+     * 
+     * @return 
+     */
+    public Integer getActiveThreadCount() {
+        return activeThreadCount;
+    }
+
+    public void setActiveThreadCount(Integer activeThreadCount) {
+        this.activeThreadCount = activeThreadCount;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6b36aefe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 2f17a35..f73ce95 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -219,6 +219,15 @@ import org.xml.sax.SAXException;
 import org.xml.sax.SAXParseException;
 
 import com.sun.jersey.api.client.ClientResponse;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
+import org.apache.nifi.web.api.dto.ReportingTaskDTO;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import 
org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
+import org.apache.nifi.web.api.entity.ControllerServicesEntity;
+import org.apache.nifi.web.api.entity.ReportingTaskEntity;
+import org.apache.nifi.web.api.entity.ReportingTasksEntity;
 
 /**
  * Provides a cluster manager implementation. The manager federates incoming
@@ -306,7 +315,13 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
     public static final String PROVENANCE_URI = 
"/nifi-api/controller/provenance";
     public static final Pattern PROVENANCE_QUERY_URI = 
Pattern.compile("/nifi-api/controller/provenance/[a-f0-9\\-]{36}");
     public static final Pattern PROVENANCE_EVENT_URI = 
Pattern.compile("/nifi-api/controller/provenance/events/[0-9]+");
-
+    
+    public static final String CONTROLLER_SERVICES_URI = 
"/nifi-api/controller/controller-services/node";
+    public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}");
+    public static final Pattern CONTROLLER_SERVICE_REFERENCES_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/references");
+    public static final String REPORTING_TASKS_URI = 
"/nifi-api/controller/reporting-tasks/node";
+    public static final Pattern REPORTING_TASK_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}");
+    
     private final NiFiProperties properties;
     private final HttpRequestReplicator httpRequestReplicator;
     private final HttpResponseMapper httpResponseMapper;
@@ -2362,13 +2377,51 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
     private static boolean isProvenanceEventEndpoint(final URI uri, final 
String method) {
         return "GET".equalsIgnoreCase(method) && 
PROVENANCE_EVENT_URI.matcher(uri.getPath()).matches();
     }
+    
+    private static boolean isControllerServicesEndpoint(final URI uri, final 
String method) {
+        return "GET".equalsIgnoreCase(method) && 
CONTROLLER_SERVICES_URI.equals(uri.getPath());
+    }
+    
+    private static boolean isControllerServiceEndpoint(final URI uri, final 
String method) {
+        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) 
&& CONTROLLER_SERVICE_URI_PATTERN.matcher(uri.getPath()).matches()) {
+            return true;
+        } else if ("POST".equalsIgnoreCase(method) && 
CONTROLLER_SERVICES_URI.equals(uri.getPath())) {
+            return true;
+        }
+
+        return false;
+    }
+    
+    private static boolean isControllerServiceReferenceEndpoint(final URI uri, 
final String method) {
+        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) 
&& CONTROLLER_SERVICE_REFERENCES_URI_PATTERN.matcher(uri.getPath()).matches()) {
+            return true;
+        }
+        
+        return false;
+    }
+    
+    private static boolean isReportingTasksEndpoint(final URI uri, final 
String method) {
+        return "GET".equalsIgnoreCase(method) && 
REPORTING_TASKS_URI.equals(uri.getPath());
+    }
+    
+    private static boolean isReportingTaskEndpoint(final URI uri, final String 
method) {
+        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) 
&& REPORTING_TASK_URI_PATTERN.matcher(uri.getPath()).matches()) {
+            return true;
+        } else if ("POST".equalsIgnoreCase(method) && 
REPORTING_TASKS_URI.equals(uri.getPath())) {
+            return true;
+        }
+
+        return false;
+    }
 
     static boolean isResponseInterpreted(final URI uri, final String method) {
         return isProcessorsEndpoint(uri, method) || isProcessorEndpoint(uri, 
method)
                 || isRemoteProcessGroupsEndpoint(uri, method) || 
isRemoteProcessGroupEndpoint(uri, method)
                 || isProcessGroupEndpoint(uri, method)
                 || isTemplateEndpoint(uri, method) || 
isFlowSnippetEndpoint(uri, method)
-                || isProvenanceQueryEndpoint(uri, method) || 
isProvenanceEventEndpoint(uri, method);
+                || isProvenanceQueryEndpoint(uri, method) || 
isProvenanceEventEndpoint(uri, method)
+                || isControllerServicesEndpoint(uri, method) || 
isControllerServiceEndpoint(uri, method) || 
isControllerServiceReferenceEndpoint(uri, method)
+                || isReportingTasksEndpoint(uri, method) || 
isReportingTaskEndpoint(uri, method);
     }
 
     private void mergeProcessorValidationErrors(final ProcessorDTO processor, 
Map<NodeIdentifier, ProcessorDTO> processorMap) {
@@ -2378,37 +2431,12 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
             final NodeIdentifier nodeId = nodeEntry.getKey();
             final ProcessorDTO nodeProcessor = nodeEntry.getValue();
 
-            // get the processor's validation errors and put them into a map
-            // where the key is the validation error and the value is the set 
of all
-            // nodes that reported that validation error.
-            final Collection<String> nodeValidationErrors = 
nodeProcessor.getValidationErrors();
-            if (nodeValidationErrors != null) {
-                for (final String nodeValidationError : nodeValidationErrors) {
-                    Set<NodeIdentifier> nodeSet = 
validationErrorMap.get(nodeValidationError);
-                    if (nodeSet == null) {
-                        nodeSet = new HashSet<>();
-                        validationErrorMap.put(nodeValidationError, nodeSet);
-                    }
-                    nodeSet.add(nodeId);
-                }
-            }
-        }
-
-        final Set<String> normalizedValidationErrors = new HashSet<>();
-        for (final Map.Entry<String, Set<NodeIdentifier>> validationEntry : 
validationErrorMap.entrySet()) {
-            final String msg = validationEntry.getKey();
-            final Set<NodeIdentifier> nodeIds = validationEntry.getValue();
-
-            if (nodeIds.size() == processorMap.size()) {
-                normalizedValidationErrors.add(msg);
-            } else {
-                for (final NodeIdentifier nodeId : nodeIds) {
-                    normalizedValidationErrors.add(nodeId.getApiAddress() + 
":" + nodeId.getApiPort() + " -- " + msg);
-                }
-            }
+            // merge the validation errors
+            mergeValidationErrors(validationErrorMap, nodeId, 
nodeProcessor.getValidationErrors());
         }
 
-        processor.setValidationErrors(normalizedValidationErrors);
+        // set the merged the validation errors
+        
processor.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap,
 processorMap.size()));
     }
 
     private void mergeProvenanceQueryResults(final ProvenanceDTO 
provenanceDto, final Map<NodeIdentifier, ProvenanceDTO> resultMap, final 
Set<NodeResponse> problematicResponses) {
@@ -2576,7 +2604,156 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
             
remoteProcessGroup.setAuthorizationIssues(mergedAuthorizationIssues);
         }
     }
+    
+    private void mergeControllerServiceReferences(final 
Set<ControllerServiceReferencingComponentDTO> referencingComponents, final 
Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> 
referencingComponentMap) {
+        final Map<String, Integer> activeThreadCounts = new HashMap<>();
+        final Map<String, String> states = new HashMap<>();
+        for (final Map.Entry<NodeIdentifier, 
Set<ControllerServiceReferencingComponentDTO>> nodeEntry : 
referencingComponentMap.entrySet()) {
+            final Set<ControllerServiceReferencingComponentDTO> 
nodeReferencingComponents = nodeEntry.getValue();
+
+            // go through all the nodes referencing components
+            for (final ControllerServiceReferencingComponentDTO 
nodeReferencingComponent : nodeReferencingComponents) {
+                // handle active thread counts
+                if (nodeReferencingComponent.getActiveThreadCount() != null && 
nodeReferencingComponent.getActiveThreadCount() > 0) {
+                    final Integer current = 
activeThreadCounts.get(nodeReferencingComponent.getId());
+                    if (current == null) {
+                        
activeThreadCounts.put(nodeReferencingComponent.getId(), 
nodeReferencingComponent.getActiveThreadCount());
+                    } else {
+                        
activeThreadCounts.put(nodeReferencingComponent.getId(), 
nodeReferencingComponent.getActiveThreadCount() + current);
+                    }
+                }
+                
+                // handle controller service state
+                final String state = 
states.get(nodeReferencingComponent.getId());
+                if (state == null) {
+                    if 
(ControllerServiceState.DISABLING.name().equals(nodeReferencingComponent.getState()))
 {
+                        states.put(nodeReferencingComponent.getId(), 
ControllerServiceState.DISABLING.name());
+                    } else if 
(ControllerServiceState.ENABLING.name().equals(nodeReferencingComponent.getState()))
 {
+                        states.put(nodeReferencingComponent.getId(), 
ControllerServiceState.ENABLING.name());
+                    }
+                }
+            }
+        }            
+
+        // go through each referencing components
+        for (final ControllerServiceReferencingComponentDTO 
referencingComponent : referencingComponents) {
+            final Integer activeThreadCount = 
activeThreadCounts.get(referencingComponent.getId());
+            if (activeThreadCount != null) {
+                referencingComponent.setActiveThreadCount(activeThreadCount);
+            }
+            
+            final String state = states.get(referencingComponent.getId());
+            if (state != null) {
+                referencingComponent.setState(state);
+            }
+        }
+    }
+    
+    private void mergeControllerService(final ControllerServiceDTO 
controllerService, final Map<NodeIdentifier, ControllerServiceDTO> 
controllerServiceMap) {
+        final Map<String, Set<NodeIdentifier>> validationErrorMap = new 
HashMap<>();
+        final Set<ControllerServiceReferencingComponentDTO> 
referencingComponents = controllerService.getReferencingComponents();
+        final Map<NodeIdentifier, 
Set<ControllerServiceReferencingComponentDTO>> nodeReferencingComponentsMap = 
new HashMap<>();
+        
+        String state = null;
+        for (final Map.Entry<NodeIdentifier, ControllerServiceDTO> nodeEntry : 
controllerServiceMap.entrySet()) {
+            final NodeIdentifier nodeId = nodeEntry.getKey();
+            final ControllerServiceDTO nodeControllerService = 
nodeEntry.getValue();
+            
+            if (state == null) {
+                if 
(ControllerServiceState.DISABLING.name().equals(nodeControllerService.getState()))
 {
+                    state = ControllerServiceState.DISABLING.name();
+                } else if 
(ControllerServiceState.ENABLING.name().equals(nodeControllerService.getState()))
 {
+                    state = ControllerServiceState.ENABLING.name();
+                }
+            }
+            
+            for (final ControllerServiceReferencingComponentDTO 
nodeReferencingComponents : nodeControllerService.getReferencingComponents()) {
+                nodeReferencingComponentsMap.put(nodeId, 
nodeReferencingComponents.getReferencingComponents());
+            }
+            
+            // merge the validation errors
+            mergeValidationErrors(validationErrorMap, nodeId, 
nodeControllerService.getValidationErrors());
+        }
+        
+        // merge the referencing components
+        mergeControllerServiceReferences(referencingComponents, 
nodeReferencingComponentsMap);
+        
+        // store the 'transition' state is applicable
+        if (state != null) {
+            controllerService.setState(state);
+        }
+        
+        // set the merged the validation errors
+        
controllerService.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap,
 controllerServiceMap.size()));
+    }
+    
+    private void mergeReportingTask(final ReportingTaskDTO reportingTask, 
final Map<NodeIdentifier, ReportingTaskDTO> reportingTaskMap) {
+        final Map<String, Set<NodeIdentifier>> validationErrorMap = new 
HashMap<>();
+
+        int activeThreadCount = 0;
+        for (final Map.Entry<NodeIdentifier, ReportingTaskDTO> nodeEntry : 
reportingTaskMap.entrySet()) {
+            final NodeIdentifier nodeId = nodeEntry.getKey();
+            final ReportingTaskDTO nodeReportingTask = nodeEntry.getValue();
+
+            if (nodeReportingTask.getActiveThreadCount() != null) {
+                activeThreadCount += nodeReportingTask.getActiveThreadCount();
+            }
+            
+            // merge the validation errors
+            mergeValidationErrors(validationErrorMap, nodeId, 
nodeReportingTask.getValidationErrors());
+        }
+
+        // set the merged active thread counts
+        reportingTask.setActiveThreadCount(activeThreadCount);
+        
+        // set the merged the validation errors
+        
reportingTask.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap,
 reportingTaskMap.size()));
+    }
 
+    /**
+     * Merges the validation errors into the specified map, recording the 
corresponding node identifier.
+     * 
+     * @param validationErrorMap
+     * @param nodeId
+     * @param nodeValidationErrors 
+     */
+    public void mergeValidationErrors(final Map<String, Set<NodeIdentifier>> 
validationErrorMap, final NodeIdentifier nodeId, final Collection<String> 
nodeValidationErrors) {
+        if (nodeValidationErrors != null) {
+            for (final String nodeValidationError : nodeValidationErrors) {
+                Set<NodeIdentifier> nodeSet = 
validationErrorMap.get(nodeValidationError);
+                if (nodeSet == null) {
+                    nodeSet = new HashSet<>();
+                    validationErrorMap.put(nodeValidationError, nodeSet);
+                }
+                nodeSet.add(nodeId);
+            }
+        }
+    }
+    
+    /**
+     * Normalizes the validation errors by prepending the corresponding nodes 
when the error does not exist across all nodes.
+     * 
+     * @param validationErrorMap
+     * @param totalNodes
+     * @return 
+     */
+    public Set<String> normalizedMergedValidationErrors(final Map<String, 
Set<NodeIdentifier>> validationErrorMap, int totalNodes) {
+        final Set<String> normalizedValidationErrors = new HashSet<>();
+        for (final Map.Entry<String, Set<NodeIdentifier>> validationEntry : 
validationErrorMap.entrySet()) {
+            final String msg = validationEntry.getKey();
+            final Set<NodeIdentifier> nodeIds = validationEntry.getValue();
+
+            if (nodeIds.size() == totalNodes) {
+                normalizedValidationErrors.add(msg);
+            } else {
+                for (final NodeIdentifier nodeId : nodeIds) {
+                    normalizedValidationErrors.add(nodeId.getApiAddress() + 
":" + nodeId.getApiPort() + " -- " + msg);
+                }
+            }
+        }
+        return normalizedValidationErrors;
+    }
+    
     // requires write lock to be already acquired unless request is not mutable
     private NodeResponse mergeResponses(final URI uri, final String method, 
final Set<NodeResponse> nodeResponses, final boolean mutableRequest) {
         // holds the one response of all the node responses to return to the 
client
@@ -2866,6 +3043,126 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
             event.setClusterNodeAddress(nodeId.getApiAddress() + ":" + 
nodeId.getApiPort());
 
             clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && 
isControllerServiceEndpoint(uri, method)) {
+            final ControllerServiceEntity responseEntity = 
clientResponse.getClientResponse().getEntity(ControllerServiceEntity.class);
+            final ControllerServiceDTO controllerService = 
responseEntity.getControllerService();
+            
+            final Map<NodeIdentifier, ControllerServiceDTO> resultsMap = new 
HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final ControllerServiceEntity nodeResponseEntity = 
(nodeResponse == clientResponse) ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ControllerServiceEntity.class);
+                final ControllerServiceDTO nodeControllerService = 
nodeResponseEntity.getControllerService();
+
+                resultsMap.put(nodeResponse.getNodeId(), 
nodeControllerService);
+            }
+            mergeControllerService(controllerService, resultsMap);
+            
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && 
isControllerServicesEndpoint(uri, method)) {
+            final ControllerServicesEntity responseEntity = 
clientResponse.getClientResponse().getEntity(ControllerServicesEntity.class);
+            final Set<ControllerServiceDTO> controllerServices = 
responseEntity.getControllerServices();
+            
+            final Map<String, Map<NodeIdentifier, ControllerServiceDTO>> 
controllerServiceMap = new HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final ControllerServicesEntity nodeResponseEntity = 
(nodeResponse == clientResponse) ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ControllerServicesEntity.class);
+                final Set<ControllerServiceDTO> nodeControllerServices = 
nodeResponseEntity.getControllerServices();
+
+                for (final ControllerServiceDTO nodeControllerService : 
nodeControllerServices) {
+                    Map<NodeIdentifier, ControllerServiceDTO> innerMap = 
controllerServiceMap.get(nodeControllerService.getId());
+                    if (innerMap == null) {
+                        innerMap = new HashMap<>();
+                        
controllerServiceMap.put(nodeControllerService.getId(), innerMap);
+                    }
+
+                    innerMap.put(nodeResponse.getNodeId(), 
nodeControllerService);
+                }
+            }
+
+            for (final ControllerServiceDTO controllerService : 
controllerServices) {
+                final String procId = controllerService.getId();
+                final Map<NodeIdentifier, ControllerServiceDTO> mergeMap = 
controllerServiceMap.get(procId);
+
+                mergeControllerService(controllerService, mergeMap);
+            }
+
+            // create a new client response
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && 
isControllerServiceReferenceEndpoint(uri, method)) {
+            final ControllerServiceReferencingComponentsEntity responseEntity 
= 
clientResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
+            final Set<ControllerServiceReferencingComponentDTO> 
referencingComponents = 
responseEntity.getControllerServiceReferencingComponents();
+            
+            final Map<NodeIdentifier, 
Set<ControllerServiceReferencingComponentDTO>> resultsMap = new HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final ControllerServiceReferencingComponentsEntity 
nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
+                final Set<ControllerServiceReferencingComponentDTO> 
nodeReferencingComponents = 
nodeResponseEntity.getControllerServiceReferencingComponents();
+
+                resultsMap.put(nodeResponse.getNodeId(), 
nodeReferencingComponents);
+            }
+            mergeControllerServiceReferences(referencingComponents, 
resultsMap);
+            
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && isReportingTaskEndpoint(uri, 
method)) {
+            final ReportingTaskEntity responseEntity = 
clientResponse.getClientResponse().getEntity(ReportingTaskEntity.class);
+            final ReportingTaskDTO reportingTask = 
responseEntity.getReportingTask();
+            
+            final Map<NodeIdentifier, ReportingTaskDTO> resultsMap = new 
HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final ReportingTaskEntity nodeResponseEntity = (nodeResponse 
== clientResponse) ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ReportingTaskEntity.class);
+                final ReportingTaskDTO nodeReportingTask = 
nodeResponseEntity.getReportingTask();
+
+                resultsMap.put(nodeResponse.getNodeId(), nodeReportingTask);
+            }
+            mergeReportingTask(reportingTask, resultsMap);
+            
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && 
isReportingTasksEndpoint(uri, method)) {
+            final ReportingTasksEntity responseEntity = 
clientResponse.getClientResponse().getEntity(ReportingTasksEntity.class);
+            final Set<ReportingTaskDTO> reportingTaskSet = 
responseEntity.getReportingTasks();
+            
+            final Map<String, Map<NodeIdentifier, ReportingTaskDTO>> 
reportingTaskMap = new HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final ReportingTasksEntity nodeResponseEntity = (nodeResponse 
== clientResponse) ? responseEntity : 
nodeResponse.getClientResponse().getEntity(ReportingTasksEntity.class);
+                final Set<ReportingTaskDTO> nodeReportingTasks = 
nodeResponseEntity.getReportingTasks();
+
+                for (final ReportingTaskDTO nodeReportingTask : 
nodeReportingTasks) {
+                    Map<NodeIdentifier, ReportingTaskDTO> innerMap = 
reportingTaskMap.get(nodeReportingTask.getId());
+                    if (innerMap == null) {
+                        innerMap = new HashMap<>();
+                        reportingTaskMap.put(nodeReportingTask.getId(), 
innerMap);
+                    }
+
+                    innerMap.put(nodeResponse.getNodeId(), nodeReportingTask);
+                }
+            }
+
+            for (final ReportingTaskDTO reportingTask : reportingTaskSet) {
+                final String procId = reportingTask.getId();
+                final Map<NodeIdentifier, ReportingTaskDTO> mergeMap = 
reportingTaskMap.get(procId);
+
+                mergeReportingTask(reportingTask, mergeMap);
+            }
+
+            // create a new client response
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
         } else {
             if (!nodeResponsesToDrain.isEmpty()) {
                 drainResponses(nodeResponsesToDrain);

Reply via email to