Repository: nifi Updated Branches: refs/heads/master ce1bc42ac -> 6b71b4cbb
NIFI-3868: - Ensuring we do not attempt to group bulletins that lack permissions. - Only group bulletins when all nodes report the same message. - Retain the most recent bulletin. Signed-off-by: Matt Burgess <[email protected]> This closes #1801 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6b71b4cb Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6b71b4cb Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6b71b4cb Branch: refs/heads/master Commit: 6b71b4cbb860c04431d3c54ef75d71fe44c59382 Parents: ce1bc42 Author: Matt Gilman <[email protected]> Authored: Mon May 15 12:52:46 2017 -0400 Committer: Matt Burgess <[email protected]> Committed: Tue May 16 16:54:30 2017 -0400 ---------------------------------------------------------------------- .../endpoints/BulletinBoardEndpointMerger.java | 2 +- .../ControllerBulletinsEndpointMerger.java | 6 +- .../nifi/cluster/manager/BulletinMerger.java | 44 ++++++++-- .../cluster/manager/ComponentEntityMerger.java | 2 +- .../cluster/manager/BulletinMergerTest.java | 86 ++++++++++++++++++++ 5 files changed, 130 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/6b71b4cb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java index 1001912..0aa705c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java @@ -61,7 +61,7 @@ public class BulletinBoardEndpointMerger extends AbstractSingleDTOEndpoint<Bulle }); } - clientDto.setBulletins(BulletinMerger.mergeBulletins(bulletinEntities)); + clientDto.setBulletins(BulletinMerger.mergeBulletins(bulletinEntities, dtoMap.size())); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6b71b4cb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerBulletinsEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerBulletinsEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerBulletinsEndpointMerger.java index 1e85ced..03f9d4c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerBulletinsEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerBulletinsEndpointMerger.java @@ -92,9 +92,9 @@ public class ControllerBulletinsEndpointMerger extends AbstractSingleEntityEndpo } } - clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinDtos)); - clientEntity.setControllerServiceBulletins(BulletinMerger.mergeBulletins(controllerServiceBulletinDtos)); - clientEntity.setReportingTaskBulletins(BulletinMerger.mergeBulletins(reportingTaskBulletinDtos)); + clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinDtos, entityMap.size())); + clientEntity.setControllerServiceBulletins(BulletinMerger.mergeBulletins(controllerServiceBulletinDtos, entityMap.size())); + clientEntity.setReportingTaskBulletins(BulletinMerger.mergeBulletins(reportingTaskBulletinDtos, entityMap.size())); // sort the bulletins Collections.sort(clientEntity.getBulletins(), BULLETIN_COMPARATOR); http://git-wip-us.apache.org/repos/asf/nifi/blob/6b71b4cb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java index 952edab..11771ad 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.cluster.manager; +import com.google.common.collect.Lists; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.web.api.entity.BulletinEntity; @@ -26,10 +27,10 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import com.google.common.collect.Lists; - public final class BulletinMerger { + final static String ALL_NODES_MESSAGE = "All Nodes"; + private BulletinMerger() {} public static Comparator<BulletinEntity> BULLETIN_COMPARATOR = new Comparator<BulletinEntity>() { @@ -54,7 +55,7 @@ public final class BulletinMerger { * * @param bulletins bulletins */ - public static List<BulletinEntity> mergeBulletins(final Map<NodeIdentifier, List<BulletinEntity>> bulletins) { + public static List<BulletinEntity> mergeBulletins(final Map<NodeIdentifier, List<BulletinEntity>> bulletins, final int totalNodes) { final List<BulletinEntity> bulletinEntities = new ArrayList<>(); for (final Map.Entry<NodeIdentifier, List<BulletinEntity>> entry : bulletins.entrySet()) { @@ -76,9 +77,42 @@ public final class BulletinMerger { final List<BulletinEntity> entities = Lists.newArrayList(); - final Map<String,List<BulletinEntity>> groupingEntities = bulletinEntities.stream().collect(Collectors.groupingBy(b -> b.getBulletin().getMessage())); - groupingEntities.values().stream().map(e -> e.get(0)).forEach(entities::add); + // group by message when permissions allow + final Map<String,List<BulletinEntity>> groupingEntities = bulletinEntities.stream() + .filter(bulletinEntity -> bulletinEntity.getCanRead()) + .collect(Collectors.groupingBy(b -> b.getBulletin().getMessage())); + + // add one from each grouped bulletin when all nodes report the same message + groupingEntities.forEach((message, groupedBulletinEntities) -> { + if (groupedBulletinEntities.size() == totalNodes) { + // get the most current bulletin + final BulletinEntity selectedBulletinEntity = groupedBulletinEntities.stream() + .max(Comparator.comparingLong(bulletinEntity -> { + if (bulletinEntity.getTimestamp() == null) { + return 0; + } else { + return bulletinEntity.getTimestamp().getTime(); + } + })).orElse(null); + + // should never be null, but just in case + if (selectedBulletinEntity != null) { + selectedBulletinEntity.setNodeAddress(ALL_NODES_MESSAGE); + selectedBulletinEntity.getBulletin().setNodeAddress(ALL_NODES_MESSAGE); + entities.add(selectedBulletinEntity); + } + } else { + // since all nodes didn't report the exact same bulletin, keep them all + entities.addAll(groupedBulletinEntities); + } + }); + + // ensure we also get the remainder of the bulletin entities + bulletinEntities.stream() + .filter(bulletinEntity -> !bulletinEntity.getCanRead()) + .forEach(entities::add); + // ensure the bulletins are sorted by time Collections.sort(entities, (BulletinEntity o1, BulletinEntity o2) -> { final int timeComparison = o1.getTimestamp().compareTo(o2.getTimestamp()); if (timeComparison != 0) { http://git-wip-us.apache.org/repos/asf/nifi/blob/6b71b4cb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java index eda3c0f..f7c28fd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java @@ -59,7 +59,7 @@ public interface ComponentEntityMerger<EntityType extends ComponentEntity & Perm }); } } - clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinEntities)); + clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinEntities, entityMap.size())); // sort the results Collections.sort(clientEntity.getBulletins(), BULLETIN_COMPARATOR); http://git-wip-us.apache.org/repos/asf/nifi/blob/6b71b4cb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/BulletinMergerTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/BulletinMergerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/BulletinMergerTest.java new file mode 100644 index 0000000..1502433 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/BulletinMergerTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.BulletinDTO; +import org.apache.nifi.web.api.entity.BulletinEntity; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.cluster.manager.BulletinMerger.ALL_NODES_MESSAGE; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +public class BulletinMergerTest { + + long bulletinId = 0; + + private BulletinEntity createBulletin(final String message) { + final BulletinDTO bulletin = new BulletinDTO(); + bulletin.setId(bulletinId++); + bulletin.setMessage(message); + bulletin.setTimestamp(new Date()); + + final BulletinEntity entity = new BulletinEntity(); + entity.setId(bulletin.getId()); + entity.setTimestamp(bulletin.getTimestamp()); + entity.setCanRead(true); + entity.setBulletin(bulletin); + + return entity; + } + + @Test + public void mergeBulletins() throws Exception { + final BulletinEntity bulletinEntity1 = createBulletin("This is bulletin 1"); + final BulletinEntity bulletinEntity2 = createBulletin("This is bulletin 2"); + + final BulletinEntity unauthorizedBulletin = new BulletinEntity(); + unauthorizedBulletin.setId(bulletinId++); + unauthorizedBulletin.setTimestamp(new Date()); + unauthorizedBulletin.setCanRead(false); + + final BulletinEntity copyOfBulletin1 = createBulletin("This is bulletin 1"); + + final NodeIdentifier node1 = new NodeIdentifier("node-1", "host-1", 8080, "host-1", 19998, null, null, null, false); + final NodeIdentifier node2 = new NodeIdentifier("node-2", "host-2", 8081, "host-2", 19999, null, null, null, false); + + final Map<NodeIdentifier, List<BulletinEntity>> nodeMap = new HashMap<>(); + nodeMap.put(node1, new ArrayList<>()); + nodeMap.put(node2, new ArrayList<>()); + + nodeMap.get(node1).add(bulletinEntity1); + nodeMap.get(node1).add(bulletinEntity2); + nodeMap.get(node1).add(unauthorizedBulletin); + + nodeMap.get(node2).add(copyOfBulletin1); + + final List<BulletinEntity> bulletinEntities = BulletinMerger.mergeBulletins(nodeMap, nodeMap.size()); + assertEquals(bulletinEntities.size(), 3); + assertTrue(bulletinEntities.contains(copyOfBulletin1)); + assertEquals(copyOfBulletin1.getNodeAddress(), ALL_NODES_MESSAGE); + assertTrue(bulletinEntities.contains(bulletinEntity2)); + assertTrue(bulletinEntities.contains(unauthorizedBulletin)); + } + +} \ No newline at end of file
