Repository: nifi Updated Branches: refs/heads/master d36b76cc6 -> bc7c42efa
NIFI-1966: Recreated issue that is outlined in JIRA (the reason for re-opening the ticket) that results in 'java.util.NoSuchElementException: No value present' in unit test - Resolved issue where two flows that are both empty but have different fingerprints (due to root group id being different) causes vote election to fail This closes #995. Signed-off-by: Bryan Bende <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bc7c42ef Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bc7c42ef Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bc7c42ef Branch: refs/heads/master Commit: bc7c42efa53b363741a974a7fcf25e04f9208086 Parents: d36b76c Author: Mark Payne <[email protected]> Authored: Wed Sep 7 21:07:01 2016 -0400 Committer: Bryan Bende <[email protected]> Committed: Thu Sep 8 10:48:24 2016 -0400 ---------------------------------------------------------------------- .../flow/PopularVoteFlowElection.java | 22 +++++++++-- .../flow/TestPopularVoteFlowElection.java | 40 ++++++++++++++++++++ .../resources/conf/different-empty-flow.xml | 27 +++++++++++++ 3 files changed, 85 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/bc7c42ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElection.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElection.java index bc730d8..b9df55e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElection.java @@ -23,10 +23,12 @@ import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.nifi.cluster.protocol.DataFlow; import org.apache.nifi.cluster.protocol.NodeIdentifier; @@ -156,12 +158,24 @@ public class PopularVoteFlowElection implements FlowElection { return null; } + final List<FlowCandidate> nonEmptyCandidates = candidateByFingerprint.values().stream() + .filter(candidate -> !candidate.isFlowEmpty()) + .collect(Collectors.toList()); + + if (nonEmptyCandidates.isEmpty()) { + // All flow candidates are empty flows. Just use one of them. + final FlowCandidate electedCandidate = candidateByFingerprint.values().iterator().next(); + this.electedDataFlow = electedCandidate.getDataFlow(); + return electedCandidate; + } + final FlowCandidate elected; - if (candidateByFingerprint.size() == 1) { - elected = candidateByFingerprint.values().iterator().next(); + if (nonEmptyCandidates.size() == 1) { + // Only one flow is non-empty. Use that one. + elected = nonEmptyCandidates.iterator().next(); } else { - elected = candidateByFingerprint.values().stream() - .filter(candidate -> !candidate.isFlowEmpty()) // We have more than 1 fingerprint. Do not consider empty flows. + // Choose the non-empty flow that got the most votes. + elected = nonEmptyCandidates.stream() .max((candidate1, candidate2) -> Integer.compare(candidate1.getVotes(), candidate2.getVotes())) .get(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/bc7c42ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java index c01371db..b7f9e82 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.nio.file.Files; @@ -34,6 +35,8 @@ import org.apache.nifi.cluster.protocol.StandardDataFlow; import org.apache.nifi.fingerprint.FingerprintFactory; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestPopularVoteFlowElection { @@ -62,6 +65,43 @@ public class TestPopularVoteFlowElection { assertEquals(new String(flow), new String(electedDataFlow.getFlow())); } + @Test + public void testDifferentEmptyFlows() throws IOException { + final FingerprintFactory fingerprintFactory = Mockito.mock(FingerprintFactory.class); + Mockito.when(fingerprintFactory.createFingerprint(Mockito.any(byte[].class))).thenAnswer(new Answer<String>() { + @Override + public String answer(final InvocationOnMock invocation) throws Throwable { + final byte[] flow = invocation.getArgumentAt(0, byte[].class); + final String xml = new String(flow); + + // Return the ID of the root group as the fingerprint. + final String fingerprint = xml.replaceAll("(?s:(.*<id>)(.*?)(</id>.*))", "$2"); + return fingerprint; + } + }); + + final PopularVoteFlowElection election = new PopularVoteFlowElection(1, TimeUnit.MINUTES, 3, fingerprintFactory); + final byte[] flow1 = Files.readAllBytes(Paths.get("src/test/resources/conf/empty-flow.xml")); + final byte[] flow2 = Files.readAllBytes(Paths.get("src/test/resources/conf/different-empty-flow.xml")); + + assertFalse(election.isElectionComplete()); + assertNull(election.getElectedDataFlow()); + assertNull(election.castVote(createDataFlow(flow1), createNodeId(1))); + + assertFalse(election.isElectionComplete()); + assertNull(election.getElectedDataFlow()); + assertNull(election.castVote(createDataFlow(flow1), createNodeId(2))); + + assertFalse(election.isElectionComplete()); + assertNull(election.getElectedDataFlow()); + + final DataFlow electedDataFlow = election.castVote(createDataFlow(flow2), createNodeId(3)); + assertNotNull(electedDataFlow); + + final String electedFlowXml = new String(electedDataFlow.getFlow()); + assertTrue(new String(flow1).equals(electedFlowXml) || new String(flow2).equals(electedFlowXml)); + } + @Test public void testEmptyFlowIgnoredIfNonEmptyFlowExists() throws IOException { http://git-wip-us.apache.org/repos/asf/nifi/blob/bc7c42ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/different-empty-flow.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/different-empty-flow.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/different-empty-flow.xml new file mode 100644 index 0000000..8c9641a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/different-empty-flow.xml @@ -0,0 +1,27 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<flowController encoding-version="1.0"> + <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount> + <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount> + <rootGroup> + <id>11111111-1111-1111-1111-111111111111</id> + <name>Empty NiFi Flow</name> + <position x="0.0" y="0.0"/> + <comment/> + </rootGroup> + <controllerServices/> + <reportingTasks/> +</flowController> \ No newline at end of file
