http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java new file mode 100644 index 0000000..7dc598e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import javax.ws.rs.core.StreamingOutput; + +import org.apache.nifi.cluster.coordination.http.endpoints.BulletinBoardEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ComponentStateEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionStatusEndpiontMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceReferenceEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServicesEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerStatusEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.CountersEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.DropRequestEndpiontMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.FlowSnippetEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.GroupStatusEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ListFlowFilesEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.PortStatusEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorStatusEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorsEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceEventEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceQueryEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupStatusEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupsEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTasksEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.StatusHistoryEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.SystemDiagnosticsEndpointMerger; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.StatusMerger; +import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.cluster.node.Node.Status; +import org.apache.nifi.reporting.Bulletin; +import org.apache.nifi.reporting.BulletinQuery; +import org.apache.nifi.reporting.ComponentType; +import org.apache.nifi.stream.io.NullOutputStream; +import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; +import org.apache.nifi.web.api.entity.ControllerStatusEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StandardHttpResponseMerger implements HttpResponseMerger { + private Logger logger = LoggerFactory.getLogger(StandardHttpResponseMerger.class); + + private static final int NODE_CONTINUE_STATUS_CODE = 150; + private final WebClusterManager clusterManager; + + private static final List<EndpointResponseMerger> endpointMergers = new ArrayList<>(); + static { + endpointMergers.add(new ControllerStatusEndpointMerger()); + endpointMergers.add(new GroupStatusEndpointMerger()); + endpointMergers.add(new ProcessorStatusEndpointMerger()); + endpointMergers.add(new ConnectionStatusEndpiontMerger()); + endpointMergers.add(new PortStatusEndpointMerger()); + endpointMergers.add(new RemoteProcessGroupStatusEndpointMerger()); + endpointMergers.add(new ProcessorEndpointMerger()); + endpointMergers.add(new ProcessorsEndpointMerger()); + endpointMergers.add(new RemoteProcessGroupEndpointMerger()); + endpointMergers.add(new RemoteProcessGroupsEndpointMerger()); + endpointMergers.add(new ProcessGroupEndpointMerger()); + endpointMergers.add(new FlowSnippetEndpointMerger()); + endpointMergers.add(new ProvenanceQueryEndpointMerger()); + endpointMergers.add(new ProvenanceEventEndpointMerger()); + endpointMergers.add(new ControllerServiceEndpointMerger()); + endpointMergers.add(new ControllerServicesEndpointMerger()); + endpointMergers.add(new ControllerServiceReferenceEndpointMerger()); + endpointMergers.add(new ReportingTaskEndpointMerger()); + endpointMergers.add(new ReportingTasksEndpointMerger()); + endpointMergers.add(new DropRequestEndpiontMerger()); + endpointMergers.add(new ListFlowFilesEndpointMerger()); + endpointMergers.add(new ComponentStateEndpointMerger()); + endpointMergers.add(new BulletinBoardEndpointMerger()); + endpointMergers.add(new StatusHistoryEndpointMerger()); + endpointMergers.add(new SystemDiagnosticsEndpointMerger()); + endpointMergers.add(new CountersEndpointMerger()); + } + + public StandardHttpResponseMerger() { + this(null); + } + + public StandardHttpResponseMerger(final WebClusterManager clusterManager) { + this.clusterManager = clusterManager; + } + + @Override + public NodeResponse mergeResponses(final URI uri, final String httpMethod, final Set<NodeResponse> nodeResponses) { + final boolean hasSuccess = hasSuccessfulResponse(nodeResponses); + if (!hasSuccess) { + // If we have a response that is a 3xx, 4xx, or 5xx, then we want to choose that. + // Otherwise, it doesn't matter which one we choose. We do this because if we replicate + // a mutable request, it's possible that one node will respond with a 409, for instance, while + // others respond with a 150-Continue. We do not want to pick the 150-Continue; instead, we want + // the failed response. + final NodeResponse clientResponse = nodeResponses.stream().filter(p -> p.getStatus() > 299).findAny().orElse(nodeResponses.iterator().next()); + + // Drain the response from all nodes except for the 'chosen one'. This ensures that we don't + // leave data lingering on the socket and ensures that we don't consume the content of the response + // that we intend to respond with + drainResponses(nodeResponses, clientResponse); + return clientResponse; + } + + // Determine which responses are successful + final Set<NodeResponse> successResponses = nodeResponses.stream().filter(p -> p.is2xx()).collect(Collectors.toSet()); + final Set<NodeResponse> problematicResponses = nodeResponses.stream().filter(p -> !p.is2xx()).collect(Collectors.toSet()); + + // Choose any of the successful responses to be the 'chosen one'. + final NodeResponse clientResponse = successResponses.iterator().next(); + + final EndpointResponseMerger merger = getEndpointResponseMerger(uri, httpMethod); + if (merger == null) { + return clientResponse; + } + + final NodeResponse response = merger.merge(uri, httpMethod, successResponses, problematicResponses, clientResponse); + if (clusterManager != null) { + mergeNCMBulletins(response, uri, httpMethod); + } + + return response; + } + + /** + * This method merges bulletins from the NCM. Eventually, the NCM will go away entirely, and + * at that point, we will completely remove this and the WebClusterManager as a member variable. + * However, until then, the bulletins from the NCM are important to include, since there is no other + * node that can include them. + * + * @param clientResponse the Node Response that will be returned to the client + * @param uri the URI + * @param method the HTTP Method + * + * @deprecated this method exists only until we can remove the Cluster Manager from the picture all together. It will then be removed. + */ + @Deprecated + private void mergeNCMBulletins(final NodeResponse clientResponse, final URI uri, final String method) { + // determine if we have at least one response + final boolean hasClientResponse = clientResponse != null; + final boolean hasSuccessfulClientResponse = hasClientResponse && clientResponse.is2xx(); + + if (hasSuccessfulClientResponse && clusterManager.isControllerStatusEndpoint(uri, method)) { + // for now, we need to merge the NCM's bulletins too. + final ControllerStatusEntity responseEntity = (ControllerStatusEntity) clientResponse.getUpdatedEntity(); + final ControllerStatusDTO mergedStatus = responseEntity.getControllerStatus(); + + final int totalNodeCount = clusterManager.getNodeIds().size(); + final int connectedNodeCount = clusterManager.getNodeIds(Status.CONNECTED).size(); + + final List<Bulletin> ncmControllerBulletins = clusterManager.getBulletinRepository().findBulletinsForController(); + mergedStatus.setBulletins(clusterManager.mergeNCMBulletins(mergedStatus.getBulletins(), ncmControllerBulletins)); + + // get the controller service bulletins + final BulletinQuery controllerServiceQuery = new BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build(); + final List<Bulletin> ncmServiceBulletins = clusterManager.getBulletinRepository().findBulletins(controllerServiceQuery); + mergedStatus.setControllerServiceBulletins(clusterManager.mergeNCMBulletins(mergedStatus.getControllerServiceBulletins(), ncmServiceBulletins)); + + // get the reporting task bulletins + final BulletinQuery reportingTaskQuery = new BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build(); + final List<Bulletin> ncmReportingTaskBulletins = clusterManager.getBulletinRepository().findBulletins(reportingTaskQuery); + mergedStatus.setReportingTaskBulletins(clusterManager.mergeNCMBulletins(mergedStatus.getReportingTaskBulletins(), ncmReportingTaskBulletins)); + + mergedStatus.setConnectedNodeCount(connectedNodeCount); + mergedStatus.setTotalNodeCount(totalNodeCount); + StatusMerger.updatePrettyPrintedFields(mergedStatus); + } + } + + + @Override + public Set<NodeResponse> getProblematicNodeResponses(final Set<NodeResponse> allResponses) { + // Check if there are any 2xx responses + final boolean containsSuccessfulResponse = hasSuccessfulResponse(allResponses); + + if (containsSuccessfulResponse) { + // If there is a 2xx response, we consider a response to be problematic if it is not 2xx + return allResponses.stream().filter(p -> !p.is2xx()).collect(Collectors.toSet()); + } else { + // If no node is successful, we consider a problematic response to be only those that are 5xx + return allResponses.stream().filter(p -> p.is5xx()).collect(Collectors.toSet()); + } + } + + @Override + public boolean isResponseInterpreted(final URI uri, final String httpMethod) { + return getEndpointResponseMerger(uri, httpMethod) != null; + } + + private static EndpointResponseMerger getEndpointResponseMerger(final URI uri, final String httpMethod) { + return endpointMergers.stream().filter(p -> p.canHandle(uri, httpMethod)).findFirst().orElse(null); + } + + private boolean hasSuccessfulResponse(final Set<NodeResponse> allResponses) { + return allResponses.stream().anyMatch(p -> p.is2xx()); + } + + + private void drainResponses(final Set<NodeResponse> responses, final NodeResponse exclude) { + responses.stream() + .parallel() // parallelize the draining of the responses, since we have multiple streams to consume + .filter(response -> response != exclude) // don't include the explicitly excluded node + .filter(response -> response.getStatus() != NODE_CONTINUE_STATUS_CODE) // don't include any 150-NodeContinue responses because they contain no content + .forEach(response -> drainResponse(response)); // drain all node responses that didn't get filtered out + } + + private void drainResponse(final NodeResponse response) { + if (response.hasThrowable()) { + return; + } + + try { + ((StreamingOutput) response.getResponse().getEntity()).write(new NullOutputStream()); + } catch (final IOException ioe) { + logger.info("Failed clearing out non-client response buffer from " + response.getNodeId() + " due to: " + ioe, ioe); + } + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractMultiEntityEndpoint.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractMultiEntityEndpoint.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractMultiEntityEndpoint.java new file mode 100644 index 0000000..d72814b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractMultiEntityEndpoint.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.entity.Entity; + +public abstract class AbstractMultiEntityEndpoint<EntityType extends Entity, DtoType> implements EndpointResponseMerger { + + @Override + public final NodeResponse merge(final URI uri, final String method, final Set<NodeResponse> successfulResponses, final Set<NodeResponse> problematicResponses, final NodeResponse clientResponse) { + if (!canHandle(uri, method)) { + throw new IllegalArgumentException("Cannot use Endpoint Mapper of type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", HTTP Method " + method); + } + + final EntityType responseEntity = clientResponse.getClientResponse().getEntity(getEntityClass()); + final Set<DtoType> dtos = getDtos(responseEntity); + + final Map<String, Map<NodeIdentifier, DtoType>> dtoMap = new HashMap<>(); + for (final NodeResponse nodeResponse : successfulResponses) { + final EntityType nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(getEntityClass()); + final Set<DtoType> nodeDtos = getDtos(nodeResponseEntity); + + for (final DtoType nodeDto : nodeDtos) { + final NodeIdentifier nodeId = nodeResponse.getNodeId(); + Map<NodeIdentifier, DtoType> innerMap = dtoMap.get(nodeId); + if (innerMap == null) { + innerMap = new HashMap<>(); + dtoMap.put(getComponentId(nodeDto), innerMap); + } + + innerMap.put(nodeResponse.getNodeId(), nodeDto); + } + } + + for (final DtoType dto : dtos) { + final String componentId = getComponentId(dto); + final Map<NodeIdentifier, DtoType> mergeMap = dtoMap.get(componentId); + + mergeResponses(dto, mergeMap, successfulResponses, problematicResponses); + } + + // create a new client response + return new NodeResponse(clientResponse, responseEntity); + } + + + /** + * @return the class that represents the type of Entity that is expected by this response mapper + */ + protected abstract Class<EntityType> getEntityClass(); + + /** + * Extracts the DTOs from the given entity + * + * @param entity the entity to extract the DTOs from + * @return the DTOs from the given entity + */ + protected abstract Set<DtoType> getDtos(EntityType entity); + + /** + * Extracts the ID of the component that the DTO refers to + * @param dto the DTO to extract the ID from + * @return the ID of the component that the DTO refers to + */ + protected abstract String getComponentId(DtoType dto); + + /** + * Merges the responses from all nodes in the given map into the single given DTO + * + * @param clientDto the DTO to merge responses into + * @param dtoMap the responses from all nodes + * @param successfulResponses the responses from nodes that completed the request successfully + * @param problematicResponses the responses from nodes that did not complete the request successfully + */ + protected abstract void mergeResponses(final DtoType clientDto, Map<NodeIdentifier, DtoType> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractNodeStatusEndpoint.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractNodeStatusEndpoint.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractNodeStatusEndpoint.java new file mode 100644 index 0000000..cc73e34 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractNodeStatusEndpoint.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.entity.Entity; + +public abstract class AbstractNodeStatusEndpoint<EntityType extends Entity, DtoType> extends AbstractSingleEntityEndpoint<EntityType, DtoType> { + + @Override + protected final void mergeResponses(DtoType clientDto, Map<NodeIdentifier, DtoType> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { + final NodeIdentifier selectedNodeId = dtoMap.entrySet().stream().filter(e -> e.getValue() == clientDto).map(e -> e.getKey()).findFirst().orElse(null); + if (selectedNodeId == null) { + throw new IllegalArgumentException("Attempted to merge Status request but could not find the appropriate Node Identifier"); + } + + mergeResponses(clientDto, dtoMap, selectedNodeId); + } + + protected abstract void mergeResponses(DtoType clientDto, Map<NodeIdentifier, DtoType> dtoMap, NodeIdentifier selectedNodeId); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractSingleEntityEndpoint.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractSingleEntityEndpoint.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractSingleEntityEndpoint.java new file mode 100644 index 0000000..66db949 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractSingleEntityEndpoint.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.entity.Entity; + +public abstract class AbstractSingleEntityEndpoint<EntityType extends Entity, DtoType> implements EndpointResponseMerger { + + @Override + public final NodeResponse merge(final URI uri, final String method, final Set<NodeResponse> successfulResponses, final Set<NodeResponse> problematicResponses, final NodeResponse clientResponse) { + if (!canHandle(uri, method)) { + throw new IllegalArgumentException("Cannot use Endpoint Mapper of type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", HTTP Method " + method); + } + + final EntityType responseEntity = clientResponse.getClientResponse().getEntity(getEntityClass()); + final DtoType dto = getDto(responseEntity); + + final Map<NodeIdentifier, DtoType> dtoMap = new HashMap<>(); + for (final NodeResponse nodeResponse : successfulResponses) { + final EntityType nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(getEntityClass()); + final DtoType nodeDto = getDto(nodeResponseEntity); + dtoMap.put(nodeResponse.getNodeId(), nodeDto); + } + + mergeResponses(dto, dtoMap, successfulResponses, problematicResponses); + return new NodeResponse(clientResponse, responseEntity); + } + + + /** + * Normalizes the validation errors by prepending the corresponding nodes when the error does not exist across all nodes. + * + * @param validationErrorMap map + * @param totalNodes total + * @return normalized errors + */ + protected Set<String> normalizedMergedValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, int totalNodes) { + final Set<String> normalizedValidationErrors = new HashSet<>(); + for (final Map.Entry<String, Set<NodeIdentifier>> validationEntry : validationErrorMap.entrySet()) { + final String msg = validationEntry.getKey(); + final Set<NodeIdentifier> nodeIds = validationEntry.getValue(); + + if (nodeIds.size() == totalNodes) { + normalizedValidationErrors.add(msg); + } else { + nodeIds.forEach(id -> normalizedValidationErrors.add(id.getApiAddress() + ":" + id.getApiPort() + " -- " + msg)); + } + } + return normalizedValidationErrors; + } + + /** + * Merges the validation errors into the specified map, recording the corresponding node identifier. + * + * @param validationErrorMap map + * @param nodeId id + * @param nodeValidationErrors errors + */ + protected void mergeValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, final NodeIdentifier nodeId, final Collection<String> nodeValidationErrors) { + if (nodeValidationErrors != null) { + nodeValidationErrors.stream().forEach( + err -> validationErrorMap.computeIfAbsent(err, k -> new HashSet<NodeIdentifier>()) + .add(nodeId)); + } + } + + /** + * @return the class that represents the type of Entity that is expected by this response mapper + */ + protected abstract Class<EntityType> getEntityClass(); + + /** + * Extracts the DTO from the given entity + * + * @param entity the entity to extract the DTO from + * @return the DTO from the given entity + */ + protected abstract DtoType getDto(EntityType entity); + + /** + * Merges the responses from all nodes in the given map into the single given DTO + * + * @param clientDto the DTO to merge responses into + * @param dtoMap the responses from all nodes + * @param successfulResponses the responses from nodes that completed the request successfully + * @param problematicResponses the responses from nodes that did not complete the request successfully + */ + protected abstract void mergeResponses(DtoType clientDto, Map<NodeIdentifier, DtoType> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java new file mode 100644 index 0000000..799d279 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.BulletinBoardDTO; +import org.apache.nifi.web.api.dto.BulletinDTO; +import org.apache.nifi.web.api.entity.BulletinBoardEntity; + +public class BulletinBoardEndpointMerger extends AbstractSingleEntityEndpoint<BulletinBoardEntity, BulletinBoardDTO> { + public static final Pattern BULLETIN_BOARD_URI_PATTERN = Pattern.compile("/nifi-api/flow/bulletin-board"); + + @Override + public boolean canHandle(URI uri, String method) { + return "GET".equalsIgnoreCase(method) && BULLETIN_BOARD_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + @Override + protected Class<BulletinBoardEntity> getEntityClass() { + return BulletinBoardEntity.class; + } + + @Override + protected BulletinBoardDTO getDto(BulletinBoardEntity entity) { + return entity.getBulletinBoard(); + } + + @Override + protected void mergeResponses(BulletinBoardDTO clientDto, Map<NodeIdentifier, BulletinBoardDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { + final List<BulletinDTO> bulletinDtos = new ArrayList<>(); + for (final Map.Entry<NodeIdentifier, BulletinBoardDTO> entry : dtoMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final BulletinBoardDTO boardDto = entry.getValue(); + final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort(); + + for (final BulletinDTO bulletin : boardDto.getBulletins()) { + bulletin.setNodeAddress(nodeAddress); + bulletinDtos.add(bulletin); + } + } + + Collections.sort(bulletinDtos, new Comparator<BulletinDTO>() { + @Override + public int compare(final BulletinDTO o1, final BulletinDTO o2) { + final int timeComparison = o1.getTimestamp().compareTo(o2.getTimestamp()); + if (timeComparison != 0) { + return timeComparison; + } + + return o1.getNodeAddress().compareTo(o2.getNodeAddress()); + } + }); + + clientDto.setBulletins(bulletinDtos); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ComponentStateEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ComponentStateEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ComponentStateEndpointMerger.java new file mode 100644 index 0000000..0598259 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ComponentStateEndpointMerger.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.controller.state.SortedStateUtils; +import org.apache.nifi.web.api.dto.ComponentStateDTO; +import org.apache.nifi.web.api.dto.StateEntryDTO; +import org.apache.nifi.web.api.dto.StateMapDTO; +import org.apache.nifi.web.api.entity.ComponentStateEntity; + +public class ComponentStateEndpointMerger extends AbstractSingleEntityEndpoint<ComponentStateEntity, ComponentStateDTO> { + public static final Pattern PROCESSOR_STATE_URI_PATTERN = Pattern.compile("/nifi-api/processors/[a-f0-9\\-]{36}/state"); + public static final Pattern CONTROLLER_SERVICE_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller-services/node/[a-f0-9\\-]{36}/state"); + public static final Pattern REPORTING_TASK_STATE_URI_PATTERN = Pattern.compile("/nifi-api/reporting-tasks/node/[a-f0-9\\-]{36}/state"); + + @Override + public boolean canHandle(URI uri, String method) { + if (!"GET".equalsIgnoreCase(method)) { + return false; + } + + return PROCESSOR_STATE_URI_PATTERN.matcher(uri.getPath()).matches() + || CONTROLLER_SERVICE_STATE_URI_PATTERN.matcher(uri.getPath()).matches() + || REPORTING_TASK_STATE_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + @Override + protected Class<ComponentStateEntity> getEntityClass() { + return ComponentStateEntity.class; + } + + @Override + protected ComponentStateDTO getDto(ComponentStateEntity entity) { + return entity.getComponentState(); + } + + @Override + protected void mergeResponses(ComponentStateDTO clientDto, Map<NodeIdentifier, ComponentStateDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { + List<StateEntryDTO> localStateEntries = new ArrayList<>(); + + int totalStateEntries = 0; + for (final Map.Entry<NodeIdentifier, ComponentStateDTO> nodeEntry : dtoMap.entrySet()) { + final ComponentStateDTO nodeComponentState = nodeEntry.getValue(); + final NodeIdentifier nodeId = nodeEntry.getKey(); + final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort(); + + final StateMapDTO nodeLocalStateMap = nodeComponentState.getLocalState(); + if (nodeLocalStateMap.getState() != null) { + totalStateEntries += nodeLocalStateMap.getTotalEntryCount(); + + for (final StateEntryDTO nodeStateEntry : nodeLocalStateMap.getState()) { + nodeStateEntry.setClusterNodeId(nodeId.getId()); + nodeStateEntry.setClusterNodeAddress(nodeAddress); + localStateEntries.add(nodeStateEntry); + } + } + } + + // ensure appropriate sort + Collections.sort(localStateEntries, SortedStateUtils.getEntryDtoComparator()); + + // sublist if necessary + if (localStateEntries.size() > SortedStateUtils.MAX_COMPONENT_STATE_ENTRIES) { + localStateEntries = localStateEntries.subList(0, SortedStateUtils.MAX_COMPONENT_STATE_ENTRIES); + } + + // add all the local state entries + clientDto.getLocalState().setTotalEntryCount(totalStateEntries); + clientDto.getLocalState().setState(localStateEntries); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectionStatusEndpiontMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectionStatusEndpiontMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectionStatusEndpiontMerger.java new file mode 100644 index 0000000..05bf0f5 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectionStatusEndpiontMerger.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Map; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.manager.StatusMerger; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO; +import org.apache.nifi.web.api.dto.status.NodeConnectionStatusSnapshotDTO; +import org.apache.nifi.web.api.entity.ConnectionStatusEntity; + +public class ConnectionStatusEndpiontMerger extends AbstractNodeStatusEndpoint<ConnectionStatusEntity, ConnectionStatusDTO> { + public static final Pattern CONNECTION_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/flow/connections/[a-f0-9\\-]{36}/status"); + + @Override + public boolean canHandle(URI uri, String method) { + return "GET".equalsIgnoreCase(method) && CONNECTION_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + @Override + protected Class<ConnectionStatusEntity> getEntityClass() { + return ConnectionStatusEntity.class; + } + + @Override + protected ConnectionStatusDTO getDto(ConnectionStatusEntity entity) { + return entity.getConnectionStatus(); + } + + @Override + protected void mergeResponses(ConnectionStatusDTO clientDto, Map<NodeIdentifier, ConnectionStatusDTO> dtoMap, NodeIdentifier selectedNodeId) { + final ConnectionStatusDTO mergedConnectionStatus = clientDto; + mergedConnectionStatus.setNodeSnapshots(new ArrayList<NodeConnectionStatusSnapshotDTO>()); + + final NodeConnectionStatusSnapshotDTO selectedNodeSnapshot = new NodeConnectionStatusSnapshotDTO(); + selectedNodeSnapshot.setStatusSnapshot(clientDto.getAggregateSnapshot().clone()); + selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); + selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); + selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); + + mergedConnectionStatus.getNodeSnapshots().add(selectedNodeSnapshot); + + // merge the other nodes + for (final Map.Entry<NodeIdentifier, ConnectionStatusDTO> entry : dtoMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final ConnectionStatusDTO nodeConnectionStatus = entry.getValue(); + if (nodeConnectionStatus == clientDto) { + continue; + } + + StatusMerger.merge(mergedConnectionStatus, nodeConnectionStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceEndpointMerger.java new file mode 100644 index 0000000..1591604 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceEndpointMerger.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.controller.service.ControllerServiceState; +import org.apache.nifi.web.api.dto.ControllerServiceDTO; +import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; +import org.apache.nifi.web.api.entity.ControllerServiceEntity; +import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity; + +public class ControllerServiceEndpointMerger extends AbstractSingleEntityEndpoint<ControllerServiceEntity, ControllerServiceDTO> { + public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller-services/node"; + public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller-services/node/[a-f0-9\\-]{36}"); + + @Override + public boolean canHandle(URI uri, String method) { + if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_URI_PATTERN.matcher(uri.getPath()).matches()) { + return true; + } else if ("POST".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.equals(uri.getPath())) { + return true; + } + + return false; + } + + @Override + protected Class<ControllerServiceEntity> getEntityClass() { + return ControllerServiceEntity.class; + } + + @Override + protected ControllerServiceDTO getDto(ControllerServiceEntity entity) { + return entity.getControllerService(); + } + + @Override + protected void mergeResponses(ControllerServiceDTO clientDto, Map<NodeIdentifier, ControllerServiceDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { + final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>(); + final Set<ControllerServiceReferencingComponentEntity> referencingComponents = clientDto.getReferencingComponents(); + final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentEntity>> nodeReferencingComponentsMap = new HashMap<>(); + + String state = null; + for (final Map.Entry<NodeIdentifier, ControllerServiceDTO> nodeEntry : dtoMap.entrySet()) { + final NodeIdentifier nodeId = nodeEntry.getKey(); + final ControllerServiceDTO nodeControllerService = nodeEntry.getValue(); + + if (state == null) { + if (ControllerServiceState.DISABLING.name().equals(nodeControllerService.getState())) { + state = ControllerServiceState.DISABLING.name(); + } else if (ControllerServiceState.ENABLING.name().equals(nodeControllerService.getState())) { + state = ControllerServiceState.ENABLING.name(); + } + } + + nodeReferencingComponentsMap.put(nodeId, nodeControllerService.getReferencingComponents()); + + // merge the validation errors + mergeValidationErrors(validationErrorMap, nodeId, nodeControllerService.getValidationErrors()); + } + + // merge the referencing components + mergeControllerServiceReferences(referencingComponents, nodeReferencingComponentsMap); + + // store the 'transition' state is applicable + if (state != null) { + clientDto.setState(state); + } + + // set the merged the validation errors + clientDto.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, dtoMap.size())); + } + + public static void mergeControllerServiceReferences(Set<ControllerServiceReferencingComponentEntity> referencingComponents, + Map<NodeIdentifier, Set<ControllerServiceReferencingComponentEntity>> referencingComponentMap) { + + final Map<String, Integer> activeThreadCounts = new HashMap<>(); + final Map<String, String> states = new HashMap<>(); + for (final Map.Entry<NodeIdentifier, Set<ControllerServiceReferencingComponentEntity>> nodeEntry : referencingComponentMap.entrySet()) { + final Set<ControllerServiceReferencingComponentEntity> nodeReferencingComponents = nodeEntry.getValue(); + + // go through all the nodes referencing components + if (nodeReferencingComponents != null) { + for (final ControllerServiceReferencingComponentEntity nodeReferencingComponentEntity : nodeReferencingComponents) { + final ControllerServiceReferencingComponentDTO nodeReferencingComponent = nodeReferencingComponentEntity.getControllerServiceReferencingComponent(); + + // handle active thread counts + if (nodeReferencingComponent.getActiveThreadCount() != null && nodeReferencingComponent.getActiveThreadCount() > 0) { + final Integer current = activeThreadCounts.get(nodeReferencingComponent.getId()); + if (current == null) { + activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount()); + } else { + activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount() + current); + } + } + + // handle controller service state + final String state = states.get(nodeReferencingComponent.getId()); + if (state == null) { + if (ControllerServiceState.DISABLING.name().equals(nodeReferencingComponent.getState())) { + states.put(nodeReferencingComponent.getId(), ControllerServiceState.DISABLING.name()); + } else if (ControllerServiceState.ENABLING.name().equals(nodeReferencingComponent.getState())) { + states.put(nodeReferencingComponent.getId(), ControllerServiceState.ENABLING.name()); + } + } + } + } + } + + // go through each referencing components + for (final ControllerServiceReferencingComponentEntity referencingComponent : referencingComponents) { + final Integer activeThreadCount = activeThreadCounts.get(referencingComponent.getId()); + if (activeThreadCount != null) { + referencingComponent.getControllerServiceReferencingComponent().setActiveThreadCount(activeThreadCount); + } + + final String state = states.get(referencingComponent.getId()); + if (state != null) { + referencingComponent.getControllerServiceReferencingComponent().setState(state); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceReferenceEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceReferenceEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceReferenceEndpointMerger.java new file mode 100644 index 0000000..1fad628 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceReferenceEndpointMerger.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; +import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity; +import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity; + +public class ControllerServiceReferenceEndpointMerger implements EndpointResponseMerger { + public static final Pattern CONTROLLER_SERVICE_REFERENCES_URI_PATTERN = Pattern.compile("/nifi-api/controller-services/node/[a-f0-9\\-]{36}/references"); + + @Override + public boolean canHandle(URI uri, String method) { + if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_REFERENCES_URI_PATTERN.matcher(uri.getPath()).matches()) { + return true; + } + + return false; + } + + @Override + public NodeResponse merge(URI uri, String method, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses, NodeResponse clientResponse) { + if (!canHandle(uri, method)) { + throw new IllegalArgumentException("Cannot use Endpoint Mapper of type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", HTTP Method " + method); + } + + final ControllerServiceReferencingComponentsEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class); + final Set<ControllerServiceReferencingComponentEntity> referencingComponents = responseEntity.getControllerServiceReferencingComponents(); + + final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentEntity>> resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : successfulResponses) { + final ControllerServiceReferencingComponentsEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity + : nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class); + final Set<ControllerServiceReferencingComponentEntity> nodeReferencingComponents = nodeResponseEntity.getControllerServiceReferencingComponents(); + + resultsMap.put(nodeResponse.getNodeId(), nodeReferencingComponents); + } + + ControllerServiceEndpointMerger.mergeControllerServiceReferences(referencingComponents, resultsMap); + + return new NodeResponse(clientResponse, responseEntity); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServicesEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServicesEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServicesEndpointMerger.java new file mode 100644 index 0000000..097c974 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServicesEndpointMerger.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.ControllerServiceDTO; +import org.apache.nifi.web.api.entity.ControllerServicesEntity; + +public class ControllerServicesEndpointMerger extends AbstractMultiEntityEndpoint<ControllerServicesEntity, ControllerServiceDTO> { + public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller-services/node"; + + @Override + public boolean canHandle(URI uri, String method) { + return "GET".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.equals(uri.getPath()); + } + + @Override + protected Class<ControllerServicesEntity> getEntityClass() { + return ControllerServicesEntity.class; + } + + @Override + protected Set<ControllerServiceDTO> getDtos(ControllerServicesEntity entity) { + return entity.getControllerServices(); + } + + @Override + protected String getComponentId(ControllerServiceDTO dto) { + return dto.getId(); + } + + @Override + protected void mergeResponses(ControllerServiceDTO clientDto, Map<NodeIdentifier, ControllerServiceDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { + new ControllerServiceEndpointMerger().mergeResponses(clientDto, dtoMap, successfulResponses, problematicResponses); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerStatusEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerStatusEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerStatusEndpointMerger.java new file mode 100644 index 0000000..50514f8 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerStatusEndpointMerger.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.StatusMerger; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.BulletinDTO; +import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; +import org.apache.nifi.web.api.entity.ControllerStatusEntity; + +public class ControllerStatusEndpointMerger extends AbstractSingleEntityEndpoint<ControllerStatusEntity, ControllerStatusDTO> { + public static final Pattern CONTROLLER_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/flow/status"); + + @Override + public boolean canHandle(URI uri, String method) { + return "GET".equalsIgnoreCase(method) && CONTROLLER_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + @Override + protected Class<ControllerStatusEntity> getEntityClass() { + return ControllerStatusEntity.class; + } + + @Override + protected ControllerStatusDTO getDto(ControllerStatusEntity entity) { + return entity.getControllerStatus(); + } + + @Override + protected void mergeResponses(ControllerStatusDTO clientDto, Map<NodeIdentifier, ControllerStatusDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { + ControllerStatusDTO mergedStatus = clientDto; + for (final Map.Entry<NodeIdentifier, ControllerStatusDTO> entry : dtoMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final ControllerStatusDTO nodeStatus = entry.getValue(); + + final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort(); + for (final BulletinDTO bulletin : nodeStatus.getBulletins()) { + bulletin.setNodeAddress(nodeAddress); + } + for (final BulletinDTO bulletin : nodeStatus.getControllerServiceBulletins()) { + bulletin.setNodeAddress(nodeAddress); + } + for (final BulletinDTO bulletin : nodeStatus.getReportingTaskBulletins()) { + bulletin.setNodeAddress(nodeAddress); + } + + if (nodeStatus == mergedStatus) { + continue; + } + + StatusMerger.merge(mergedStatus, nodeStatus); + } + + final int totalNodeCount = successfulResponses.size() + problematicResponses.size(); + final int connectedNodeCount = successfulResponses.size(); // all nodes that responded successfully must be connected. Those that did not will be disconnected. + + mergedStatus.setConnectedNodeCount(connectedNodeCount); + mergedStatus.setTotalNodeCount(totalNodeCount); + StatusMerger.updatePrettyPrintedFields(mergedStatus); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/CountersEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/CountersEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/CountersEndpointMerger.java new file mode 100644 index 0000000..22b82b3 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/CountersEndpointMerger.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Map; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.manager.StatusMerger; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.CountersDTO; +import org.apache.nifi.web.api.dto.NodeCountersSnapshotDTO; +import org.apache.nifi.web.api.entity.CountersEntity; + +public class CountersEndpointMerger extends AbstractNodeStatusEndpoint<CountersEntity, CountersDTO> { + public static final Pattern COUNTERS_URI_PATTERN = Pattern.compile("/nifi-api/controller/counters"); + + @Override + public boolean canHandle(URI uri, String method) { + return "GET".equalsIgnoreCase(method) && COUNTERS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + @Override + protected Class<CountersEntity> getEntityClass() { + return CountersEntity.class; + } + + @Override + protected CountersDTO getDto(CountersEntity entity) { + return entity.getCounters(); + } + + @Override + protected void mergeResponses(CountersDTO clientDto, Map<NodeIdentifier, CountersDTO> dtoMap, NodeIdentifier selectedNodeId) { + final CountersDTO mergedCounters = clientDto; + mergedCounters.setNodeSnapshots(new ArrayList<NodeCountersSnapshotDTO>()); + + final NodeCountersSnapshotDTO selectedNodeSnapshot = new NodeCountersSnapshotDTO(); + selectedNodeSnapshot.setSnapshot(clientDto.getAggregateSnapshot().clone()); + selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); + selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); + selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); + + mergedCounters.getNodeSnapshots().add(selectedNodeSnapshot); + + for (final Map.Entry<NodeIdentifier, CountersDTO> entry : dtoMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final CountersDTO toMerge = entry.getValue(); + if (toMerge == clientDto) { + continue; + } + + StatusMerger.merge(mergedCounters, toMerge, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/DropRequestEndpiontMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/DropRequestEndpiontMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/DropRequestEndpiontMerger.java new file mode 100644 index 0000000..f6025b8 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/DropRequestEndpiontMerger.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.controller.queue.DropFlowFileState; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.web.api.dto.DropRequestDTO; +import org.apache.nifi.web.api.entity.DropRequestEntity; + +public class DropRequestEndpiontMerger extends AbstractSingleEntityEndpoint<DropRequestEntity, DropRequestDTO> { + public static final Pattern DROP_REQUESTS_URI = Pattern.compile("/nifi-api/flowfile-queues/[a-f0-9\\-]{36}/drop-requests"); + public static final Pattern DROP_REQUEST_URI = Pattern.compile("/nifi-api/flowfile-queues/[a-f0-9\\-]{36}/drop-requests/[a-f0-9\\-]{36}"); + + @Override + public boolean canHandle(URI uri, String method) { + if (("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method)) && DROP_REQUEST_URI.matcher(uri.getPath()).matches()) { + return true; + } else if (("POST".equalsIgnoreCase(method) && DROP_REQUESTS_URI.matcher(uri.getPath()).matches())) { + return true; + } + + return false; + } + + @Override + protected Class<DropRequestEntity> getEntityClass() { + return DropRequestEntity.class; + } + + @Override + protected DropRequestDTO getDto(DropRequestEntity entity) { + return entity.getDropRequest(); + } + + @Override + protected void mergeResponses(DropRequestDTO clientDto, Map<NodeIdentifier, DropRequestDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { + boolean nodeWaiting = false; + int originalCount = 0; + long originalSize = 0; + int currentCount = 0; + long currentSize = 0; + int droppedCount = 0; + long droppedSize = 0; + + DropFlowFileState state = null; + boolean allFinished = true; + String failureReason = null; + for (final Map.Entry<NodeIdentifier, DropRequestDTO> nodeEntry : dtoMap.entrySet()) { + final DropRequestDTO nodeDropRequest = nodeEntry.getValue(); + + if (!nodeDropRequest.isFinished()) { + allFinished = false; + } + if (nodeDropRequest.getFailureReason() != null) { + failureReason = nodeDropRequest.getFailureReason(); + } + + currentCount += nodeDropRequest.getCurrentCount(); + currentSize += nodeDropRequest.getCurrentSize(); + droppedCount += nodeDropRequest.getDroppedCount(); + droppedSize += nodeDropRequest.getDroppedSize(); + + if (nodeDropRequest.getOriginalCount() == null) { + nodeWaiting = true; + } else { + originalCount += nodeDropRequest.getOriginalCount(); + originalSize += nodeDropRequest.getOriginalSize(); + } + + final DropFlowFileState nodeState = DropFlowFileState.valueOfDescription(nodeDropRequest.getState()); + if (state == null || state.ordinal() > nodeState.ordinal()) { + state = nodeState; + } + } + + clientDto.setCurrentCount(currentCount); + clientDto.setCurrentSize(currentSize); + clientDto.setCurrent(FormatUtils.formatCount(currentCount) + " / " + FormatUtils.formatDataSize(currentSize)); + + clientDto.setDroppedCount(droppedCount); + clientDto.setDroppedSize(droppedSize); + clientDto.setDropped(FormatUtils.formatCount(droppedCount) + " / " + FormatUtils.formatDataSize(droppedSize)); + + clientDto.setFinished(allFinished); + clientDto.setFailureReason(failureReason); + if (originalCount == 0) { + clientDto.setPercentCompleted(allFinished ? 100 : 0); + } else { + clientDto.setPercentCompleted((int) ((double) droppedCount / (double) originalCount * 100D)); + } + + if (!nodeWaiting) { + clientDto.setOriginalCount(originalCount); + clientDto.setOriginalSize(originalSize); + clientDto.setOriginal(FormatUtils.formatCount(originalCount) + " / " + FormatUtils.formatDataSize(originalSize)); + } + + if (state != null) { + clientDto.setState(state.toString()); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowSnippetEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowSnippetEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowSnippetEndpointMerger.java new file mode 100644 index 0000000..d7f6948 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowSnippetEndpointMerger.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.FlowSnippetDTO; +import org.apache.nifi.web.api.dto.ProcessorDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; +import org.apache.nifi.web.api.entity.FlowSnippetEntity; + +public class FlowSnippetEndpointMerger implements EndpointResponseMerger { + public static final Pattern TEMPLATE_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/template-instance"); + public static final Pattern FLOW_SNIPPET_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/snippet-instance"); + + @Override + public boolean canHandle(final URI uri, final String method) { + return "POST".equalsIgnoreCase(method) && (TEMPLATE_INSTANCE_URI_PATTERN.matcher(uri.getPath()).matches() + || FLOW_SNIPPET_INSTANCE_URI_PATTERN.matcher(uri.getPath()).matches()); + } + + @Override + public NodeResponse merge(final URI uri, final String method, Set<NodeResponse> successfulResponses, final Set<NodeResponse> problematicResponses, final NodeResponse clientResponse) { + final FlowSnippetEntity responseEntity = clientResponse.getClientResponse().getEntity(FlowSnippetEntity.class); + final FlowSnippetDTO contents = responseEntity.getContents(); + + if (contents == null) { + return clientResponse; + } else { + final Map<String, Map<NodeIdentifier, ProcessorDTO>> processorMap = new HashMap<>(); + final Map<String, Map<NodeIdentifier, RemoteProcessGroupDTO>> remoteProcessGroupMap = new HashMap<>(); + + for (final NodeResponse nodeResponse : successfulResponses) { + final FlowSnippetEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(FlowSnippetEntity.class); + final FlowSnippetDTO nodeContents = nodeResponseEntity.getContents(); + + for (final ProcessorDTO nodeProcessor : nodeContents.getProcessors()) { + Map<NodeIdentifier, ProcessorDTO> innerMap = processorMap.get(nodeProcessor.getId()); + if (innerMap == null) { + innerMap = new HashMap<>(); + processorMap.put(nodeProcessor.getId(), innerMap); + } + + innerMap.put(nodeResponse.getNodeId(), nodeProcessor); + } + + for (final RemoteProcessGroupDTO nodeRemoteProcessGroup : nodeContents.getRemoteProcessGroups()) { + Map<NodeIdentifier, RemoteProcessGroupDTO> innerMap = remoteProcessGroupMap.get(nodeRemoteProcessGroup.getId()); + if (innerMap == null) { + innerMap = new HashMap<>(); + remoteProcessGroupMap.put(nodeRemoteProcessGroup.getId(), innerMap); + } + + innerMap.put(nodeResponse.getNodeId(), nodeRemoteProcessGroup); + } + } + + final ProcessorEndpointMerger procMerger = new ProcessorEndpointMerger(); + for (final ProcessorDTO processor : contents.getProcessors()) { + final String procId = processor.getId(); + final Map<NodeIdentifier, ProcessorDTO> mergeMap = processorMap.get(procId); + + procMerger.mergeResponses(processor, mergeMap, successfulResponses, problematicResponses); + } + + final RemoteProcessGroupEndpointMerger rpgMerger = new RemoteProcessGroupEndpointMerger(); + for (final RemoteProcessGroupDTO remoteProcessGroup : contents.getRemoteProcessGroups()) { + if (remoteProcessGroup.getContents() != null) { + final String remoteProcessGroupId = remoteProcessGroup.getId(); + final Map<NodeIdentifier, RemoteProcessGroupDTO> mergeMap = remoteProcessGroupMap.get(remoteProcessGroupId); + + rpgMerger.mergeResponses(remoteProcessGroup, mergeMap, successfulResponses, problematicResponses); + } + } + } + + // create a new client response + return new NodeResponse(clientResponse, responseEntity); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/GroupStatusEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/GroupStatusEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/GroupStatusEndpointMerger.java new file mode 100644 index 0000000..b769042 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/GroupStatusEndpointMerger.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.regex.Pattern; + +import org.apache.nifi.cluster.manager.StatusMerger; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; +import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO; +import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO; +import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity; + +public class GroupStatusEndpointMerger extends AbstractNodeStatusEndpoint<ProcessGroupStatusEntity, ProcessGroupStatusDTO> { + public static final Pattern GROUP_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/flow/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/status"); + + @Override + public boolean canHandle(URI uri, String method) { + return "GET".equalsIgnoreCase(method) && GROUP_STATUS_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + @Override + protected Class<ProcessGroupStatusEntity> getEntityClass() { + return ProcessGroupStatusEntity.class; + } + + @Override + protected ProcessGroupStatusDTO getDto(ProcessGroupStatusEntity entity) { + return entity.getProcessGroupStatus(); + } + + @Override + protected void mergeResponses(ProcessGroupStatusDTO clientDto, Map<NodeIdentifier, ProcessGroupStatusDTO> dtoMap, NodeIdentifier selectedNodeId) { + final ProcessGroupStatusDTO mergedProcessGroupStatus = clientDto; + mergedProcessGroupStatus.setNodeSnapshots(new ArrayList<NodeProcessGroupStatusSnapshotDTO>()); + + final NodeProcessGroupStatusSnapshotDTO selectedNodeSnapshot = new NodeProcessGroupStatusSnapshotDTO(); + selectedNodeSnapshot.setStatusSnapshot(clientDto.getAggregateSnapshot().clone()); + selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress()); + selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort()); + selectedNodeSnapshot.setNodeId(selectedNodeId.getId()); + + mergedProcessGroupStatus.getNodeSnapshots().add(selectedNodeSnapshot); + + for (final Map.Entry<NodeIdentifier, ProcessGroupStatusDTO> entry : dtoMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final ProcessGroupStatusDTO nodeProcessGroupStatus = entry.getValue(); + if (nodeProcessGroupStatus == mergedProcessGroupStatus) { + continue; + } + + final ProcessGroupStatusSnapshotDTO nodeSnapshot = nodeProcessGroupStatus.getAggregateSnapshot(); + for (final RemoteProcessGroupStatusSnapshotDTO remoteProcessGroupStatus : nodeSnapshot.getRemoteProcessGroupStatusSnapshots()) { + final List<String> nodeAuthorizationIssues = remoteProcessGroupStatus.getAuthorizationIssues(); + if (!nodeAuthorizationIssues.isEmpty()) { + for (final ListIterator<String> iter = nodeAuthorizationIssues.listIterator(); iter.hasNext();) { + final String Issue = iter.next(); + iter.set("[" + nodeId.getApiAddress() + ":" + nodeId.getApiPort() + "] -- " + Issue); + } + remoteProcessGroupStatus.setAuthorizationIssues(nodeAuthorizationIssues); + } + } + + StatusMerger.merge(mergedProcessGroupStatus, nodeProcessGroupStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort()); + } + } + +}
