Repository: nifi
Updated Branches:
  refs/heads/master 86f162b61 -> a1c917656


NIFI-4973:
- Fixing RPG port merging.
- Adding unit tests.
- Removing unecessary sorting that wasn't maintained while clustered.

This closes #2551.

Signed-off-by: Mark Payne <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a1c91765
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a1c91765
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a1c91765

Branch: refs/heads/master
Commit: a1c917656e2beec2f57953ee4717ef1e3599489d
Parents: 86f162b
Author: Matt Gilman <[email protected]>
Authored: Wed Mar 14 16:31:41 2018 -0400
Committer: Mark Payne <[email protected]>
Committed: Fri Mar 16 13:03:09 2018 -0400

----------------------------------------------------------------------
 .../manager/RemoteProcessGroupEntityMerger.java |  30 ++++--
 .../RemoteProcessGroupEntityMergerTest.java     | 103 +++++++++++++++++++
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  42 +-------
 3 files changed, 126 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a1c91765/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java
index a426d93..5207524 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Set;
 
 public class RemoteProcessGroupEntityMerger implements 
ComponentEntityMerger<RemoteProcessGroupEntity>, 
ComponentEntityStatusMerger<RemoteProcessGroupStatusDTO> {
+
     @Override
     public void merge(RemoteProcessGroupEntity clientEntity, 
Map<NodeIdentifier, RemoteProcessGroupEntity> entityMap) {
         ComponentEntityMerger.super.merge(clientEntity, entityMap);
@@ -76,9 +77,10 @@ public class RemoteProcessGroupEntityMerger implements 
ComponentEntityMerger<Rem
 
         final Map<String, Set<NodeIdentifier>> authorizationErrorMap = new 
HashMap<>();
         final Map<String, Set<NodeIdentifier>> validationErrorMap = new 
HashMap<>();
+
         Boolean mergedIsTargetSecure = null;
-        final Set<RemoteProcessGroupPortDTO> mergedInputPorts = new 
HashSet<>();
-        final Set<RemoteProcessGroupPortDTO> mergedOutputPorts = new 
HashSet<>();
+        Set<RemoteProcessGroupPortDTO> mergedInputPorts = null;
+        Set<RemoteProcessGroupPortDTO> mergedOutputPorts = null;
 
         for (final Map.Entry<NodeIdentifier, RemoteProcessGroupDTO> nodeEntry 
: dtoMap.entrySet()) {
             final RemoteProcessGroupDTO nodeRemoteProcessGroup = 
nodeEntry.getValue();
@@ -100,22 +102,32 @@ public class RemoteProcessGroupEntityMerger implements 
ComponentEntityMerger<Rem
                 // merge the ports in the contents
                 final RemoteProcessGroupContentsDTO 
nodeRemoteProcessGroupContentsDto = nodeRemoteProcessGroup.getContents();
                 if (remoteProcessGroupContents != null && 
nodeRemoteProcessGroupContentsDto != null) {
-                    if (nodeRemoteProcessGroupContentsDto.getInputPorts() != 
null) {
-                        
mergedInputPorts.addAll(nodeRemoteProcessGroupContentsDto.getInputPorts());
+                    final Set<RemoteProcessGroupPortDTO> nodeInputPorts = 
nodeRemoteProcessGroupContentsDto.getInputPorts();
+                    if (nodeInputPorts != null) {
+                        if (mergedInputPorts == null) {
+                            mergedInputPorts = new HashSet<>(nodeInputPorts);
+                        } else {
+                            mergedInputPorts.retainAll(nodeInputPorts);
+                        }
                     }
-                    if (nodeRemoteProcessGroupContentsDto.getOutputPorts() != 
null) {
-                        
mergedOutputPorts.addAll(nodeRemoteProcessGroupContentsDto.getOutputPorts());
+
+                    final Set<RemoteProcessGroupPortDTO> nodeOutputPorts = 
nodeRemoteProcessGroupContentsDto.getOutputPorts();
+                    if (nodeOutputPorts != null) {
+                        if (mergedOutputPorts == null) {
+                            mergedOutputPorts = new HashSet<>(nodeOutputPorts);
+                        } else {
+                            mergedOutputPorts.retainAll(nodeOutputPorts);
+                        }
                     }
                 }
             }
-
         }
 
         if (remoteProcessGroupContents != null) {
-            if (!mergedInputPorts.isEmpty()) {
+            if (mergedInputPorts != null && !mergedInputPorts.isEmpty()) {
                 remoteProcessGroupContents.setInputPorts(mergedInputPorts);
             }
-            if (!mergedOutputPorts.isEmpty()) {
+            if (mergedOutputPorts != null && !mergedOutputPorts.isEmpty()) {
                 remoteProcessGroupContents.setOutputPorts(mergedOutputPorts);
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1c91765/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMergerTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMergerTest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMergerTest.java
new file mode 100644
index 0000000..8a262d1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMergerTest.java
@@ -0,0 +1,103 @@
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.PermissionsDTO;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
+import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class RemoteProcessGroupEntityMergerTest {
+
+    @Test
+    public void testMergeRemoteProcessGroups() throws Exception {
+        final NodeIdentifier node1 = new NodeIdentifier("node-1", "host-1", 
8080, "host-1", 19998, null, null, null, false);
+        final NodeIdentifier node2 = new NodeIdentifier("node-2", "host-2", 
8081, "host-2", 19999, null, null, null, false);
+
+        final PermissionsDTO permissed = new PermissionsDTO();
+        permissed.setCanRead(true);
+        permissed.setCanWrite(true);
+
+        final RemoteProcessGroupStatusDTO status = new 
RemoteProcessGroupStatusDTO();
+        status.setAggregateSnapshot(new RemoteProcessGroupStatusSnapshotDTO());
+
+        final RemoteProcessGroupPortDTO in1_1 = new 
RemoteProcessGroupPortDTO();
+        in1_1.setName("in1");
+
+        final RemoteProcessGroupPortDTO in1_2 = new 
RemoteProcessGroupPortDTO();
+        in1_2.setName("in2");
+
+        final Set<RemoteProcessGroupPortDTO> inputs1 = new HashSet<>();
+        inputs1.add(in1_1);
+        inputs1.add(in1_2);
+
+        final RemoteProcessGroupPortDTO out1_1 = new 
RemoteProcessGroupPortDTO();
+        out1_1.setName("out1");
+
+        final Set<RemoteProcessGroupPortDTO> outputs1 = new HashSet<>();
+        outputs1.add(out1_1);
+
+        final RemoteProcessGroupContentsDTO contents1 = new 
RemoteProcessGroupContentsDTO();
+        contents1.setInputPorts(inputs1);
+        contents1.setOutputPorts(outputs1);
+
+        final RemoteProcessGroupDTO rpg1 = new RemoteProcessGroupDTO();
+        rpg1.setContents(contents1);
+
+        final RemoteProcessGroupEntity entity1 = new 
RemoteProcessGroupEntity();
+        entity1.setPermissions(permissed);
+        entity1.setStatus(status);
+        entity1.setComponent(rpg1);
+
+        final RemoteProcessGroupPortDTO in2_1 = new 
RemoteProcessGroupPortDTO();
+        in2_1.setName("in1");
+
+        final Set<RemoteProcessGroupPortDTO> inputs2 = new HashSet<>();
+        inputs2.add(in2_1);
+
+        final RemoteProcessGroupPortDTO out2_1 = new 
RemoteProcessGroupPortDTO();
+        out2_1.setName("out1");
+
+        final RemoteProcessGroupPortDTO out2_2 = new 
RemoteProcessGroupPortDTO();
+        out2_2.setName("out2");
+
+        final Set<RemoteProcessGroupPortDTO> outputs2 = new HashSet<>();
+        outputs2.add(out2_1);
+        outputs2.add(out2_2);
+
+        final RemoteProcessGroupContentsDTO contents2 = new 
RemoteProcessGroupContentsDTO();
+        contents2.setInputPorts(inputs2);
+        contents2.setOutputPorts(outputs2);
+
+        final RemoteProcessGroupDTO rpg2 = new RemoteProcessGroupDTO();
+        rpg2.setContents(contents2);
+
+        final RemoteProcessGroupEntity entity2 = new 
RemoteProcessGroupEntity();
+        entity2.setPermissions(permissed);
+        entity2.setStatus(status);
+        entity2.setComponent(rpg2);
+
+        final Map<NodeIdentifier, RemoteProcessGroupEntity> nodeMap = new 
HashMap<>();
+        nodeMap.put(node1, entity1);
+        nodeMap.put(node2, entity2);
+
+        final RemoteProcessGroupEntityMerger merger = new 
RemoteProcessGroupEntityMerger();
+        merger.merge(entity1, nodeMap);
+
+        // should only include ports in common to all rpg's
+        assertEquals(1, 
entity1.getComponent().getContents().getInputPorts().size());
+        assertEquals("in1", 
entity1.getComponent().getContents().getInputPorts().iterator().next().getName());
+        assertEquals(1, 
entity1.getComponent().getContents().getOutputPorts().size());
+        assertEquals("out1", 
entity1.getComponent().getContents().getOutputPorts().iterator().next().getName());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1c91765/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 6b7e49a..e26be4e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -1595,8 +1595,8 @@ public final class DtoFactory {
             return null;
         }
 
-        final Set<RemoteProcessGroupPortDTO> inputPorts = new TreeSet<>(new 
DtoFactory.SortedRemoteGroupPortComparator());
-        final Set<RemoteProcessGroupPortDTO> outputPorts = new TreeSet<>(new 
DtoFactory.SortedRemoteGroupPortComparator());
+        final Set<RemoteProcessGroupPortDTO> inputPorts = new HashSet<>();
+        final Set<RemoteProcessGroupPortDTO> outputPorts = new HashSet<>();
 
         int activeRemoteInputPortCount = 0;
         int inactiveRemoteInputPortCount = 0;
@@ -4082,44 +4082,6 @@ public final class DtoFactory {
         return copy;
     }
 
-    private static class SortedRemoteGroupPortComparator implements 
Comparator<RemoteProcessGroupPortDTO> {
-
-        @Override
-        public int compare(final RemoteProcessGroupPortDTO o1, final 
RemoteProcessGroupPortDTO o2) {
-            if (o2 == null) {
-                return -1;
-            } else if (o1 == null) {
-                return 1;
-            }
-
-            final String name1 = o1.getName();
-            final String name2 = o2.getName();
-            if (name2 == null) {
-                return -1;
-            } else if (name1 == null) {
-                return 1;
-            } else {
-                int compareResult = 
Collator.getInstance(Locale.US).compare(name2, name2);
-
-                // if the names are same, use the id
-                if (compareResult == 0) {
-                    final String id1 = o1.getId();
-                    final String id2 = o2.getId();
-                    if (id2 == null) {
-                        compareResult = -1;
-                    } else if (id1 == null) {
-                        compareResult = 1;
-                    } else {
-                        compareResult = id1.compareTo(id2);
-                    }
-                }
-
-                return compareResult;
-            }
-
-        }
-    }
-
     /**
      * Factory method for creating a new RevisionDTO based on this controller.
      *

Reply via email to