Repository: nifi
Updated Branches:
  refs/heads/master df11e1d2c -> a1bb94c08


NIFI-1869 Cloning controller services when referenced by reporting tasks and 
upgrading from 0.x to 1.0.0. This closes #767


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a1bb94c0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a1bb94c0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a1bb94c0

Branch: refs/heads/master
Commit: a1bb94c08a82a459341c0a63c4679bd839104c6a
Parents: df11e1d
Author: Bryan Bende <[email protected]>
Authored: Mon Aug 1 17:50:13 2016 -0400
Committer: Matt Gilman <[email protected]>
Committed: Tue Aug 2 16:09:54 2016 -0400

----------------------------------------------------------------------
 .../controller/StandardFlowSynchronizer.java    | 153 ++++++++++++++-----
 .../service/ControllerServiceLoader.java        |  44 +++++-
 .../nifi/controller/TestFlowController.java     | 102 +++++++++++++
 .../service/mock/DummyReportingTask.java        |  52 +++++++
 .../nifi/controller/service/mock/ServiceD.java  |  45 ++++++
 .../conf/processor-with-cs-flow-0.7.0.xml       |  59 +++++++
 .../conf/reporting-task-with-cs-flow-0.7.0.xml  |  74 +++++++++
 7 files changed, 487 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a1bb94c0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 5b2509d..0e0c74b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -21,6 +21,7 @@ import 
org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer;
 import org.apache.nifi.authorization.Authorizer;
 import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.cluster.protocol.StandardDataFlow;
+import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
@@ -103,11 +104,13 @@ import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import java.util.zip.GZIPInputStream;
 
 /**
@@ -298,6 +301,21 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
                         }
                     }
 
+                    // get all the reporting task elements
+                    final Element reportingTasksElement = 
DomUtils.getChild(rootElement, "reportingTasks");
+                    final List<Element> reportingTaskElements = new 
ArrayList<>();
+                    if (reportingTasksElement != null) {
+                        
reportingTaskElements.addAll(DomUtils.getChildElementsByTagName(reportingTasksElement,
 "reportingTask"));
+                    }
+
+                    // get/create all the reporting task nodes and DTOs, but 
don't apply their scheduled state yet
+                    final Map<ReportingTaskNode,ReportingTaskDTO> 
reportingTaskNodesToDTOs = new HashMap<>();
+                    for (final Element taskElement : reportingTaskElements) {
+                        final ReportingTaskDTO dto = 
FlowFromDOMFactory.getReportingTask(taskElement, encryptor);
+                        final ReportingTaskNode reportingTask = 
getOrCreateReportingTask(controller, dto, initialized, existingFlowEmpty);
+                        reportingTaskNodesToDTOs.put(reportingTask, dto);
+                    }
+
                     final Element controllerServicesElement = 
DomUtils.getChild(rootElement, "controllerServices");
                     if (controllerServicesElement != null) {
                         final List<Element> serviceElements = 
DomUtils.getChildElementsByTagName(controllerServicesElement, 
"controllerService");
@@ -308,7 +326,40 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
                             // to the root Group. Otherwise, we want to use a 
null group, which indicates a Controller-level
                             // Controller Service.
                             final ProcessGroup group = (encodingVersion == 
null) ? rootGroup : null;
-                            
ControllerServiceLoader.loadControllerServices(serviceElements, controller, 
group, encryptor, controller.getBulletinRepository(), autoResumeState);
+                            final Map<ControllerServiceNode, Element> 
controllerServices = 
ControllerServiceLoader.loadControllerServices(serviceElements, controller, 
group, encryptor);
+
+                            // If we are moving controller services to the 
root group we also need to see if any reporting tasks
+                            // reference them, and if so we need to clone the 
CS and update the reporting task reference
+                            if (group != null) {
+                                // find all the controller service ids 
referenced by reporting tasks
+                                final Set<String> 
controllerServicesInReportingTasks = reportingTaskNodesToDTOs.keySet().stream()
+                                        .flatMap(r -> 
r.getProperties().entrySet().stream())
+                                        .filter(e -> 
e.getKey().getControllerServiceDefinition() != null)
+                                        .map(e -> e.getValue())
+                                        .collect(Collectors.toSet());
+
+                                // find the controller service nodes for each 
id referenced by a reporting task
+                                final Set<ControllerServiceNode> 
controllerServicesToClone = controllerServices.keySet().stream()
+                                        .filter(cs -> 
controllerServicesInReportingTasks.contains(cs.getIdentifier()))
+                                        .collect(Collectors.toSet());
+
+                                // clone the controller services and map the 
original id to the clone
+                                final Map<String,ControllerServiceNode> 
controllerServiceMapping = new HashMap<>();
+                                for (ControllerServiceNode controllerService : 
controllerServicesToClone) {
+                                    final ControllerServiceNode clone = 
ControllerServiceLoader.cloneControllerService(controller, controllerService);
+                                    controller.addRootControllerService(clone);
+                                    
controllerServiceMapping.put(controllerService.getIdentifier(), clone);
+                                }
+
+                                // update the reporting tasks to reference the 
cloned controller services
+                                
updateReportingTaskControllerServices(reportingTaskNodesToDTOs.keySet(), 
controllerServiceMapping);
+
+                                // enable all the cloned controller services
+                                
ControllerServiceLoader.enableControllerServices(controllerServiceMapping.values(),
 controller, autoResumeState);
+                            }
+
+                            // enable all the original controller services
+                            
ControllerServiceLoader.enableControllerServices(controllerServices, 
controller, encryptor, autoResumeState);
                         } else {
                             for (final Element serviceElement : 
serviceElements) {
                                 updateControllerService(controller, 
serviceElement, encryptor);
@@ -318,16 +369,9 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
 
                     scaleRootGroup(rootGroup, encodingVersion);
 
-                    final Element reportingTasksElement = 
DomUtils.getChild(rootElement, "reportingTasks");
-                    if (reportingTasksElement != null) {
-                        final List<Element> taskElements = 
DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask");
-                        for (final Element taskElement : taskElements) {
-                            if (!initialized || existingFlowEmpty) {
-                                addReportingTask(controller, taskElement, 
encryptor);
-                            } else {
-                                updateReportingTask(controller, taskElement, 
encryptor);
-                            }
-                        }
+                    // now that controller services are loaded and enabled we 
can apply the scheduled state to each reporting task
+                    for (Map.Entry<ReportingTaskNode,ReportingTaskDTO> entry : 
reportingTaskNodesToDTOs.entrySet()) {
+                        applyReportingTaskScheduleState(controller, 
entry.getValue(), entry.getKey(), initialized, existingFlowEmpty);
                     }
                 }
             }
@@ -359,6 +403,23 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
         }
     }
 
+    private void updateReportingTaskControllerServices(final 
Set<ReportingTaskNode> reportingTasks, final Map<String,ControllerServiceNode> 
controllerServiceMapping) {
+        for (ReportingTaskNode reportingTask : reportingTasks) {
+            if (reportingTask.getProperties() != null) {
+                final Set<Map.Entry<PropertyDescriptor,String>> 
propertyDescriptors = reportingTask.getProperties().entrySet().stream()
+                        .filter(e -> 
e.getKey().getControllerServiceDefinition() != null)
+                        .filter(e -> 
controllerServiceMapping.containsKey(e.getValue()))
+                        .collect(Collectors.toSet());
+
+                for (Map.Entry<PropertyDescriptor,String> propEntry : 
propertyDescriptors) {
+                    final PropertyDescriptor propertyDescriptor = 
propEntry.getKey();
+                    final ControllerServiceNode clone = 
controllerServiceMapping.get(propEntry.getValue());
+                    reportingTask.setProperty(propertyDescriptor.getName(), 
clone.getIdentifier());
+                }
+            }
+        }
+    }
+
     private void addLocalTemplates(final Element processGroupElement, final 
ProcessGroup processGroup, final FlowEncodingVersion encodingVersion) {
         // Replace the templates with those from the proposed flow
         final List<Element> templateNodeList = 
getChildrenByTagName(processGroupElement, "template");
@@ -461,35 +522,53 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
         }
     }
 
-    private void addReportingTask(final FlowController controller, final 
Element reportingTaskElement, final StringEncryptor encryptor) throws 
ReportingTaskInstantiationException {
-        final ReportingTaskDTO dto = 
FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor);
-
-        final ReportingTaskNode reportingTask = 
controller.createReportingTask(dto.getType(), dto.getId(), false);
-        reportingTask.setName(dto.getName());
-        reportingTask.setComments(dto.getComments());
-        reportingTask.setScheduldingPeriod(dto.getSchedulingPeriod());
-        
reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy()));
+    private ReportingTaskNode getOrCreateReportingTask(final FlowController 
controller, final ReportingTaskDTO dto, final boolean controllerInitialized, 
final boolean existingFlowEmpty)
+            throws ReportingTaskInstantiationException {
+        // create a new reporting task node when the controller is not 
initialized or the flow is empty
+        if (!controllerInitialized || existingFlowEmpty) {
+            final ReportingTaskNode reportingTask = 
controller.createReportingTask(dto.getType(), dto.getId(), false);
+            reportingTask.setName(dto.getName());
+            reportingTask.setComments(dto.getComments());
+            reportingTask.setScheduldingPeriod(dto.getSchedulingPeriod());
+            
reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy()));
+
+            reportingTask.setAnnotationData(dto.getAnnotationData());
+
+            for (final Map.Entry<String, String> entry : 
dto.getProperties().entrySet()) {
+                if (entry.getValue() == null) {
+                    reportingTask.removeProperty(entry.getKey());
+                } else {
+                    reportingTask.setProperty(entry.getKey(), 
entry.getValue());
+                }
+            }
 
-        reportingTask.setAnnotationData(dto.getAnnotationData());
+            final ComponentLog componentLog = new 
SimpleProcessLogger(dto.getId(), reportingTask.getReportingTask());
+            final ReportingInitializationContext config = new 
StandardReportingInitializationContext(dto.getId(), dto.getName(),
+                    SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), 
dto.getSchedulingPeriod(), componentLog, controller);
 
-        for (final Map.Entry<String, String> entry : 
dto.getProperties().entrySet()) {
-            if (entry.getValue() == null) {
-                reportingTask.removeProperty(entry.getKey());
-            } else {
-                reportingTask.setProperty(entry.getKey(), entry.getValue());
+            try {
+                reportingTask.getReportingTask().initialize(config);
+            } catch (final InitializationException ie) {
+                throw new ReportingTaskInstantiationException("Failed to 
initialize reporting task of type " + dto.getType(), ie);
             }
-        }
 
-        final ComponentLog componentLog = new SimpleProcessLogger(dto.getId(), 
reportingTask.getReportingTask());
-        final ReportingInitializationContext config = new 
StandardReportingInitializationContext(dto.getId(), dto.getName(),
-                SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), 
dto.getSchedulingPeriod(), componentLog, controller);
+            return reportingTask;
+        } else {
+            // otherwise return the existing reporting task node
+            return controller.getReportingTaskNode(dto.getId());
+        }
+    }
 
-        try {
-            reportingTask.getReportingTask().initialize(config);
-        } catch (final InitializationException ie) {
-            throw new ReportingTaskInstantiationException("Failed to 
initialize reporting task of type " + dto.getType(), ie);
+    private void applyReportingTaskScheduleState(final FlowController 
controller, final ReportingTaskDTO dto, final ReportingTaskNode reportingTask,
+                                                 final boolean 
controllerInitialized, final boolean existingFlowEmpty) {
+        if (!controllerInitialized || existingFlowEmpty) {
+            applyNewReportingTaskScheduleState(controller, dto, reportingTask);
+        } else {
+            applyExistingReportingTaskScheduleState(controller, dto, 
reportingTask);
         }
+    }
 
+    private void applyNewReportingTaskScheduleState(final FlowController 
controller, final ReportingTaskDTO dto, final ReportingTaskNode reportingTask) {
         if (autoResumeState) {
             if (ScheduledState.RUNNING.name().equals(dto.getState())) {
                 try {
@@ -517,10 +596,7 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
         }
     }
 
-    private void updateReportingTask(final FlowController controller, final 
Element reportingTaskElement, final StringEncryptor encryptor) {
-        final ReportingTaskDTO dto = 
FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor);
-        final ReportingTaskNode taskNode = 
controller.getReportingTaskNode(dto.getId());
-
+    private void applyExistingReportingTaskScheduleState(final FlowController 
controller, final ReportingTaskDTO dto, final ReportingTaskNode taskNode) {
         if (!taskNode.getScheduledState().name().equals(dto.getState())) {
             try {
                 switch (ScheduledState.valueOf(dto.getState())) {
@@ -863,7 +939,8 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
         // Add Controller Services
         final List<Element> serviceNodeList = 
getChildrenByTagName(processGroupElement, "controllerService");
         if (!serviceNodeList.isEmpty()) {
-            ControllerServiceLoader.loadControllerServices(serviceNodeList, 
controller, processGroup, encryptor, controller.getBulletinRepository(), 
autoResumeState);
+            final Map<ControllerServiceNode, Element> controllerServices = 
ControllerServiceLoader.loadControllerServices(serviceNodeList, controller, 
processGroup, encryptor);
+            
ControllerServiceLoader.enableControllerServices(controllerServices, 
controller, encryptor, autoResumeState);
         }
 
         // add processors

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1bb94c0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index d302cff..8b3dcf4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller.service;
 import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -26,11 +27,13 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
 
+import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.serialization.FlowFromDOMFactory;
 import org.apache.nifi.encrypt.StringEncryptor;
@@ -91,14 +94,17 @@ public class ControllerServiceLoader {
             final Document document = builder.parse(in);
             final Element controllerServices = document.getDocumentElement();
             final List<Element> serviceElements = 
DomUtils.getChildElementsByTagName(controllerServices, "controllerService");
-            return new ArrayList<>(loadControllerServices(serviceElements, 
controller, parentGroup, encryptor, bulletinRepo, autoResumeState));
+
+            final Map<ControllerServiceNode, Element> controllerServiceMap = 
ControllerServiceLoader.loadControllerServices(serviceElements, controller, 
parentGroup, encryptor);
+            enableControllerServices(controllerServiceMap, controller, 
encryptor, autoResumeState);
+            return new ArrayList<>(controllerServiceMap.keySet());
         } catch (SAXException | ParserConfigurationException sxe) {
             throw new IOException(sxe);
         }
     }
 
-    public static Collection<ControllerServiceNode> 
loadControllerServices(final List<Element> serviceElements, final 
FlowController controller, final ProcessGroup parentGroup,
-        final StringEncryptor encryptor, final BulletinRepository 
bulletinRepo, final boolean autoResumeState) {
+    public static Map<ControllerServiceNode, Element> 
loadControllerServices(final List<Element> serviceElements, final 
FlowController controller,
+                                                                             
final ProcessGroup parentGroup, final StringEncryptor encryptor) {
 
         final Map<ControllerServiceNode, Element> nodeMap = new HashMap<>();
         for (final Element serviceElement : serviceElements) {
@@ -117,6 +123,11 @@ public class ControllerServiceLoader {
             configureControllerService(entry.getKey(), entry.getValue(), 
encryptor);
         }
 
+        return nodeMap;
+    }
+
+    public static void enableControllerServices(final 
Map<ControllerServiceNode, Element> nodeMap, final FlowController controller,
+                                                final StringEncryptor 
encryptor, final boolean autoResumeState) {
         // Start services
         if (autoResumeState) {
             final Set<ControllerServiceNode> nodesToEnable = new HashSet<>();
@@ -135,10 +146,34 @@ public class ControllerServiceLoader {
                 }
             }
 
+            enableControllerServices(nodesToEnable, controller, 
autoResumeState);
+        }
+    }
+
+    public static void enableControllerServices(final 
Collection<ControllerServiceNode> nodesToEnable, final FlowController 
controller, final boolean autoResumeState) {
+        // Start services
+        if (autoResumeState) {
             controller.enableControllerServices(nodesToEnable);
         }
+    }
+
+    public static ControllerServiceNode cloneControllerService(final 
ControllerServiceProvider provider, final ControllerServiceNode 
controllerService) {
+        // create a new id for the clone seeded from the original id so that 
it is consistent in a cluster
+        final UUID id = 
UUID.nameUUIDFromBytes(controllerService.getIdentifier().getBytes(StandardCharsets.UTF_8));
+
+        final ControllerServiceNode clone = 
provider.createControllerService(controllerService.getCanonicalClassName(), 
id.toString(), false);
+        clone.setName(controllerService.getName());
+        clone.setComments(controllerService.getComments());
+
+        if (controllerService.getProperties() != null) {
+            for (Map.Entry<PropertyDescriptor, String> propEntry : 
controllerService.getProperties().entrySet()) {
+                if (propEntry.getValue() != null) {
+                    clone.setProperty(propEntry.getKey().getName(), 
propEntry.getValue());
+                }
+            }
+        }
 
-        return nodeMap.keySet();
+        return clone;
     }
 
     private static ControllerServiceNode createControllerService(final 
ControllerServiceProvider provider, final Element controllerServiceElement, 
final StringEncryptor encryptor) {
@@ -162,4 +197,5 @@ public class ControllerServiceLoader {
             }
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1bb94c0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
index 60530c8..71af93f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.controller;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.nifi.admin.service.AuditService;
 import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer;
 import org.apache.nifi.authorization.AccessPolicy;
@@ -41,6 +42,9 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.LinkedHashSet;
 import java.util.Set;
@@ -128,6 +132,104 @@ public class TestFlowController {
     }
 
     @Test
+    public void 
testSynchronizeFlowWithReportingTaskAndProcessorReferencingControllerService() 
throws IOException {
+        // create a mock proposed data flow with the same auth fingerprint as 
the current authorizer
+        final String authFingerprint = authorizer.getFingerprint();
+        final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
+        
when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(authFingerprint.getBytes(StandardCharsets.UTF_8));
+
+        final File flowFile = new 
File("src/test/resources/conf/reporting-task-with-cs-flow-0.7.0.xml");
+        final String flow = IOUtils.toString(new FileInputStream(flowFile));
+        
when(proposedDataFlow.getFlow()).thenReturn(flow.getBytes(StandardCharsets.UTF_8));
+
+        controller.synchronize(standardFlowSynchronizer, proposedDataFlow);
+
+        // should be two controller services
+        final Set<ControllerServiceNode> controllerServiceNodes = 
controller.getAllControllerServices();
+        assertNotNull(controllerServiceNodes);
+        assertEquals(2, controllerServiceNodes.size());
+
+        // find the controller service that was moved to the root group
+        final ControllerServiceNode rootGroupCs = 
controllerServiceNodes.stream().filter(c -> c.getProcessGroup() != 
null).findFirst().get();
+        assertNotNull(rootGroupCs);
+
+        // find the controller service that was not moved to the root group
+        final ControllerServiceNode controllerCs = 
controllerServiceNodes.stream().filter(c -> c.getProcessGroup() == 
null).findFirst().get();
+        assertNotNull(controllerCs);
+
+        // should be same class (not Ghost), different ids, and same properties
+        assertEquals(rootGroupCs.getCanonicalClassName(), 
controllerCs.getCanonicalClassName());
+        assertFalse(rootGroupCs.getCanonicalClassName().contains("Ghost"));
+        assertNotEquals(rootGroupCs.getIdentifier(), 
controllerCs.getIdentifier());
+        assertEquals(rootGroupCs.getProperties(), 
controllerCs.getProperties());
+
+        // should be one processor
+        final Set<ProcessorNode> processorNodes = 
controller.getGroup(controller.getRootGroupId()).getProcessors();
+        assertNotNull(processorNodes);
+        assertEquals(1, processorNodes.size());
+
+        // verify the processor is still pointing at the controller service 
that got moved to the root group
+        final ProcessorNode processorNode = 
processorNodes.stream().findFirst().get();
+        final PropertyDescriptor procControllerServiceProp = 
processorNode.getProperties().entrySet().stream()
+                .filter(e -> e.getValue().equals(rootGroupCs.getIdentifier()))
+                .map(e -> e.getKey())
+                .findFirst()
+                .get();
+        assertNotNull(procControllerServiceProp);
+
+        // should be one reporting task
+        final Set<ReportingTaskNode> reportingTaskNodes = 
controller.getAllReportingTasks();
+        assertNotNull(reportingTaskNodes);
+        assertEquals(1, reportingTaskNodes.size());
+
+        // verify that the reporting task is pointing at the controller 
service at the controller level
+        final ReportingTaskNode reportingTaskNode = 
reportingTaskNodes.stream().findFirst().get();
+        final PropertyDescriptor reportingTaskControllerServiceProp = 
reportingTaskNode.getProperties().entrySet().stream()
+                .filter(e -> e.getValue().equals(controllerCs.getIdentifier()))
+                .map(e -> e.getKey())
+                .findFirst()
+                .get();
+        assertNotNull(reportingTaskControllerServiceProp);
+    }
+
+    @Test
+    public void testSynchronizeFlowWithProcessorReferencingControllerService() 
throws IOException {
+        // create a mock proposed data flow with the same auth fingerprint as 
the current authorizer
+        final String authFingerprint = authorizer.getFingerprint();
+        final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
+        
when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(authFingerprint.getBytes(StandardCharsets.UTF_8));
+
+        final File flowFile = new 
File("src/test/resources/conf/processor-with-cs-flow-0.7.0.xml");
+        final String flow = IOUtils.toString(new FileInputStream(flowFile));
+        
when(proposedDataFlow.getFlow()).thenReturn(flow.getBytes(StandardCharsets.UTF_8));
+
+        controller.synchronize(standardFlowSynchronizer, proposedDataFlow);
+
+        // should be two controller services
+        final Set<ControllerServiceNode> controllerServiceNodes = 
controller.getAllControllerServices();
+        assertNotNull(controllerServiceNodes);
+        assertEquals(1, controllerServiceNodes.size());
+
+        // find the controller service that was moved to the root group
+        final ControllerServiceNode rootGroupCs = 
controllerServiceNodes.stream().filter(c -> c.getProcessGroup() != 
null).findFirst().get();
+        assertNotNull(rootGroupCs);
+
+        // should be one processor
+        final Set<ProcessorNode> processorNodes = 
controller.getGroup(controller.getRootGroupId()).getProcessors();
+        assertNotNull(processorNodes);
+        assertEquals(1, processorNodes.size());
+
+        // verify the processor is still pointing at the controller service 
that got moved to the root group
+        final ProcessorNode processorNode = 
processorNodes.stream().findFirst().get();
+        final PropertyDescriptor procControllerServiceProp = 
processorNode.getProperties().entrySet().stream()
+                .filter(e -> e.getValue().equals(rootGroupCs.getIdentifier()))
+                .map(e -> e.getKey())
+                .findFirst()
+                .get();
+        assertNotNull(procControllerServiceProp);
+    }
+
+    @Test
     public void testSynchronizeFlowWhenAuthorizationsAreEqual() {
         // create a mock proposed data flow with the same auth fingerprint as 
the current authorizer
         final String authFingerprint = authorizer.getFingerprint();

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1bb94c0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyReportingTask.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyReportingTask.java
new file mode 100644
index 0000000..cb74c76
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyReportingTask.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service.mock;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.ReportingContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DummyReportingTask extends AbstractReportingTask {
+
+    public static final PropertyDescriptor SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Controller Service")
+            .identifiesControllerService(ControllerService.class)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PROP_FOO = new 
PropertyDescriptor.Builder()
+            .name("Foo")
+            .required(false)
+            .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(SERVICE);
+        descriptors.add(PROP_FOO);
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(ReportingContext context) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1bb94c0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceD.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceD.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceD.java
new file mode 100644
index 0000000..b61bead
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceD.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service.mock;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ServiceD extends AbstractControllerService {
+
+    public static final PropertyDescriptor PROP_FOO1 = new 
PropertyDescriptor.Builder()
+            .name("Foo1")
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor PROP_FOO2 = new 
PropertyDescriptor.Builder()
+            .name("Foo2")
+            .required(false)
+            .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(PROP_FOO1);
+        descriptors.add(PROP_FOO2);
+        return descriptors;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1bb94c0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/processor-with-cs-flow-0.7.0.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/processor-with-cs-flow-0.7.0.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/processor-with-cs-flow-0.7.0.xml
new file mode 100644
index 0000000..7703a0b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/processor-with-cs-flow-0.7.0.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<flowController>
+    <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
+    <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
+    <rootGroup>
+        <id>778f676e-6542-4c18-9d06-24b6fd3a1b29</id>
+        <name>NiFi Flow</name>
+        <position x="0.0" y="0.0"/>
+        <comment/>
+        <processor>
+            <id>809cca74-cd11-4ffa-9831-39d446a8ed55</id>
+            <name>DummyProcessor</name>
+            <position x="670.0" y="235.0"/>
+            <styles/>
+            <comment/>
+            
<class>org.apache.nifi.controller.service.mock.DummyProcessor</class>
+            <maxConcurrentTasks>1</maxConcurrentTasks>
+            <schedulingPeriod>0 sec</schedulingPeriod>
+            <penalizationPeriod>30 sec</penalizationPeriod>
+            <yieldPeriod>1 sec</yieldPeriod>
+            <bulletinLevel>WARN</bulletinLevel>
+            <lossTolerant>false</lossTolerant>
+            <scheduledState>STOPPED</scheduledState>
+            <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+            <runDurationNanos>0</runDurationNanos>
+            <property>
+                <name>Controller Service</name>
+                <value>edf22ee5-376a-46dc-a38a-919351124457</value>
+            </property>
+        </processor>
+    </rootGroup>
+    <controllerServices>
+        <controllerService>
+            <id>edf22ee5-376a-46dc-a38a-919351124457</id>
+            <name>ControllerService</name>
+            <comment/>
+            <class>org.apache.nifi.controller.service.mock.ServiceD</class>
+            <enabled>false</enabled>
+            <property>
+                <name>Foo1</name>
+                <value>Bar1</value>
+            </property>
+        </controllerService>
+    </controllerServices>
+</flowController>

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1bb94c0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/reporting-task-with-cs-flow-0.7.0.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/reporting-task-with-cs-flow-0.7.0.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/reporting-task-with-cs-flow-0.7.0.xml
new file mode 100644
index 0000000..b9cff57
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/reporting-task-with-cs-flow-0.7.0.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<flowController>
+  <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
+  <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
+  <rootGroup>
+    <id>778f676e-6542-4c18-9d06-24b6fd3a1b29</id>
+    <name>NiFi Flow</name>
+    <position x="0.0" y="0.0"/>
+    <comment/>
+    <processor>
+      <id>809cca74-cd11-4ffa-9831-39d446a8ed54</id>
+      <name>DummyProcessor</name>
+      <position x="670.0" y="235.0"/>
+      <styles/>
+      <comment/>
+      <class>org.apache.nifi.controller.service.mock.DummyProcessor</class>
+      <maxConcurrentTasks>1</maxConcurrentTasks>
+      <schedulingPeriod>0 sec</schedulingPeriod>
+      <penalizationPeriod>30 sec</penalizationPeriod>
+      <yieldPeriod>1 sec</yieldPeriod>
+      <bulletinLevel>WARN</bulletinLevel>
+      <lossTolerant>false</lossTolerant>
+      <scheduledState>STOPPED</scheduledState>
+      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+      <runDurationNanos>0</runDurationNanos>
+      <property>
+        <name>Controller Service</name>
+        <value>edf22ee5-376a-46dc-a38a-919351124456</value>
+      </property>
+    </processor>
+  </rootGroup>
+  <controllerServices>
+    <controllerService>
+      <id>edf22ee5-376a-46dc-a38a-919351124456</id>
+      <name>ControllerService</name>
+      <comment/>
+      <class>org.apache.nifi.controller.service.mock.ServiceD</class>
+      <enabled>false</enabled>
+      <property>
+        <name>Foo1</name>
+        <value>Bar1</value>
+      </property>
+    </controllerService>
+  </controllerServices>
+  <reportingTasks>
+    <reportingTask>
+      <id>fb9b40ce-608f-4a2f-9822-3899f695f699</id>
+      <name>ReportingTask</name>
+      <comment/>
+      <class>org.apache.nifi.controller.service.mock.DummyReportingTask</class>
+      <schedulingPeriod>5 mins</schedulingPeriod>
+      <scheduledState>STOPPED</scheduledState>
+      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+      <property>
+        <name>Controller Service</name>
+        <value>edf22ee5-376a-46dc-a38a-919351124456</value>
+      </property>
+    </reportingTask>
+  </reportingTasks>
+</flowController>

Reply via email to