Repository: nifi
Updated Branches:
  refs/heads/master 6c426f7a1 -> 0eda71a9a


NIFI-4310 - added changes to support detection of reporting tasks and 
controller services during isEmpty flow check. Added testing scenarios. This 
closes #2107.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0eda71a9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0eda71a9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0eda71a9

Branch: refs/heads/master
Commit: 0eda71a9a642432accc536206516df0f114b176a
Parents: 6c426f7
Author: Yolanda M. Davis <[email protected]>
Authored: Wed Aug 23 01:53:49 2017 -0400
Committer: Mark Payne <[email protected]>
Committed: Fri Aug 25 10:15:55 2017 -0400

----------------------------------------------------------------------
 .../flow/TestPopularVoteFlowElection.java       | 70 ++++++++++++++++++++
 .../conf/auto-generated-empty-flow.xml          | 27 ++++++++
 .../resources/conf/controller-service-flow.xml  | 37 +++++++++++
 .../test/resources/conf/reporting-task-flow.xml | 54 +++++++++++++++
 .../controller/StandardFlowSynchronizer.java    | 23 ++++++-
 5 files changed, 209 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0eda71a9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
index 6447e27..95e9f5f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
@@ -22,6 +22,9 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.nio.file.Files;
@@ -33,7 +36,9 @@ import java.util.concurrent.TimeUnit;
 import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.StandardDataFlow;
+import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.fingerprint.FingerprintFactory;
+import org.apache.nifi.util.NiFiProperties;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -134,6 +139,71 @@ public class TestPopularVoteFlowElection {
         }
     }
 
+    @Test
+    public void testAutoGeneratedVsPopulatedFlowElection() throws IOException {
+        final FingerprintFactory fingerprintFactory = new 
FingerprintFactory(StringEncryptor.createEncryptor(getNiFiProperties()));
+        final PopularVoteFlowElection election = new 
PopularVoteFlowElection(1, TimeUnit.MINUTES, 4, fingerprintFactory);
+        final byte[] emptyFlow = 
Files.readAllBytes(Paths.get("src/test/resources/conf/auto-generated-empty-flow.xml"));
+        final byte[] nonEmptyFlow = 
Files.readAllBytes(Paths.get("src/test/resources/conf/reporting-task-flow.xml"));
+
+        for (int i = 0; i < 4; i++) {
+            assertFalse(election.isElectionComplete());
+            assertNull(election.getElectedDataFlow());
+
+            final DataFlow dataFlow;
+            if (i % 2 == 0) {
+                dataFlow = createDataFlow(emptyFlow);
+            } else {
+                dataFlow = createDataFlow(nonEmptyFlow);
+            }
+
+            final DataFlow electedDataFlow = election.castVote(dataFlow, 
createNodeId(i));
+
+            if (i == 3) {
+                assertNotNull(electedDataFlow);
+                assertEquals(new String(nonEmptyFlow), new 
String(electedDataFlow.getFlow()));
+            } else {
+                assertNull(electedDataFlow);
+            }
+        }
+    }
+
+    @Test
+    public void testDifferentPopulatedFlowsElection() throws IOException {
+        final FingerprintFactory fingerprintFactory = new 
FingerprintFactory(StringEncryptor.createEncryptor(getNiFiProperties()));
+        final PopularVoteFlowElection election = new 
PopularVoteFlowElection(1, TimeUnit.MINUTES, 4, fingerprintFactory);
+        final byte[] nonEmptyCandidateA = 
Files.readAllBytes(Paths.get("src/test/resources/conf/controller-service-flow.xml"));
+        final byte[] nonEmptyCandidateB = 
Files.readAllBytes(Paths.get("src/test/resources/conf/reporting-task-flow.xml"));
+
+        for (int i = 0; i < 4; i++) {
+            assertFalse(election.isElectionComplete());
+            assertNull(election.getElectedDataFlow());
+
+            final DataFlow dataFlow;
+            if (i % 2 == 0) {
+                dataFlow = createDataFlow(nonEmptyCandidateA);
+            } else {
+                dataFlow = createDataFlow(nonEmptyCandidateB);
+            }
+
+            final DataFlow electedDataFlow = election.castVote(dataFlow, 
createNodeId(i));
+
+            if (i == 3) {
+                assertNotNull(electedDataFlow);
+                assertEquals(new String(nonEmptyCandidateA), new 
String(electedDataFlow.getFlow()));
+            } else {
+                assertNull(electedDataFlow);
+            }
+        }
+    }
+
+    private NiFiProperties getNiFiProperties() {
+        final NiFiProperties nifiProperties = mock(NiFiProperties.class);
+        
when(nifiProperties.getProperty(StringEncryptor.NF_SENSITIVE_PROPS_ALGORITHM)).thenReturn("PBEWITHMD5AND256BITAES-CBC-OPENSSL");
+        
when(nifiProperties.getProperty(StringEncryptor.NF_SENSITIVE_PROPS_PROVIDER)).thenReturn("BC");
+        when(nifiProperties.getProperty(anyString(), 
anyString())).then(invocation -> invocation.getArgumentAt(1, String.class));
+        return nifiProperties;
+    }
 
     private NodeIdentifier createNodeId(final int index) {
         return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 
9000 + index, "localhost", 9000 + index, "localhost", 9000 + index, 9000 + 
index, true);

http://git-wip-us.apache.org/repos/asf/nifi/blob/0eda71a9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/auto-generated-empty-flow.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/auto-generated-empty-flow.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/auto-generated-empty-flow.xml
new file mode 100644
index 0000000..720fc0b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/auto-generated-empty-flow.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+        http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<flowController encoding-version="1.1">
+    <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
+    <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
+    <rootGroup>
+        <id>ee207cce-015d-1000-30bf-36cd2fd1ea5c</id>
+        <name>NiFi Flow</name>
+        <position x="0.0" y="0.0"/>
+        <comment/>
+    </rootGroup>
+    <controllerServices/>
+    <reportingTasks/>
+</flowController>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/0eda71a9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/controller-service-flow.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/controller-service-flow.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/controller-service-flow.xml
new file mode 100644
index 0000000..4278311
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/controller-service-flow.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<flowController>
+    <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
+    <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
+    <rootGroup>
+        <id>778f676e-6542-4c18-9d06-24b6fd3a1b29</id>
+        <name>NiFi Flow</name>
+        <position x="0.0" y="0.0"/>
+    </rootGroup>
+    <controllerServices>
+        <controllerService>
+            <id>edf22ee5-376a-46dc-a38a-919351124457</id>
+            <name>ControllerService</name>
+            <comment/>
+            <class>org.apache.nifi.controller.service.mock.ServiceD</class>
+            <enabled>false</enabled>
+            <property>
+                <name>Foo1</name>
+                <value>Bar1</value>
+            </property>
+        </controllerService>
+    </controllerServices>
+</flowController>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0eda71a9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/reporting-task-flow.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/reporting-task-flow.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/reporting-task-flow.xml
new file mode 100644
index 0000000..751517c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/reporting-task-flow.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+        http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<flowController>
+    <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
+    <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
+    <rootGroup>
+        <id>7c84501d-d10c-407c-b9f3-1d80e38fe36a</id>
+        <name>NiFi Flow</name>
+        <position x="0.0" y="0.0"/>
+        <comment/>
+    </rootGroup>
+    <controllerServices/>
+    <reportingTasks>
+        <reportingTask>
+            <id>3b80ba0f-a6c0-48db-b721-4dbc04cef28e</id>
+            <name>AmbariReportingTask</name>
+            <comment/>
+            <class>org.apache.nifi.reporting.ambari.AmbariReportingTask</class>
+            <bundle>
+                <group>org.apache.nifi</group>
+                <artifact>nifi-standard-nar</artifact>
+                <version>1.1.0</version>
+            </bundle>
+            
<schedulingPeriod>{{nifi_ambari_reporting_frequency}}</schedulingPeriod>
+            <scheduledState>RUNNING</scheduledState>
+            <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+            <property>
+                <name>Metrics Collector URL</name>
+                <value>${ambari.metrics.collector.url}</value>
+            </property>
+            <property>
+                <name>Application ID</name>
+                <value>${ambari.application.id}</value>
+            </property>
+            <property>
+                <name>Hostname</name>
+                <value>${hostname(true)}</value>
+            </property>
+        </reportingTask>
+    </reportingTasks>
+</flowController>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/0eda71a9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 58bb90f..3af270c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -101,6 +101,7 @@ import org.apache.nifi.util.file.FileUtils;
 import org.apache.nifi.web.api.dto.BundleDTO;
 import org.apache.nifi.web.api.dto.ConnectableDTO;
 import org.apache.nifi.web.api.dto.ConnectionDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
 import org.apache.nifi.web.api.dto.FunnelDTO;
 import org.apache.nifi.web.api.dto.LabelDTO;
@@ -146,9 +147,15 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
 
         final Element rootGroupElement = (Element) 
rootElement.getElementsByTagName("rootGroup").item(0);
         final FlowEncodingVersion encodingVersion = 
FlowEncodingVersion.parse(rootGroupElement);
-
         final ProcessGroupDTO rootGroupDto = 
FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, null, 
encodingVersion);
-        return isEmpty(rootGroupDto);
+
+        final NodeList reportingTasks = 
rootElement.getElementsByTagName("reportingTask");
+        final ReportingTaskDTO reportingTaskDTO = reportingTasks.getLength() 
== 0 ? null : 
FlowFromDOMFactory.getReportingTask((Element)reportingTasks.item(0),null);
+
+        final NodeList controllerServices = 
rootElement.getElementsByTagName("controllerService");
+        final ControllerServiceDTO controllerServiceDTO = 
controllerServices.getLength() == 0 ? null : 
FlowFromDOMFactory.getControllerService((Element)controllerServices.item(0),null);
+
+        return isEmpty(rootGroupDto) && isEmpty(reportingTaskDTO) && 
isEmpty(controllerServiceDTO);
     }
 
     @Override
@@ -537,6 +544,18 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
                 && CollectionUtils.isEmpty(contents.getRemoteProcessGroups());
     }
 
+    private static boolean isEmpty(final ReportingTaskDTO reportingTaskDTO){
+
+       return reportingTaskDTO == null || 
StringUtils.isEmpty(reportingTaskDTO.getName()) ;
+
+    }
+
+    private static boolean isEmpty(final ControllerServiceDTO 
controllerServiceDTO){
+
+        return controllerServiceDTO == null || 
StringUtils.isEmpty(controllerServiceDTO.getName());
+
+    }
+
     private static Document parseFlowBytes(final byte[] flow) throws 
FlowSerializationException {
         // create document by parsing proposed flow bytes
         try {

Reply via email to