Repository: nifi Updated Branches: refs/heads/master d37480eaf -> 97e2f406d
NIFI-2708 Added merging of SiteToSite details NIFI-2708 Updated to include ports from clientDto for port merging. This closes #981 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/97e2f406 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/97e2f406 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/97e2f406 Branch: refs/heads/master Commit: 97e2f406da83602ab2cf42bf3b7fbfcb9ea1686c Parents: d37480e Author: Jeff Storck <[email protected]> Authored: Wed Aug 31 19:34:57 2016 -0400 Committer: Matt Gilman <[email protected]> Committed: Fri Sep 2 14:55:08 2016 -0400 ---------------------------------------------------------------------- .../http/StandardHttpResponseMerger.java | 2 + .../endpoints/ControllerEndpointMerger.java | 110 +++++++++++++++++++ .../nifi/cluster/manager/PortEntityMerger.java | 2 +- 3 files changed, 113 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/97e2f406/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java index 7b1d8f6..a7e4883 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java @@ -23,6 +23,7 @@ import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionStatusEndpi import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionsEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ControllerBulletinsEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ControllerConfigurationEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceReferenceEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServicesEndpointMerger; @@ -131,6 +132,7 @@ public class StandardHttpResponseMerger implements HttpResponseMerger { endpointMergers.add(new LabelsEndpointMerger()); endpointMergers.add(new FunnelEndpointMerger()); endpointMergers.add(new FunnelsEndpointMerger()); + endpointMergers.add(new ControllerEndpointMerger()); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/97e2f406/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerEndpointMerger.java new file mode 100644 index 0000000..6e38860 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerEndpointMerger.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.coordination.http.endpoints; + +import com.google.common.collect.Sets; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.PortEntityMerger; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.ControllerDTO; +import org.apache.nifi.web.api.dto.PortDTO; +import org.apache.nifi.web.api.entity.ControllerEntity; + +import java.net.URI; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +public class ControllerEndpointMerger extends AbstractSingleDTOEndpoint<ControllerEntity, ControllerDTO> { + public static final Pattern CONTROLLER_URI_PATTERN = Pattern.compile("/nifi-api/site-to-site"); + private PortEntityMerger portMerger = new PortEntityMerger(); + + @Override + protected Class<ControllerEntity> getEntityClass() { + return ControllerEntity.class; + } + + @Override + protected ControllerDTO getDto(ControllerEntity entity) { + return entity.getController(); + } + + @Override + protected void mergeResponses(ControllerDTO clientDto, Map<NodeIdentifier, ControllerDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { + ControllerDTO mergedController = clientDto; + final Map<String, Map<NodeIdentifier, PortDTO>> inputPortMap = new HashMap<>(); // map of port id to map of node id to port dto + final Map<String, Map<NodeIdentifier, PortDTO>> outputPortMap = new HashMap<>(); // map of port id to map of node id to port dto + + for (final Map.Entry<NodeIdentifier, ControllerDTO> entry : dtoMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final ControllerDTO nodeController = entry.getValue(); + + // gather all input and output ports for merging, including the ports from clientDto + nodeController.getInputPorts().stream().forEach(inputPort -> inputPortMap.computeIfAbsent(inputPort.getId(), nodeIdToInputPort -> new HashMap<>()).put(nodeId, inputPort)); + nodeController.getOutputPorts().stream().forEach(outputPort -> outputPortMap.computeIfAbsent(outputPort.getId(), nodeIdToOutputPort -> new HashMap<>()).put(nodeId, outputPort)); + } + + /* + * Note on port merging: only merge the ports if they exist in the client response and all node responses. Due to authorization possibly different per node, only ports that have been + * returned from every node need to be merged. If a node doesn't return a port DTO due to authorization issues, the responses for that port ID should be dropped from the client response. + */ + + // merge input ports + for (Map<NodeIdentifier, PortDTO> inputPortByNodeId : inputPortMap.values()) { + final Collection<PortDTO> nodeInputPorts = inputPortByNodeId.values(); + if (!nodeInputPorts.isEmpty()) { + final PortDTO inputPort = nodeInputPorts.iterator().next(); + final PortDTO clientInputPort = clientDto.getInputPorts().stream().filter(p -> p.getId().equals(inputPort.getId())).findFirst().orElse(null); + if (clientInputPort != null) { + PortEntityMerger.mergeDtos(clientInputPort, inputPortByNodeId); + } + } + } + + // merge output ports + for (Map<NodeIdentifier, PortDTO> outputPortByNodeId : outputPortMap.values()) { + final Collection<PortDTO> nodeOutputPorts = outputPortByNodeId.values(); + if (!nodeOutputPorts.isEmpty()) { + final PortDTO outputPort = nodeOutputPorts.iterator().next(); + final PortDTO clientOutputPort = clientDto.getInputPorts().stream().filter(p -> p.getId().equals(outputPort.getId())).findFirst().orElse(null); + if (clientOutputPort != null) { + PortEntityMerger.mergeDtos(clientOutputPort, outputPortByNodeId); + } + } + } + + // get intersection of input and output ports + final Set<PortDTO> clientInputPorts = Sets.newHashSet(clientDto.getInputPorts()); + final Set<PortDTO> clientOutputPorts = Sets.newHashSet(clientDto.getOutputPorts()); + dtoMap.values().forEach(controller -> { + clientInputPorts.retainAll(controller.getInputPorts()); + clientOutputPorts.retainAll(controller.getOutputPorts()); + }); + + clientDto.setInputPorts(clientInputPorts); + clientDto.setInputPortCount(clientInputPorts.size()); + clientDto.setOutputPorts(clientOutputPorts); + clientDto.setOutputPortCount(clientOutputPorts.size()); + } + + @Override + public boolean canHandle(URI uri, String method) { + return "GET".equalsIgnoreCase(method) && CONTROLLER_URI_PATTERN.matcher(uri.getPath()).matches(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/97e2f406/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java index cd73084..2929741 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java @@ -65,7 +65,7 @@ public class PortEntityMerger implements ComponentEntityMerger<PortEntity>, Comp statusNodeIdentifier.getApiPort()); } - private static void mergeDtos(final PortDTO clientDto, final Map<NodeIdentifier, PortDTO> dtoMap) { + public static void mergeDtos(final PortDTO clientDto, final Map<NodeIdentifier, PortDTO> dtoMap) { // if unauthorized for the client dto, simple return if (clientDto == null) { return;
