NIFI-250: - Adding active thread count on reporting tasks. - Merging clustered responses for controller service and reporting task endpoints.
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/6b36aefe Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/6b36aefe Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/6b36aefe Branch: refs/heads/NIFI-250 Commit: 6b36aefef942482a665182594fd3bb1044cad627 Parents: 79ddcb8 Author: Matt Gilman <[email protected]> Authored: Fri Mar 13 10:11:17 2015 -0400 Committer: Matt Gilman <[email protected]> Committed: Fri Mar 13 10:11:17 2015 -0400 ---------------------------------------------------------------------- .../nifi/web/api/dto/ReportingTaskDTO.java | 22 +- .../cluster/manager/impl/WebClusterManager.java | 359 +++++++++++++++++-- 2 files changed, 346 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6b36aefe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ReportingTaskDTO.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ReportingTaskDTO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ReportingTaskDTO.java index 3ec048b..b9e7ff1 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ReportingTaskDTO.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ReportingTaskDTO.java @@ -29,11 +29,11 @@ public class ReportingTaskDTO extends NiFiComponentDTO { private String name; private String comment; private String type; - private String schedulingPeriod; private String state; - private String schedulingStrategy; private String availability; + private String schedulingPeriod; + private String schedulingStrategy; private Map<String, String> defaultSchedulingPeriod; private Map<String, String> properties; @@ -42,7 +42,8 @@ public class ReportingTaskDTO extends NiFiComponentDTO { private String annotationData; private Collection<String> validationErrors; - + private Integer activeThreadCount; + /** * The user-defined name of the reporting task * @return @@ -195,5 +196,18 @@ public class ReportingTaskDTO extends NiFiComponentDTO { public void setDefaultSchedulingPeriod(Map<String, String> defaultSchedulingPeriod) { this.defaultSchedulingPeriod = defaultSchedulingPeriod; } - + + /** + * The number of active threads for this reporting task. + * + * @return + */ + public Integer getActiveThreadCount() { + return activeThreadCount; + } + + public void setActiveThreadCount(Integer activeThreadCount) { + this.activeThreadCount = activeThreadCount; + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6b36aefe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index 2f17a35..f73ce95 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -219,6 +219,15 @@ import org.xml.sax.SAXException; import org.xml.sax.SAXParseException; import com.sun.jersey.api.client.ClientResponse; +import org.apache.nifi.controller.service.ControllerServiceState; +import org.apache.nifi.web.api.dto.ControllerServiceDTO; +import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; +import org.apache.nifi.web.api.dto.ReportingTaskDTO; +import org.apache.nifi.web.api.entity.ControllerServiceEntity; +import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity; +import org.apache.nifi.web.api.entity.ControllerServicesEntity; +import org.apache.nifi.web.api.entity.ReportingTaskEntity; +import org.apache.nifi.web.api.entity.ReportingTasksEntity; /** * Provides a cluster manager implementation. The manager federates incoming @@ -306,7 +315,13 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public static final String PROVENANCE_URI = "/nifi-api/controller/provenance"; public static final Pattern PROVENANCE_QUERY_URI = Pattern.compile("/nifi-api/controller/provenance/[a-f0-9\\-]{36}"); public static final Pattern PROVENANCE_EVENT_URI = Pattern.compile("/nifi-api/controller/provenance/events/[0-9]+"); - + + public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller/controller-services/node"; + public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}"); + public static final Pattern CONTROLLER_SERVICE_REFERENCES_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/references"); + public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node"; + public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}"); + private final NiFiProperties properties; private final HttpRequestReplicator httpRequestReplicator; private final HttpResponseMapper httpResponseMapper; @@ -2362,13 +2377,51 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C private static boolean isProvenanceEventEndpoint(final URI uri, final String method) { return "GET".equalsIgnoreCase(method) && PROVENANCE_EVENT_URI.matcher(uri.getPath()).matches(); } + + private static boolean isControllerServicesEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.equals(uri.getPath()); + } + + private static boolean isControllerServiceEndpoint(final URI uri, final String method) { + if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_URI_PATTERN.matcher(uri.getPath()).matches()) { + return true; + } else if ("POST".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.equals(uri.getPath())) { + return true; + } + + return false; + } + + private static boolean isControllerServiceReferenceEndpoint(final URI uri, final String method) { + if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_REFERENCES_URI_PATTERN.matcher(uri.getPath()).matches()) { + return true; + } + + return false; + } + + private static boolean isReportingTasksEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && REPORTING_TASKS_URI.equals(uri.getPath()); + } + + private static boolean isReportingTaskEndpoint(final URI uri, final String method) { + if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && REPORTING_TASK_URI_PATTERN.matcher(uri.getPath()).matches()) { + return true; + } else if ("POST".equalsIgnoreCase(method) && REPORTING_TASKS_URI.equals(uri.getPath())) { + return true; + } + + return false; + } static boolean isResponseInterpreted(final URI uri, final String method) { return isProcessorsEndpoint(uri, method) || isProcessorEndpoint(uri, method) || isRemoteProcessGroupsEndpoint(uri, method) || isRemoteProcessGroupEndpoint(uri, method) || isProcessGroupEndpoint(uri, method) || isTemplateEndpoint(uri, method) || isFlowSnippetEndpoint(uri, method) - || isProvenanceQueryEndpoint(uri, method) || isProvenanceEventEndpoint(uri, method); + || isProvenanceQueryEndpoint(uri, method) || isProvenanceEventEndpoint(uri, method) + || isControllerServicesEndpoint(uri, method) || isControllerServiceEndpoint(uri, method) || isControllerServiceReferenceEndpoint(uri, method) + || isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method); } private void mergeProcessorValidationErrors(final ProcessorDTO processor, Map<NodeIdentifier, ProcessorDTO> processorMap) { @@ -2378,37 +2431,12 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C final NodeIdentifier nodeId = nodeEntry.getKey(); final ProcessorDTO nodeProcessor = nodeEntry.getValue(); - // get the processor's validation errors and put them into a map - // where the key is the validation error and the value is the set of all - // nodes that reported that validation error. - final Collection<String> nodeValidationErrors = nodeProcessor.getValidationErrors(); - if (nodeValidationErrors != null) { - for (final String nodeValidationError : nodeValidationErrors) { - Set<NodeIdentifier> nodeSet = validationErrorMap.get(nodeValidationError); - if (nodeSet == null) { - nodeSet = new HashSet<>(); - validationErrorMap.put(nodeValidationError, nodeSet); - } - nodeSet.add(nodeId); - } - } - } - - final Set<String> normalizedValidationErrors = new HashSet<>(); - for (final Map.Entry<String, Set<NodeIdentifier>> validationEntry : validationErrorMap.entrySet()) { - final String msg = validationEntry.getKey(); - final Set<NodeIdentifier> nodeIds = validationEntry.getValue(); - - if (nodeIds.size() == processorMap.size()) { - normalizedValidationErrors.add(msg); - } else { - for (final NodeIdentifier nodeId : nodeIds) { - normalizedValidationErrors.add(nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " -- " + msg); - } - } + // merge the validation errors + mergeValidationErrors(validationErrorMap, nodeId, nodeProcessor.getValidationErrors()); } - processor.setValidationErrors(normalizedValidationErrors); + // set the merged the validation errors + processor.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, processorMap.size())); } private void mergeProvenanceQueryResults(final ProvenanceDTO provenanceDto, final Map<NodeIdentifier, ProvenanceDTO> resultMap, final Set<NodeResponse> problematicResponses) { @@ -2576,7 +2604,156 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C remoteProcessGroup.setAuthorizationIssues(mergedAuthorizationIssues); } } + + private void mergeControllerServiceReferences(final Set<ControllerServiceReferencingComponentDTO> referencingComponents, final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> referencingComponentMap) { + final Map<String, Integer> activeThreadCounts = new HashMap<>(); + final Map<String, String> states = new HashMap<>(); + for (final Map.Entry<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> nodeEntry : referencingComponentMap.entrySet()) { + final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeEntry.getValue(); + + // go through all the nodes referencing components + for (final ControllerServiceReferencingComponentDTO nodeReferencingComponent : nodeReferencingComponents) { + // handle active thread counts + if (nodeReferencingComponent.getActiveThreadCount() != null && nodeReferencingComponent.getActiveThreadCount() > 0) { + final Integer current = activeThreadCounts.get(nodeReferencingComponent.getId()); + if (current == null) { + activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount()); + } else { + activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount() + current); + } + } + + // handle controller service state + final String state = states.get(nodeReferencingComponent.getId()); + if (state == null) { + if (ControllerServiceState.DISABLING.name().equals(nodeReferencingComponent.getState())) { + states.put(nodeReferencingComponent.getId(), ControllerServiceState.DISABLING.name()); + } else if (ControllerServiceState.ENABLING.name().equals(nodeReferencingComponent.getState())) { + states.put(nodeReferencingComponent.getId(), ControllerServiceState.ENABLING.name()); + } + } + } + } + + // go through each referencing components + for (final ControllerServiceReferencingComponentDTO referencingComponent : referencingComponents) { + final Integer activeThreadCount = activeThreadCounts.get(referencingComponent.getId()); + if (activeThreadCount != null) { + referencingComponent.setActiveThreadCount(activeThreadCount); + } + + final String state = states.get(referencingComponent.getId()); + if (state != null) { + referencingComponent.setState(state); + } + } + } + + private void mergeControllerService(final ControllerServiceDTO controllerService, final Map<NodeIdentifier, ControllerServiceDTO> controllerServiceMap) { + final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>(); + final Set<ControllerServiceReferencingComponentDTO> referencingComponents = controllerService.getReferencingComponents(); + final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> nodeReferencingComponentsMap = new HashMap<>(); + + String state = null; + for (final Map.Entry<NodeIdentifier, ControllerServiceDTO> nodeEntry : controllerServiceMap.entrySet()) { + final NodeIdentifier nodeId = nodeEntry.getKey(); + final ControllerServiceDTO nodeControllerService = nodeEntry.getValue(); + + if (state == null) { + if (ControllerServiceState.DISABLING.name().equals(nodeControllerService.getState())) { + state = ControllerServiceState.DISABLING.name(); + } else if (ControllerServiceState.ENABLING.name().equals(nodeControllerService.getState())) { + state = ControllerServiceState.ENABLING.name(); + } + } + + for (final ControllerServiceReferencingComponentDTO nodeReferencingComponents : nodeControllerService.getReferencingComponents()) { + nodeReferencingComponentsMap.put(nodeId, nodeReferencingComponents.getReferencingComponents()); + } + + // merge the validation errors + mergeValidationErrors(validationErrorMap, nodeId, nodeControllerService.getValidationErrors()); + } + + // merge the referencing components + mergeControllerServiceReferences(referencingComponents, nodeReferencingComponentsMap); + + // store the 'transition' state is applicable + if (state != null) { + controllerService.setState(state); + } + + // set the merged the validation errors + controllerService.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, controllerServiceMap.size())); + } + + private void mergeReportingTask(final ReportingTaskDTO reportingTask, final Map<NodeIdentifier, ReportingTaskDTO> reportingTaskMap) { + final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>(); + + int activeThreadCount = 0; + for (final Map.Entry<NodeIdentifier, ReportingTaskDTO> nodeEntry : reportingTaskMap.entrySet()) { + final NodeIdentifier nodeId = nodeEntry.getKey(); + final ReportingTaskDTO nodeReportingTask = nodeEntry.getValue(); + + if (nodeReportingTask.getActiveThreadCount() != null) { + activeThreadCount += nodeReportingTask.getActiveThreadCount(); + } + + // merge the validation errors + mergeValidationErrors(validationErrorMap, nodeId, nodeReportingTask.getValidationErrors()); + } + + // set the merged active thread counts + reportingTask.setActiveThreadCount(activeThreadCount); + + // set the merged the validation errors + reportingTask.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, reportingTaskMap.size())); + } + /** + * Merges the validation errors into the specified map, recording the corresponding node identifier. + * + * @param validationErrorMap + * @param nodeId + * @param nodeValidationErrors + */ + public void mergeValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, final NodeIdentifier nodeId, final Collection<String> nodeValidationErrors) { + if (nodeValidationErrors != null) { + for (final String nodeValidationError : nodeValidationErrors) { + Set<NodeIdentifier> nodeSet = validationErrorMap.get(nodeValidationError); + if (nodeSet == null) { + nodeSet = new HashSet<>(); + validationErrorMap.put(nodeValidationError, nodeSet); + } + nodeSet.add(nodeId); + } + } + } + + /** + * Normalizes the validation errors by prepending the corresponding nodes when the error does not exist across all nodes. + * + * @param validationErrorMap + * @param totalNodes + * @return + */ + public Set<String> normalizedMergedValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, int totalNodes) { + final Set<String> normalizedValidationErrors = new HashSet<>(); + for (final Map.Entry<String, Set<NodeIdentifier>> validationEntry : validationErrorMap.entrySet()) { + final String msg = validationEntry.getKey(); + final Set<NodeIdentifier> nodeIds = validationEntry.getValue(); + + if (nodeIds.size() == totalNodes) { + normalizedValidationErrors.add(msg); + } else { + for (final NodeIdentifier nodeId : nodeIds) { + normalizedValidationErrors.add(nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " -- " + msg); + } + } + } + return normalizedValidationErrors; + } + // requires write lock to be already acquired unless request is not mutable private NodeResponse mergeResponses(final URI uri, final String method, final Set<NodeResponse> nodeResponses, final boolean mutableRequest) { // holds the one response of all the node responses to return to the client @@ -2866,6 +3043,126 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C event.setClusterNodeAddress(nodeId.getApiAddress() + ":" + nodeId.getApiPort()); clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isControllerServiceEndpoint(uri, method)) { + final ControllerServiceEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServiceEntity.class); + final ControllerServiceDTO controllerService = responseEntity.getControllerService(); + + final Map<NodeIdentifier, ControllerServiceDTO> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final ControllerServiceEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceEntity.class); + final ControllerServiceDTO nodeControllerService = nodeResponseEntity.getControllerService(); + + resultsMap.put(nodeResponse.getNodeId(), nodeControllerService); + } + mergeControllerService(controllerService, resultsMap); + + clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isControllerServicesEndpoint(uri, method)) { + final ControllerServicesEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServicesEntity.class); + final Set<ControllerServiceDTO> controllerServices = responseEntity.getControllerServices(); + + final Map<String, Map<NodeIdentifier, ControllerServiceDTO>> controllerServiceMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final ControllerServicesEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServicesEntity.class); + final Set<ControllerServiceDTO> nodeControllerServices = nodeResponseEntity.getControllerServices(); + + for (final ControllerServiceDTO nodeControllerService : nodeControllerServices) { + Map<NodeIdentifier, ControllerServiceDTO> innerMap = controllerServiceMap.get(nodeControllerService.getId()); + if (innerMap == null) { + innerMap = new HashMap<>(); + controllerServiceMap.put(nodeControllerService.getId(), innerMap); + } + + innerMap.put(nodeResponse.getNodeId(), nodeControllerService); + } + } + + for (final ControllerServiceDTO controllerService : controllerServices) { + final String procId = controllerService.getId(); + final Map<NodeIdentifier, ControllerServiceDTO> mergeMap = controllerServiceMap.get(procId); + + mergeControllerService(controllerService, mergeMap); + } + + // create a new client response + clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isControllerServiceReferenceEndpoint(uri, method)) { + final ControllerServiceReferencingComponentsEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class); + final Set<ControllerServiceReferencingComponentDTO> referencingComponents = responseEntity.getControllerServiceReferencingComponents(); + + final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final ControllerServiceReferencingComponentsEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class); + final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeResponseEntity.getControllerServiceReferencingComponents(); + + resultsMap.put(nodeResponse.getNodeId(), nodeReferencingComponents); + } + mergeControllerServiceReferences(referencingComponents, resultsMap); + + clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isReportingTaskEndpoint(uri, method)) { + final ReportingTaskEntity responseEntity = clientResponse.getClientResponse().getEntity(ReportingTaskEntity.class); + final ReportingTaskDTO reportingTask = responseEntity.getReportingTask(); + + final Map<NodeIdentifier, ReportingTaskDTO> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final ReportingTaskEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTaskEntity.class); + final ReportingTaskDTO nodeReportingTask = nodeResponseEntity.getReportingTask(); + + resultsMap.put(nodeResponse.getNodeId(), nodeReportingTask); + } + mergeReportingTask(reportingTask, resultsMap); + + clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && isReportingTasksEndpoint(uri, method)) { + final ReportingTasksEntity responseEntity = clientResponse.getClientResponse().getEntity(ReportingTasksEntity.class); + final Set<ReportingTaskDTO> reportingTaskSet = responseEntity.getReportingTasks(); + + final Map<String, Map<NodeIdentifier, ReportingTaskDTO>> reportingTaskMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final ReportingTasksEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTasksEntity.class); + final Set<ReportingTaskDTO> nodeReportingTasks = nodeResponseEntity.getReportingTasks(); + + for (final ReportingTaskDTO nodeReportingTask : nodeReportingTasks) { + Map<NodeIdentifier, ReportingTaskDTO> innerMap = reportingTaskMap.get(nodeReportingTask.getId()); + if (innerMap == null) { + innerMap = new HashMap<>(); + reportingTaskMap.put(nodeReportingTask.getId(), innerMap); + } + + innerMap.put(nodeResponse.getNodeId(), nodeReportingTask); + } + } + + for (final ReportingTaskDTO reportingTask : reportingTaskSet) { + final String procId = reportingTask.getId(); + final Map<NodeIdentifier, ReportingTaskDTO> mergeMap = reportingTaskMap.get(procId); + + mergeReportingTask(reportingTask, mergeMap); + } + + // create a new client response + clientResponse = new NodeResponse(clientResponse, responseEntity); } else { if (!nodeResponsesToDrain.isEmpty()) { drainResponses(nodeResponsesToDrain);
