http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ListFlowFilesEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ListFlowFilesEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ListFlowFilesEndpointMerger.java new file mode 100644 index 0000000..415334e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ListFlowFilesEndpointMerger.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.controller.queue.ListFlowFileState; +import org.apache.nifi.web.api.dto.FlowFileSummaryDTO; +import org.apache.nifi.web.api.dto.ListingRequestDTO; +import org.apache.nifi.web.api.dto.QueueSizeDTO; +import org.apache.nifi.web.api.entity.ListingRequestEntity; + +public class ListFlowFilesEndpointMerger extends AbstractSingleEntityEndpoint<ListingRequestEntity, ListingRequestDTO> { + public static final Pattern LISTING_REQUESTS_URI = Pattern.compile("/nifi-api/flowfile-queues/[a-f0-9\\-]{36}/listing-requests"); + public static final Pattern LISTING_REQUEST_URI = Pattern.compile("/nifi-api/flowfile-queues/[a-f0-9\\-]{36}/listing-requests/[a-f0-9\\-]{36}"); + + @Override + public boolean canHandle(URI uri, String method) { + if (("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method)) && LISTING_REQUEST_URI.matcher(uri.getPath()).matches()) { + return true; + } else if ("POST".equalsIgnoreCase(method) && LISTING_REQUESTS_URI.matcher(uri.getPath()).matches()) { + return true; + } + + return false; + } + + @Override + protected Class<ListingRequestEntity> getEntityClass() { + return ListingRequestEntity.class; + } + + @Override + protected ListingRequestDTO getDto(ListingRequestEntity entity) { + return entity.getListingRequest(); + } + + @Override + protected void mergeResponses(ListingRequestDTO clientDto, Map<NodeIdentifier, ListingRequestDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { + final Comparator<FlowFileSummaryDTO> comparator = new Comparator<FlowFileSummaryDTO>() { + @Override + public int compare(final FlowFileSummaryDTO dto1, final FlowFileSummaryDTO dto2) { + int positionCompare = dto1.getPosition().compareTo(dto2.getPosition()); + if (positionCompare != 0) { + return positionCompare; + } + + final String address1 = dto1.getClusterNodeAddress(); + final String address2 = dto2.getClusterNodeAddress(); + if (address1 == null && address2 == null) { + return 0; + } + if (address1 == null) { + return 1; + } + if (address2 == null) { + return -1; + } + return address1.compareTo(address2); + } + }; + + final NavigableSet<FlowFileSummaryDTO> flowFileSummaries = new TreeSet<>(comparator); + + ListFlowFileState state = null; + int numStepsCompleted = 0; + int numStepsTotal = 0; + int objectCount = 0; + long byteCount = 0; + boolean finished = true; + for (final Map.Entry<NodeIdentifier, ListingRequestDTO> entry : dtoMap.entrySet()) { + final NodeIdentifier nodeIdentifier = entry.getKey(); + final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + nodeIdentifier.getApiPort(); + + final ListingRequestDTO nodeRequest = entry.getValue(); + + numStepsTotal++; + if (Boolean.TRUE.equals(nodeRequest.getFinished())) { + numStepsCompleted++; + } + + final QueueSizeDTO nodeQueueSize = nodeRequest.getQueueSize(); + objectCount += nodeQueueSize.getObjectCount(); + byteCount += nodeQueueSize.getByteCount(); + + if (!nodeRequest.getFinished()) { + finished = false; + } + + if (nodeRequest.getLastUpdated().after(clientDto.getLastUpdated())) { + clientDto.setLastUpdated(nodeRequest.getLastUpdated()); + } + + // Keep the state with the lowest ordinal value (the "least completed"). + final ListFlowFileState nodeState = ListFlowFileState.valueOfDescription(nodeRequest.getState()); + if (state == null || state.compareTo(nodeState) > 0) { + state = nodeState; + } + + if (nodeRequest.getFlowFileSummaries() != null) { + for (final FlowFileSummaryDTO summaryDTO : nodeRequest.getFlowFileSummaries()) { + summaryDTO.setClusterNodeId(nodeIdentifier.getId()); + summaryDTO.setClusterNodeAddress(nodeAddress); + + flowFileSummaries.add(summaryDTO); + + // Keep the set from growing beyond our max + if (flowFileSummaries.size() > clientDto.getMaxResults()) { + flowFileSummaries.pollLast(); + } + } + } + + if (nodeRequest.getFailureReason() != null) { + clientDto.setFailureReason(nodeRequest.getFailureReason()); + } + } + + final List<FlowFileSummaryDTO> summaryDTOs = new ArrayList<>(flowFileSummaries); + clientDto.setFlowFileSummaries(summaryDTOs); + // depends on invariant if numStepsTotal is 0, so is numStepsCompleted, all steps being completed + // would be 1 + final int percentCompleted = (numStepsTotal == 0) ? 1 : numStepsCompleted / numStepsTotal; + clientDto.setPercentCompleted(percentCompleted); + clientDto.setFinished(finished); + + clientDto.getQueueSize().setByteCount(byteCount); + clientDto.getQueueSize().setObjectCount(objectCount); + } + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/PortStatusEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/PortStatusEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/PortStatusEndpointMerger.java new file mode 100644 index 0000000..5c570f0 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/PortStatusEndpointMerger.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Map; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.manager.StatusMerger; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.status.NodePortStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.PortStatusDTO; +import org.apache.nifi.web.api.entity.PortStatusEntity; + +public class PortStatusEndpointMerger extends AbstractNodeStatusEndpoint<PortStatusEntity, PortStatusDTO> { + public static final Pattern INPUT_PORT_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/flow/input-ports/[a-f0-9\\-]{36}/status"); + public static final Pattern OUTPUT_PORT_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/flow/output-ports/[a-f0-9\\-]{36}/status"); + + @Override + public boolean canHandle(URI uri, String method) { + return "GET".equalsIgnoreCase(method) && (INPUT_PORT_STATUS_URI_PATTERN.matcher(uri.getPath()).matches() || OUTPUT_PORT_STATUS_URI_PATTERN.matcher(uri.getPath()).matches()); + } + + @Override + protected Class<PortStatusEntity> getEntityClass() { + return PortStatusEntity.class; + } + + @Override + protected PortStatusDTO getDto(PortStatusEntity entity) { + return entity.getPortStatus(); + } + + @Override + protected void mergeResponses(PortStatusDTO clientDto, Map<NodeIdentifier, PortStatusDTO> dtoMap, NodeIdentifier selectedNodeId) { + final PortStatusDTO mergedPortStatus = clientDto; + mergedPortStatus.setNodeSnapshots(new ArrayList<NodePortStatusSnapshotDTO>()); + + final NodePortStatusSnapshotDTO selectedNodeSnapshot = new NodePortStatusSnapshotDTO(); + selectedNodeSnapshot.setStatusSnapshot(clientDto.getAggregateSnapshot().clone()); + selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); + selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); + selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); + + mergedPortStatus.getNodeSnapshots().add(selectedNodeSnapshot); + + // merge the other nodes + for (final Map.Entry<NodeIdentifier, PortStatusDTO> entry : dtoMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final PortStatusDTO nodePortStatus = entry.getValue(); + if (nodePortStatus == clientDto) { + continue; + } + + StatusMerger.merge(mergedPortStatus, nodePortStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupEndpointMerger.java new file mode 100644 index 0000000..bef75a0 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessGroupEndpointMerger.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.FlowSnippetDTO; +import org.apache.nifi.web.api.dto.ProcessGroupDTO; +import org.apache.nifi.web.api.dto.ProcessorDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; +import org.apache.nifi.web.api.entity.ProcessGroupEntity; + +public class ProcessGroupEndpointMerger implements EndpointResponseMerger { + public static final Pattern PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))"); + + @Override + public boolean canHandle(final URI uri, final String method) { + return ("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + @Override + public NodeResponse merge(final URI uri, final String method, final Set<NodeResponse> successfulResponses, final Set<NodeResponse> problematicResponses, final NodeResponse clientResponse) { + if (!canHandle(uri, method)) { + throw new IllegalArgumentException("Cannot use Endpoint Mapper of type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", HTTP Method " + method); + } + + final ProcessGroupEntity responseEntity = clientResponse.getClientResponse().getEntity(ProcessGroupEntity.class); + final ProcessGroupDTO responseDto = responseEntity.getComponent(); + + final FlowSnippetDTO contents = responseDto.getContents(); + if (contents == null) { + return new NodeResponse(clientResponse, responseEntity); + } else { + final Map<String, Map<NodeIdentifier, ProcessorDTO>> processorMap = new HashMap<>(); + final Map<String, Map<NodeIdentifier, RemoteProcessGroupDTO>> remoteProcessGroupMap = new HashMap<>(); + + for (final NodeResponse nodeResponse : successfulResponses) { + final ProcessGroupEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessGroupEntity.class); + final ProcessGroupDTO nodeProcessGroup = nodeResponseEntity.getComponent(); + + for (final ProcessorDTO nodeProcessor : nodeProcessGroup.getContents().getProcessors()) { + Map<NodeIdentifier, ProcessorDTO> innerMap = processorMap.get(nodeProcessor.getId()); + if (innerMap == null) { + innerMap = new HashMap<>(); + processorMap.put(nodeProcessor.getId(), innerMap); + } + + innerMap.put(nodeResponse.getNodeId(), nodeProcessor); + } + + for (final RemoteProcessGroupDTO nodeRemoteProcessGroup : nodeProcessGroup.getContents().getRemoteProcessGroups()) { + Map<NodeIdentifier, RemoteProcessGroupDTO> innerMap = remoteProcessGroupMap.get(nodeRemoteProcessGroup.getId()); + if (innerMap == null) { + innerMap = new HashMap<>(); + remoteProcessGroupMap.put(nodeRemoteProcessGroup.getId(), innerMap); + } + + innerMap.put(nodeResponse.getNodeId(), nodeRemoteProcessGroup); + } + } + + final ProcessorEndpointMerger procMerger = new ProcessorEndpointMerger(); + for (final ProcessorDTO processor : contents.getProcessors()) { + final String procId = processor.getId(); + final Map<NodeIdentifier, ProcessorDTO> mergeMap = processorMap.get(procId); + + procMerger.mergeResponses(processor, mergeMap, successfulResponses, problematicResponses); + } + + final RemoteProcessGroupEndpointMerger rpgMerger = new RemoteProcessGroupEndpointMerger(); + for (final RemoteProcessGroupDTO remoteProcessGroup : contents.getRemoteProcessGroups()) { + if (remoteProcessGroup.getContents() != null) { + final String remoteProcessGroupId = remoteProcessGroup.getId(); + final Map<NodeIdentifier, RemoteProcessGroupDTO> mergeMap = remoteProcessGroupMap.get(remoteProcessGroupId); + + rpgMerger.mergeResponses(remoteProcessGroup, mergeMap, successfulResponses, problematicResponses); + } + } + } + + // create a new client response + return new NodeResponse(clientResponse, responseEntity); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java new file mode 100644 index 0000000..6a040fa --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.ProcessorDTO; +import org.apache.nifi.web.api.entity.ProcessorEntity; + +public class ProcessorEndpointMerger extends AbstractSingleEntityEndpoint<ProcessorEntity, ProcessorDTO> implements EndpointResponseMerger { + public static final Pattern PROCESSORS_URI_PATTERN = Pattern.compile("/nifi-api/processors"); + public static final Pattern PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/processors/[a-f0-9\\-]{36}"); + public static final Pattern CLUSTER_PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/cluster/processors/[a-f0-9\\-]{36}"); + + @Override + public boolean canHandle(final URI uri, final String method) { + if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) + && (PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches() || CLUSTER_PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches())) { + return true; + } else if ("POST".equalsIgnoreCase(method) && PROCESSORS_URI_PATTERN.matcher(uri.getPath()).matches()) { + return true; + } + + return false; + } + + @Override + protected Class<ProcessorEntity> getEntityClass() { + return ProcessorEntity.class; + } + + @Override + protected ProcessorDTO getDto(final ProcessorEntity entity) { + return entity.getComponent(); + } + + @Override + protected void mergeResponses(final ProcessorDTO clientDto, final Map<NodeIdentifier, ProcessorDTO> dtoMap, final Set<NodeResponse> successfulResponses, + final Set<NodeResponse> problematicResponses) { + final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>(); + + for (final Map.Entry<NodeIdentifier, ProcessorDTO> nodeEntry : dtoMap.entrySet()) { + final NodeIdentifier nodeId = nodeEntry.getKey(); + final ProcessorDTO nodeProcessor = nodeEntry.getValue(); + + // merge the validation errors + mergeValidationErrors(validationErrorMap, nodeId, nodeProcessor.getValidationErrors()); + } + + // set the merged the validation errors + clientDto.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, dtoMap.size())); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorStatusEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorStatusEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorStatusEndpointMerger.java new file mode 100644 index 0000000..8fdceb1 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorStatusEndpointMerger.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Map; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.manager.StatusMerger; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.status.NodeProcessorStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; +import org.apache.nifi.web.api.entity.ProcessorStatusEntity; + +public class ProcessorStatusEndpointMerger extends AbstractNodeStatusEndpoint<ProcessorStatusEntity, ProcessorStatusDTO> { + public static final Pattern PROCESSOR_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/flow/processors/[a-f0-9\\-]{36}/status"); + + @Override + public boolean canHandle(URI uri, String method) { + return "GET".equalsIgnoreCase(method) && PROCESSOR_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + @Override + protected Class<ProcessorStatusEntity> getEntityClass() { + return ProcessorStatusEntity.class; + } + + @Override + protected ProcessorStatusDTO getDto(ProcessorStatusEntity entity) { + return entity.getProcessorStatus(); + } + + @Override + protected void mergeResponses(ProcessorStatusDTO clientDto, Map<NodeIdentifier, ProcessorStatusDTO> dtoMap, NodeIdentifier selectedNodeId) { + final ProcessorStatusDTO mergedProcessorStatus = clientDto; + mergedProcessorStatus.setNodeSnapshots(new ArrayList<NodeProcessorStatusSnapshotDTO>()); + + final NodeProcessorStatusSnapshotDTO selectedNodeSnapshot = new NodeProcessorStatusSnapshotDTO(); + selectedNodeSnapshot.setStatusSnapshot(clientDto.getAggregateSnapshot().clone()); + selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); + selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); + selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); + + mergedProcessorStatus.getNodeSnapshots().add(selectedNodeSnapshot); + + // merge the other nodes + for (final Map.Entry<NodeIdentifier, ProcessorStatusDTO> entry : dtoMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final ProcessorStatusDTO nodeProcessorStatus = entry.getValue(); + if (nodeProcessorStatus == clientDto) { + continue; + } + + StatusMerger.merge(mergedProcessorStatus, nodeProcessorStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorsEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorsEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorsEndpointMerger.java new file mode 100644 index 0000000..fa076b9 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorsEndpointMerger.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.ProcessorDTO; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.apache.nifi.web.api.entity.ProcessorsEntity; + +public class ProcessorsEndpointMerger implements EndpointResponseMerger { + public static final Pattern PROCESSORS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors"); + + @Override + public boolean canHandle(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && PROCESSORS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + @Override + public final NodeResponse merge(final URI uri, final String method, final Set<NodeResponse> successfulResponses, final Set<NodeResponse> problematicResponses, final NodeResponse clientResponse) { + if (!canHandle(uri, method)) { + throw new IllegalArgumentException("Cannot use Endpoint Mapper of type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", HTTP Method " + method); + } + + final ProcessorsEntity responseEntity = clientResponse.getClientResponse().getEntity(ProcessorsEntity.class); + final Set<ProcessorEntity> processorEntities = responseEntity.getProcessors(); + + final Map<String, Map<NodeIdentifier, ProcessorDTO>> dtoMap = new HashMap<>(); + for (final NodeResponse nodeResponse : successfulResponses) { + final ProcessorsEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessorsEntity.class); + final Set<ProcessorEntity> nodeProcessorEntities = nodeResponseEntity.getProcessors(); + + for (final ProcessorEntity nodeProcessorEntity : nodeProcessorEntities) { + final NodeIdentifier nodeId = nodeResponse.getNodeId(); + Map<NodeIdentifier, ProcessorDTO> innerMap = dtoMap.get(nodeId); + if (innerMap == null) { + innerMap = new HashMap<>(); + dtoMap.put(nodeProcessorEntity.getId(), innerMap); + } + + innerMap.put(nodeResponse.getNodeId(), nodeProcessorEntity.getComponent()); + } + } + + final ProcessorEndpointMerger procMerger = new ProcessorEndpointMerger(); + for (final ProcessorEntity entity : processorEntities) { + final String componentId = entity.getId(); + final Map<NodeIdentifier, ProcessorDTO> mergeMap = dtoMap.get(componentId); + + procMerger.mergeResponses(entity.getComponent(), mergeMap, successfulResponses, problematicResponses); + } + + // create a new client response + return new NodeResponse(clientResponse, responseEntity); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProvenanceEventEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProvenanceEventEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProvenanceEventEndpointMerger.java new file mode 100644 index 0000000..3f895bd --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProvenanceEventEndpointMerger.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; +import org.apache.nifi.web.api.entity.ProvenanceEventEntity; + +public class ProvenanceEventEndpointMerger extends AbstractSingleEntityEndpoint<ProvenanceEventEntity, ProvenanceEventDTO> { + public static final Pattern PROVENANCE_EVENT_URI = Pattern.compile("/nifi-api/provenance/events/[0-9]+"); + + @Override + public boolean canHandle(URI uri, String method) { + return "GET".equalsIgnoreCase(method) && PROVENANCE_EVENT_URI.matcher(uri.getPath()).matches(); + } + + @Override + protected Class<ProvenanceEventEntity> getEntityClass() { + return ProvenanceEventEntity.class; + } + + @Override + protected ProvenanceEventDTO getDto(ProvenanceEventEntity entity) { + return entity.getProvenanceEvent(); + } + + @Override + protected void mergeResponses(ProvenanceEventDTO clientDto, Map<NodeIdentifier, ProvenanceEventDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { + // The request for a Provenance Event is replicated to a single Node. We simply update its cluster node info. + final NodeIdentifier nodeId = successfulResponses.iterator().next().getNodeId(); + clientDto.setClusterNodeId(nodeId.getId()); + clientDto.setClusterNodeAddress(nodeId.getApiAddress() + ":" + nodeId.getApiPort()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProvenanceQueryEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProvenanceQueryEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProvenanceQueryEndpointMerger.java new file mode 100644 index 0000000..4875499 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProvenanceQueryEndpointMerger.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO; +import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; +import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO; +import org.apache.nifi.web.api.dto.provenance.ProvenanceResultsDTO; +import org.apache.nifi.web.api.entity.ProvenanceEntity; + +public class ProvenanceQueryEndpointMerger implements EndpointResponseMerger { + public static final String PROVENANCE_URI = "/nifi-api/provenance"; + public static final Pattern PROVENANCE_QUERY_URI = Pattern.compile("/nifi-api/provenance/[a-f0-9\\-]{36}"); + + @Override + public boolean canHandle(URI uri, String method) { + if ("POST".equalsIgnoreCase(method) && PROVENANCE_URI.equals(uri.getPath())) { + return true; + } else if ("GET".equalsIgnoreCase(method) && PROVENANCE_QUERY_URI.matcher(uri.getPath()).matches()) { + return true; + } + return false; + } + + + @Override + public NodeResponse merge(URI uri, String method, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses, NodeResponse clientResponse) { + if (!canHandle(uri, method)) { + throw new IllegalArgumentException("Cannot use Endpoint Mapper of type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", HTTP Method " + method); + } + + final ProvenanceEntity responseEntity = clientResponse.getClientResponse().getEntity(ProvenanceEntity.class); + final ProvenanceDTO dto = responseEntity.getProvenance(); + + final Map<NodeIdentifier, ProvenanceDTO> dtoMap = new HashMap<>(); + for (final NodeResponse nodeResponse : successfulResponses) { + final ProvenanceEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProvenanceEntity.class); + final ProvenanceDTO nodeDto = nodeResponseEntity.getProvenance(); + dtoMap.put(nodeResponse.getNodeId(), nodeDto); + } + + mergeResponses(dto, dtoMap, successfulResponses, problematicResponses); + return new NodeResponse(clientResponse, responseEntity); + } + + + protected void mergeResponses(ProvenanceDTO clientDto, Map<NodeIdentifier, ProvenanceDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { + final ProvenanceResultsDTO results = clientDto.getResults(); + final ProvenanceRequestDTO request = clientDto.getRequest(); + final List<ProvenanceEventDTO> allResults = new ArrayList<>(1024); + + final Set<String> errors = new HashSet<>(); + Date oldestEventDate = new Date(); + int percentageComplete = 0; + boolean finished = true; + + long totalRecords = 0; + for (final Map.Entry<NodeIdentifier, ProvenanceDTO> entry : dtoMap.entrySet()) { + final NodeIdentifier nodeIdentifier = entry.getKey(); + final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + nodeIdentifier.getApiPort(); + + final ProvenanceDTO nodeDto = entry.getValue(); + final ProvenanceResultsDTO nodeResultDto = nodeDto.getResults(); + if (nodeResultDto != null && nodeResultDto.getProvenanceEvents() != null) { + // increment the total number of records + totalRecords += nodeResultDto.getTotalCount(); + + // populate the cluster identifier + for (final ProvenanceEventDTO eventDto : nodeResultDto.getProvenanceEvents()) { + eventDto.setClusterNodeId(nodeIdentifier.getId()); + eventDto.setClusterNodeAddress(nodeAddress); + // add node identifier to the event's id so that it is unique across cluster + eventDto.setId(nodeIdentifier.getId() + eventDto.getId()); + allResults.add(eventDto); + } + } + + if (nodeResultDto.getOldestEvent() != null && nodeResultDto.getOldestEvent().before(oldestEventDate)) { + oldestEventDate = nodeResultDto.getOldestEvent(); + } + + if (nodeResultDto.getErrors() != null) { + for (final String error : nodeResultDto.getErrors()) { + errors.add(nodeAddress + " -- " + error); + } + } + + percentageComplete += nodeDto.getPercentCompleted(); + if (!nodeDto.isFinished()) { + finished = false; + } + } + percentageComplete /= dtoMap.size(); + + // consider any problematic responses as errors + for (final NodeResponse problematicResponse : problematicResponses) { + final NodeIdentifier problemNode = problematicResponse.getNodeId(); + final String problemNodeAddress = problemNode.getApiAddress() + ":" + problemNode.getApiPort(); + errors.add(String.format("%s -- Request did not complete successfully (Status code: %s)", problemNodeAddress, problematicResponse.getStatus())); + } + + // Since we get back up to the maximum number of results from each node, we need to sort those values and then + // grab only the first X number of them. We do a sort based on time, such that the newest are included. + // If 2 events have the same timestamp, we do a secondary sort based on Cluster Node Identifier. If those are + // equal, we perform a terciary sort based on the the event id + Collections.sort(allResults, new Comparator<ProvenanceEventDTO>() { + @Override + public int compare(final ProvenanceEventDTO o1, final ProvenanceEventDTO o2) { + final int eventTimeComparison = o1.getEventTime().compareTo(o2.getEventTime()); + if (eventTimeComparison != 0) { + return -eventTimeComparison; + } + + final String nodeId1 = o1.getClusterNodeId(); + final String nodeId2 = o2.getClusterNodeId(); + final int nodeIdComparison; + if (nodeId1 == null && nodeId2 == null) { + nodeIdComparison = 0; + } else if (nodeId1 == null) { + nodeIdComparison = 1; + } else if (nodeId2 == null) { + nodeIdComparison = -1; + } else { + nodeIdComparison = -nodeId1.compareTo(nodeId2); + } + + if (nodeIdComparison != 0) { + return nodeIdComparison; + } + + return -Long.compare(o1.getEventId(), o2.getEventId()); + } + }); + + final int maxResults = request.getMaxResults().intValue(); + final List<ProvenanceEventDTO> selectedResults; + if (allResults.size() < maxResults) { + selectedResults = allResults; + } else { + selectedResults = allResults.subList(0, maxResults); + } + + // include any errors + if (errors.size() > 0) { + results.setErrors(errors); + } + + results.setTotalCount(totalRecords); + results.setTotal(FormatUtils.formatCount(totalRecords)); + results.setProvenanceEvents(selectedResults); + results.setOldestEvent(oldestEventDate); + results.setGenerated(new Date()); + clientDto.setPercentCompleted(percentageComplete); + clientDto.setFinished(finished); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java new file mode 100644 index 0000000..94383de --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; +import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; + +public class RemoteProcessGroupEndpointMerger extends AbstractSingleEntityEndpoint<RemoteProcessGroupEntity, RemoteProcessGroupDTO> { + public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = Pattern.compile("/nifi-api/remote-process-groups"); + public static final Pattern REMOTE_PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/remote-process-groups/[a-f0-9\\-]{36}"); + + @Override + public boolean canHandle(final URI uri, final String method) { + if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && REMOTE_PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches()) { + return true; + } else if ("POST".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches()) { + return true; + } + + return false; + } + + @Override + protected Class<RemoteProcessGroupEntity> getEntityClass() { + return RemoteProcessGroupEntity.class; + } + + @Override + protected RemoteProcessGroupDTO getDto(final RemoteProcessGroupEntity entity) { + return entity.getComponent(); + } + + @Override + protected void mergeResponses(RemoteProcessGroupDTO clientDto, Map<NodeIdentifier, RemoteProcessGroupDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { + final RemoteProcessGroupContentsDTO remoteProcessGroupContents = clientDto.getContents(); + + Boolean mergedIsTargetSecure = null; + final List<String> mergedAuthorizationIssues = new ArrayList<>(); + final Set<RemoteProcessGroupPortDTO> mergedInputPorts = new HashSet<>(); + final Set<RemoteProcessGroupPortDTO> mergedOutputPorts = new HashSet<>(); + + for (final Map.Entry<NodeIdentifier, RemoteProcessGroupDTO> nodeEntry : dtoMap.entrySet()) { + final NodeIdentifier nodeId = nodeEntry.getKey(); + final RemoteProcessGroupDTO nodeRemoteProcessGroupDto = nodeEntry.getValue(); + + // merge the issues + final List<String> nodeAuthorizationIssues = nodeRemoteProcessGroupDto.getAuthorizationIssues(); + if (nodeAuthorizationIssues != null && !nodeAuthorizationIssues.isEmpty()) { + for (final String nodeAuthorizationIssue : nodeAuthorizationIssues) { + mergedAuthorizationIssues.add(nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " -- " + nodeAuthorizationIssue); + } + } + + // use the first target secure flag since they will all be the same + final Boolean nodeIsTargetSecure = nodeRemoteProcessGroupDto.isTargetSecure(); + if (mergedIsTargetSecure == null) { + mergedIsTargetSecure = nodeIsTargetSecure; + } + + // merge the ports in the contents + final RemoteProcessGroupContentsDTO nodeRemoteProcessGroupContentsDto = nodeRemoteProcessGroupDto.getContents(); + if (remoteProcessGroupContents != null && nodeRemoteProcessGroupContentsDto != null) { + if (nodeRemoteProcessGroupContentsDto.getInputPorts() != null) { + mergedInputPorts.addAll(nodeRemoteProcessGroupContentsDto.getInputPorts()); + } + if (nodeRemoteProcessGroupContentsDto.getOutputPorts() != null) { + mergedOutputPorts.addAll(nodeRemoteProcessGroupContentsDto.getOutputPorts()); + } + } + } + + if (remoteProcessGroupContents != null) { + if (!mergedInputPorts.isEmpty()) { + remoteProcessGroupContents.setInputPorts(mergedInputPorts); + } + if (!mergedOutputPorts.isEmpty()) { + remoteProcessGroupContents.setOutputPorts(mergedOutputPorts); + } + } + + if (mergedIsTargetSecure != null) { + clientDto.setTargetSecure(mergedIsTargetSecure); + } + + if (!mergedAuthorizationIssues.isEmpty()) { + clientDto.setAuthorizationIssues(mergedAuthorizationIssues); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupStatusEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupStatusEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupStatusEndpointMerger.java new file mode 100644 index 0000000..d2056f7 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupStatusEndpointMerger.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Map; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.manager.StatusMerger; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.status.NodeRemoteProcessGroupStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; +import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity; + +public class RemoteProcessGroupStatusEndpointMerger extends AbstractNodeStatusEndpoint<RemoteProcessGroupStatusEntity, RemoteProcessGroupStatusDTO> { + public static final Pattern REMOTE_PROCESS_GROUP_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/flow/remote-process-groups/[a-f0-9\\-]{36}/status"); + + @Override + public boolean canHandle(URI uri, String method) { + return "GET".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUP_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + @Override + protected Class<RemoteProcessGroupStatusEntity> getEntityClass() { + return RemoteProcessGroupStatusEntity.class; + } + + @Override + protected RemoteProcessGroupStatusDTO getDto(RemoteProcessGroupStatusEntity entity) { + return entity.getRemoteProcessGroupStatus(); + } + + @Override + protected void mergeResponses(RemoteProcessGroupStatusDTO clientDto, Map<NodeIdentifier, RemoteProcessGroupStatusDTO> dtoMap, NodeIdentifier selectedNodeId) { + final RemoteProcessGroupStatusDTO mergedRemoteProcessGroupStatus = clientDto; + mergedRemoteProcessGroupStatus.setNodeSnapshots(new ArrayList<NodeRemoteProcessGroupStatusSnapshotDTO>()); + + final NodeRemoteProcessGroupStatusSnapshotDTO selectedNodeSnapshot = new NodeRemoteProcessGroupStatusSnapshotDTO(); + selectedNodeSnapshot.setStatusSnapshot(clientDto.getAggregateSnapshot().clone()); + selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); + selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); + selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); + + mergedRemoteProcessGroupStatus.getNodeSnapshots().add(selectedNodeSnapshot); + + // merge the other nodes + for (final Map.Entry<NodeIdentifier, RemoteProcessGroupStatusDTO> entry : dtoMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final RemoteProcessGroupStatusDTO nodeRemoteProcessGroupStatus = entry.getValue(); + if (nodeRemoteProcessGroupStatus == clientDto) { + continue; + } + + StatusMerger.merge(mergedRemoteProcessGroupStatus, nodeRemoteProcessGroupStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java new file mode 100644 index 0000000..c387951 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; +import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; +import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity; + +public class RemoteProcessGroupsEndpointMerger implements EndpointResponseMerger { + public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups"); + + @Override + public boolean canHandle(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + @Override + public NodeResponse merge(URI uri, String method, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses, NodeResponse clientResponse) { + if (!canHandle(uri, method)) { + throw new IllegalArgumentException("Cannot use Endpoint Mapper of type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", HTTP Method " + method); + } + + final RemoteProcessGroupsEntity responseEntity = clientResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class); + final Set<RemoteProcessGroupEntity> rpgEntities = responseEntity.getRemoteProcessGroups(); + + final Map<String, Map<NodeIdentifier, RemoteProcessGroupDTO>> dtoMap = new HashMap<>(); + for (final NodeResponse nodeResponse : successfulResponses) { + final RemoteProcessGroupsEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class); + final Set<RemoteProcessGroupEntity> nodeRpgEntities = nodeResponseEntity.getRemoteProcessGroups(); + + for (final RemoteProcessGroupEntity nodeRpgEntity : nodeRpgEntities) { + final NodeIdentifier nodeId = nodeResponse.getNodeId(); + Map<NodeIdentifier, RemoteProcessGroupDTO> innerMap = dtoMap.get(nodeId); + if (innerMap == null) { + innerMap = new HashMap<>(); + dtoMap.put(nodeRpgEntity.getId(), innerMap); + } + + innerMap.put(nodeResponse.getNodeId(), nodeRpgEntity.getComponent()); + } + } + + final RemoteProcessGroupEndpointMerger rpgMerger = new RemoteProcessGroupEndpointMerger(); + for (final RemoteProcessGroupEntity entity : rpgEntities) { + final String componentId = entity.getId(); + final Map<NodeIdentifier, RemoteProcessGroupDTO> mergeMap = dtoMap.get(componentId); + + rpgMerger.mergeResponses(entity.getComponent(), mergeMap, successfulResponses, problematicResponses); + } + + // create a new client response + return new NodeResponse(clientResponse, responseEntity); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskEndpointMerger.java new file mode 100644 index 0000000..2188af6 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTaskEndpointMerger.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.ReportingTaskDTO; +import org.apache.nifi.web.api.entity.ReportingTaskEntity; + +public class ReportingTaskEndpointMerger extends AbstractSingleEntityEndpoint<ReportingTaskEntity, ReportingTaskDTO> { + public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node"; + public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/reporting-tasks/node/[a-f0-9\\-]{36}"); + + @Override + public boolean canHandle(URI uri, String method) { + if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && REPORTING_TASK_URI_PATTERN.matcher(uri.getPath()).matches()) { + return true; + } else if ("POST".equalsIgnoreCase(method) && REPORTING_TASKS_URI.equals(uri.getPath())) { + return true; + } + + return false; + } + + @Override + protected Class<ReportingTaskEntity> getEntityClass() { + return ReportingTaskEntity.class; + } + + @Override + protected ReportingTaskDTO getDto(ReportingTaskEntity entity) { + return entity.getReportingTask(); + } + + @Override + protected void mergeResponses(ReportingTaskDTO clientDto, Map<NodeIdentifier, ReportingTaskDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { + final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>(); + + int activeThreadCount = 0; + for (final Map.Entry<NodeIdentifier, ReportingTaskDTO> nodeEntry : dtoMap.entrySet()) { + final NodeIdentifier nodeId = nodeEntry.getKey(); + final ReportingTaskDTO nodeReportingTask = nodeEntry.getValue(); + + if (nodeReportingTask.getActiveThreadCount() != null) { + activeThreadCount += nodeReportingTask.getActiveThreadCount(); + } + + // merge the validation errors + mergeValidationErrors(validationErrorMap, nodeId, nodeReportingTask.getValidationErrors()); + } + + // set the merged active thread counts + clientDto.setActiveThreadCount(activeThreadCount); + + // set the merged the validation errors + clientDto.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, dtoMap.size())); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTasksEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTasksEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTasksEndpointMerger.java new file mode 100644 index 0000000..f5b2e4d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ReportingTasksEndpointMerger.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.ReportingTaskDTO; +import org.apache.nifi.web.api.entity.ReportingTasksEntity; + +public class ReportingTasksEndpointMerger extends AbstractMultiEntityEndpoint<ReportingTasksEntity, ReportingTaskDTO> { + public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node"; + + @Override + public boolean canHandle(URI uri, String method) { + return "GET".equalsIgnoreCase(method) && REPORTING_TASKS_URI.equals(uri.getPath()); + } + + @Override + protected Class<ReportingTasksEntity> getEntityClass() { + return ReportingTasksEntity.class; + } + + @Override + protected Set<ReportingTaskDTO> getDtos(ReportingTasksEntity entity) { + return entity.getReportingTasks(); + } + + @Override + protected String getComponentId(ReportingTaskDTO dto) { + return dto.getId(); + } + + @Override + protected void mergeResponses(ReportingTaskDTO clientDto, Map<NodeIdentifier, ReportingTaskDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { + new ReportingTaskEndpointMerger().mergeResponses(clientDto, dtoMap, successfulResponses, problematicResponses); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java new file mode 100644 index 0000000..6d8da5f --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor; +import org.apache.nifi.controller.status.history.MetricDescriptor; +import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor; +import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor; +import org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor; +import org.apache.nifi.controller.status.history.StandardStatusSnapshot; +import org.apache.nifi.controller.status.history.StatusHistoryUtil; +import org.apache.nifi.controller.status.history.StatusSnapshot; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.api.dto.status.NodeStatusSnapshotsDTO; +import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; +import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; +import org.apache.nifi.web.api.entity.StatusHistoryEntity; + +public class StatusHistoryEndpointMerger implements EndpointResponseMerger { + public static final Pattern PROCESSOR_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/flow/processors/[a-f0-9\\-]{36}/status/history"); + public static final Pattern PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/flow/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/status/history"); + public static final Pattern REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/flow/remote-process-groups/[a-f0-9\\-]{36}/status/history"); + public static final Pattern CONNECTION_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/flow/connections/[a-f0-9\\-]{36}/status/history"); + + private final long componentStatusSnapshotMillis; + + + public StatusHistoryEndpointMerger() { + final NiFiProperties properties = NiFiProperties.getInstance(); + final String snapshotFrequency = properties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY); + long snapshotMillis; + try { + snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS); + } catch (final Exception e) { + snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS); + } + componentStatusSnapshotMillis = snapshotMillis; + } + + private Map<String, MetricDescriptor<?>> getMetricDescriptors(final URI uri) { + final String path = uri.getPath(); + + final Map<String, MetricDescriptor<?>> metricDescriptors = new HashMap<>(); + + if (PROCESSOR_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) { + for (final ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) { + metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor()); + } + } else if (PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) { + for (final ProcessGroupStatusDescriptor descriptor : ProcessGroupStatusDescriptor.values()) { + metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor()); + } + } else if (REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) { + for (final RemoteProcessGroupStatusDescriptor descriptor : RemoteProcessGroupStatusDescriptor.values()) { + metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor()); + } + } else if (CONNECTION_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) { + for (final ConnectionStatusDescriptor descriptor : ConnectionStatusDescriptor.values()) { + metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor()); + } + } + + return metricDescriptors; + } + + @Override + public boolean canHandle(URI uri, String method) { + if (!"GET".equalsIgnoreCase(method)) { + return false; + } + + final Map<String, MetricDescriptor<?>> descriptors = getMetricDescriptors(uri); + return descriptors != null && !descriptors.isEmpty(); + } + + @Override + public NodeResponse merge(URI uri, String method, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses, NodeResponse clientResponse) { + final Map<String, MetricDescriptor<?>> metricDescriptors = getMetricDescriptors(uri); + + final StatusHistoryEntity responseEntity = clientResponse.getClientResponse().getEntity(StatusHistoryEntity.class); + + StatusHistoryDTO lastStatusHistory = null; + final List<NodeStatusSnapshotsDTO> nodeStatusSnapshots = new ArrayList<>(successfulResponses.size()); + for (final NodeResponse nodeResponse : successfulResponses) { + final StatusHistoryEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(StatusHistoryEntity.class); + final StatusHistoryDTO nodeStatus = nodeResponseEntity.getStatusHistory(); + lastStatusHistory = nodeStatus; + + final NodeIdentifier nodeId = nodeResponse.getNodeId(); + final NodeStatusSnapshotsDTO nodeStatusSnapshot = new NodeStatusSnapshotsDTO(); + nodeStatusSnapshot.setNodeId(nodeId.getId()); + nodeStatusSnapshot.setAddress(nodeId.getApiAddress()); + nodeStatusSnapshot.setApiPort(nodeId.getApiPort()); + nodeStatusSnapshot.setStatusSnapshots(nodeStatus.getAggregateSnapshots()); + nodeStatusSnapshots.add(nodeStatusSnapshot); + } + + final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO(); + clusterStatusHistory.setAggregateSnapshots(mergeStatusHistories(nodeStatusSnapshots, metricDescriptors)); + clusterStatusHistory.setGenerated(new Date()); + clusterStatusHistory.setNodeSnapshots(nodeStatusSnapshots); + if (lastStatusHistory != null) { + clusterStatusHistory.setComponentDetails(lastStatusHistory.getComponentDetails()); + clusterStatusHistory.setFieldDescriptors(lastStatusHistory.getFieldDescriptors()); + } + + final StatusHistoryEntity clusterEntity = new StatusHistoryEntity(); + clusterEntity.setStatusHistory(clusterStatusHistory); + clusterEntity.setRevision(responseEntity.getRevision()); + + return new NodeResponse(clientResponse, clusterEntity); + } + + private List<StatusSnapshotDTO> mergeStatusHistories(final List<NodeStatusSnapshotsDTO> nodeStatusSnapshots, final Map<String, MetricDescriptor<?>> metricDescriptors) { + // We want a Map<Date, List<StatusSnapshot>>, which is a Map of "normalized Date" (i.e., a time range, essentially) + // to all Snapshots for that time. The list will contain one snapshot for each node. However, we can have the case + // where the NCM has a different value for the componentStatusSnapshotMillis than the nodes have. In this case, + // we end up with multiple entries in the List<StatusSnapshot> for the same node/timestamp, which skews our aggregate + // results. In order to avoid this, we will use only the latest snapshot for a node that falls into the the time range + // of interest. + // To accomplish this, we have an intermediate data structure, which is a Map of "normalized Date" to an inner Map + // of Node Identifier to StatusSnapshot. We then will flatten this Map and aggregate the results. + final Map<Date, Map<String, StatusSnapshot>> dateToNodeSnapshots = new TreeMap<>(); + + // group status snapshot's for each node by date + for (final NodeStatusSnapshotsDTO nodeStatusSnapshot : nodeStatusSnapshots) { + for (final StatusSnapshotDTO snapshotDto : nodeStatusSnapshot.getStatusSnapshots()) { + final StatusSnapshot snapshot = createSnapshot(snapshotDto, metricDescriptors); + final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis); + + Map<String, StatusSnapshot> nodeToSnapshotMap = dateToNodeSnapshots.get(normalizedDate); + if (nodeToSnapshotMap == null) { + nodeToSnapshotMap = new HashMap<>(); + dateToNodeSnapshots.put(normalizedDate, nodeToSnapshotMap); + } + nodeToSnapshotMap.put(nodeStatusSnapshot.getNodeId(), snapshot); + } + } + + // aggregate the snapshots by (normalized) timestamp + final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>(); + for (final Map.Entry<Date, Map<String, StatusSnapshot>> entry : dateToNodeSnapshots.entrySet()) { + final Date normalizedDate = entry.getKey(); + final Map<String, StatusSnapshot> nodeToSnapshot = entry.getValue(); + final List<StatusSnapshot> snapshotsForTimestamp = new ArrayList<>(nodeToSnapshot.values()); + snapshotsToAggregate.put(normalizedDate, snapshotsForTimestamp); + } + + final List<StatusSnapshotDTO> aggregatedSnapshots = aggregate(snapshotsToAggregate); + return aggregatedSnapshots; + } + + private StatusSnapshot createSnapshot(final StatusSnapshotDTO snapshotDto, final Map<String, MetricDescriptor<?>> metricDescriptors) { + final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(); + snapshot.setTimestamp(snapshotDto.getTimestamp()); + + final Map<String, Long> metrics = snapshotDto.getStatusMetrics(); + for (final Map.Entry<String, Long> entry : metrics.entrySet()) { + final String metricId = entry.getKey(); + final Long value = entry.getValue(); + + final MetricDescriptor<?> descriptor = metricDescriptors.get(metricId); + if (descriptor != null) { + snapshot.addStatusMetric(descriptor, value); + } + } + + return snapshot; + } + + private List<StatusSnapshotDTO> aggregate(Map<Date, List<StatusSnapshot>> snapshotsToAggregate) { + // Aggregate the snapshots + final List<StatusSnapshotDTO> aggregatedSnapshotDtos = new ArrayList<>(); + for (final Map.Entry<Date, List<StatusSnapshot>> entry : snapshotsToAggregate.entrySet()) { + final List<StatusSnapshot> snapshots = entry.getValue(); + final StatusSnapshot reducedSnapshot = snapshots.get(0).getValueReducer().reduce(snapshots); + + final StatusSnapshotDTO dto = new StatusSnapshotDTO(); + dto.setTimestamp(reducedSnapshot.getTimestamp()); + dto.setStatusMetrics(StatusHistoryUtil.createStatusSnapshotDto(reducedSnapshot).getStatusMetrics()); + + aggregatedSnapshotDtos.add(dto); + } + + return aggregatedSnapshotDtos; + } + + public static Date normalizeStatusSnapshotDate(final Date toNormalize, final long numMillis) { + final long time = toNormalize.getTime(); + return new Date(time - time % numMillis); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/SystemDiagnosticsEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/SystemDiagnosticsEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/SystemDiagnosticsEndpointMerger.java new file mode 100644 index 0000000..6cccc8e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/SystemDiagnosticsEndpointMerger.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Map; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.manager.StatusMerger; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsSnapshotDTO; +import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO; +import org.apache.nifi.web.api.entity.SystemDiagnosticsEntity; + +public class SystemDiagnosticsEndpointMerger extends AbstractNodeStatusEndpoint<SystemDiagnosticsEntity, SystemDiagnosticsDTO> { + public static final Pattern SYSTEM_DIAGNOSTICS_URI_PATTERN = Pattern.compile("/nifi-api/system-diagnostics"); + + @Override + public boolean canHandle(URI uri, String method) { + return "GET".equalsIgnoreCase(method) && SYSTEM_DIAGNOSTICS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + @Override + protected Class<SystemDiagnosticsEntity> getEntityClass() { + return SystemDiagnosticsEntity.class; + } + + @Override + protected SystemDiagnosticsDTO getDto(SystemDiagnosticsEntity entity) { + return entity.getSystemDiagnostics(); + } + + @Override + protected void mergeResponses(SystemDiagnosticsDTO clientDto, Map<NodeIdentifier, SystemDiagnosticsDTO> dtoMap, NodeIdentifier selectedNodeId) { + final SystemDiagnosticsDTO mergedSystemDiagnostics = clientDto; + mergedSystemDiagnostics.setNodeSnapshots(new ArrayList<NodeSystemDiagnosticsSnapshotDTO>()); + + final NodeSystemDiagnosticsSnapshotDTO selectedNodeSnapshot = new NodeSystemDiagnosticsSnapshotDTO(); + selectedNodeSnapshot.setSnapshot(clientDto.getAggregateSnapshot().clone()); + selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); + selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); + selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); + + mergedSystemDiagnostics.getNodeSnapshots().add(selectedNodeSnapshot); + + for (final Map.Entry<NodeIdentifier, SystemDiagnosticsDTO> entry : dtoMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final SystemDiagnosticsDTO toMerge = entry.getValue(); + if (toMerge == clientDto) { + continue; + } + + StatusMerger.merge(mergedSystemDiagnostics, toMerge, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + } + } + +}
