NIFI-5154: When Processor or Controller Service is added to a Process Group, 
remove any references from it to any other Controller Service that is not 
reachable from the newly assigned Process Group
Fixed bug in unit test
Addressed review feedback/addressed issue where if a group is moved inside 
another group, the descendant processors of the moved group did not have their 
service references updated properly. Also addressed an issue where if a service 
is defined in Group A, then Group B lives within Group A and has a processor 
that references a service at the level of Group A, we allowed user to move 
Group B outside of Group A (but wouldn't allow the processor to be moved out of 
scope by itself).
This closes #2678


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/da94eaca
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/da94eaca
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/da94eaca

Branch: refs/heads/HDF-3.1-maint
Commit: da94eacae7bc31df07d2fbc13c44ff5b48637a53
Parents: fd8f1bf
Author: Mark Payne <[email protected]>
Authored: Fri May 4 14:48:53 2018 -0400
Committer: Matt Gilman <[email protected]>
Committed: Thu May 10 11:12:40 2018 -0400

----------------------------------------------------------------------
 .../nifi/groups/StandardProcessGroup.java       | 63 +++++++++++++++++++-
 .../TestStandardControllerServiceProvider.java  | 12 +++-
 2 files changed, 70 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/da94eaca/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index f0b7b3e..6170324 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -660,6 +660,10 @@ public final class StandardProcessGroup implements 
ProcessGroup {
 
             processGroups.put(Objects.requireNonNull(group).getIdentifier(), 
group);
             flowController.onProcessGroupAdded(group);
+
+            
group.findAllControllerServices().stream().forEach(this::updateControllerServiceReferences);
+            
group.findAllProcessors().stream().forEach(this::updateControllerServiceReferences);
+
             onComponentModified();
         } finally {
             writeLock.unlock();
@@ -829,12 +833,49 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             processor.getVariableRegistry().setParent(getVariableRegistry());
             processors.put(processorId, processor);
             flowController.onProcessorAdded(processor);
+            updateControllerServiceReferences(processor);
             onComponentModified();
         } finally {
             writeLock.unlock();
         }
     }
 
+    /**
+     * Looks for any property that is configured on the given component that 
references a Controller Service.
+     * If any exists, and that Controller Service is not accessible from this 
Process Group, then the given
+     * component will be removed from the service's referencing components.
+     *
+     * @param component the component whose invalid references should be 
removed
+     */
+    private void updateControllerServiceReferences(final ConfiguredComponent 
component) {
+        for (final Map.Entry<PropertyDescriptor, String> entry : 
component.getProperties().entrySet()) {
+            final String serviceId = entry.getValue();
+            if (serviceId == null) {
+                continue;
+            }
+
+            final PropertyDescriptor propertyDescriptor = entry.getKey();
+            final Class<? extends ControllerService> serviceClass = 
propertyDescriptor.getControllerServiceDefinition();
+
+            if (serviceClass != null) {
+                final boolean validReference = 
isValidServiceReference(serviceId, serviceClass);
+                final ControllerServiceNode serviceNode = 
controllerServiceProvider.getControllerServiceNode(serviceId);
+                if (serviceNode != null) {
+                    if (validReference) {
+                        serviceNode.addReference(component);
+                    } else {
+                        serviceNode.removeReference(component);
+                    }
+                }
+            }
+        }
+    }
+
+    private boolean isValidServiceReference(final String serviceId, final 
Class<? extends ControllerService> serviceClass) {
+        final Set<String> validServiceIds = 
controllerServiceProvider.getControllerServiceIdentifiers(serviceClass, 
getIdentifier());
+        return validServiceIds.contains(serviceId);
+    }
+
     @Override
     public void removeProcessor(final ProcessorNode processor) {
         boolean removed = false;
@@ -2024,6 +2065,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             service.getVariableRegistry().setParent(getVariableRegistry());
             this.controllerServices.put(service.getIdentifier(), service);
             LOG.info("{} added to {}", service, this);
+            updateControllerServiceReferences(service);
             onComponentModified();
         } finally {
             writeLock.unlock();
@@ -2751,8 +2793,8 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                 }
             }
 
-            for (final String id : snippet.getProcessors().keySet()) {
-                final ProcessorNode processorNode = getProcessor(id);
+            final Set<ProcessorNode> processors = findAllProcessors(snippet);
+            for (final ProcessorNode processorNode : processors) {
                 for (final PropertyDescriptor descriptor : 
processorNode.getProperties().keySet()) {
                     final Class<? extends ControllerService> serviceDefinition 
= descriptor.getControllerServiceDefinition();
 
@@ -2768,7 +2810,8 @@ public final class StandardProcessGroup implements 
ProcessGroup {
 
                             // ensure the configured service is an allowed 
service if it's still a valid service
                             if 
(currentControllerServiceIds.contains(serviceId) && 
!proposedControllerServiceIds.contains(serviceId)) {
-                                throw new IllegalStateException("Cannot 
perform Move Operation because a Processor references a service that is not 
available in the destination Process Group");
+                                throw new IllegalStateException("Cannot 
perform Move Operation because Processor with ID " + 
processorNode.getIdentifier()
+                                    + " references a service that is not 
available in the destination Process Group");
                             }
                         }
                     }
@@ -2779,6 +2822,20 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         }
     }
 
+    private Set<ProcessorNode> findAllProcessors(final Snippet snippet) {
+        final Set<ProcessorNode> processors = new HashSet<>();
+
+        snippet.getProcessors().keySet().stream()
+            .map(this::getProcessor)
+            .forEach(processors::add);
+
+        for (final String groupId : snippet.getProcessGroups().keySet()) {
+            processors.addAll(getProcessGroup(groupId).findAllProcessors());
+        }
+
+        return processors;
+    }
+
     @Override
     public MutableVariableRegistry getVariableRegistry() {
         return variableRegistry;

http://git-wip-us.apache.org/repos/asf/nifi/blob/da94eaca/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index ed335e9..d45b51c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -57,6 +57,8 @@ import 
org.apache.nifi.processor.StandardValidationContextFactory;
 import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.registry.variable.MutableVariableRegistry;
 import org.apache.nifi.registry.variable.StandardComponentVariableRegistry;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.MockProcessorInitializationContext;
 import org.apache.nifi.util.NiFiProperties;
 import org.junit.Assert;
 import org.junit.Before;
@@ -430,8 +432,14 @@ public class TestStandardControllerServiceProvider {
 
     private ProcessorNode createProcessor(final StandardProcessScheduler 
scheduler, final ControllerServiceProvider serviceProvider) {
         final ReloadComponent reloadComponent = 
Mockito.mock(ReloadComponent.class);
-        final LoggableComponent<Processor> dummyProcessor = new 
LoggableComponent<>(new DummyProcessor(), 
systemBundle.getBundleDetails().getCoordinate(), null);
-        final ProcessorNode procNode = new 
StandardProcessorNode(dummyProcessor, UUID.randomUUID().toString(),
+
+        final Processor processor = new DummyProcessor();
+        final MockProcessContext context = new MockProcessContext(processor, 
Mockito.mock(StateManager.class), variableRegistry);
+        final MockProcessorInitializationContext mockInitContext = new 
MockProcessorInitializationContext(processor, context);
+        processor.initialize(mockInitContext);
+
+        final LoggableComponent<Processor> dummyProcessor = new 
LoggableComponent<>(processor, systemBundle.getBundleDetails().getCoordinate(), 
null);
+        final ProcessorNode procNode = new 
StandardProcessorNode(dummyProcessor, mockInitContext.getIdentifier(),
                 new StandardValidationContextFactory(serviceProvider, null), 
scheduler, serviceProvider, niFiProperties,
                 new 
StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), 
reloadComponent);
 

Reply via email to