Repository: nifi Updated Branches: refs/heads/master 6c426f7a1 -> 0eda71a9a
NIFI-4310 - added changes to support detection of reporting tasks and controller services during isEmpty flow check. Added testing scenarios. This closes #2107. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0eda71a9 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0eda71a9 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0eda71a9 Branch: refs/heads/master Commit: 0eda71a9a642432accc536206516df0f114b176a Parents: 6c426f7 Author: Yolanda M. Davis <[email protected]> Authored: Wed Aug 23 01:53:49 2017 -0400 Committer: Mark Payne <[email protected]> Committed: Fri Aug 25 10:15:55 2017 -0400 ---------------------------------------------------------------------- .../flow/TestPopularVoteFlowElection.java | 70 ++++++++++++++++++++ .../conf/auto-generated-empty-flow.xml | 27 ++++++++ .../resources/conf/controller-service-flow.xml | 37 +++++++++++ .../test/resources/conf/reporting-task-flow.xml | 54 +++++++++++++++ .../controller/StandardFlowSynchronizer.java | 23 ++++++- 5 files changed, 209 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/0eda71a9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java index 6447e27..95e9f5f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java @@ -22,6 +22,9 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.nio.file.Files; @@ -33,7 +36,9 @@ import java.util.concurrent.TimeUnit; import org.apache.nifi.cluster.protocol.DataFlow; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.StandardDataFlow; +import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.fingerprint.FingerprintFactory; +import org.apache.nifi.util.NiFiProperties; import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -134,6 +139,71 @@ public class TestPopularVoteFlowElection { } } + @Test + public void testAutoGeneratedVsPopulatedFlowElection() throws IOException { + final FingerprintFactory fingerprintFactory = new FingerprintFactory(StringEncryptor.createEncryptor(getNiFiProperties())); + final PopularVoteFlowElection election = new PopularVoteFlowElection(1, TimeUnit.MINUTES, 4, fingerprintFactory); + final byte[] emptyFlow = Files.readAllBytes(Paths.get("src/test/resources/conf/auto-generated-empty-flow.xml")); + final byte[] nonEmptyFlow = Files.readAllBytes(Paths.get("src/test/resources/conf/reporting-task-flow.xml")); + + for (int i = 0; i < 4; i++) { + assertFalse(election.isElectionComplete()); + assertNull(election.getElectedDataFlow()); + + final DataFlow dataFlow; + if (i % 2 == 0) { + dataFlow = createDataFlow(emptyFlow); + } else { + dataFlow = createDataFlow(nonEmptyFlow); + } + + final DataFlow electedDataFlow = election.castVote(dataFlow, createNodeId(i)); + + if (i == 3) { + assertNotNull(electedDataFlow); + assertEquals(new String(nonEmptyFlow), new String(electedDataFlow.getFlow())); + } else { + assertNull(electedDataFlow); + } + } + } + + @Test + public void testDifferentPopulatedFlowsElection() throws IOException { + final FingerprintFactory fingerprintFactory = new FingerprintFactory(StringEncryptor.createEncryptor(getNiFiProperties())); + final PopularVoteFlowElection election = new PopularVoteFlowElection(1, TimeUnit.MINUTES, 4, fingerprintFactory); + final byte[] nonEmptyCandidateA = Files.readAllBytes(Paths.get("src/test/resources/conf/controller-service-flow.xml")); + final byte[] nonEmptyCandidateB = Files.readAllBytes(Paths.get("src/test/resources/conf/reporting-task-flow.xml")); + + for (int i = 0; i < 4; i++) { + assertFalse(election.isElectionComplete()); + assertNull(election.getElectedDataFlow()); + + final DataFlow dataFlow; + if (i % 2 == 0) { + dataFlow = createDataFlow(nonEmptyCandidateA); + } else { + dataFlow = createDataFlow(nonEmptyCandidateB); + } + + final DataFlow electedDataFlow = election.castVote(dataFlow, createNodeId(i)); + + if (i == 3) { + assertNotNull(electedDataFlow); + assertEquals(new String(nonEmptyCandidateA), new String(electedDataFlow.getFlow())); + } else { + assertNull(electedDataFlow); + } + } + } + + private NiFiProperties getNiFiProperties() { + final NiFiProperties nifiProperties = mock(NiFiProperties.class); + when(nifiProperties.getProperty(StringEncryptor.NF_SENSITIVE_PROPS_ALGORITHM)).thenReturn("PBEWITHMD5AND256BITAES-CBC-OPENSSL"); + when(nifiProperties.getProperty(StringEncryptor.NF_SENSITIVE_PROPS_PROVIDER)).thenReturn("BC"); + when(nifiProperties.getProperty(anyString(), anyString())).then(invocation -> invocation.getArgumentAt(1, String.class)); + return nifiProperties; + } private NodeIdentifier createNodeId(final int index) { return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9000 + index, "localhost", 9000 + index, "localhost", 9000 + index, 9000 + index, true); http://git-wip-us.apache.org/repos/asf/nifi/blob/0eda71a9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/auto-generated-empty-flow.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/auto-generated-empty-flow.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/auto-generated-empty-flow.xml new file mode 100644 index 0000000..720fc0b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/auto-generated-empty-flow.xml @@ -0,0 +1,27 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<flowController encoding-version="1.1"> + <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount> + <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount> + <rootGroup> + <id>ee207cce-015d-1000-30bf-36cd2fd1ea5c</id> + <name>NiFi Flow</name> + <position x="0.0" y="0.0"/> + <comment/> + </rootGroup> + <controllerServices/> + <reportingTasks/> +</flowController> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/0eda71a9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/controller-service-flow.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/controller-service-flow.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/controller-service-flow.xml new file mode 100644 index 0000000..4278311 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/controller-service-flow.xml @@ -0,0 +1,37 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<flowController> + <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount> + <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount> + <rootGroup> + <id>778f676e-6542-4c18-9d06-24b6fd3a1b29</id> + <name>NiFi Flow</name> + <position x="0.0" y="0.0"/> + </rootGroup> + <controllerServices> + <controllerService> + <id>edf22ee5-376a-46dc-a38a-919351124457</id> + <name>ControllerService</name> + <comment/> + <class>org.apache.nifi.controller.service.mock.ServiceD</class> + <enabled>false</enabled> + <property> + <name>Foo1</name> + <value>Bar1</value> + </property> + </controllerService> + </controllerServices> +</flowController> http://git-wip-us.apache.org/repos/asf/nifi/blob/0eda71a9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/reporting-task-flow.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/reporting-task-flow.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/reporting-task-flow.xml new file mode 100644 index 0000000..751517c --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/reporting-task-flow.xml @@ -0,0 +1,54 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<flowController> + <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount> + <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount> + <rootGroup> + <id>7c84501d-d10c-407c-b9f3-1d80e38fe36a</id> + <name>NiFi Flow</name> + <position x="0.0" y="0.0"/> + <comment/> + </rootGroup> + <controllerServices/> + <reportingTasks> + <reportingTask> + <id>3b80ba0f-a6c0-48db-b721-4dbc04cef28e</id> + <name>AmbariReportingTask</name> + <comment/> + <class>org.apache.nifi.reporting.ambari.AmbariReportingTask</class> + <bundle> + <group>org.apache.nifi</group> + <artifact>nifi-standard-nar</artifact> + <version>1.1.0</version> + </bundle> + <schedulingPeriod>{{nifi_ambari_reporting_frequency}}</schedulingPeriod> + <scheduledState>RUNNING</scheduledState> + <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> + <property> + <name>Metrics Collector URL</name> + <value>${ambari.metrics.collector.url}</value> + </property> + <property> + <name>Application ID</name> + <value>${ambari.application.id}</value> + </property> + <property> + <name>Hostname</name> + <value>${hostname(true)}</value> + </property> + </reportingTask> + </reportingTasks> +</flowController> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/0eda71a9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 58bb90f..3af270c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -101,6 +101,7 @@ import org.apache.nifi.util.file.FileUtils; import org.apache.nifi.web.api.dto.BundleDTO; import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; +import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.FlowSnippetDTO; import org.apache.nifi.web.api.dto.FunnelDTO; import org.apache.nifi.web.api.dto.LabelDTO; @@ -146,9 +147,15 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0); final FlowEncodingVersion encodingVersion = FlowEncodingVersion.parse(rootGroupElement); - final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, null, encodingVersion); - return isEmpty(rootGroupDto); + + final NodeList reportingTasks = rootElement.getElementsByTagName("reportingTask"); + final ReportingTaskDTO reportingTaskDTO = reportingTasks.getLength() == 0 ? null : FlowFromDOMFactory.getReportingTask((Element)reportingTasks.item(0),null); + + final NodeList controllerServices = rootElement.getElementsByTagName("controllerService"); + final ControllerServiceDTO controllerServiceDTO = controllerServices.getLength() == 0 ? null : FlowFromDOMFactory.getControllerService((Element)controllerServices.item(0),null); + + return isEmpty(rootGroupDto) && isEmpty(reportingTaskDTO) && isEmpty(controllerServiceDTO); } @Override @@ -537,6 +544,18 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { && CollectionUtils.isEmpty(contents.getRemoteProcessGroups()); } + private static boolean isEmpty(final ReportingTaskDTO reportingTaskDTO){ + + return reportingTaskDTO == null || StringUtils.isEmpty(reportingTaskDTO.getName()) ; + + } + + private static boolean isEmpty(final ControllerServiceDTO controllerServiceDTO){ + + return controllerServiceDTO == null || StringUtils.isEmpty(controllerServiceDTO.getName()); + + } + private static Document parseFlowBytes(final byte[] flow) throws FlowSerializationException { // create document by parsing proposed flow bytes try {
