http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index 2219d6d..bd0f0ab 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -16,19 +16,18 @@
  */
 package org.apache.nifi.controller.service;
 
-import org.apache.nifi.components.ConfigurableComponent;
-import org.apache.nifi.components.VersionedComponent;
-import org.apache.nifi.controller.ConfiguredComponent;
-import org.apache.nifi.controller.ControllerService;
-import org.apache.nifi.controller.LoggableComponent;
-import org.apache.nifi.groups.ProcessGroup;
-
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 
-public interface ControllerServiceNode extends ConfiguredComponent, 
ConfigurableComponent, VersionedComponent {
+import org.apache.nifi.components.VersionedComponent;
+import org.apache.nifi.controller.ComponentNode;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.LoggableComponent;
+import org.apache.nifi.groups.ProcessGroup;
+
+public interface ControllerServiceNode extends ComponentNode, 
VersionedComponent {
 
     /**
      * @return the Process Group that this Controller Service belongs to, or 
<code>null</code> if the Controller Service
@@ -121,13 +120,13 @@ public interface ControllerServiceNode extends 
ConfiguredComponent, Configurable
      * Indicates that the given component is now referencing this Controller 
Service
      * @param referringComponent the component referencing this service
      */
-    void addReference(ConfiguredComponent referringComponent);
+    void addReference(ComponentNode referringComponent);
 
     /**
      * Indicates that the given component is no longer referencing this 
Controller Service
      * @param referringComponent the component that is no longer referencing 
this service
      */
-    void removeReference(ConfiguredComponent referringComponent);
+    void removeReference(ComponentNode referringComponent);
 
     void setComments(String comment);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
index ae5416c..56276f4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
@@ -24,7 +24,7 @@ import java.util.concurrent.Future;
 
 import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.bundle.BundleCoordinate;
-import org.apache.nifi.controller.ConfiguredComponent;
+import org.apache.nifi.controller.ComponentNode;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceLookup;
 
@@ -138,7 +138,7 @@ public interface ControllerServiceProvider extends 
ControllerServiceLookup {
      *
      * @param serviceNode the node
      */
-    Set<ConfiguredComponent> 
unscheduleReferencingComponents(ControllerServiceNode serviceNode);
+    Set<ComponentNode> unscheduleReferencingComponents(ControllerServiceNode 
serviceNode);
 
     /**
      * Verifies that all Controller Services referencing the provided 
Controller
@@ -159,7 +159,7 @@ public interface ControllerServiceProvider extends 
ControllerServiceLookup {
      *
      * @param serviceNode the node
      */
-    Set<ConfiguredComponent> disableReferencingServices(ControllerServiceNode 
serviceNode);
+    Set<ComponentNode> disableReferencingServices(ControllerServiceNode 
serviceNode);
 
     /**
      * Verifies that all Controller Services referencing the provided
@@ -181,7 +181,7 @@ public interface ControllerServiceProvider extends 
ControllerServiceLookup {
      *
      * @return the set of all components that were updated as a result of this 
action
      */
-    Set<ConfiguredComponent> enableReferencingServices(ControllerServiceNode 
serviceNode);
+    Set<ComponentNode> enableReferencingServices(ControllerServiceNode 
serviceNode);
 
     /**
      * Verifies that all enabled Processors referencing the ControllerService
@@ -203,7 +203,7 @@ public interface ControllerServiceProvider extends 
ControllerServiceLookup {
      *
      * @param serviceNode the node
      */
-    Set<ConfiguredComponent> 
scheduleReferencingComponents(ControllerServiceNode serviceNode);
+    Set<ComponentNode> scheduleReferencingComponents(ControllerServiceNode 
serviceNode);
 
     /**
      *

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java
index 13c5844..056bcbb 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java
@@ -19,7 +19,7 @@ package org.apache.nifi.controller.service;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.nifi.controller.ConfiguredComponent;
+import org.apache.nifi.controller.ComponentNode;
 
 /**
  * Provides a collection of components that are referencing a Controller 
Service
@@ -35,7 +35,7 @@ public interface ControllerServiceReference {
      * @return a {@link Set} of all components that are referencing this
      * Controller Service
      */
-    Set<ConfiguredComponent> getReferencingComponents();
+    Set<ComponentNode> getReferencingComponents();
 
     /**
      * @return a {@link Set} of all Processors, Reporting Tasks, and Controller
@@ -43,7 +43,7 @@ public interface ControllerServiceReference {
      * the case of Processors and Reporting Tasks) or enabled (in the case of
      * Controller Services)
      */
-    Set<ConfiguredComponent> getActiveReferences();
+    Set<ComponentNode> getActiveReferences();
 
     /**
      * Returns a List of all components that reference this Controller Service 
(recursively) that

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index da9374a..c266c64 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -25,12 +25,13 @@ import java.util.function.Predicate;
 
 import org.apache.nifi.authorization.resource.ComponentAuthorizable;
 import org.apache.nifi.components.VersionedComponent;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.connectable.Funnel;
 import org.apache.nifi.connectable.Port;
 import org.apache.nifi.connectable.Positionable;
-import org.apache.nifi.controller.ConfiguredComponent;
+import org.apache.nifi.controller.ComponentNode;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.Snippet;
@@ -59,7 +60,7 @@ public interface ProcessGroup extends ComponentAuthorizable, 
Positionable, Versi
     /**
      * Predicate for starting eligible Processors.
      */
-    Predicate<ProcessorNode> START_PROCESSORS_FILTER = node -> 
!node.isRunning() && !ScheduledState.DISABLED.equals(node.getScheduledState()) 
&& node.isValid();
+    Predicate<ProcessorNode> START_PROCESSORS_FILTER = node -> 
!node.isRunning() && !ScheduledState.DISABLED.equals(node.getScheduledState()) 
&& node.getValidationStatus() == ValidationStatus.VALID;
 
     /**
      * Predicate for stopping eligible Processors.
@@ -399,7 +400,7 @@ public interface ProcessGroup extends 
ComponentAuthorizable, Positionable, Versi
      * @return a {@link Collection} of all FlowFileProcessors that are 
contained
      * within this.
      */
-    Set<ProcessorNode> getProcessors();
+    Collection<ProcessorNode> getProcessors();
 
     /**
      * Returns the FlowFileProcessor with the given ID.
@@ -972,7 +973,7 @@ public interface ProcessGroup extends 
ComponentAuthorizable, Positionable, Versi
      * @param variableName the name of the variable
      * @return a set of all components that are affected by the variable with 
the given name
      */
-    Set<ConfiguredComponent> getComponentsAffectedByVariable(String 
variableName);
+    Set<ComponentNode> getComponentsAffectedByVariable(String variableName);
 
     /**
      * @return the version control information that indicates where this flow 
is stored in a Flow Registry,

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java
new file mode 100644
index 0000000..4102eca
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestAbstractComponentNode.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller;
+
+import static org.junit.Assert.assertEquals;
+
+import java.net.URL;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.authorization.Resource;
+import org.apache.nifi.authorization.resource.Authorizable;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.ConfigurableComponent;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.validation.ValidationStatus;
+import org.apache.nifi.components.validation.ValidationTrigger;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.registry.ComponentVariableRegistry;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestAbstractComponentNode {
+
+    @Test(timeout = 5000)
+    public void testGetValidationStatusWithTimeout() {
+        final ValidationControlledAbstractComponentNode node = new 
ValidationControlledAbstractComponentNode();
+        final ValidationStatus status = node.getValidationStatus(1, 
TimeUnit.MILLISECONDS);
+        assertEquals(ValidationStatus.VALIDATING, status);
+    }
+
+
+    private static class ValidationControlledAbstractComponentNode extends 
AbstractComponentNode {
+
+        public ValidationControlledAbstractComponentNode() {
+            super("id", Mockito.mock(ValidationContextFactory.class), 
Mockito.mock(ControllerServiceProvider.class), "unit test component",
+                
ValidationControlledAbstractComponentNode.class.getCanonicalName(), 
Mockito.mock(ComponentVariableRegistry.class), 
Mockito.mock(ReloadComponent.class),
+                Mockito.mock(ValidationTrigger.class), false);
+        }
+
+        @Override
+        protected Collection<ValidationResult> 
computeValidationErrors(ValidationContext context) {
+            try {
+                Thread.sleep(5000L);
+            } catch (final InterruptedException ie) {
+            }
+
+            return null;
+        }
+
+        @Override
+        public void reload(Set<URL> additionalUrls) throws Exception {
+        }
+
+        @Override
+        public BundleCoordinate getBundleCoordinate() {
+            return null;
+        }
+
+        @Override
+        public ConfigurableComponent getComponent() {
+            return null;
+        }
+
+        @Override
+        public TerminationAwareLogger getLogger() {
+            return null;
+        }
+
+        @Override
+        public Class<?> getComponentClass() {
+            return ValidationControlledAbstractComponentNode.class;
+        }
+
+        @Override
+        public boolean isRestricted() {
+            return false;
+        }
+
+        @Override
+        public boolean isDeprecated() {
+            return false;
+        }
+
+        @Override
+        public boolean isValidationNecessary() {
+            return true;
+        }
+
+        @Override
+        public String getProcessGroupIdentifier() {
+            return "1234";
+        }
+
+        @Override
+        public Authorizable getParentAuthorizable() {
+            return null;
+        }
+
+        @Override
+        public Resource getResource() {
+            return null;
+        }
+
+        @Override
+        public void verifyModifiable() throws IllegalStateException {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/StandardValidationTrigger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/StandardValidationTrigger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/StandardValidationTrigger.java
new file mode 100644
index 0000000..4f1f7dc
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/StandardValidationTrigger.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.validation;
+
+import java.util.concurrent.ExecutorService;
+import java.util.function.BooleanSupplier;
+
+import org.apache.nifi.controller.ComponentNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StandardValidationTrigger implements ValidationTrigger {
+    private static final Logger logger = 
LoggerFactory.getLogger(StandardValidationTrigger.class);
+
+    private final ExecutorService threadPool;
+    private final BooleanSupplier flowInitialized;
+
+    public StandardValidationTrigger(final ExecutorService threadPool, final 
BooleanSupplier flowInitialized) {
+        this.threadPool = threadPool;
+        this.flowInitialized = flowInitialized;
+    }
+
+    @Override
+    public void triggerAsync(final ComponentNode component) {
+        if (!flowInitialized.getAsBoolean()) {
+            logger.debug("Triggered to perform validation on {} asynchronously 
but flow is not yet initialized so will ignore validation", component);
+            return;
+        }
+
+        threadPool.submit(() -> trigger(component));
+    }
+
+    @Override
+    public void trigger(final ComponentNode component) {
+        try {
+            if (component.isValidationNecessary()) {
+                component.performValidation();
+            }
+        } catch (final Throwable t) {
+            component.getLogger().error("Failed to perform validation due to " 
+ t, t);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/TriggerValidationTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/TriggerValidationTask.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/TriggerValidationTask.java
new file mode 100644
index 0000000..0665dd2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/validation/TriggerValidationTask.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.validation;
+
+import org.apache.nifi.controller.ComponentNode;
+import org.apache.nifi.controller.FlowController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TriggerValidationTask implements Runnable {
+    private static final Logger logger = 
LoggerFactory.getLogger(TriggerValidationTask.class);
+
+    private final FlowController controller;
+    private final ValidationTrigger validationTrigger;
+
+    public TriggerValidationTask(final FlowController controller, final 
ValidationTrigger validationTrigger) {
+        this.controller = controller;
+        this.validationTrigger = validationTrigger;
+    }
+
+    @Override
+    public void run() {
+        try {
+            logger.debug("Triggering validation of all components");
+
+            for (final ComponentNode node : 
controller.getAllControllerServices()) {
+                validationTrigger.trigger(node);
+            }
+
+            for (final ComponentNode node : controller.getAllReportingTasks()) 
{
+                validationTrigger.trigger(node);
+            }
+
+            for (final ComponentNode node : 
controller.getRootGroup().findAllProcessors()) {
+                validationTrigger.trigger(node);
+            }
+        } catch (final Throwable t) {
+            logger.error("Encountered unexpected error when attempting to 
validate components", t);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 92ab7b3..091f0fb 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -16,6 +16,41 @@
  */
 package org.apache.nifi.controller;
 
+import static java.util.Objects.requireNonNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import javax.net.ssl.SSLContext;
+
 import org.apache.commons.collections4.Predicate;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.action.Action;
@@ -52,6 +87,10 @@ import 
org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.StandardValidationTrigger;
+import org.apache.nifi.components.validation.TriggerValidationTask;
+import org.apache.nifi.components.validation.ValidationStatus;
+import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
@@ -99,8 +138,8 @@ import 
org.apache.nifi.controller.repository.claim.StandardContentClaim;
 import 
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.controller.repository.io.LimitedInputStream;
 import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent;
-import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
 import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent;
+import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
 import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
 import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
 import org.apache.nifi.controller.serialization.FlowSerializationException;
@@ -206,6 +245,7 @@ import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.ReflectionUtils;
 import org.apache.nifi.util.SnippetUtils;
+import org.apache.nifi.util.concurrency.TimedLock;
 import org.apache.nifi.web.ResourceNotFoundException;
 import org.apache.nifi.web.api.dto.BatchSettingsDTO;
 import org.apache.nifi.web.api.dto.BundleDTO;
@@ -229,41 +269,6 @@ import 
org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.management.GarbageCollectorMXBean;
-import java.lang.management.ManagementFactory;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-
-import static java.util.Objects.requireNonNull;
-
 public class FlowController implements EventAccess, ControllerServiceProvider, 
ReportingTaskProvider,
     QueueProvider, Authorizable, ProvenanceAuthorizableFactory, 
NodeTypeProvider, IdentifierLookup, ReloadComponent {
 
@@ -343,6 +348,8 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     private final LeaderElectionManager leaderElectionManager;
     private final ClusterCoordinator clusterCoordinator;
     private final FlowRegistryClient flowRegistryClient;
+    private final FlowEngine validationThreadPool;
+    private final ValidationTrigger validationTrigger;
 
     /**
      * true if controller is configured to operate in a clustered environment
@@ -397,8 +404,8 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     private volatile boolean shutdown = false;
 
     private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
-    private final Lock readLock = rwLock.readLock();
-    private final Lock writeLock = rwLock.writeLock();
+    private final TimedLock readLock = new TimedLock(rwLock.readLock(), 
"FlowControllerReadLock", 1);
+    private final TimedLock writeLock = new TimedLock(rwLock.writeLock(), 
"FlowControllerWriteLock", 1);
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FlowController.class);
 
@@ -460,6 +467,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         return flowController;
     }
 
+    @SuppressWarnings("deprecation")
     private FlowController(
             final FlowFileEventRepository flowFileEventRepo,
             final NiFiProperties nifiProperties,
@@ -568,7 +576,11 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         setRootGroup(rootGroup);
         instanceId = ComponentIdGenerator.generateId().toString();
 
-        controllerServiceProvider = new 
StandardControllerServiceProvider(this, processScheduler, bulletinRepository, 
stateManagerProvider, this.variableRegistry, this.nifiProperties);
+        this.validationThreadPool = new FlowEngine(5, "Validate Components", 
true);
+        this.validationTrigger = new 
StandardValidationTrigger(validationThreadPool, this::isInitialized);
+
+        controllerServiceProvider = new 
StandardControllerServiceProvider(this, processScheduler, bulletinRepository, 
stateManagerProvider,
+            this.variableRegistry, this.nifiProperties, validationTrigger);
 
         if (remoteInputSocketPort == null) {
             LOG.info("Not enabling RAW Socket Site-to-Site functionality 
because nifi.remote.input.socket.port is not set");
@@ -793,7 +805,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
             initialized.set(true);
         } finally {
-            writeLock.unlock();
+            writeLock.unlock("initializeFlow");
         }
     }
 
@@ -833,6 +845,16 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     public void onFlowInitialized(final boolean startDelayedComponents) {
         writeLock.lock();
         try {
+            // Perform validation of all components before attempting to start 
them.
+            LOG.debug("Triggering initial validation of all components");
+            final long start = System.nanoTime();
+            new TriggerValidationTask(this, validationTrigger).run();
+            final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+            LOG.info("Performed initial validation of all components in {} 
milliseconds", millis);
+
+            // Trigger component validation to occur every 5 seconds.
+            validationThreadPool.scheduleWithFixedDelay(new 
TriggerValidationTask(this, validationTrigger), 5, 5, TimeUnit.SECONDS);
+
             if (startDelayedComponents) {
                 LOG.info("Starting {} processors/ports/funnels", 
startConnectablesAfterInitialization.size() + 
startRemoteGroupPortsAfterInitialization.size());
                 for (final Connectable connectable : 
startConnectablesAfterInitialization) {
@@ -842,6 +864,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
                     try {
                         if (connectable instanceof ProcessorNode) {
+                            ((ProcessorNode) 
connectable).getValidationStatus(5, TimeUnit.SECONDS);
                             
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true);
                         } else {
                             startConnectable(connectable);
@@ -885,7 +908,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 startRemoteGroupPortsAfterInitialization.clear();
             }
         } finally {
-            writeLock.unlock();
+            writeLock.unlock("onFlowInitialized");
         }
     }
 
@@ -1150,12 +1173,12 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         final ProcessorNode procNode;
         if (creationSuccessful) {
             procNode = new StandardProcessorNode(processor, id, 
validationContextFactory, processScheduler, controllerServiceProvider,
-                nifiProperties, componentVarRegistry, this);
+                nifiProperties, componentVarRegistry, this, validationTrigger);
         } else {
             final String simpleClassName = type.contains(".") ? 
StringUtils.substringAfterLast(type, ".") : type;
             final String componentType = "(Missing) " + simpleClassName;
             procNode = new StandardProcessorNode(processor, id, 
validationContextFactory, processScheduler, controllerServiceProvider,
-                componentType, type, nifiProperties, componentVarRegistry, 
this, true);
+                componentType, type, nifiProperties, componentVarRegistry, 
this, validationTrigger, true);
         }
 
         if (registerLogObserver) {
@@ -1209,6 +1232,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             }
         }
 
+        validationTrigger.triggerAsync(procNode);
         return procNode;
     }
 
@@ -1288,6 +1312,8 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
         // need to refresh the properties in case we are changing from ghost 
component to real component
         existingNode.refreshProperties();
+
+        validationTrigger.triggerAsync(existingNode);
     }
 
     /**
@@ -1303,7 +1329,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         try {
             return instanceId;
         } finally {
-            readLock.unlock();
+            readLock.unlock("getInstanceId");
         }
     }
 
@@ -1451,7 +1477,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         try {
             return null == this.timerDrivenEngineRef.get() || 
this.timerDrivenEngineRef.get().isTerminated();
         } finally {
-            this.readLock.unlock();
+            this.readLock.unlock("isTerminated");
         }
     }
 
@@ -1495,6 +1521,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 LOG.info("Initiated graceful shutdown of flow 
controller...waiting up to " + gracefulShutdownSeconds + " seconds");
             }
 
+            validationThreadPool.shutdown();
             clusterTaskExecutor.shutdownNow();
 
             if (zooKeeperStateServer != null) {
@@ -1565,7 +1592,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 }
             }
         } finally {
-            readLock.unlock();
+            readLock.unlock("shutdown");
         }
     }
 
@@ -1577,7 +1604,9 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
      * @throws FlowSerializationException if serialization of the flow fails 
for
      * any reason
      */
-    public void serialize(final FlowSerializer serializer, final OutputStream 
os) throws FlowSerializationException {
+    public synchronized <T> void serialize(final FlowSerializer<T> serializer, 
final OutputStream os) throws FlowSerializationException {
+        T flowConfiguration;
+
         readLock.lock();
         try {
             final ScheduledStateLookup scheduledStateLookup = new 
ScheduledStateLookup() {
@@ -1603,10 +1632,12 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 }
             };
 
-            serializer.serialize(this, os, scheduledStateLookup);
+            flowConfiguration = serializer.transform(this, 
scheduledStateLookup);
         } finally {
-            readLock.unlock();
+            readLock.unlock("serialize");
         }
+
+        serializer.serialize(flowConfiguration, os);
     }
 
     /**
@@ -1639,7 +1670,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             flowSynchronized.set(true);
             LOG.info("Successfully synchronized controller with proposed 
flow");
         } finally {
-            writeLock.unlock();
+            writeLock.unlock("synchronize");
         }
     }
 
@@ -1668,7 +1699,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         try {
             setMaxThreadCount(maxThreadCount, this.timerDrivenEngineRef.get(), 
this.maxTimerDrivenThreads);
         } finally {
-            writeLock.unlock();
+            writeLock.unlock("setMaxTimerDrivenThreadCount");
         }
     }
 
@@ -1678,7 +1709,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             setMaxThreadCount(maxThreadCount, this.eventDrivenEngineRef.get(), 
this.maxEventDrivenThreads);
             
processScheduler.setMaxThreadCount(SchedulingStrategy.EVENT_DRIVEN, 
maxThreadCount);
         } finally {
-            writeLock.unlock();
+            writeLock.unlock("setMaxEventDrivenThreadCount");
         }
     }
 
@@ -1732,7 +1763,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             allProcessGroups.put(group.getIdentifier(), group);
             allProcessGroups.put(ROOT_GROUP_ID_ALIAS, group);
         } finally {
-            writeLock.unlock();
+            writeLock.unlock("setRootGroup");
         }
     }
 
@@ -1819,16 +1850,15 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     }
 
     private void instantiateSnippet(final ProcessGroup group, final 
FlowSnippetDTO dto, final boolean topLevel) throws 
ProcessorInstantiationException {
+        validateSnippetContents(requireNonNull(group), dto);
         writeLock.lock();
         try {
-            validateSnippetContents(requireNonNull(group), dto);
-
             //
             // Instantiate Controller Services
             //
             for (final ControllerServiceDTO controllerServiceDTO : 
dto.getControllerServices()) {
                 final BundleCoordinate bundleCoordinate = 
BundleUtils.getBundle(controllerServiceDTO.getType(), 
controllerServiceDTO.getBundle());
-                final ControllerServiceNode serviceNode = 
createControllerService(controllerServiceDTO.getType(), 
controllerServiceDTO.getId(), bundleCoordinate, Collections.emptySet(),true);
+                final ControllerServiceNode serviceNode = 
createControllerService(controllerServiceDTO.getType(), 
controllerServiceDTO.getId(), bundleCoordinate, Collections.emptySet(), true);
 
                 
serviceNode.setAnnotationData(controllerServiceDTO.getAnnotationData());
                 serviceNode.setComments(controllerServiceDTO.getComments());
@@ -2155,7 +2185,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 group.addConnection(connection);
             }
         } finally {
-            writeLock.unlock();
+            writeLock.unlock("instantiateSnippet");
         }
     }
 
@@ -2732,6 +2762,18 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     }
 
     /**
+     * Returns the status for components in the specified group. This request 
is
+     * made by the specified user so the results will be filtered accordingly.
+     *
+     * @param groupId group id
+     * @param user user making request
+     * @return the component status
+     */
+    public ProcessGroupStatus getGroupStatus(final String groupId, final 
NiFiUser user, final int recursiveStatusDepth) {
+        return getGroupStatus(groupId, getProcessorStats(), user, 
recursiveStatusDepth);
+    }
+
+    /**
      * Returns the status for the components in the specified group with the
      * specified report. This request is not in the context of a user so the
      * results will be unfiltered.
@@ -2744,7 +2786,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         final ProcessGroup group = getGroup(groupId);
 
         // this was invoked with no user context so the results will be 
unfiltered... necessary for aggregating status history
-        return getGroupStatus(group, statusReport, authorizable -> true);
+        return getGroupStatus(group, statusReport, authorizable -> true, 
Integer.MAX_VALUE, 1);
     }
 
     /**
@@ -2761,7 +2803,26 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         final ProcessGroup group = getGroup(groupId);
 
         // on demand status request for a specific user... require 
authorization per component and filter results as appropriate
-        return getGroupStatus(group, statusReport, authorizable -> 
authorizable.isAuthorized(authorizer, RequestAction.READ, user));
+        return getGroupStatus(group, statusReport, authorizable -> 
authorizable.isAuthorized(authorizer, RequestAction.READ, user), 
Integer.MAX_VALUE, 1);
+    }
+
+
+    /**
+     * Returns the status for the components in the specified group with the
+     * specified report. This request is made by the specified user so the
+     * results will be filtered accordingly.
+     *
+     * @param groupId group id
+     * @param statusReport report
+     * @param user user making request
+     * @param recursiveStatusDepth the number of levels deep we should recurse 
and still include the the processors' statuses, the groups' statuses, etc. in 
the returned ProcessGroupStatus
+     * @return the component status
+     */
+    public ProcessGroupStatus getGroupStatus(final String groupId, final 
RepositoryStatusReport statusReport, final NiFiUser user, final int 
recursiveStatusDepth) {
+        final ProcessGroup group = getGroup(groupId);
+
+        // on demand status request for a specific user... require 
authorization per component and filter results as appropriate
+        return getGroupStatus(group, statusReport, authorizable -> 
authorizable.isAuthorized(authorizer, RequestAction.READ, user), 
recursiveStatusDepth, 1);
     }
 
     /**
@@ -2772,9 +2833,12 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
      * @param group group id
      * @param statusReport report
      * @param isAuthorized is authorized check
+     * @param recursiveStatusDepth the number of levels deep we should recurse 
and still include the the processors' statuses, the groups' statuses, etc. in 
the returned ProcessGroupStatus
+     * @param currentDepth the current number of levels deep that we have 
recursed
      * @return the component status
      */
-    public ProcessGroupStatus getGroupStatus(final ProcessGroup group, final 
RepositoryStatusReport statusReport, final Predicate<Authorizable> 
isAuthorized) {
+    private ProcessGroupStatus getGroupStatus(final ProcessGroup group, final 
RepositoryStatusReport statusReport, final Predicate<Authorizable> isAuthorized,
+            final int recursiveStatusDepth, final int currentDepth) {
         if (group == null) {
             return null;
         }
@@ -2799,12 +2863,16 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         int flowFilesTransferred = 0;
         long bytesTransferred = 0;
 
+        final boolean populateChildStatuses = currentDepth <= 
recursiveStatusDepth;
+
         // set status for processors
         final Collection<ProcessorStatus> processorStatusCollection = new 
ArrayList<>();
         status.setProcessorStatus(processorStatusCollection);
         for (final ProcessorNode procNode : group.getProcessors()) {
             final ProcessorStatus procStat = getProcessorStatus(statusReport, 
procNode, isAuthorized);
-            processorStatusCollection.add(procStat);
+            if (populateChildStatuses) {
+                processorStatusCollection.add(procStat);
+            }
             activeGroupThreads += procStat.getActiveThreadCount();
             terminatedGroupThreads += procStat.getTerminatedThreadCount();
             bytesRead += procStat.getBytesRead();
@@ -2820,8 +2888,18 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         final Collection<ProcessGroupStatus> localChildGroupStatusCollection = 
new ArrayList<>();
         status.setProcessGroupStatus(localChildGroupStatusCollection);
         for (final ProcessGroup childGroup : group.getProcessGroups()) {
-            final ProcessGroupStatus childGroupStatus = 
getGroupStatus(childGroup, statusReport, isAuthorized);
-            localChildGroupStatusCollection.add(childGroupStatus);
+            final ProcessGroupStatus childGroupStatus;
+            if (populateChildStatuses) {
+                childGroupStatus = getGroupStatus(childGroup, statusReport, 
isAuthorized, recursiveStatusDepth, currentDepth + 1);
+                localChildGroupStatusCollection.add(childGroupStatus);
+            } else {
+                // In this case, we don't want to include any of the recursive 
components' individual statuses. As a result, we can
+                // avoid performing any sort of authorizations. Because we 
only care about the numbers that come back, we can just indicate
+                // that the user is not authorized. This allows us to avoid 
the expense of both performing the authorization and calculating
+                // things that we would otherwise need to calculate if the 
user were in fact authorized.
+                childGroupStatus = getGroupStatus(childGroup, statusReport, 
authorizable -> false, recursiveStatusDepth, currentDepth + 1);
+            }
+
             activeGroupThreads += childGroupStatus.getActiveThreadCount();
             terminatedGroupThreads += 
childGroupStatus.getTerminatedThreadCount();
             bytesRead += childGroupStatus.getBytesRead();
@@ -2844,7 +2922,9 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         for (final RemoteProcessGroup remoteGroup : 
group.getRemoteProcessGroups()) {
             final RemoteProcessGroupStatus remoteStatus = 
createRemoteGroupStatus(remoteGroup, statusReport, isAuthorized);
             if (remoteStatus != null) {
-                remoteProcessGroupStatusCollection.add(remoteStatus);
+                if (populateChildStatuses) {
+                    remoteProcessGroupStatusCollection.add(remoteStatus);
+                }
 
                 flowFilesReceived += remoteStatus.getReceivedCount();
                 bytesReceived += remoteStatus.getReceivedContentSize();
@@ -2905,7 +2985,11 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 connStatus.setQueuedBytes(connectionQueuedBytes);
                 connStatus.setQueuedCount(connectionQueuedCount);
             }
-            connectionStatusCollection.add(connStatus);
+
+            if (populateChildStatuses) {
+                connectionStatusCollection.add(connStatus);
+            }
+
             queuedCount += connectionQueuedCount;
             queuedContentSize += connectionQueuedBytes;
 
@@ -2978,7 +3062,10 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 bytesReceived += entry.getBytesReceived();
             }
 
-            inputPortStatusCollection.add(portStatus);
+            if (populateChildStatuses) {
+                inputPortStatusCollection.add(portStatus);
+            }
+
             activeGroupThreads += portStatus.getActiveThreadCount();
         }
 
@@ -3039,7 +3126,10 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 bytesSent += entry.getBytesSent();
             }
 
-            outputPortStatusCollection.add(portStatus);
+            if (populateChildStatuses) {
+                outputPortStatusCollection.add(portStatus);
+            }
+
             activeGroupThreads += portStatus.getActiveThreadCount();
         }
 
@@ -3216,7 +3306,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             status.setRunStatus(RunStatus.Disabled);
         } else if 
(ScheduledState.RUNNING.equals(procNode.getScheduledState())) {
             status.setRunStatus(RunStatus.Running);
-        } else if (!procNode.isValid()) {
+        } else if (procNode.getValidationStatus() == ValidationStatus.INVALID) 
{
             status.setRunStatus(RunStatus.Invalid);
         } else {
             status.setRunStatus(RunStatus.Stopped);
@@ -3248,7 +3338,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 startConnectablesAfterInitialization.add(node);
             }
         } finally {
-            writeLock.unlock();
+            writeLock.unlock("startProcessor");
         }
     }
 
@@ -3285,7 +3375,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 startConnectablesAfterInitialization.add(connectable);
             }
         } finally {
-            writeLock.unlock();
+            writeLock.unlock("startConnectable");
         }
     }
 
@@ -3312,7 +3402,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                     throw new IllegalArgumentException();
             }
         } finally {
-            writeLock.unlock();
+            writeLock.unlock("stopConnectable");
         }
     }
 
@@ -3325,7 +3415,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 startRemoteGroupPortsAfterInitialization.add(remoteGroupPort);
             }
         } finally {
-            writeLock.unlock();
+            writeLock.unlock("startTransmitting");
         }
     }
 
@@ -3393,12 +3483,12 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         final ValidationContextFactory validationContextFactory = new 
StandardValidationContextFactory(controllerServiceProvider, 
componentVarRegistry);
         final ReportingTaskNode taskNode;
         if (creationSuccessful) {
-            taskNode = new StandardReportingTaskNode(task, id, this, 
processScheduler, validationContextFactory, componentVarRegistry, this);
+            taskNode = new StandardReportingTaskNode(task, id, this, 
processScheduler, validationContextFactory, componentVarRegistry, this, 
validationTrigger);
         } else {
             final String simpleClassName = type.contains(".") ? 
StringUtils.substringAfterLast(type, ".") : type;
             final String componentType = "(Missing) " + simpleClassName;
 
-            taskNode = new StandardReportingTaskNode(task, id, this, 
processScheduler, validationContextFactory, componentType, type, 
componentVarRegistry, this, true);
+            taskNode = new StandardReportingTaskNode(task, id, this, 
processScheduler, validationContextFactory, componentType, type, 
componentVarRegistry, this, validationTrigger, true);
         }
 
         
taskNode.setName(taskNode.getReportingTask().getClass().getSimpleName());
@@ -3429,6 +3519,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                     new ReportingTaskLogObserver(getBulletinRepository(), 
taskNode));
         }
 
+        validationTrigger.triggerAsync(taskNode);
         return taskNode;
     }
 
@@ -3503,6 +3594,8 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
         // need to refresh the properties in case we are changing from ghost 
component to real component
         existingNode.refreshProperties();
+
+        validationTrigger.triggerAsync(existingNode);
     }
 
     @Override
@@ -3515,6 +3608,8 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         if (isTerminated()) {
             throw new IllegalStateException("Cannot start reporting task " + 
reportingTaskNode.getIdentifier() + " because the controller is terminated");
         }
+
+        reportingTaskNode.performValidation(); // ensure that the reporting 
task has completed its validation before attempting to start it
         reportingTaskNode.verifyCanStart();
         reportingTaskNode.reloadAdditionalResourcesIfNecessary();
         processScheduler.schedule(reportingTaskNode);
@@ -3558,6 +3653,8 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
         reportingTasks.remove(reportingTaskNode.getIdentifier());
         
LogRepositoryFactory.removeRepository(reportingTaskNode.getIdentifier());
+        processScheduler.onReportingTaskRemoved(reportingTaskNode);
+
         
ExtensionManager.removeInstanceClassLoader(reportingTaskNode.getIdentifier());
     }
 
@@ -3589,6 +3686,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             }
         }
 
+        validationTrigger.triggerAsync(serviceNode);
         return serviceNode;
     }
 
@@ -3641,6 +3739,8 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
         // need to refresh the properties in case we are changing from ghost 
component to real component
         existingNode.refreshProperties();
+
+        validationTrigger.triggerAsync(existingNode);
     }
 
     @Override
@@ -3657,22 +3757,22 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     }
 
     @Override
-    public Set<ConfiguredComponent> disableReferencingServices(final 
ControllerServiceNode serviceNode) {
+    public Set<ComponentNode> disableReferencingServices(final 
ControllerServiceNode serviceNode) {
         return 
controllerServiceProvider.disableReferencingServices(serviceNode);
     }
 
     @Override
-    public Set<ConfiguredComponent> enableReferencingServices(final 
ControllerServiceNode serviceNode) {
+    public Set<ComponentNode> enableReferencingServices(final 
ControllerServiceNode serviceNode) {
         return 
controllerServiceProvider.enableReferencingServices(serviceNode);
     }
 
     @Override
-    public Set<ConfiguredComponent> scheduleReferencingComponents(final 
ControllerServiceNode serviceNode) {
+    public Set<ComponentNode> scheduleReferencingComponents(final 
ControllerServiceNode serviceNode) {
         return 
controllerServiceProvider.scheduleReferencingComponents(serviceNode);
     }
 
     @Override
-    public Set<ConfiguredComponent> unscheduleReferencingComponents(final 
ControllerServiceNode serviceNode) {
+    public Set<ComponentNode> unscheduleReferencingComponents(final 
ControllerServiceNode serviceNode) {
         return 
controllerServiceProvider.unscheduleReferencingComponents(serviceNode);
     }
 
@@ -3966,7 +4066,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             this.heartbeatSendTask.set(sendTask);
             heartbeatSenderFuture = 
clusterTaskExecutor.scheduleWithFixedDelay(sendTask, 0, heartbeatDelaySeconds, 
TimeUnit.SECONDS);
         } finally {
-            writeLock.unlock();
+            writeLock.unlock("startHeartbeating");
         }
     }
 
@@ -4014,7 +4114,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 heartbeatSenderFuture.cancel(false);
             }
         } finally {
-            writeLock.unlock();
+            writeLock.unlock("stopHeartbeating");
         }
 
     }
@@ -4027,7 +4127,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         try {
             return heartbeatSenderFuture != null && 
!heartbeatSenderFuture.isCancelled();
         } finally {
-            readLock.unlock();
+            readLock.unlock("isHeartbeating");
         }
     }
 
@@ -4039,7 +4139,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         try {
             return heartbeatDelaySeconds;
         } finally {
-            readLock.unlock();
+            readLock.unlock("getHeartbeatDelaySeconds");
         }
     }
 
@@ -4072,7 +4172,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         try {
             return clustered;
         } finally {
-            readLock.unlock();
+            readLock.unlock("isClustered");
         }
     }
 
@@ -4087,6 +4187,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             @Override
             public synchronized void onLeaderRelinquish() {
                 LOG.info("This node is no longer the elected Active Cluster 
Coordinator");
+                
bulletinRepository.addBulletin(BulletinFactory.createBulletin("Cluster 
Coordinator", Severity.INFO.name(), participantId + " is no longer the Cluster 
Coordinator"));
 
                 // We do not want to stop the heartbeat monitor. This is 
because even though ZooKeeper offers guarantees
                 // that watchers will see changes on a ZNode in the order they 
happened, there does not seem to be any
@@ -4101,6 +4202,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             @Override
             public synchronized void onLeaderElection() {
                 LOG.info("This node elected Active Cluster Coordinator");
+                
bulletinRepository.addBulletin(BulletinFactory.createBulletin("Cluster 
Coordinator", Severity.INFO.name(), participantId + " has been elected the 
Cluster Coordinator"));
 
                 // Purge any heartbeats that we already have. If we don't do 
this, we can have a scenario where we receive heartbeats
                 // from a node, and then another node becomes Cluster 
Coordinator. As a result, we stop receiving heartbeats. Now that
@@ -4192,7 +4294,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             // update the heartbeat bean
             this.heartbeatBeanRef.set(new HeartbeatBean(getRootGroup(), 
isPrimary()));
         } finally {
-            writeLock.unlock();
+            writeLock.unlock("setClustered");
         }
     }
 
@@ -4713,7 +4815,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 try {
                     bean = new HeartbeatBean(getGroup(getRootGroupId()), 
isPrimary());
                 } finally {
-                    readLock.unlock();
+                    readLock.unlock("createHeartbeatMessage");
                 }
             }
 

Reply via email to