Repository: nifi Updated Branches: refs/heads/master 86f162b61 -> a1c917656
NIFI-4973: - Fixing RPG port merging. - Adding unit tests. - Removing unecessary sorting that wasn't maintained while clustered. This closes #2551. Signed-off-by: Mark Payne <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a1c91765 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a1c91765 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a1c91765 Branch: refs/heads/master Commit: a1c917656e2beec2f57953ee4717ef1e3599489d Parents: 86f162b Author: Matt Gilman <[email protected]> Authored: Wed Mar 14 16:31:41 2018 -0400 Committer: Mark Payne <[email protected]> Committed: Fri Mar 16 13:03:09 2018 -0400 ---------------------------------------------------------------------- .../manager/RemoteProcessGroupEntityMerger.java | 30 ++++-- .../RemoteProcessGroupEntityMergerTest.java | 103 +++++++++++++++++++ .../org/apache/nifi/web/api/dto/DtoFactory.java | 42 +------- 3 files changed, 126 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/a1c91765/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java index a426d93..5207524 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Set; public class RemoteProcessGroupEntityMerger implements ComponentEntityMerger<RemoteProcessGroupEntity>, ComponentEntityStatusMerger<RemoteProcessGroupStatusDTO> { + @Override public void merge(RemoteProcessGroupEntity clientEntity, Map<NodeIdentifier, RemoteProcessGroupEntity> entityMap) { ComponentEntityMerger.super.merge(clientEntity, entityMap); @@ -76,9 +77,10 @@ public class RemoteProcessGroupEntityMerger implements ComponentEntityMerger<Rem final Map<String, Set<NodeIdentifier>> authorizationErrorMap = new HashMap<>(); final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>(); + Boolean mergedIsTargetSecure = null; - final Set<RemoteProcessGroupPortDTO> mergedInputPorts = new HashSet<>(); - final Set<RemoteProcessGroupPortDTO> mergedOutputPorts = new HashSet<>(); + Set<RemoteProcessGroupPortDTO> mergedInputPorts = null; + Set<RemoteProcessGroupPortDTO> mergedOutputPorts = null; for (final Map.Entry<NodeIdentifier, RemoteProcessGroupDTO> nodeEntry : dtoMap.entrySet()) { final RemoteProcessGroupDTO nodeRemoteProcessGroup = nodeEntry.getValue(); @@ -100,22 +102,32 @@ public class RemoteProcessGroupEntityMerger implements ComponentEntityMerger<Rem // merge the ports in the contents final RemoteProcessGroupContentsDTO nodeRemoteProcessGroupContentsDto = nodeRemoteProcessGroup.getContents(); if (remoteProcessGroupContents != null && nodeRemoteProcessGroupContentsDto != null) { - if (nodeRemoteProcessGroupContentsDto.getInputPorts() != null) { - mergedInputPorts.addAll(nodeRemoteProcessGroupContentsDto.getInputPorts()); + final Set<RemoteProcessGroupPortDTO> nodeInputPorts = nodeRemoteProcessGroupContentsDto.getInputPorts(); + if (nodeInputPorts != null) { + if (mergedInputPorts == null) { + mergedInputPorts = new HashSet<>(nodeInputPorts); + } else { + mergedInputPorts.retainAll(nodeInputPorts); + } } - if (nodeRemoteProcessGroupContentsDto.getOutputPorts() != null) { - mergedOutputPorts.addAll(nodeRemoteProcessGroupContentsDto.getOutputPorts()); + + final Set<RemoteProcessGroupPortDTO> nodeOutputPorts = nodeRemoteProcessGroupContentsDto.getOutputPorts(); + if (nodeOutputPorts != null) { + if (mergedOutputPorts == null) { + mergedOutputPorts = new HashSet<>(nodeOutputPorts); + } else { + mergedOutputPorts.retainAll(nodeOutputPorts); + } } } } - } if (remoteProcessGroupContents != null) { - if (!mergedInputPorts.isEmpty()) { + if (mergedInputPorts != null && !mergedInputPorts.isEmpty()) { remoteProcessGroupContents.setInputPorts(mergedInputPorts); } - if (!mergedOutputPorts.isEmpty()) { + if (mergedOutputPorts != null && !mergedOutputPorts.isEmpty()) { remoteProcessGroupContents.setOutputPorts(mergedOutputPorts); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/a1c91765/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMergerTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMergerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMergerTest.java new file mode 100644 index 0000000..8a262d1 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMergerTest.java @@ -0,0 +1,103 @@ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.PermissionsDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; +import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; +import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO; +import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; +import org.junit.Test; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +public class RemoteProcessGroupEntityMergerTest { + + @Test + public void testMergeRemoteProcessGroups() throws Exception { + final NodeIdentifier node1 = new NodeIdentifier("node-1", "host-1", 8080, "host-1", 19998, null, null, null, false); + final NodeIdentifier node2 = new NodeIdentifier("node-2", "host-2", 8081, "host-2", 19999, null, null, null, false); + + final PermissionsDTO permissed = new PermissionsDTO(); + permissed.setCanRead(true); + permissed.setCanWrite(true); + + final RemoteProcessGroupStatusDTO status = new RemoteProcessGroupStatusDTO(); + status.setAggregateSnapshot(new RemoteProcessGroupStatusSnapshotDTO()); + + final RemoteProcessGroupPortDTO in1_1 = new RemoteProcessGroupPortDTO(); + in1_1.setName("in1"); + + final RemoteProcessGroupPortDTO in1_2 = new RemoteProcessGroupPortDTO(); + in1_2.setName("in2"); + + final Set<RemoteProcessGroupPortDTO> inputs1 = new HashSet<>(); + inputs1.add(in1_1); + inputs1.add(in1_2); + + final RemoteProcessGroupPortDTO out1_1 = new RemoteProcessGroupPortDTO(); + out1_1.setName("out1"); + + final Set<RemoteProcessGroupPortDTO> outputs1 = new HashSet<>(); + outputs1.add(out1_1); + + final RemoteProcessGroupContentsDTO contents1 = new RemoteProcessGroupContentsDTO(); + contents1.setInputPorts(inputs1); + contents1.setOutputPorts(outputs1); + + final RemoteProcessGroupDTO rpg1 = new RemoteProcessGroupDTO(); + rpg1.setContents(contents1); + + final RemoteProcessGroupEntity entity1 = new RemoteProcessGroupEntity(); + entity1.setPermissions(permissed); + entity1.setStatus(status); + entity1.setComponent(rpg1); + + final RemoteProcessGroupPortDTO in2_1 = new RemoteProcessGroupPortDTO(); + in2_1.setName("in1"); + + final Set<RemoteProcessGroupPortDTO> inputs2 = new HashSet<>(); + inputs2.add(in2_1); + + final RemoteProcessGroupPortDTO out2_1 = new RemoteProcessGroupPortDTO(); + out2_1.setName("out1"); + + final RemoteProcessGroupPortDTO out2_2 = new RemoteProcessGroupPortDTO(); + out2_2.setName("out2"); + + final Set<RemoteProcessGroupPortDTO> outputs2 = new HashSet<>(); + outputs2.add(out2_1); + outputs2.add(out2_2); + + final RemoteProcessGroupContentsDTO contents2 = new RemoteProcessGroupContentsDTO(); + contents2.setInputPorts(inputs2); + contents2.setOutputPorts(outputs2); + + final RemoteProcessGroupDTO rpg2 = new RemoteProcessGroupDTO(); + rpg2.setContents(contents2); + + final RemoteProcessGroupEntity entity2 = new RemoteProcessGroupEntity(); + entity2.setPermissions(permissed); + entity2.setStatus(status); + entity2.setComponent(rpg2); + + final Map<NodeIdentifier, RemoteProcessGroupEntity> nodeMap = new HashMap<>(); + nodeMap.put(node1, entity1); + nodeMap.put(node2, entity2); + + final RemoteProcessGroupEntityMerger merger = new RemoteProcessGroupEntityMerger(); + merger.merge(entity1, nodeMap); + + // should only include ports in common to all rpg's + assertEquals(1, entity1.getComponent().getContents().getInputPorts().size()); + assertEquals("in1", entity1.getComponent().getContents().getInputPorts().iterator().next().getName()); + assertEquals(1, entity1.getComponent().getContents().getOutputPorts().size()); + assertEquals("out1", entity1.getComponent().getContents().getOutputPorts().iterator().next().getName()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/a1c91765/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 6b7e49a..e26be4e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -1595,8 +1595,8 @@ public final class DtoFactory { return null; } - final Set<RemoteProcessGroupPortDTO> inputPorts = new TreeSet<>(new DtoFactory.SortedRemoteGroupPortComparator()); - final Set<RemoteProcessGroupPortDTO> outputPorts = new TreeSet<>(new DtoFactory.SortedRemoteGroupPortComparator()); + final Set<RemoteProcessGroupPortDTO> inputPorts = new HashSet<>(); + final Set<RemoteProcessGroupPortDTO> outputPorts = new HashSet<>(); int activeRemoteInputPortCount = 0; int inactiveRemoteInputPortCount = 0; @@ -4082,44 +4082,6 @@ public final class DtoFactory { return copy; } - private static class SortedRemoteGroupPortComparator implements Comparator<RemoteProcessGroupPortDTO> { - - @Override - public int compare(final RemoteProcessGroupPortDTO o1, final RemoteProcessGroupPortDTO o2) { - if (o2 == null) { - return -1; - } else if (o1 == null) { - return 1; - } - - final String name1 = o1.getName(); - final String name2 = o2.getName(); - if (name2 == null) { - return -1; - } else if (name1 == null) { - return 1; - } else { - int compareResult = Collator.getInstance(Locale.US).compare(name2, name2); - - // if the names are same, use the id - if (compareResult == 0) { - final String id1 = o1.getId(); - final String id2 = o2.getId(); - if (id2 == null) { - compareResult = -1; - } else if (id1 == null) { - compareResult = 1; - } else { - compareResult = id1.compareTo(id2); - } - } - - return compareResult; - } - - } - } - /** * Factory method for creating a new RevisionDTO based on this controller. *
