Repository: nifi
Updated Branches:
  refs/heads/master 0973c2d8d -> eb0b4283e


NIFI-5222: Prevent validating components multiple times for each update
- Avoid triggering async validation for each update to component when 
instantiating a template (such as copy/paste or templates). Added debug logging 
to indicate when and why we are triggering validation; removed unit test that 
made poor assumptions about the inner workings of the FlowSynchronizer that 
resulted in failures when we make calls into processors that the unit test 
doesn't know about"

This closes #2731.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/eb0b4283
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/eb0b4283
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/eb0b4283

Branch: refs/heads/master
Commit: eb0b4283e800636b8722fadfebce34e3df094196
Parents: 0973c2d
Author: Mark Payne <[email protected]>
Authored: Tue May 22 08:35:46 2018 -0400
Committer: Mark Payne <[email protected]>
Committed: Fri May 25 13:44:15 2018 -0400

----------------------------------------------------------------------
 .../nifi/controller/AbstractComponentNode.java  |  33 ++-
 .../apache/nifi/controller/ComponentNode.java   |  33 +++
 .../controller/TestAbstractComponentNode.java   |  45 ++-
 .../apache/nifi/controller/FlowController.java  | 166 ++++++-----
 .../controller/StandardFlowSynchronizer.java    | 106 ++++---
 .../nifi/controller/StandardProcessorNode.java  |   6 +
 .../service/ControllerServiceLoader.java        |   9 +-
 .../service/StandardControllerServiceNode.java  |   1 +
 .../nifi/groups/StandardProcessGroup.java       |  74 ++---
 .../StandardFlowSynchronizerSpec.groovy         | 274 -------------------
 .../dao/impl/StandardControllerServiceDAO.java  |  27 +-
 .../nifi/web/dao/impl/StandardProcessorDAO.java |  89 +++---
 .../web/dao/impl/StandardReportingTaskDAO.java  |  41 +--
 13 files changed, 410 insertions(+), 494 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/eb0b4283/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
index 7a35f5a..42214a9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
@@ -79,6 +79,7 @@ public abstract class AbstractComponentNode implements 
ComponentNode {
     private volatile String additionalResourcesFingerprint;
     private AtomicReference<ValidationState> validationState = new 
AtomicReference<>(new ValidationState(ValidationStatus.VALIDATING, 
Collections.emptyList()));
     private final ValidationTrigger validationTrigger;
+    private volatile boolean triggerValidation = true;
 
     public AbstractComponentNode(final String id,
                                        final ValidationContextFactory 
validationContextFactory, final ControllerServiceProvider serviceProvider,
@@ -129,6 +130,7 @@ public abstract class AbstractComponentNode implements 
ComponentNode {
     @Override
     public void setAnnotationData(final String data) {
         
annotationData.set(CharacterFilterUtils.filterInvalidXmlCharacters(data));
+        logger.debug("Resetting Validation State of {} due to setting 
annotation data", this);
         resetValidationState();
     }
 
@@ -198,7 +200,7 @@ public abstract class AbstractComponentNode implements 
ComponentNode {
                 }
             }
 
-            logger.debug("Setting properties to {}; resetting validation 
state", properties);
+            logger.debug("Resetting Validation State of {} due to setting 
properties", this);
             resetValidationState();
         } finally {
             lock.unlock();
@@ -609,7 +611,34 @@ public abstract class AbstractComponentNode implements 
ComponentNode {
     protected void resetValidationState() {
         validationContext.set(null);
         validationState.set(new ValidationState(ValidationStatus.VALIDATING, 
Collections.emptyList()));
-        validationTrigger.triggerAsync(this);
+
+        if (isTriggerValidation()) {
+            validationTrigger.triggerAsync(this);
+        } else {
+            logger.debug("Reset validation state of {} but will not trigger 
async validation because trigger has been paused", this);
+        }
+    }
+
+    @Override
+    public void pauseValidationTrigger() {
+        triggerValidation = false;
+    }
+
+    @Override
+    public void resumeValidationTrigger() {
+        triggerValidation = true;
+
+        final ValidationStatus validationStatus = getValidationStatus();
+        if (validationStatus == ValidationStatus.VALIDATING) {
+            logger.debug("Resuming Triggering of Validation State for {}; 
status is VALIDATING so will trigger async validation now", this);
+            validationTrigger.triggerAsync(this);
+        } else {
+            logger.debug("Resuming Triggering of Validation State for {}; 
status is {} so will not trigger async validation now", this, validationStatus);
+        }
+    }
+
+    private boolean isTriggerValidation() {
+        return triggerValidation;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/eb0b4283/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
index d65e8d6..707bb75 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
@@ -58,6 +58,39 @@ public interface ComponentNode extends ComponentAuthorizable 
{
 
     public void setProperties(Map<String, String> properties, boolean 
allowRemovalOfRequiredProperties);
 
+    /**
+     * <p>
+     * Pause triggering asynchronous validation to occur when the component is 
updated. Often times, it is necessary
+     * to update several aspects of a component, such as the properties and 
annotation data, at once. When this occurs,
+     * we don't want to trigger validation for each update, so we can follow 
the pattern:
+     * </p>
+     *
+     * <pre>
+     * <code>
+     * componentNode.pauseValidationTrigger();
+     * try {
+     *   componentNode.setProperties(properties);
+     *   componentNode.setAnnotationData(annotationData);
+     * } finally {
+     *   componentNode.resumeValidationTrigger();
+     * }
+     * </code>
+     * </pre>
+     *
+     * <p>
+     * When calling this method, it is imperative that {@link 
#resumeValidationTrigger()} is always called within a {@code finally} block to
+     * ensure that validation occurs.
+     * </p>
+     */
+    void pauseValidationTrigger();
+
+    /**
+     * Resume triggering asynchronous validation to occur when the component 
is updated. This method is to be used in conjunction
+     * with {@link #pauseValidationTrigger()} as illustrated in its 
documentation. When this method is called, if the component's Validation Status
+     * is {@link ValidationStatus#VALIDATING}, component validation will 
immediately be triggered asynchronously.
+     */
+    void resumeValidationTrigger();
+
     public Map<PropertyDescriptor, String> getProperties();
 
     public String getProperty(final PropertyDescriptor property);

http://git-wip-us.apache.org/repos/asf/nifi/blob/eb0b4283/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java
index 4102eca..3fa5d2c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java
@@ -21,8 +21,10 @@ import static org.junit.Assert.assertEquals;
 
 import java.net.URL;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.authorization.Resource;
 import org.apache.nifi.authorization.resource.Authorizable;
@@ -41,24 +43,57 @@ public class TestAbstractComponentNode {
 
     @Test(timeout = 5000)
     public void testGetValidationStatusWithTimeout() {
-        final ValidationControlledAbstractComponentNode node = new 
ValidationControlledAbstractComponentNode();
+        final ValidationControlledAbstractComponentNode node = new 
ValidationControlledAbstractComponentNode(5000, 
Mockito.mock(ValidationTrigger.class));
         final ValidationStatus status = node.getValidationStatus(1, 
TimeUnit.MILLISECONDS);
         assertEquals(ValidationStatus.VALIDATING, status);
     }
 
+    @Test(timeout = 10000)
+    public void testValidationTriggerPaused() throws InterruptedException {
+        final AtomicLong validationCount = new AtomicLong(0L);
+
+        final ValidationControlledAbstractComponentNode node = new 
ValidationControlledAbstractComponentNode(0, new ValidationTrigger() {
+            @Override
+            public void triggerAsync(ComponentNode component) {
+                validationCount.incrementAndGet();
+            }
+
+            @Override
+            public void trigger(ComponentNode component) {
+                validationCount.incrementAndGet();
+            }
+        });
+
+        node.pauseValidationTrigger();
+        for (int i = 0; i < 1000; i++) {
+            node.setProperties(Collections.emptyMap());
+            assertEquals(0, validationCount.get());
+        }
+        node.resumeValidationTrigger();
+
+        // wait for validation count to be 1 (this is asynchronous so we want 
to just keep checking).
+        while (validationCount.get() != 1) {
+            Thread.sleep(50L);
+        }
+
+        assertEquals(1L, validationCount.get());
+    }
 
     private static class ValidationControlledAbstractComponentNode extends 
AbstractComponentNode {
+        private final long pauseMillis;
 
-        public ValidationControlledAbstractComponentNode() {
+        public ValidationControlledAbstractComponentNode(final long 
pauseMillis, final ValidationTrigger validationTrigger) {
             super("id", Mockito.mock(ValidationContextFactory.class), 
Mockito.mock(ControllerServiceProvider.class), "unit test component",
                 
ValidationControlledAbstractComponentNode.class.getCanonicalName(), 
Mockito.mock(ComponentVariableRegistry.class), 
Mockito.mock(ReloadComponent.class),
-                Mockito.mock(ValidationTrigger.class), false);
+                validationTrigger, false);
+
+            this.pauseMillis = pauseMillis;
         }
 
         @Override
         protected Collection<ValidationResult> 
computeValidationErrors(ValidationContext context) {
             try {
-                Thread.sleep(5000L);
+                Thread.sleep(pauseMillis);
             } catch (final InterruptedException ie) {
             }
 
@@ -76,7 +111,7 @@ public class TestAbstractComponentNode {
 
         @Override
         public ConfigurableComponent getComponent() {
-            return null;
+            return Mockito.mock(ConfigurableComponent.class);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/eb0b4283/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index db18ada..ed56694 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -847,7 +847,35 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             // Perform validation of all components before attempting to start 
them.
             LOG.debug("Triggering initial validation of all components");
             final long start = System.nanoTime();
-            new TriggerValidationTask(this, validationTrigger).run();
+
+            final ValidationTrigger triggerIfValidating = new 
ValidationTrigger() {
+                @Override
+                public void triggerAsync(final ComponentNode component) {
+                    final ValidationStatus status = 
component.getValidationStatus();
+
+                    if (component.getValidationStatus() == 
ValidationStatus.VALIDATING) {
+                        LOG.debug("Will trigger async validation for {} 
because its status is VALIDATING", component);
+                        validationTrigger.triggerAsync(component);
+                    } else {
+                        LOG.debug("Will not trigger async validation for {} 
because its status is {}", component, status);
+                    }
+                }
+
+                @Override
+                public void trigger(final ComponentNode component) {
+                    final ValidationStatus status = 
component.getValidationStatus();
+
+                    if (component.getValidationStatus() == 
ValidationStatus.VALIDATING) {
+                        LOG.debug("Will trigger immediate validation for {} 
because its status is VALIDATING", component);
+                        validationTrigger.trigger(component);
+                    } else {
+                        LOG.debug("Will not trigger immediate validation for 
{} because its status is {}", component, status);
+                    }
+                }
+            };
+
+            new TriggerValidationTask(this, triggerIfValidating).run();
+
             final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
             LOG.info("Performed initial validation of all components in {} 
milliseconds", millis);
 
@@ -1231,7 +1259,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             }
         }
 
-        validationTrigger.triggerAsync(procNode);
         return procNode;
     }
 
@@ -1312,6 +1339,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         // need to refresh the properties in case we are changing from ghost 
component to real component
         existingNode.refreshProperties();
 
+        LOG.debug("Triggering async validation of {} due to processor reload", 
existingNode);
         validationTrigger.triggerAsync(existingNode);
     }
 
@@ -1855,26 +1883,33 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             //
             // Instantiate Controller Services
             //
-            for (final ControllerServiceDTO controllerServiceDTO : 
dto.getControllerServices()) {
-                final BundleCoordinate bundleCoordinate = 
BundleUtils.getBundle(controllerServiceDTO.getType(), 
controllerServiceDTO.getBundle());
-                final ControllerServiceNode serviceNode = 
createControllerService(controllerServiceDTO.getType(), 
controllerServiceDTO.getId(), bundleCoordinate, Collections.emptySet(), true);
+            final List<ControllerServiceNode> serviceNodes = new ArrayList<>();
+            try {
+                for (final ControllerServiceDTO controllerServiceDTO : 
dto.getControllerServices()) {
+                    final BundleCoordinate bundleCoordinate = 
BundleUtils.getBundle(controllerServiceDTO.getType(), 
controllerServiceDTO.getBundle());
+                    final ControllerServiceNode serviceNode = 
createControllerService(controllerServiceDTO.getType(), 
controllerServiceDTO.getId(), bundleCoordinate, Collections.emptySet(), true);
+                    serviceNode.pauseValidationTrigger();
+                    serviceNodes.add(serviceNode);
+
+                    
serviceNode.setAnnotationData(controllerServiceDTO.getAnnotationData());
+                    
serviceNode.setComments(controllerServiceDTO.getComments());
+                    serviceNode.setName(controllerServiceDTO.getName());
+                    if (!topLevel) {
+                        
serviceNode.setVersionedComponentId(controllerServiceDTO.getVersionedComponentId());
+                    }
 
-                
serviceNode.setAnnotationData(controllerServiceDTO.getAnnotationData());
-                serviceNode.setComments(controllerServiceDTO.getComments());
-                serviceNode.setName(controllerServiceDTO.getName());
-                if (!topLevel) {
-                    
serviceNode.setVersionedComponentId(controllerServiceDTO.getVersionedComponentId());
+                    group.addControllerService(serviceNode);
                 }
 
-                group.addControllerService(serviceNode);
-            }
-
-            // configure controller services. We do this after creating all of 
them in case 1 service
-            // references another service.
-            for (final ControllerServiceDTO controllerServiceDTO : 
dto.getControllerServices()) {
-                final String serviceId = controllerServiceDTO.getId();
-                final ControllerServiceNode serviceNode = 
getControllerServiceNode(serviceId);
-                
serviceNode.setProperties(controllerServiceDTO.getProperties());
+                // configure controller services. We do this after creating 
all of them in case 1 service
+                // references another service.
+                for (final ControllerServiceDTO controllerServiceDTO : 
dto.getControllerServices()) {
+                    final String serviceId = controllerServiceDTO.getId();
+                    final ControllerServiceNode serviceNode = 
getControllerServiceNode(serviceId);
+                    
serviceNode.setProperties(controllerServiceDTO.getProperties());
+                }
+            } finally {
+                
serviceNodes.stream().forEach(ControllerServiceNode::resumeValidationTrigger);
             }
 
             //
@@ -1963,61 +1998,66 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             for (final ProcessorDTO processorDTO : dto.getProcessors()) {
                 final BundleCoordinate bundleCoordinate = 
BundleUtils.getBundle(processorDTO.getType(), processorDTO.getBundle());
                 final ProcessorNode procNode = 
createProcessor(processorDTO.getType(), processorDTO.getId(), bundleCoordinate);
+                procNode.pauseValidationTrigger();
 
-                procNode.setPosition(toPosition(processorDTO.getPosition()));
-                procNode.setProcessGroup(group);
-                if (!topLevel) {
-                    
procNode.setVersionedComponentId(processorDTO.getVersionedComponentId());
-                }
+                try {
+                    
procNode.setPosition(toPosition(processorDTO.getPosition()));
+                    procNode.setProcessGroup(group);
+                    if (!topLevel) {
+                        
procNode.setVersionedComponentId(processorDTO.getVersionedComponentId());
+                    }
 
-                final ProcessorConfigDTO config = processorDTO.getConfig();
-                procNode.setComments(config.getComments());
-                if (config.isLossTolerant() != null) {
-                    procNode.setLossTolerant(config.isLossTolerant());
-                }
-                procNode.setName(processorDTO.getName());
+                    final ProcessorConfigDTO config = processorDTO.getConfig();
+                    procNode.setComments(config.getComments());
+                    if (config.isLossTolerant() != null) {
+                        procNode.setLossTolerant(config.isLossTolerant());
+                    }
+                    procNode.setName(processorDTO.getName());
 
-                procNode.setYieldPeriod(config.getYieldDuration());
-                procNode.setPenalizationPeriod(config.getPenaltyDuration());
-                
procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel()));
-                procNode.setAnnotationData(config.getAnnotationData());
-                procNode.setStyle(processorDTO.getStyle());
+                    procNode.setYieldPeriod(config.getYieldDuration());
+                    
procNode.setPenalizationPeriod(config.getPenaltyDuration());
+                    
procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel()));
+                    procNode.setAnnotationData(config.getAnnotationData());
+                    procNode.setStyle(processorDTO.getStyle());
 
-                if (config.getRunDurationMillis() != null) {
-                    procNode.setRunDuration(config.getRunDurationMillis(), 
TimeUnit.MILLISECONDS);
-                }
+                    if (config.getRunDurationMillis() != null) {
+                        procNode.setRunDuration(config.getRunDurationMillis(), 
TimeUnit.MILLISECONDS);
+                    }
 
-                if (config.getSchedulingStrategy() != null) {
-                    
procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy()));
-                }
+                    if (config.getSchedulingStrategy() != null) {
+                        
procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy()));
+                    }
 
-                if (config.getExecutionNode() != null) {
-                    
procNode.setExecutionNode(ExecutionNode.valueOf(config.getExecutionNode()));
-                }
+                    if (config.getExecutionNode() != null) {
+                        
procNode.setExecutionNode(ExecutionNode.valueOf(config.getExecutionNode()));
+                    }
 
-                if 
(processorDTO.getState().equals(ScheduledState.DISABLED.toString())) {
-                    procNode.disable();
-                }
+                    if 
(processorDTO.getState().equals(ScheduledState.DISABLED.toString())) {
+                        procNode.disable();
+                    }
 
-                // ensure that the scheduling strategy is set prior to these 
values
-                
procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount());
-                procNode.setScheduldingPeriod(config.getSchedulingPeriod());
+                    // ensure that the scheduling strategy is set prior to 
these values
+                    
procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount());
+                    
procNode.setScheduldingPeriod(config.getSchedulingPeriod());
 
-                final Set<Relationship> relationships = new HashSet<>();
-                if (processorDTO.getRelationships() != null) {
-                    for (final RelationshipDTO rel : 
processorDTO.getRelationships()) {
-                        if (rel.isAutoTerminate()) {
-                            
relationships.add(procNode.getRelationship(rel.getName()));
+                    final Set<Relationship> relationships = new HashSet<>();
+                    if (processorDTO.getRelationships() != null) {
+                        for (final RelationshipDTO rel : 
processorDTO.getRelationships()) {
+                            if (rel.isAutoTerminate()) {
+                                
relationships.add(procNode.getRelationship(rel.getName()));
+                            }
                         }
+                        procNode.setAutoTerminatedRelationships(relationships);
                     }
-                    procNode.setAutoTerminatedRelationships(relationships);
-                }
 
-                if (config.getProperties() != null) {
-                    procNode.setProperties(config.getProperties());
-                }
+                    if (config.getProperties() != null) {
+                        procNode.setProperties(config.getProperties());
+                    }
 
-                group.addProcessor(procNode);
+                    group.addProcessor(procNode);
+                } finally {
+                    procNode.resumeValidationTrigger();
+                }
             }
 
             //
@@ -3530,7 +3570,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                     new ReportingTaskLogObserver(getBulletinRepository(), 
taskNode));
         }
 
-        validationTrigger.triggerAsync(taskNode);
         return taskNode;
     }
 
@@ -3606,6 +3645,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         // need to refresh the properties in case we are changing from ghost 
component to real component
         existingNode.refreshProperties();
 
+        LOG.debug("Triggering async validation of {} due to reporting task 
reload", existingNode);
         validationTrigger.triggerAsync(existingNode);
     }
 
@@ -3697,7 +3737,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             }
         }
 
-        validationTrigger.triggerAsync(serviceNode);
         return serviceNode;
     }
 
@@ -3751,6 +3790,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         // need to refresh the properties in case we are changing from ghost 
component to real component
         existingNode.refreshProperties();
 
+        LOG.debug("Triggering async validation of {} due to controller service 
reload", existingNode);
         validationTrigger.triggerAsync(existingNode);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/eb0b4283/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 8a79acf..15538b3 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -515,20 +515,25 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
     private void updateReportingTaskControllerServices(final 
Set<ReportingTaskNode> reportingTasks, final Map<String, ControllerServiceNode> 
controllerServiceMapping) {
         for (ReportingTaskNode reportingTask : reportingTasks) {
             if (reportingTask.getProperties() != null) {
-                final Set<Map.Entry<PropertyDescriptor, String>> 
propertyDescriptors = reportingTask.getProperties().entrySet().stream()
-                        .filter(e -> 
e.getKey().getControllerServiceDefinition() != null)
-                        .filter(e -> 
controllerServiceMapping.containsKey(e.getValue()))
-                        .collect(Collectors.toSet());
+                reportingTask.pauseValidationTrigger();
+                try {
+                    final Set<Map.Entry<PropertyDescriptor, String>> 
propertyDescriptors = reportingTask.getProperties().entrySet().stream()
+                            .filter(e -> 
e.getKey().getControllerServiceDefinition() != null)
+                            .filter(e -> 
controllerServiceMapping.containsKey(e.getValue()))
+                            .collect(Collectors.toSet());
 
-                final Map<String,String> controllerServiceProps = new 
HashMap<>();
+                    final Map<String,String> controllerServiceProps = new 
HashMap<>();
 
-                for (Map.Entry<PropertyDescriptor, String> propEntry : 
propertyDescriptors) {
-                    final PropertyDescriptor propertyDescriptor = 
propEntry.getKey();
-                    final ControllerServiceNode clone = 
controllerServiceMapping.get(propEntry.getValue());
-                    controllerServiceProps.put(propertyDescriptor.getName(), 
clone.getIdentifier());
-                }
+                    for (Map.Entry<PropertyDescriptor, String> propEntry : 
propertyDescriptors) {
+                        final PropertyDescriptor propertyDescriptor = 
propEntry.getKey();
+                        final ControllerServiceNode clone = 
controllerServiceMapping.get(propEntry.getValue());
+                        
controllerServiceProps.put(propertyDescriptor.getName(), clone.getIdentifier());
+                    }
 
-                reportingTask.setProperties(controllerServiceProps);
+                    reportingTask.setProperties(controllerServiceProps);
+                } finally {
+                    reportingTask.resumeValidationTrigger();
+                }
             }
         }
     }
@@ -820,6 +825,9 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
             }
         }
 
+        // Ensure that all services have been validated, so that we don't 
attempt to enable a service that is still in a 'validating' state
+        toEnable.stream().forEach(ControllerServiceNode::performValidation);
+
         controller.disableControllerServicesAsync(toDisable);
         controller.enableControllerServices(toEnable);
 
@@ -842,6 +850,7 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
                             break;
                         case RUNNING:
                             // we want to run now. Make sure processor is not 
disabled and then start it.
+                            procNode.performValidation();
                             
procNode.getProcessGroup().enableProcessor(procNode);
                             
controller.startProcessor(procNode.getProcessGroupIdentifier(), 
procNode.getIdentifier(), false);
                             break;
@@ -1070,48 +1079,55 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
 
     private void updateProcessor(final ProcessorNode procNode, final 
ProcessorDTO processorDTO, final ProcessGroup processGroup, final 
FlowController controller)
             throws ProcessorInstantiationException {
-        final ProcessorConfigDTO config = processorDTO.getConfig();
-        procNode.setProcessGroup(processGroup);
-        procNode.setLossTolerant(config.isLossTolerant());
-        procNode.setPenalizationPeriod(config.getPenaltyDuration());
-        procNode.setYieldPeriod(config.getYieldDuration());
-        procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel()));
-        updateNonFingerprintedProcessorSettings(procNode, processorDTO);
 
-        if (config.getSchedulingStrategy() != null) {
-            
procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy()));
-        }
+        procNode.pauseValidationTrigger();
+        try {
+            final ProcessorConfigDTO config = processorDTO.getConfig();
+            procNode.setProcessGroup(processGroup);
+            procNode.setLossTolerant(config.isLossTolerant());
+            procNode.setPenalizationPeriod(config.getPenaltyDuration());
+            procNode.setYieldPeriod(config.getYieldDuration());
+            
procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel()));
+            updateNonFingerprintedProcessorSettings(procNode, processorDTO);
 
-        if (config.getExecutionNode() != null) {
-            
procNode.setExecutionNode(ExecutionNode.valueOf(config.getExecutionNode()));
-        }
+            if (config.getSchedulingStrategy() != null) {
+                
procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy()));
+            }
 
-        // must set scheduling strategy before these two
-        
procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount());
-        procNode.setScheduldingPeriod(config.getSchedulingPeriod());
-        if (config.getRunDurationMillis() != null) {
-            procNode.setRunDuration(config.getRunDurationMillis(), 
TimeUnit.MILLISECONDS);
-        }
+            if (config.getExecutionNode() != null) {
+                
procNode.setExecutionNode(ExecutionNode.valueOf(config.getExecutionNode()));
+            }
 
-        procNode.setAnnotationData(config.getAnnotationData());
+            // must set scheduling strategy before these two
+            
procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount());
+            procNode.setScheduldingPeriod(config.getSchedulingPeriod());
+            if (config.getRunDurationMillis() != null) {
+                procNode.setRunDuration(config.getRunDurationMillis(), 
TimeUnit.MILLISECONDS);
+            }
 
-        if (config.getAutoTerminatedRelationships() != null) {
-            final Set<Relationship> relationships = new HashSet<>();
-            for (final String rel : config.getAutoTerminatedRelationships()) {
-                relationships.add(procNode.getRelationship(rel));
+            procNode.setAnnotationData(config.getAnnotationData());
+
+            if (config.getAutoTerminatedRelationships() != null) {
+                final Set<Relationship> relationships = new HashSet<>();
+                for (final String rel : 
config.getAutoTerminatedRelationships()) {
+                    relationships.add(procNode.getRelationship(rel));
+                }
+                procNode.setAutoTerminatedRelationships(relationships);
             }
-            procNode.setAutoTerminatedRelationships(relationships);
-        }
 
-        procNode.setProperties(config.getProperties());
+            procNode.setProperties(config.getProperties());
 
-        final ScheduledState scheduledState = 
ScheduledState.valueOf(processorDTO.getState());
-        if (ScheduledState.RUNNING.equals(scheduledState)) {
-            controller.startProcessor(processGroup.getIdentifier(), 
procNode.getIdentifier());
-        } else if (ScheduledState.DISABLED.equals(scheduledState)) {
-            processGroup.disableProcessor(procNode);
-        } else if (ScheduledState.STOPPED.equals(scheduledState)) {
-            controller.stopProcessor(processGroup.getIdentifier(), 
procNode.getIdentifier());
+            final ScheduledState scheduledState = 
ScheduledState.valueOf(processorDTO.getState());
+            if (ScheduledState.RUNNING.equals(scheduledState)) {
+                procNode.performValidation(); // ensure that processor has 
been validated
+                controller.startProcessor(processGroup.getIdentifier(), 
procNode.getIdentifier());
+            } else if (ScheduledState.DISABLED.equals(scheduledState)) {
+                processGroup.disableProcessor(procNode);
+            } else if (ScheduledState.STOPPED.equals(scheduledState)) {
+                controller.stopProcessor(processGroup.getIdentifier(), 
procNode.getIdentifier());
+            }
+        } finally {
+            procNode.resumeValidationTrigger();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/eb0b4283/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 5d6b3da..6f51f87 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -410,6 +410,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
         }
 
         undefinedRelationshipsToTerminate.set(new HashSet<>(terminate));
+        LOG.debug("Resetting Validation State of {} due to setting 
auto-terminated relationships", this);
         resetValidationState();
     }
 
@@ -773,6 +774,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
                 
setIncomingConnections(Collections.unmodifiableList(updatedIncoming));
             }
         } finally {
+            LOG.debug("Resetting Validation State of {} due to connection 
added", this);
             resetValidationState();
         }
     }
@@ -851,6 +853,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
             }
         } finally {
             // need to perform validation in case selected relationships were 
changed.
+            LOG.debug("Resetting Validation State of {} due to updating 
connection", this);
             resetValidationState();
         }
     }
@@ -890,11 +893,13 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
                     "Cannot remove a connection from a ProcessorNode for which 
the ProcessorNode is not the Source");
         }
 
+        LOG.debug("Resetting Validation State of {} due to connection 
removed", this);
         resetValidationState();
     }
 
     private void setIncomingConnections(final List<Connection> incoming) {
         this.incomingConnections.set(incoming);
+        LOG.debug("Resetting Validation State of {} due to setting incoming 
connections", this);
         resetValidationState();
     }
 
@@ -1147,6 +1152,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     @Override
     public synchronized void setProcessGroup(final ProcessGroup group) {
         this.processGroup.set(group);
+        LOG.debug("Resetting Validation State of {} due to setting process 
group", this);
         resetValidationState();
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/eb0b4283/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index 102ae26..e5192b3 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -213,8 +213,13 @@ public class ControllerServiceLoader {
 
     private static void configureControllerService(final ControllerServiceNode 
node, final Element controllerServiceElement, final StringEncryptor encryptor) {
         final ControllerServiceDTO dto = 
FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
-        node.setAnnotationData(dto.getAnnotationData());
-        node.setProperties(dto.getProperties());
+        node.pauseValidationTrigger();
+        try {
+            node.setAnnotationData(dto.getAnnotationData());
+            node.setProperties(dto.getProperties());
+        } finally {
+            node.resumeValidationTrigger();
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/eb0b4283/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 74050e2..df17a8d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -222,6 +222,7 @@ public class StandardControllerServiceNode extends 
AbstractComponentNode impleme
         writeLock.lock();
         try {
             this.processGroup = group;
+            LOG.debug("Resetting Validation State of {} due to setting process 
group", this);
             resetValidationState();
         } finally {
             writeLock.unlock();

http://git-wip-us.apache.org/repos/asf/nifi/blob/eb0b4283/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 3660b99..e77be2a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -4043,18 +4043,23 @@ public final class StandardProcessGroup implements 
ProcessGroup {
     }
 
     private void updateControllerService(final ControllerServiceNode service, 
final VersionedControllerService proposed) {
-        service.setAnnotationData(proposed.getAnnotationData());
-        service.setComments(proposed.getComments());
-        service.setName(proposed.getName());
+        service.pauseValidationTrigger();
+        try {
+            service.setAnnotationData(proposed.getAnnotationData());
+            service.setComments(proposed.getComments());
+            service.setName(proposed.getName());
 
-        final Map<String, String> properties = 
populatePropertiesMap(service.getProperties(), proposed.getProperties(), 
proposed.getPropertyDescriptors(), service.getProcessGroup());
-        service.setProperties(properties, true);
+            final Map<String, String> properties = 
populatePropertiesMap(service.getProperties(), proposed.getProperties(), 
proposed.getPropertyDescriptors(), service.getProcessGroup());
+            service.setProperties(properties, true);
 
-        if (!isEqual(service.getBundleCoordinate(), proposed.getBundle())) {
-            final BundleCoordinate newBundleCoordinate = 
toCoordinate(proposed.getBundle());
-            final List<PropertyDescriptor> descriptors = new 
ArrayList<>(service.getProperties().keySet());
-            final Set<URL> additionalUrls = 
service.getAdditionalClasspathResources(descriptors);
-            flowController.reload(service, proposed.getType(), 
newBundleCoordinate, additionalUrls);
+            if (!isEqual(service.getBundleCoordinate(), proposed.getBundle())) 
{
+                final BundleCoordinate newBundleCoordinate = 
toCoordinate(proposed.getBundle());
+                final List<PropertyDescriptor> descriptors = new 
ArrayList<>(service.getProperties().keySet());
+                final Set<URL> additionalUrls = 
service.getAdditionalClasspathResources(descriptors);
+                flowController.reload(service, proposed.getType(), 
newBundleCoordinate, additionalUrls);
+            }
+        } finally {
+            service.resumeValidationTrigger();
         }
     }
 
@@ -4161,28 +4166,33 @@ public final class StandardProcessGroup implements 
ProcessGroup {
     }
 
     private void updateProcessor(final ProcessorNode processor, final 
VersionedProcessor proposed) throws ProcessorInstantiationException {
-        processor.setAnnotationData(proposed.getAnnotationData());
-        
processor.setBulletinLevel(LogLevel.valueOf(proposed.getBulletinLevel()));
-        processor.setComments(proposed.getComments());
-        processor.setName(proposed.getName());
-        processor.setPenalizationPeriod(proposed.getPenaltyDuration());
-
-        final Map<String, String> properties = 
populatePropertiesMap(processor.getProperties(), proposed.getProperties(), 
proposed.getPropertyDescriptors(), processor.getProcessGroup());
-        processor.setProperties(properties, true);
-        processor.setRunDuration(proposed.getRunDurationMillis(), 
TimeUnit.MILLISECONDS);
-        
processor.setSchedulingStrategy(SchedulingStrategy.valueOf(proposed.getSchedulingStrategy()));
-        processor.setScheduldingPeriod(proposed.getSchedulingPeriod());
-        
processor.setMaxConcurrentTasks(proposed.getConcurrentlySchedulableTaskCount());
-        
processor.setExecutionNode(ExecutionNode.valueOf(proposed.getExecutionNode()));
-        processor.setStyle(proposed.getStyle());
-        processor.setYieldPeriod(proposed.getYieldDuration());
-        processor.setPosition(new Position(proposed.getPosition().getX(), 
proposed.getPosition().getY()));
-
-        if (!isEqual(processor.getBundleCoordinate(), proposed.getBundle())) {
-            final BundleCoordinate newBundleCoordinate = 
toCoordinate(proposed.getBundle());
-            final List<PropertyDescriptor> descriptors = new 
ArrayList<>(processor.getProperties().keySet());
-            final Set<URL> additionalUrls = 
processor.getAdditionalClasspathResources(descriptors);
-            flowController.reload(processor, proposed.getType(), 
newBundleCoordinate, additionalUrls);
+        processor.pauseValidationTrigger();
+        try {
+            processor.setAnnotationData(proposed.getAnnotationData());
+            
processor.setBulletinLevel(LogLevel.valueOf(proposed.getBulletinLevel()));
+            processor.setComments(proposed.getComments());
+            processor.setName(proposed.getName());
+            processor.setPenalizationPeriod(proposed.getPenaltyDuration());
+
+            final Map<String, String> properties = 
populatePropertiesMap(processor.getProperties(), proposed.getProperties(), 
proposed.getPropertyDescriptors(), processor.getProcessGroup());
+            processor.setProperties(properties, true);
+            processor.setRunDuration(proposed.getRunDurationMillis(), 
TimeUnit.MILLISECONDS);
+            
processor.setSchedulingStrategy(SchedulingStrategy.valueOf(proposed.getSchedulingStrategy()));
+            processor.setScheduldingPeriod(proposed.getSchedulingPeriod());
+            
processor.setMaxConcurrentTasks(proposed.getConcurrentlySchedulableTaskCount());
+            
processor.setExecutionNode(ExecutionNode.valueOf(proposed.getExecutionNode()));
+            processor.setStyle(proposed.getStyle());
+            processor.setYieldPeriod(proposed.getYieldDuration());
+            processor.setPosition(new Position(proposed.getPosition().getX(), 
proposed.getPosition().getY()));
+
+            if (!isEqual(processor.getBundleCoordinate(), 
proposed.getBundle())) {
+                final BundleCoordinate newBundleCoordinate = 
toCoordinate(proposed.getBundle());
+                final List<PropertyDescriptor> descriptors = new 
ArrayList<>(processor.getProperties().keySet());
+                final Set<URL> additionalUrls = 
processor.getAdditionalClasspathResources(descriptors);
+                flowController.reload(processor, proposed.getType(), 
newBundleCoordinate, additionalUrls);
+            }
+        } finally {
+            processor.resumeValidationTrigger();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/eb0b4283/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy
deleted file mode 100644
index 897d77e..0000000
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller
-
-import groovy.xml.XmlUtil
-import org.apache.nifi.authorization.Authorizer
-import org.apache.nifi.bundle.BundleCoordinate
-import org.apache.nifi.cluster.protocol.DataFlow
-import org.apache.nifi.connectable.*
-import org.apache.nifi.controller.label.Label
-import org.apache.nifi.controller.queue.FlowFileQueue
-import org.apache.nifi.groups.ProcessGroup
-import org.apache.nifi.groups.RemoteProcessGroup
-import org.apache.nifi.nar.ExtensionManager
-import org.apache.nifi.nar.SystemBundle
-import org.apache.nifi.processor.Relationship
-import org.apache.nifi.reporting.BulletinRepository
-import org.apache.nifi.util.NiFiProperties
-import spock.lang.Shared
-import spock.lang.Specification
-import spock.lang.Unroll
-
-class StandardFlowSynchronizerSpec extends Specification {
-
-    @Shared
-    def systemBundle
-    @Shared
-    def nifiProperties
-
-    def setupSpec() {
-        def propFile = 
StandardFlowSynchronizerSpec.class.getResource("/standardflowsynchronizerspec.nifi.properties").getFile()
-
-        nifiProperties = NiFiProperties.createBasicNiFiProperties(propFile, 
null)
-        systemBundle = SystemBundle.create(nifiProperties)
-        ExtensionManager.discoverExtensions(systemBundle, 
Collections.emptySet())
-    }
-
-    def teardownSpec() {
-
-    }
-
-    @Unroll
-    def "scaling of #filename with encoding version 
\"#flowEncodingVersion\""() {
-        given: "a StandardFlowSynchronizer with mocked collaborators"
-        def controller = Mock FlowController
-        def proposedFlow = Mock DataFlow
-        def snippetManager = Mock SnippetManager
-        def bulletinRepository = Mock BulletinRepository
-        def flowFileQueue = Mock FlowFileQueue
-        def authorizer = Mock Authorizer
-        def flowFile = new 
File(StandardFlowSynchronizerSpec.getResource(filename).toURI())
-        def flowControllerXml = new XmlSlurper().parse(flowFile)
-        def Map<String, Position> originalPositionablePositionsById = 
flowControllerXml.rootGroup.'**'
-                .findAll { !it.name().equals('connection') && it.id.size() == 
1 && it.position.size() == 1 }
-                .collectEntries { [it.id.text(), new 
Position([email protected](), [email protected]())] }
-        def Map<String, List<Position>> originalBendPointsByConnectionId = 
flowControllerXml.rootGroup.'**'
-                .findAll { it.name().equals('connection') && 
it.bendPoints.size() > 0 }
-                .collectEntries { [it.id.text(), 
it.bendPoints.children().collect { new Position([email protected](), 
[email protected]()) }] }
-        flowControllerXml.@'encoding-version' = flowEncodingVersion
-        def testFlowBytes = XmlUtil.serialize(flowControllerXml).bytes
-        def Map<String, Position> positionablePositionsById = [:]
-        def Map<String, Positionable> positionableMocksById = [:]
-        def Map<String, Connection> connectionMocksById = [:]
-        def Map<String, List<Position>> bendPointPositionsByConnectionId = [:]
-        // the unit under test
-        def flowSynchronizer = new StandardFlowSynchronizer(null, 
nifiProperties)
-        def firstRootGroup = Mock ProcessGroup
-
-        when: "the flow is synchronized with the current state of the 
controller"
-        flowSynchronizer.sync controller, proposedFlow, null
-
-        then: "establish interactions for the mocked collaborators of 
StandardFlowSynchronizer to store the ending positions of components"
-        1 * firstRootGroup.findAllProcessors() >> []
-        1 * controller.isFlowSynchronized() >> false
-        _ * controller.rootGroupId >> flowControllerXml.rootGroup.id.text()
-        _ * controller.getGroup(_) >> { String id -> 
positionableMocksById.get(id) }
-        _ * controller.snippetManager >> snippetManager
-        _ * controller.bulletinRepository >> bulletinRepository
-        _ * controller.authorizer >> authorizer
-        _ * controller./set.*/(*_)
-        _ * controller.getAllControllerServices() >> []
-        _ * controller.getAllReportingTasks() >> []
-        _ * controller.getRootGroup() >>> [
-            firstRootGroup,
-            positionableMocksById.get(controller.rootGroupId)
-        ]
-        _ * controller.createProcessGroup(_) >> { String pgId ->
-            def processGroup = Mock(ProcessGroup)
-            _ * processGroup.getIdentifier() >> pgId
-            _ * processGroup.getPosition() >> { 
positionablePositionsById.get(pgId) }
-            _ * processGroup.setPosition(_) >> { Position pos ->
-                positionablePositionsById.put pgId, pos
-            }
-            _ * processGroup./(add|set).*/(*_)
-            _ * processGroup.isEmpty() >> true
-            _ * processGroup.isRootGroup() >> { pgId == 
flowControllerXml.rootGroup.id }
-            _ * processGroup.getConnectable(_) >> { String connId -> 
positionableMocksById.get(connId) }
-            _ * processGroup.findAllPositionables() >> {
-                def foundProcessGroup = flowControllerXml.rootGroup.'**'.find 
{ it.id == pgId }
-                def idsUnderPg = foundProcessGroup.'**'.findAll { it.name() == 
'id' }.collect { it.text() }
-                positionableMocksById.entrySet().collect {
-                    if (idsUnderPg.contains(it.key)) {
-                        it.value
-                    }
-                }
-            }
-            _ * processGroup.findAllConnections() >> {
-                def foundProcessGroup = flowControllerXml.rootGroup.'**'.find 
{ it.id == pgId }
-                def foundConnections = foundProcessGroup.'**'.findAll { 
it.name() == 'connection' }.collect { it.id.text() }
-                connectionMocksById.entrySet().collect {
-                    if (foundConnections.contains(it.key)) {
-                        it.value
-                    }
-                }
-            }
-                       _ * processGroup.findAllRemoteProcessGroups() >> []
-            
-            positionableMocksById.put(pgId, processGroup)
-            return processGroup
-        }
-
-        _ * controller.createProcessor(_, _, _, _) >> { String type, String 
id, BundleCoordinate coordinate, boolean firstTimeAdded ->
-            def processor = Mock(ProcessorNode)
-            _ * processor.getPosition() >> { positionablePositionsById.get(id) 
}
-            _ * processor.setPosition(_) >> { Position pos ->
-                positionablePositionsById.put id, pos
-            }
-            _ * processor./(add|set).*/(*_)
-            _ * processor.getIdentifier() >> id
-            _ * processor.getBundleCoordinate() >> coordinate
-            _ * processor.getRelationship(_) >> { String n -> new 
Relationship.Builder().name(n).build() }
-            positionableMocksById.put(id, processor)
-            return processor
-        }
-        _ * controller.createFunnel(_) >> { String id ->
-            def funnel = Mock(Funnel)
-            _ * funnel.getPosition() >> { positionablePositionsById.get(id) }
-            _ * funnel.setPosition(_) >> { Position pos ->
-                positionablePositionsById.put id, pos
-            }
-            _ * funnel./(add|set).*/(*_)
-            positionableMocksById.put id, funnel
-            return funnel
-        }
-        _ * controller.createLabel(_, _) >> { String id, String text ->
-            def l = Mock(Label)
-            _ * l.getPosition() >> { positionablePositionsById.get(id) }
-            _ * l.setPosition(_) >> { Position pos ->
-                positionablePositionsById.put id, pos
-            }
-            _ * l./(add|set).*/(*_)
-            positionableMocksById.put(id, l)
-            return l
-        }
-        _ * controller./create.*Port/(_, _) >> { String id, String text ->
-            def port = Mock(Port)
-            _ * port.getPosition() >> { positionablePositionsById.get(id) }
-            _ * port.setPosition(_) >> { Position pos ->
-                positionablePositionsById.put id, pos
-            }
-            _ * port./(add|set).*/(*_)
-            positionableMocksById.put(id, port)
-            return port
-        }
-        _ * controller.createRemoteProcessGroup(_, _) >> { String id, String 
uri ->
-            def rpg = Mock(RemoteProcessGroup)
-            _ * rpg.getPosition() >> { positionablePositionsById.get(id) }
-            _ * rpg.setPosition(_) >> { Position pos ->
-                positionablePositionsById.put id, pos
-            }
-            _ * rpg./(add|set).*/(*_)
-            _ * rpg.getOutputPort(_) >> { String rpgId -> 
positionableMocksById.get(rpgId) }
-            _ * rpg.getIdentifier() >> id
-            positionableMocksById.put(id, rpg)
-            return rpg
-        }
-        _ * controller.createConnection(_, _, _, _, _) >> { String id, String 
name, Connectable source, Connectable destination, Collection<String> 
relationshipNames ->
-            def connection = Mock(Connection)
-            _ * connection.getIdentifier() >> id
-            _ * connection.getBendPoints() >> {
-                def bendpoints = bendPointPositionsByConnectionId.get(id)
-                return bendpoints
-            }
-            _ * connection.setBendPoints(_) >> {
-                // There seems to be a bug in Spock method matching where a 
list of arguments to a method
-                // is being coerced into an Arrays$ArrayList with the actual 
list of bend points as an
-                // ArrayList in the 0th element.
-                // Need to keep an eye on this...
-                bendPointPositionsByConnectionId.put id, it[0]
-            }
-            _ * connection./set.*/(*_)
-            _ * connection.flowFileQueue >> flowFileQueue
-            connectionMocksById.put(id, connection)
-            return connection
-        }
-        _ * controller.startProcessor(*_)
-        _ * controller.startConnectable(_)
-        _ * controller.enableControllerServices(_)
-        _ * snippetManager.export() >> {
-            [] as byte[]
-        }
-        _ * snippetManager.clear()
-        1 * proposedFlow.flow >> testFlowBytes
-        _ * proposedFlow.snippets >> {
-            [] as byte[]
-        }
-        _ * proposedFlow.authorizerFingerprint >> null
-        _ * proposedFlow.missingComponents >> []
-
-        _ * flowFileQueue./set.*/(*_)
-        _ * _.hashCode() >> 1
-        0 * _ // no other mock calls allowed
-
-        then: "verify that the flow was scaled properly"
-        originalPositionablePositionsById.entrySet().forEach { entry ->
-            assert positionablePositionsById.containsKey(entry.key)
-            def originalPosition = entry.value
-            def position = positionablePositionsById.get(entry.key)
-            compareOriginalPointToScaledPoint(originalPosition, position, 
isSyncedPositionGreater)
-        }
-        originalBendPointsByConnectionId.entrySet().forEach { entry ->
-            assert bendPointPositionsByConnectionId.containsKey(entry.key)
-            def originalBendPoints = entry.value
-            def sortedBendPoints = 
bendPointPositionsByConnectionId.get(entry.key).sort { it.x }
-            def sortedOriginalBendPoints = originalBendPoints.sort { it.x }
-            assert sortedOriginalBendPoints.size() == sortedBendPoints.size()
-            [sortedOriginalBendPoints, sortedBendPoints].transpose().forEach { 
Position originalPosition, Position position ->
-                compareOriginalPointToScaledPoint(originalPosition, position, 
isSyncedPositionGreater)
-            }
-        }
-
-        where: "the each flowfile and flow encoding version is run through the 
StandardFlowSynchronizer"
-        filename                               | flowEncodingVersion | 
isSyncedPositionGreater
-        '/conf/scale-positions-flow-0.7.0.xml' | null                | true
-        '/conf/scale-positions-flow-0.7.0.xml' | '0.7'               | true
-        '/conf/scale-positions-flow-0.7.0.xml' | '1.0'               | false
-        '/conf/scale-positions-flow-0.7.0.xml' | '99.0'              | false
-    }
-
-    private void compareOriginalPointToScaledPoint(Position originalPosition, 
Position position, boolean isSyncedPositionGreater) {
-        if (originalPosition.x == 0) {
-            assert position.x == 0
-        }
-        if (originalPosition.y == 0) {
-            assert position.y == 0
-        }
-        if (originalPosition.x > 0) {
-            assert isSyncedPositionGreater == position.x > originalPosition.x
-        }
-        if (originalPosition.y > 0) {
-            assert isSyncedPositionGreater == position.y > originalPosition.y
-        }
-        if (originalPosition.x < 0) {
-            assert isSyncedPositionGreater == position.x < originalPosition.x
-        }
-        if (originalPosition.y < 0) {
-            assert isSyncedPositionGreater == position.y < originalPosition.y
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/eb0b4283/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
index 0c889f5..953e03a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
@@ -333,17 +333,22 @@ public class StandardControllerServiceDAO extends 
ComponentDAO implements Contro
         final String comments = controllerServiceDTO.getComments();
         final Map<String, String> properties = 
controllerServiceDTO.getProperties();
 
-        if (isNotNull(name)) {
-            controllerService.setName(name);
-        }
-        if (isNotNull(annotationData)) {
-            controllerService.setAnnotationData(annotationData);
-        }
-        if (isNotNull(comments)) {
-            controllerService.setComments(comments);
-        }
-        if (isNotNull(properties)) {
-            controllerService.setProperties(properties);
+        controllerService.pauseValidationTrigger(); // avoid causing 
validation to be triggered multiple times
+        try {
+            if (isNotNull(name)) {
+                controllerService.setName(name);
+            }
+            if (isNotNull(annotationData)) {
+                controllerService.setAnnotationData(annotationData);
+            }
+            if (isNotNull(comments)) {
+                controllerService.setComments(comments);
+            }
+            if (isNotNull(properties)) {
+                controllerService.setProperties(properties);
+            }
+        } finally {
+            controllerService.resumeValidationTrigger();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/eb0b4283/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
index e9a665a..5a1f3bb 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
@@ -142,51 +142,56 @@ public class StandardProcessorDAO extends ComponentDAO 
implements ProcessorDAO {
             final String bulletinLevel = config.getBulletinLevel();
             final Set<String> undefinedRelationshipsToTerminate = 
config.getAutoTerminatedRelationships();
 
-            // ensure scheduling strategy is set first
-            if (isNotNull(schedulingStrategy)) {
-                
processor.setSchedulingStrategy(SchedulingStrategy.valueOf(schedulingStrategy));
-            }
+            processor.pauseValidationTrigger(); // ensure that we don't 
trigger many validations to occur
+            try {
+                // ensure scheduling strategy is set first
+                if (isNotNull(schedulingStrategy)) {
+                    
processor.setSchedulingStrategy(SchedulingStrategy.valueOf(schedulingStrategy));
+                }
 
-            if (isNotNull(executionNode)) {
-                
processor.setExecutionNode(ExecutionNode.valueOf(executionNode));
-            }
-            if (isNotNull(comments)) {
-                processor.setComments(comments);
-            }
-            if (isNotNull(annotationData)) {
-                processor.setAnnotationData(annotationData);
-            }
-            if (isNotNull(maxTasks)) {
-                processor.setMaxConcurrentTasks(maxTasks);
-            }
-            if (isNotNull(schedulingPeriod)) {
-                processor.setScheduldingPeriod(schedulingPeriod);
-            }
-            if (isNotNull(penaltyDuration)) {
-                processor.setPenalizationPeriod(penaltyDuration);
-            }
-            if (isNotNull(yieldDuration)) {
-                processor.setYieldPeriod(yieldDuration);
-            }
-            if (isNotNull(runDurationMillis)) {
-                processor.setRunDuration(runDurationMillis, 
TimeUnit.MILLISECONDS);
-            }
-            if (isNotNull(bulletinLevel)) {
-                processor.setBulletinLevel(LogLevel.valueOf(bulletinLevel));
-            }
-            if (isNotNull(config.isLossTolerant())) {
-                processor.setLossTolerant(config.isLossTolerant());
-            }
-            if (isNotNull(configProperties)) {
-                processor.setProperties(configProperties);
-            }
+                if (isNotNull(executionNode)) {
+                    
processor.setExecutionNode(ExecutionNode.valueOf(executionNode));
+                }
+                if (isNotNull(comments)) {
+                    processor.setComments(comments);
+                }
+                if (isNotNull(annotationData)) {
+                    processor.setAnnotationData(annotationData);
+                }
+                if (isNotNull(maxTasks)) {
+                    processor.setMaxConcurrentTasks(maxTasks);
+                }
+                if (isNotNull(schedulingPeriod)) {
+                    processor.setScheduldingPeriod(schedulingPeriod);
+                }
+                if (isNotNull(penaltyDuration)) {
+                    processor.setPenalizationPeriod(penaltyDuration);
+                }
+                if (isNotNull(yieldDuration)) {
+                    processor.setYieldPeriod(yieldDuration);
+                }
+                if (isNotNull(runDurationMillis)) {
+                    processor.setRunDuration(runDurationMillis, 
TimeUnit.MILLISECONDS);
+                }
+                if (isNotNull(bulletinLevel)) {
+                    
processor.setBulletinLevel(LogLevel.valueOf(bulletinLevel));
+                }
+                if (isNotNull(config.isLossTolerant())) {
+                    processor.setLossTolerant(config.isLossTolerant());
+                }
+                if (isNotNull(configProperties)) {
+                    processor.setProperties(configProperties);
+                }
 
-            if (isNotNull(undefinedRelationshipsToTerminate)) {
-                final Set<Relationship> relationships = new HashSet<>();
-                for (final String relName : undefinedRelationshipsToTerminate) 
{
-                    relationships.add(new 
Relationship.Builder().name(relName).build());
+                if (isNotNull(undefinedRelationshipsToTerminate)) {
+                    final Set<Relationship> relationships = new HashSet<>();
+                    for (final String relName : 
undefinedRelationshipsToTerminate) {
+                        relationships.add(new 
Relationship.Builder().name(relName).build());
+                    }
+                    processor.setAutoTerminatedRelationships(relationships);
                 }
-                processor.setAutoTerminatedRelationships(relationships);
+            } finally {
+                processor.resumeValidationTrigger();
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/eb0b4283/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
index 9c6cc0b..5210a08 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
@@ -313,25 +313,30 @@ public class StandardReportingTaskDAO extends 
ComponentDAO implements ReportingT
         final String comments = reportingTaskDTO.getComments();
         final Map<String, String> properties = 
reportingTaskDTO.getProperties();
 
-        // ensure scheduling strategy is set first
-        if (isNotNull(schedulingStrategy)) {
-            
reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(schedulingStrategy));
-        }
+        reportingTask.pauseValidationTrigger(); // avoid triggering validation 
multiple times
+        try {
+            // ensure scheduling strategy is set first
+            if (isNotNull(schedulingStrategy)) {
+                
reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(schedulingStrategy));
+            }
 
-        if (isNotNull(name)) {
-            reportingTask.setName(name);
-        }
-        if (isNotNull(schedulingPeriod)) {
-            reportingTask.setSchedulingPeriod(schedulingPeriod);
-        }
-        if (isNotNull(annotationData)) {
-            reportingTask.setAnnotationData(annotationData);
-        }
-        if (isNotNull(comments)) {
-            reportingTask.setComments(comments);
-        }
-        if (isNotNull(properties)) {
-            reportingTask.setProperties(properties);
+            if (isNotNull(name)) {
+                reportingTask.setName(name);
+            }
+            if (isNotNull(schedulingPeriod)) {
+                reportingTask.setSchedulingPeriod(schedulingPeriod);
+            }
+            if (isNotNull(annotationData)) {
+                reportingTask.setAnnotationData(annotationData);
+            }
+            if (isNotNull(comments)) {
+                reportingTask.setComments(comments);
+            }
+            if (isNotNull(properties)) {
+                reportingTask.setProperties(properties);
+            }
+        } finally {
+            reportingTask.resumeValidationTrigger();
         }
     }
 

Reply via email to