Repository: nifi Updated Branches: refs/heads/master df11e1d2c -> a1bb94c08
NIFI-1869 Cloning controller services when referenced by reporting tasks and upgrading from 0.x to 1.0.0. This closes #767 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a1bb94c0 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a1bb94c0 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a1bb94c0 Branch: refs/heads/master Commit: a1bb94c08a82a459341c0a63c4679bd839104c6a Parents: df11e1d Author: Bryan Bende <[email protected]> Authored: Mon Aug 1 17:50:13 2016 -0400 Committer: Matt Gilman <[email protected]> Committed: Tue Aug 2 16:09:54 2016 -0400 ---------------------------------------------------------------------- .../controller/StandardFlowSynchronizer.java | 153 ++++++++++++++----- .../service/ControllerServiceLoader.java | 44 +++++- .../nifi/controller/TestFlowController.java | 102 +++++++++++++ .../service/mock/DummyReportingTask.java | 52 +++++++ .../nifi/controller/service/mock/ServiceD.java | 45 ++++++ .../conf/processor-with-cs-flow-0.7.0.xml | 59 +++++++ .../conf/reporting-task-with-cs-flow-0.7.0.xml | 74 +++++++++ 7 files changed, 487 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/a1bb94c0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 5b2509d..0e0c74b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -21,6 +21,7 @@ import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer; import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.cluster.protocol.DataFlow; import org.apache.nifi.cluster.protocol.StandardDataFlow; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; @@ -103,11 +104,13 @@ import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; /** @@ -298,6 +301,21 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } + // get all the reporting task elements + final Element reportingTasksElement = DomUtils.getChild(rootElement, "reportingTasks"); + final List<Element> reportingTaskElements = new ArrayList<>(); + if (reportingTasksElement != null) { + reportingTaskElements.addAll(DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask")); + } + + // get/create all the reporting task nodes and DTOs, but don't apply their scheduled state yet + final Map<ReportingTaskNode,ReportingTaskDTO> reportingTaskNodesToDTOs = new HashMap<>(); + for (final Element taskElement : reportingTaskElements) { + final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(taskElement, encryptor); + final ReportingTaskNode reportingTask = getOrCreateReportingTask(controller, dto, initialized, existingFlowEmpty); + reportingTaskNodesToDTOs.put(reportingTask, dto); + } + final Element controllerServicesElement = DomUtils.getChild(rootElement, "controllerServices"); if (controllerServicesElement != null) { final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService"); @@ -308,7 +326,40 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { // to the root Group. Otherwise, we want to use a null group, which indicates a Controller-level // Controller Service. final ProcessGroup group = (encodingVersion == null) ? rootGroup : null; - ControllerServiceLoader.loadControllerServices(serviceElements, controller, group, encryptor, controller.getBulletinRepository(), autoResumeState); + final Map<ControllerServiceNode, Element> controllerServices = ControllerServiceLoader.loadControllerServices(serviceElements, controller, group, encryptor); + + // If we are moving controller services to the root group we also need to see if any reporting tasks + // reference them, and if so we need to clone the CS and update the reporting task reference + if (group != null) { + // find all the controller service ids referenced by reporting tasks + final Set<String> controllerServicesInReportingTasks = reportingTaskNodesToDTOs.keySet().stream() + .flatMap(r -> r.getProperties().entrySet().stream()) + .filter(e -> e.getKey().getControllerServiceDefinition() != null) + .map(e -> e.getValue()) + .collect(Collectors.toSet()); + + // find the controller service nodes for each id referenced by a reporting task + final Set<ControllerServiceNode> controllerServicesToClone = controllerServices.keySet().stream() + .filter(cs -> controllerServicesInReportingTasks.contains(cs.getIdentifier())) + .collect(Collectors.toSet()); + + // clone the controller services and map the original id to the clone + final Map<String,ControllerServiceNode> controllerServiceMapping = new HashMap<>(); + for (ControllerServiceNode controllerService : controllerServicesToClone) { + final ControllerServiceNode clone = ControllerServiceLoader.cloneControllerService(controller, controllerService); + controller.addRootControllerService(clone); + controllerServiceMapping.put(controllerService.getIdentifier(), clone); + } + + // update the reporting tasks to reference the cloned controller services + updateReportingTaskControllerServices(reportingTaskNodesToDTOs.keySet(), controllerServiceMapping); + + // enable all the cloned controller services + ControllerServiceLoader.enableControllerServices(controllerServiceMapping.values(), controller, autoResumeState); + } + + // enable all the original controller services + ControllerServiceLoader.enableControllerServices(controllerServices, controller, encryptor, autoResumeState); } else { for (final Element serviceElement : serviceElements) { updateControllerService(controller, serviceElement, encryptor); @@ -318,16 +369,9 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { scaleRootGroup(rootGroup, encodingVersion); - final Element reportingTasksElement = DomUtils.getChild(rootElement, "reportingTasks"); - if (reportingTasksElement != null) { - final List<Element> taskElements = DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask"); - for (final Element taskElement : taskElements) { - if (!initialized || existingFlowEmpty) { - addReportingTask(controller, taskElement, encryptor); - } else { - updateReportingTask(controller, taskElement, encryptor); - } - } + // now that controller services are loaded and enabled we can apply the scheduled state to each reporting task + for (Map.Entry<ReportingTaskNode,ReportingTaskDTO> entry : reportingTaskNodesToDTOs.entrySet()) { + applyReportingTaskScheduleState(controller, entry.getValue(), entry.getKey(), initialized, existingFlowEmpty); } } } @@ -359,6 +403,23 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } + private void updateReportingTaskControllerServices(final Set<ReportingTaskNode> reportingTasks, final Map<String,ControllerServiceNode> controllerServiceMapping) { + for (ReportingTaskNode reportingTask : reportingTasks) { + if (reportingTask.getProperties() != null) { + final Set<Map.Entry<PropertyDescriptor,String>> propertyDescriptors = reportingTask.getProperties().entrySet().stream() + .filter(e -> e.getKey().getControllerServiceDefinition() != null) + .filter(e -> controllerServiceMapping.containsKey(e.getValue())) + .collect(Collectors.toSet()); + + for (Map.Entry<PropertyDescriptor,String> propEntry : propertyDescriptors) { + final PropertyDescriptor propertyDescriptor = propEntry.getKey(); + final ControllerServiceNode clone = controllerServiceMapping.get(propEntry.getValue()); + reportingTask.setProperty(propertyDescriptor.getName(), clone.getIdentifier()); + } + } + } + } + private void addLocalTemplates(final Element processGroupElement, final ProcessGroup processGroup, final FlowEncodingVersion encodingVersion) { // Replace the templates with those from the proposed flow final List<Element> templateNodeList = getChildrenByTagName(processGroupElement, "template"); @@ -461,35 +522,53 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } - private void addReportingTask(final FlowController controller, final Element reportingTaskElement, final StringEncryptor encryptor) throws ReportingTaskInstantiationException { - final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor); - - final ReportingTaskNode reportingTask = controller.createReportingTask(dto.getType(), dto.getId(), false); - reportingTask.setName(dto.getName()); - reportingTask.setComments(dto.getComments()); - reportingTask.setScheduldingPeriod(dto.getSchedulingPeriod()); - reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy())); + private ReportingTaskNode getOrCreateReportingTask(final FlowController controller, final ReportingTaskDTO dto, final boolean controllerInitialized, final boolean existingFlowEmpty) + throws ReportingTaskInstantiationException { + // create a new reporting task node when the controller is not initialized or the flow is empty + if (!controllerInitialized || existingFlowEmpty) { + final ReportingTaskNode reportingTask = controller.createReportingTask(dto.getType(), dto.getId(), false); + reportingTask.setName(dto.getName()); + reportingTask.setComments(dto.getComments()); + reportingTask.setScheduldingPeriod(dto.getSchedulingPeriod()); + reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy())); + + reportingTask.setAnnotationData(dto.getAnnotationData()); + + for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) { + if (entry.getValue() == null) { + reportingTask.removeProperty(entry.getKey()); + } else { + reportingTask.setProperty(entry.getKey(), entry.getValue()); + } + } - reportingTask.setAnnotationData(dto.getAnnotationData()); + final ComponentLog componentLog = new SimpleProcessLogger(dto.getId(), reportingTask.getReportingTask()); + final ReportingInitializationContext config = new StandardReportingInitializationContext(dto.getId(), dto.getName(), + SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), dto.getSchedulingPeriod(), componentLog, controller); - for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) { - if (entry.getValue() == null) { - reportingTask.removeProperty(entry.getKey()); - } else { - reportingTask.setProperty(entry.getKey(), entry.getValue()); + try { + reportingTask.getReportingTask().initialize(config); + } catch (final InitializationException ie) { + throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + dto.getType(), ie); } - } - final ComponentLog componentLog = new SimpleProcessLogger(dto.getId(), reportingTask.getReportingTask()); - final ReportingInitializationContext config = new StandardReportingInitializationContext(dto.getId(), dto.getName(), - SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), dto.getSchedulingPeriod(), componentLog, controller); + return reportingTask; + } else { + // otherwise return the existing reporting task node + return controller.getReportingTaskNode(dto.getId()); + } + } - try { - reportingTask.getReportingTask().initialize(config); - } catch (final InitializationException ie) { - throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + dto.getType(), ie); + private void applyReportingTaskScheduleState(final FlowController controller, final ReportingTaskDTO dto, final ReportingTaskNode reportingTask, + final boolean controllerInitialized, final boolean existingFlowEmpty) { + if (!controllerInitialized || existingFlowEmpty) { + applyNewReportingTaskScheduleState(controller, dto, reportingTask); + } else { + applyExistingReportingTaskScheduleState(controller, dto, reportingTask); } + } + private void applyNewReportingTaskScheduleState(final FlowController controller, final ReportingTaskDTO dto, final ReportingTaskNode reportingTask) { if (autoResumeState) { if (ScheduledState.RUNNING.name().equals(dto.getState())) { try { @@ -517,10 +596,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } - private void updateReportingTask(final FlowController controller, final Element reportingTaskElement, final StringEncryptor encryptor) { - final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor); - final ReportingTaskNode taskNode = controller.getReportingTaskNode(dto.getId()); - + private void applyExistingReportingTaskScheduleState(final FlowController controller, final ReportingTaskDTO dto, final ReportingTaskNode taskNode) { if (!taskNode.getScheduledState().name().equals(dto.getState())) { try { switch (ScheduledState.valueOf(dto.getState())) { @@ -863,7 +939,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { // Add Controller Services final List<Element> serviceNodeList = getChildrenByTagName(processGroupElement, "controllerService"); if (!serviceNodeList.isEmpty()) { - ControllerServiceLoader.loadControllerServices(serviceNodeList, controller, processGroup, encryptor, controller.getBulletinRepository(), autoResumeState); + final Map<ControllerServiceNode, Element> controllerServices = ControllerServiceLoader.loadControllerServices(serviceNodeList, controller, processGroup, encryptor); + ControllerServiceLoader.enableControllerServices(controllerServices, controller, encryptor, autoResumeState); } // add processors http://git-wip-us.apache.org/repos/asf/nifi/blob/a1bb94c0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java index d302cff..8b3dcf4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java @@ -19,6 +19,7 @@ package org.apache.nifi.controller.service; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -26,11 +27,13 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.serialization.FlowFromDOMFactory; import org.apache.nifi.encrypt.StringEncryptor; @@ -91,14 +94,17 @@ public class ControllerServiceLoader { final Document document = builder.parse(in); final Element controllerServices = document.getDocumentElement(); final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServices, "controllerService"); - return new ArrayList<>(loadControllerServices(serviceElements, controller, parentGroup, encryptor, bulletinRepo, autoResumeState)); + + final Map<ControllerServiceNode, Element> controllerServiceMap = ControllerServiceLoader.loadControllerServices(serviceElements, controller, parentGroup, encryptor); + enableControllerServices(controllerServiceMap, controller, encryptor, autoResumeState); + return new ArrayList<>(controllerServiceMap.keySet()); } catch (SAXException | ParserConfigurationException sxe) { throw new IOException(sxe); } } - public static Collection<ControllerServiceNode> loadControllerServices(final List<Element> serviceElements, final FlowController controller, final ProcessGroup parentGroup, - final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) { + public static Map<ControllerServiceNode, Element> loadControllerServices(final List<Element> serviceElements, final FlowController controller, + final ProcessGroup parentGroup, final StringEncryptor encryptor) { final Map<ControllerServiceNode, Element> nodeMap = new HashMap<>(); for (final Element serviceElement : serviceElements) { @@ -117,6 +123,11 @@ public class ControllerServiceLoader { configureControllerService(entry.getKey(), entry.getValue(), encryptor); } + return nodeMap; + } + + public static void enableControllerServices(final Map<ControllerServiceNode, Element> nodeMap, final FlowController controller, + final StringEncryptor encryptor, final boolean autoResumeState) { // Start services if (autoResumeState) { final Set<ControllerServiceNode> nodesToEnable = new HashSet<>(); @@ -135,10 +146,34 @@ public class ControllerServiceLoader { } } + enableControllerServices(nodesToEnable, controller, autoResumeState); + } + } + + public static void enableControllerServices(final Collection<ControllerServiceNode> nodesToEnable, final FlowController controller, final boolean autoResumeState) { + // Start services + if (autoResumeState) { controller.enableControllerServices(nodesToEnable); } + } + + public static ControllerServiceNode cloneControllerService(final ControllerServiceProvider provider, final ControllerServiceNode controllerService) { + // create a new id for the clone seeded from the original id so that it is consistent in a cluster + final UUID id = UUID.nameUUIDFromBytes(controllerService.getIdentifier().getBytes(StandardCharsets.UTF_8)); + + final ControllerServiceNode clone = provider.createControllerService(controllerService.getCanonicalClassName(), id.toString(), false); + clone.setName(controllerService.getName()); + clone.setComments(controllerService.getComments()); + + if (controllerService.getProperties() != null) { + for (Map.Entry<PropertyDescriptor, String> propEntry : controllerService.getProperties().entrySet()) { + if (propEntry.getValue() != null) { + clone.setProperty(propEntry.getKey().getName(), propEntry.getValue()); + } + } + } - return nodeMap.keySet(); + return clone; } private static ControllerServiceNode createControllerService(final ControllerServiceProvider provider, final Element controllerServiceElement, final StringEncryptor encryptor) { @@ -162,4 +197,5 @@ public class ControllerServiceLoader { } } } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/a1bb94c0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java index 60530c8..71af93f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java @@ -17,6 +17,7 @@ package org.apache.nifi.controller; +import org.apache.commons.io.IOUtils; import org.apache.nifi.admin.service.AuditService; import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer; import org.apache.nifi.authorization.AccessPolicy; @@ -41,6 +42,9 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.LinkedHashSet; import java.util.Set; @@ -128,6 +132,104 @@ public class TestFlowController { } @Test + public void testSynchronizeFlowWithReportingTaskAndProcessorReferencingControllerService() throws IOException { + // create a mock proposed data flow with the same auth fingerprint as the current authorizer + final String authFingerprint = authorizer.getFingerprint(); + final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class); + when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(authFingerprint.getBytes(StandardCharsets.UTF_8)); + + final File flowFile = new File("src/test/resources/conf/reporting-task-with-cs-flow-0.7.0.xml"); + final String flow = IOUtils.toString(new FileInputStream(flowFile)); + when(proposedDataFlow.getFlow()).thenReturn(flow.getBytes(StandardCharsets.UTF_8)); + + controller.synchronize(standardFlowSynchronizer, proposedDataFlow); + + // should be two controller services + final Set<ControllerServiceNode> controllerServiceNodes = controller.getAllControllerServices(); + assertNotNull(controllerServiceNodes); + assertEquals(2, controllerServiceNodes.size()); + + // find the controller service that was moved to the root group + final ControllerServiceNode rootGroupCs = controllerServiceNodes.stream().filter(c -> c.getProcessGroup() != null).findFirst().get(); + assertNotNull(rootGroupCs); + + // find the controller service that was not moved to the root group + final ControllerServiceNode controllerCs = controllerServiceNodes.stream().filter(c -> c.getProcessGroup() == null).findFirst().get(); + assertNotNull(controllerCs); + + // should be same class (not Ghost), different ids, and same properties + assertEquals(rootGroupCs.getCanonicalClassName(), controllerCs.getCanonicalClassName()); + assertFalse(rootGroupCs.getCanonicalClassName().contains("Ghost")); + assertNotEquals(rootGroupCs.getIdentifier(), controllerCs.getIdentifier()); + assertEquals(rootGroupCs.getProperties(), controllerCs.getProperties()); + + // should be one processor + final Set<ProcessorNode> processorNodes = controller.getGroup(controller.getRootGroupId()).getProcessors(); + assertNotNull(processorNodes); + assertEquals(1, processorNodes.size()); + + // verify the processor is still pointing at the controller service that got moved to the root group + final ProcessorNode processorNode = processorNodes.stream().findFirst().get(); + final PropertyDescriptor procControllerServiceProp = processorNode.getProperties().entrySet().stream() + .filter(e -> e.getValue().equals(rootGroupCs.getIdentifier())) + .map(e -> e.getKey()) + .findFirst() + .get(); + assertNotNull(procControllerServiceProp); + + // should be one reporting task + final Set<ReportingTaskNode> reportingTaskNodes = controller.getAllReportingTasks(); + assertNotNull(reportingTaskNodes); + assertEquals(1, reportingTaskNodes.size()); + + // verify that the reporting task is pointing at the controller service at the controller level + final ReportingTaskNode reportingTaskNode = reportingTaskNodes.stream().findFirst().get(); + final PropertyDescriptor reportingTaskControllerServiceProp = reportingTaskNode.getProperties().entrySet().stream() + .filter(e -> e.getValue().equals(controllerCs.getIdentifier())) + .map(e -> e.getKey()) + .findFirst() + .get(); + assertNotNull(reportingTaskControllerServiceProp); + } + + @Test + public void testSynchronizeFlowWithProcessorReferencingControllerService() throws IOException { + // create a mock proposed data flow with the same auth fingerprint as the current authorizer + final String authFingerprint = authorizer.getFingerprint(); + final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class); + when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(authFingerprint.getBytes(StandardCharsets.UTF_8)); + + final File flowFile = new File("src/test/resources/conf/processor-with-cs-flow-0.7.0.xml"); + final String flow = IOUtils.toString(new FileInputStream(flowFile)); + when(proposedDataFlow.getFlow()).thenReturn(flow.getBytes(StandardCharsets.UTF_8)); + + controller.synchronize(standardFlowSynchronizer, proposedDataFlow); + + // should be two controller services + final Set<ControllerServiceNode> controllerServiceNodes = controller.getAllControllerServices(); + assertNotNull(controllerServiceNodes); + assertEquals(1, controllerServiceNodes.size()); + + // find the controller service that was moved to the root group + final ControllerServiceNode rootGroupCs = controllerServiceNodes.stream().filter(c -> c.getProcessGroup() != null).findFirst().get(); + assertNotNull(rootGroupCs); + + // should be one processor + final Set<ProcessorNode> processorNodes = controller.getGroup(controller.getRootGroupId()).getProcessors(); + assertNotNull(processorNodes); + assertEquals(1, processorNodes.size()); + + // verify the processor is still pointing at the controller service that got moved to the root group + final ProcessorNode processorNode = processorNodes.stream().findFirst().get(); + final PropertyDescriptor procControllerServiceProp = processorNode.getProperties().entrySet().stream() + .filter(e -> e.getValue().equals(rootGroupCs.getIdentifier())) + .map(e -> e.getKey()) + .findFirst() + .get(); + assertNotNull(procControllerServiceProp); + } + + @Test public void testSynchronizeFlowWhenAuthorizationsAreEqual() { // create a mock proposed data flow with the same auth fingerprint as the current authorizer final String authFingerprint = authorizer.getFingerprint(); http://git-wip-us.apache.org/repos/asf/nifi/blob/a1bb94c0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyReportingTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyReportingTask.java new file mode 100644 index 0000000..cb74c76 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyReportingTask.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.service.mock; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.ReportingContext; + +import java.util.ArrayList; +import java.util.List; + +public class DummyReportingTask extends AbstractReportingTask { + + public static final PropertyDescriptor SERVICE = new PropertyDescriptor.Builder() + .name("Controller Service") + .identifiesControllerService(ControllerService.class) + .required(true) + .build(); + + public static final PropertyDescriptor PROP_FOO = new PropertyDescriptor.Builder() + .name("Foo") + .required(false) + .build(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(SERVICE); + descriptors.add(PROP_FOO); + return descriptors; + } + + @Override + public void onTrigger(ReportingContext context) { + + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a1bb94c0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceD.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceD.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceD.java new file mode 100644 index 0000000..b61bead --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceD.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.service.mock; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; + +import java.util.ArrayList; +import java.util.List; + +public class ServiceD extends AbstractControllerService { + + public static final PropertyDescriptor PROP_FOO1 = new PropertyDescriptor.Builder() + .name("Foo1") + .required(false) + .build(); + + public static final PropertyDescriptor PROP_FOO2 = new PropertyDescriptor.Builder() + .name("Foo2") + .required(false) + .build(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(PROP_FOO1); + descriptors.add(PROP_FOO2); + return descriptors; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a1bb94c0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/processor-with-cs-flow-0.7.0.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/processor-with-cs-flow-0.7.0.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/processor-with-cs-flow-0.7.0.xml new file mode 100644 index 0000000..7703a0b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/processor-with-cs-flow-0.7.0.xml @@ -0,0 +1,59 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<flowController> + <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount> + <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount> + <rootGroup> + <id>778f676e-6542-4c18-9d06-24b6fd3a1b29</id> + <name>NiFi Flow</name> + <position x="0.0" y="0.0"/> + <comment/> + <processor> + <id>809cca74-cd11-4ffa-9831-39d446a8ed55</id> + <name>DummyProcessor</name> + <position x="670.0" y="235.0"/> + <styles/> + <comment/> + <class>org.apache.nifi.controller.service.mock.DummyProcessor</class> + <maxConcurrentTasks>1</maxConcurrentTasks> + <schedulingPeriod>0 sec</schedulingPeriod> + <penalizationPeriod>30 sec</penalizationPeriod> + <yieldPeriod>1 sec</yieldPeriod> + <bulletinLevel>WARN</bulletinLevel> + <lossTolerant>false</lossTolerant> + <scheduledState>STOPPED</scheduledState> + <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> + <runDurationNanos>0</runDurationNanos> + <property> + <name>Controller Service</name> + <value>edf22ee5-376a-46dc-a38a-919351124457</value> + </property> + </processor> + </rootGroup> + <controllerServices> + <controllerService> + <id>edf22ee5-376a-46dc-a38a-919351124457</id> + <name>ControllerService</name> + <comment/> + <class>org.apache.nifi.controller.service.mock.ServiceD</class> + <enabled>false</enabled> + <property> + <name>Foo1</name> + <value>Bar1</value> + </property> + </controllerService> + </controllerServices> +</flowController> http://git-wip-us.apache.org/repos/asf/nifi/blob/a1bb94c0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/reporting-task-with-cs-flow-0.7.0.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/reporting-task-with-cs-flow-0.7.0.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/reporting-task-with-cs-flow-0.7.0.xml new file mode 100644 index 0000000..b9cff57 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/reporting-task-with-cs-flow-0.7.0.xml @@ -0,0 +1,74 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<flowController> + <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount> + <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount> + <rootGroup> + <id>778f676e-6542-4c18-9d06-24b6fd3a1b29</id> + <name>NiFi Flow</name> + <position x="0.0" y="0.0"/> + <comment/> + <processor> + <id>809cca74-cd11-4ffa-9831-39d446a8ed54</id> + <name>DummyProcessor</name> + <position x="670.0" y="235.0"/> + <styles/> + <comment/> + <class>org.apache.nifi.controller.service.mock.DummyProcessor</class> + <maxConcurrentTasks>1</maxConcurrentTasks> + <schedulingPeriod>0 sec</schedulingPeriod> + <penalizationPeriod>30 sec</penalizationPeriod> + <yieldPeriod>1 sec</yieldPeriod> + <bulletinLevel>WARN</bulletinLevel> + <lossTolerant>false</lossTolerant> + <scheduledState>STOPPED</scheduledState> + <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> + <runDurationNanos>0</runDurationNanos> + <property> + <name>Controller Service</name> + <value>edf22ee5-376a-46dc-a38a-919351124456</value> + </property> + </processor> + </rootGroup> + <controllerServices> + <controllerService> + <id>edf22ee5-376a-46dc-a38a-919351124456</id> + <name>ControllerService</name> + <comment/> + <class>org.apache.nifi.controller.service.mock.ServiceD</class> + <enabled>false</enabled> + <property> + <name>Foo1</name> + <value>Bar1</value> + </property> + </controllerService> + </controllerServices> + <reportingTasks> + <reportingTask> + <id>fb9b40ce-608f-4a2f-9822-3899f695f699</id> + <name>ReportingTask</name> + <comment/> + <class>org.apache.nifi.controller.service.mock.DummyReportingTask</class> + <schedulingPeriod>5 mins</schedulingPeriod> + <scheduledState>STOPPED</scheduledState> + <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> + <property> + <name>Controller Service</name> + <value>edf22ee5-376a-46dc-a38a-919351124456</value> + </property> + </reportingTask> + </reportingTasks> +</flowController>
