This is an automated email from the ASF dual-hosted git repository.

mcgilman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 706cf7d  NIFI-5944: When components are started on NiFi startup, if 
they are invalid, don't fail immediately and give up. Instead, keep attempting 
to start the component when it becomes valid.
706cf7d is described below

commit 706cf7dcff1eddaf86e6dd7496734ec637d31810
Author: Mark Payne <[email protected]>
AuthorDate: Thu Jan 10 09:53:05 2019 -0500

    NIFI-5944: When components are started on NiFi startup, if they are 
invalid, don't fail immediately and give up. Instead, keep attempting to start 
the component when it becomes valid.
    
    This closes #3259
---
 .../nifi/controller/AbstractComponentNode.java     | 12 ++--
 .../org/apache/nifi/controller/ComponentNode.java  |  2 +-
 .../org/apache/nifi/controller/ProcessorNode.java  | 17 ++++-
 .../org/apache/nifi/controller/FlowController.java |  4 +-
 .../nifi/controller/StandardProcessorNode.java     | 33 ++++++---
 .../reporting/AbstractReportingTaskNode.java       | 15 +++--
 .../scheduling/StandardProcessScheduler.java       | 22 +++---
 .../serialization/ScheduledStateLookup.java        |  2 +-
 .../service/StandardControllerServiceNode.java     | 78 +++++++++++-----------
 .../scheduling/TestStandardProcessScheduler.java   | 43 ++++++++----
 .../TestStandardControllerServiceProvider.java     | 21 ++++--
 11 files changed, 158 insertions(+), 91 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
index f3ae41f..e17682e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
@@ -373,9 +373,8 @@ public abstract class AbstractComponentNode implements 
ComponentNode {
     }
 
     @Override
-    public final void performValidation() {
-        boolean replaced = false;
-        do {
+    public final ValidationStatus performValidation() {
+        while (true) {
             final ValidationState validationState = getValidationState();
 
             final ValidationContext validationContext = getValidationContext();
@@ -391,8 +390,11 @@ public abstract class AbstractComponentNode implements 
ComponentNode {
 
             final ValidationStatus status = results.isEmpty() ? 
ValidationStatus.VALID : ValidationStatus.INVALID;
             final ValidationState updatedState = new ValidationState(status, 
results);
-            replaced = replaceValidationState(validationState, updatedState);
-        } while (!replaced);
+            final boolean replaced = replaceValidationState(validationState, 
updatedState);
+            if (replaced) {
+                return status;
+            }
+        }
     }
 
     protected Collection<ValidationResult> computeValidationErrors(final 
ValidationContext validationContext) {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
index d0ed572..2357d41 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
@@ -179,7 +179,7 @@ public interface ComponentNode extends 
ComponentAuthorizable {
     /**
      * Asynchronously begins the validation process
      */
-    public abstract void performValidation();
+    public abstract ValidationStatus performValidation();
 
     /**
      * Returns a {@link List} of all {@link PropertyDescriptor}s that this
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index 6e8206e..12eeb88 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.controller;
 
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.controller.scheduling.LifecycleState;
@@ -144,7 +145,13 @@ public abstract class ProcessorNode extends 
AbstractComponentNode implements Con
     public ScheduledState getScheduledState() {
         ScheduledState sc = this.scheduledState.get();
         if (sc == ScheduledState.STARTING) {
-            return ScheduledState.RUNNING;
+            final ValidationStatus validationStatus = getValidationStatus();
+
+            if (validationStatus == ValidationStatus.INVALID) {
+                return ScheduledState.STOPPED;
+            } else {
+                return ScheduledState.RUNNING;
+            }
         } else if (sc == ScheduledState.STOPPING) {
             return ScheduledState.STOPPED;
         }
@@ -240,4 +247,12 @@ public abstract class ProcessorNode extends 
AbstractComponentNode implements Con
      * will result in the WARN message if processor can not be enabled.
      */
     public abstract void disable();
+
+    /**
+     * Returns the Scheduled State that is desired for this Processor. This 
may vary from the current state if the Processor is not
+     * currently valid, is in the process of stopping but should then 
transition to Running, etc.
+     *
+     * @return the desired state for this Processor
+     */
+    public abstract ScheduledState getDesiredState();
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index dad01b8..8ab7e69 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -912,7 +912,6 @@ public class FlowController implements 
ReportingTaskProvider, Authorizable, Node
 
                     try {
                         if (connectable instanceof ProcessorNode) {
-                            ((ProcessorNode) 
connectable).getValidationStatus(5, TimeUnit.SECONDS);
                             
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true);
                         } else {
                             startConnectable(connectable);
@@ -1242,7 +1241,7 @@ public class FlowController implements 
ReportingTaskProvider, Authorizable, Node
                         return ScheduledState.RUNNING;
                     }
 
-                    return procNode.getScheduledState();
+                    return procNode.getDesiredState();
                 }
 
                 @Override
@@ -1699,7 +1698,6 @@ public class FlowController implements 
ReportingTaskProvider, Authorizable, Node
             throw new IllegalStateException("Cannot start reporting task " + 
reportingTaskNode.getIdentifier() + " because the controller is terminated");
         }
 
-        reportingTaskNode.performValidation(); // ensure that the reporting 
task has completed its validation before attempting to start it
         reportingTaskNode.verifyCanStart();
         reportingTaskNode.reloadAdditionalResourcesIfNecessary();
         processScheduler.schedule(reportingTaskNode);
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 4adfad4..8ca5173 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -38,6 +38,7 @@ import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.validation.ValidationState;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
@@ -139,7 +140,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     private final ProcessScheduler processScheduler;
     private long runNanos = 0L;
     private volatile long yieldNanos;
-    private volatile ScheduledState desiredState;
+    private volatile ScheduledState desiredState = ScheduledState.STOPPED;
     private volatile LogLevel bulletinLevel = LogLevel.WARN;
 
     private SchedulingStrategy schedulingStrategy; // guarded by read/write 
lock
@@ -1343,13 +1344,6 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     public void start(final ScheduledExecutorService taskScheduler, final long 
administrativeYieldMillis, final long timeoutMillis, final ProcessContext 
processContext,
             final SchedulingAgentCallback schedulingAgentCallback, final 
boolean failIfStopping) {
 
-        switch (getValidationStatus()) {
-            case INVALID:
-                throw new IllegalStateException("Processor " + this.getName() 
+ " is not in a valid state due to " + this.getValidationErrors());
-            case VALIDATING:
-                throw new IllegalStateException("Processor " + this.getName() 
+ " cannot be started because its validation is still being performed");
-        }
-
         final Processor processor = processorRef.get().getProcessor();
         final ComponentLog procLog = new 
SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
 
@@ -1487,6 +1481,25 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
 
         // Create a task to invoke the @OnScheduled annotation of the processor
         final Callable<Void> startupTask = () -> {
+            final ScheduledState currentScheduleState = scheduledState.get();
+            if (currentScheduleState == ScheduledState.STOPPING || 
currentScheduleState == ScheduledState.STOPPED) {
+                LOG.debug("{} is stopped. Will not call @OnScheduled lifecycle 
methods or begin trigger onTrigger() method", StandardProcessorNode.this);
+                schedulingAgentCallback.onTaskComplete();
+                return null;
+            }
+
+            final ValidationStatus validationStatus = getValidationStatus();
+            if (validationStatus != ValidationStatus.VALID) {
+                LOG.debug("Cannot start {} because Processor is currently not 
valid; will try again after 5 seconds", StandardProcessorNode.this);
+
+                // re-initiate the entire process
+                final Runnable initiateStartTask = () -> 
initiateStart(taskScheduler, administrativeYieldMillis, timeoutMilis, 
processContext, schedulingAgentCallback);
+                taskScheduler.schedule(initiateStartTask, 5, TimeUnit.SECONDS);
+
+                schedulingAgentCallback.onTaskComplete();
+                return null;
+            }
+
             LOG.debug("Invoking @OnScheduled methods of {}", processor);
 
             // Now that the task has been scheduled, set the timeout
@@ -1696,6 +1709,10 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
         return future;
     }
 
+    @Override
+    public ScheduledState getDesiredState() {
+        return desiredState;
+    }
 
     private void monitorAsyncTask(final Future<?> taskFuture, final Future<?> 
monitoringFuture, final long completionTimestamp) {
         if (taskFuture.isDone()) {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index bce85f8..f1b585e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -16,16 +16,11 @@
  */
 package org.apache.nifi.controller.reporting;
 
-import java.net.URL;
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.nifi.annotation.configuration.DefaultSchedule;
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.controller.AbstractComponentNode;
 import org.apache.nifi.controller.ConfigurationContext;
@@ -51,6 +46,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.core.annotation.AnnotationUtils;
 
+import java.net.URL;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
 public abstract class AbstractReportingTaskNode extends AbstractComponentNode 
implements ReportingTaskNode {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractReportingTaskNode.class);
@@ -176,7 +177,7 @@ public abstract class AbstractReportingTaskNode extends 
AbstractComponentNode im
 
     @Override
     public boolean isValidationNecessary() {
-        return !processScheduler.isScheduled(this);
+        return !processScheduler.isScheduled(this) || getValidationStatus() != 
ValidationStatus.VALID;
     }
 
     @Override
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 2ff3307..a7d5fd8 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -21,6 +21,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Funnel;
 import org.apache.nifi.connectable.Port;
@@ -186,13 +187,6 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
             throw new IllegalStateException("Reporting Task " + 
taskNode.getName() + " cannot be started because it has " + activeThreadCount + 
" threads still running");
         }
 
-        switch (taskNode.getValidationStatus()) {
-            case INVALID:
-                throw new IllegalStateException("Reporting Task " + 
taskNode.getName() + " is not in a valid state for the following reasons: " + 
taskNode.getValidationErrors());
-            case VALIDATING:
-                throw new IllegalStateException("Reporting Task " + 
taskNode.getName() + " cannot be scheduled because it is in the process of 
validating its configuration");
-        }
-
         final SchedulingAgent agent = 
getSchedulingAgent(taskNode.getSchedulingStrategy());
         lifecycleState.setScheduled(true);
 
@@ -216,6 +210,13 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
                             return;
                         }
 
+                        final ValidationStatus validationStatus = 
taskNode.getValidationStatus();
+                        if (validationStatus != ValidationStatus.VALID) {
+                            LOG.debug("Cannot schedule {} to run because it is 
currently invalid. Will try again in 5 seconds", taskNode);
+                            componentLifeCycleThreadPool.schedule(this, 5, 
TimeUnit.SECONDS);
+                            return;
+                        }
+
                         try (final NarCloseable x = 
NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), 
reportingTask.getClass(), reportingTask.getIdentifier())) {
                             
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, reportingTask, 
taskNode.getConfigurationContext());
                         }
@@ -231,8 +232,11 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
                             + "ReportingTask and will attempt to schedule it 
again after {}",
                             new Object[]{reportingTask, e.toString(), 
administrativeYieldDuration}, e);
 
-                    
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
reportingTask, taskNode.getConfigurationContext());
-                    
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, 
reportingTask, taskNode.getConfigurationContext());
+
+                    try (final NarCloseable x = 
NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), 
reportingTask.getClass(), reportingTask.getIdentifier())) {
+                        
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
reportingTask, taskNode.getConfigurationContext());
+                        
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, 
reportingTask, taskNode.getConfigurationContext());
+                    }
 
                     componentLifeCycleThreadPool.schedule(this, 
administrativeYieldMillis, TimeUnit.MILLISECONDS);
                 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
index 39693b8..4a6b420 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
@@ -30,7 +30,7 @@ public interface ScheduledStateLookup {
     public static final ScheduledStateLookup IDENTITY_LOOKUP = new 
ScheduledStateLookup() {
         @Override
         public ScheduledState getScheduledState(final ProcessorNode procNode) {
-            return procNode.getScheduledState();
+            return procNode.getDesiredState();
         }
 
         @Override
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 795fc8c..3d6329f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -16,25 +16,6 @@
  */
 package org.apache.nifi.controller.service;
 
-import java.lang.reflect.InvocationTargetException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.Restricted;
 import org.apache.nifi.annotation.documentation.DeprecationNotice;
@@ -47,8 +28,7 @@ import org.apache.nifi.authorization.resource.ResourceType;
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.validation.ValidationState;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.controller.AbstractComponentNode;
 import org.apache.nifi.controller.ComponentNode;
@@ -71,6 +51,24 @@ import 
org.apache.nifi.util.file.classloader.ClassLoaderUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 public class StandardControllerServiceNode extends AbstractComponentNode 
implements ControllerServiceNode {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(StandardControllerServiceNode.class);
@@ -319,14 +317,6 @@ public class StandardControllerServiceNode extends 
AbstractComponentNode impleme
         if (getState() != ControllerServiceState.DISABLED) {
             throw new 
IllegalStateException(getControllerServiceImplementation().getIdentifier() + " 
cannot be enabled because it is not disabled");
         }
-
-        final ValidationState validationState = getValidationState();
-        switch (validationState.getStatus()) {
-            case INVALID:
-                throw new 
IllegalStateException(getControllerServiceImplementation().getIdentifier() + " 
cannot be enabled because it is not valid: " + 
validationState.getValidationErrors());
-            case VALIDATING:
-                throw new 
IllegalStateException(getControllerServiceImplementation().getIdentifier() + " 
cannot be enabled because its validation has not yet completed");
-        }
     }
 
     @Override
@@ -334,11 +324,6 @@ public class StandardControllerServiceNode extends 
AbstractComponentNode impleme
         if (getState() != ControllerServiceState.DISABLED) {
             throw new 
IllegalStateException(getControllerServiceImplementation().getIdentifier() + " 
cannot be enabled because it is not disabled");
         }
-
-        final Collection<ValidationResult> validationErrors = 
getValidationErrors(ignoredReferences);
-        if (ignoredReferences != null && !validationErrors.isEmpty()) {
-            throw new IllegalStateException("Controller Service with ID " + 
getIdentifier() + " cannot be enabled because it is not currently valid: " + 
validationErrors);
-        }
     }
 
     @Override
@@ -389,8 +374,11 @@ public class StandardControllerServiceNode extends 
AbstractComponentNode impleme
             case DISABLED:
             case DISABLING:
                 return true;
-            case ENABLED:
             case ENABLING:
+                // If enabling and currently not valid, then we must trigger 
validation to occur. This allows the #enable method
+                // to continue running in the background and complete enabling 
when the service becomes valid.
+                return getValidationStatus() != ValidationStatus.VALID;
+            case ENABLED:
             default:
                 return false;
         }
@@ -398,7 +386,7 @@ public class StandardControllerServiceNode extends 
AbstractComponentNode impleme
 
     /**
      * Will atomically enable this service by invoking its @OnEnabled 
operation.
-     * It uses CAS operation on {@link #stateRef} to transition this service
+     * It uses CAS operation on {@link #stateTransition} to transition this 
service
      * from DISABLED to ENABLING state. If such transition succeeds the service
      * will be marked as 'active' (see {@link 
ControllerServiceNode#isActive()}).
      * If such transition doesn't succeed then no enabling logic will be
@@ -429,6 +417,20 @@ public class StandardControllerServiceNode extends 
AbstractComponentNode impleme
             scheduler.execute(new Runnable() {
                 @Override
                 public void run() {
+                    if (!isActive()) {
+                        LOG.debug("{} is no longer active so will not attempt 
to enable it", StandardControllerServiceNode.this);
+                        stateTransition.disable();
+                        return;
+                    }
+
+                    final ValidationStatus validationStatus = 
getValidationStatus();
+                    if (validationStatus != ValidationStatus.VALID) {
+                        LOG.debug("Cannot enable {} because it is not 
currently valid. Will try again in 5 seconds", 
StandardControllerServiceNode.this);
+                        scheduler.schedule(this, 5, TimeUnit.SECONDS);
+                        future.completeExceptionally(new RuntimeException(this 
+ " cannot be enabled because it is not currently valid. Will try again in 5 
seconds."));
+                        return;
+                    }
+
                     try {
                         try (final NarCloseable nc = 
NarCloseable.withComponentNarLoader(getExtensionManager(), 
getControllerServiceImplementation().getClass(), getIdentifier())) {
                             
ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, 
getControllerServiceImplementation(), configContext);
@@ -446,7 +448,7 @@ public class StandardControllerServiceNode extends 
AbstractComponentNode impleme
                             invokeDisable(configContext);
                             stateTransition.disable();
                         } else {
-                            LOG.debug("Successfully enabled {}", service);
+                            LOG.info("Successfully enabled {}", service);
                         }
                     } catch (Exception e) {
                         future.completeExceptionally(e);
@@ -478,7 +480,7 @@ public class StandardControllerServiceNode extends 
AbstractComponentNode impleme
 
     /**
      * Will atomically disable this service by invoking its @OnDisabled 
operation.
-     * It uses CAS operation on {@link #stateRef} to transition this service
+     * It uses CAS operation on {@link #stateTransition} to transition this 
service
      * from ENABLED to DISABLING state. If such transition succeeds the service
      * will be de-activated (see {@link ControllerServiceNode#isActive()}).
      * If such transition doesn't succeed (the service is still in ENABLING 
state)
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
index 920999e..242c0ad 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
@@ -24,6 +24,7 @@ import org.apache.nifi.bundle.Bundle;
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
@@ -92,6 +93,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -342,6 +344,8 @@ public class TestStandardProcessScheduler {
         final ControllerServiceNode serviceNode = 
flowManager.createControllerService(SimpleTestService.class.getName(),
                 "1", systemBundle.getBundleDetails().getCoordinate(), null, 
false, true);
 
+        serviceNode.performValidation();
+
         assertFalse(serviceNode.isActive());
         final SimpleTestService ts = (SimpleTestService) 
serviceNode.getControllerServiceImplementation();
         final ExecutorService executor = Executors.newCachedThreadPool();
@@ -361,10 +365,10 @@ public class TestStandardProcessScheduler {
                 }
             });
         }
-        // need to sleep a while since we are emulating async invocations on
-        // method that is also internally async
-        Thread.sleep(500);
+
         executor.shutdown();
+        executor.awaitTermination(10, TimeUnit.SECONDS);
+
         assertFalse(asyncFailed.get());
         assertEquals(1, ts.enableInvocationCount());
     }
@@ -399,10 +403,9 @@ public class TestStandardProcessScheduler {
                 }
             });
         }
-        // need to sleep a while since we are emulating async invocations on
-        // method that is also internally async
-        Thread.sleep(500);
+
         executor.shutdown();
+        executor.awaitTermination(10, TimeUnit.SECONDS);
         assertFalse(asyncFailed.get());
         assertEquals(0, ts.disableInvocationCount());
     }
@@ -419,8 +422,10 @@ public class TestStandardProcessScheduler {
         final ControllerServiceNode serviceNode = 
flowManager.createControllerService(SimpleTestService.class.getName(),
                 "1", systemBundle.getBundleDetails().getCoordinate(), null, 
false, true);
 
+        assertSame(ValidationStatus.VALID, serviceNode.performValidation());
+
         final SimpleTestService ts = (SimpleTestService) 
serviceNode.getControllerServiceImplementation();
-        scheduler.enableControllerService(serviceNode);
+        scheduler.enableControllerService(serviceNode).get();
         assertTrue(serviceNode.isActive());
         final ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -441,8 +446,8 @@ public class TestStandardProcessScheduler {
         }
         // need to sleep a while since we are emulating async invocations on
         // method that is also internally async
-        Thread.sleep(500);
         executor.shutdown();
+        executor.awaitTermination(10, TimeUnit.SECONDS); // change to seconds.
         assertFalse(asyncFailed.get());
         assertEquals(1, ts.disableInvocationCount());
     }
@@ -453,9 +458,17 @@ public class TestStandardProcessScheduler {
 
         final ControllerServiceNode serviceNode = 
flowManager.createControllerService(FailingService.class.getName(),
                 "1", systemBundle.getBundleDetails().getCoordinate(), null, 
false, true);
-        scheduler.enableControllerService(serviceNode);
-        Thread.sleep(1000);
+        serviceNode.performValidation();
+
+        final Future<?> future = 
scheduler.enableControllerService(serviceNode);
+        try {
+            future.get();
+        } catch (final Exception e) {
+            // Expected behavior because the FailingService throws Exception 
when attempting to enable
+        }
+
         scheduler.shutdown();
+
         /*
          * Because it was never disabled it will remain active since its
          * enabling is being retried. This may actually be a bug in the
@@ -528,14 +541,20 @@ public class TestStandardProcessScheduler {
 
         final ControllerServiceNode serviceNode = 
flowManager.createControllerService(LongEnablingService.class.getName(),
                 "1", systemBundle.getBundleDetails().getCoordinate(), null, 
false, true);
+
         final LongEnablingService ts = (LongEnablingService) 
serviceNode.getControllerServiceImplementation();
         ts.setLimit(Long.MAX_VALUE);
+
+        serviceNode.performValidation();
         scheduler.enableControllerService(serviceNode);
-        Thread.sleep(100);
+
         assertTrue(serviceNode.isActive());
+        final long maxTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
+        while (ts.enableInvocationCount() != 1 && System.nanoTime() <= 
maxTime) {
+            Thread.sleep(1L);
+        }
         assertEquals(1, ts.enableInvocationCount());
 
-        Thread.sleep(1000);
         scheduler.disableControllerService(serviceNode);
         assertFalse(serviceNode.isActive());
         assertEquals(ControllerServiceState.DISABLING, serviceNode.getState());
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index daee3c8..ebeb916 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -21,6 +21,7 @@ import org.apache.nifi.bundle.Bundle;
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.controller.ExtensionBuilder;
 import org.apache.nifi.controller.FlowController;
@@ -73,6 +74,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 public class TestStandardControllerServiceProvider {
@@ -197,8 +199,8 @@ public class TestStandardControllerServiceProvider {
         provider.disableControllerService(serviceNode);
     }
 
-    @Test(timeout = 1000000)
-    public void testEnableDisableWithReference() {
+    @Test(timeout = 10000)
+    public void testEnableDisableWithReference() throws InterruptedException {
         final ProcessGroup group = new MockProcessGroup(controller);
         final FlowController controller = Mockito.mock(FlowController.class);
         final FlowManager flowManager = Mockito.mock(FlowManager.class);
@@ -221,17 +223,24 @@ public class TestStandardControllerServiceProvider {
 
         try {
             provider.enableControllerService(serviceNodeA);
-            Assert.fail("Was able to enable Service A but Service B is 
disabled.");
         } catch (final IllegalStateException expected) {
         }
 
+        assertSame(ControllerServiceState.ENABLING, serviceNodeA.getState());
+
         serviceNodeB.performValidation();
-        serviceNodeB.getValidationStatus(5, TimeUnit.SECONDS);
+        assertSame(ValidationStatus.VALID, serviceNodeB.getValidationStatus(5, 
TimeUnit.SECONDS));
         provider.enableControllerService(serviceNodeB);
 
         serviceNodeA.performValidation();
-        serviceNodeA.getValidationStatus(5, TimeUnit.SECONDS);
-        provider.enableControllerService(serviceNodeA);
+        assertSame(ValidationStatus.VALID, serviceNodeA.getValidationStatus(5, 
TimeUnit.SECONDS));
+
+        final long maxTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
+        // Wait for Service A to become ENABLED. This will happen in a 
background thread after approximately 5 seconds, now that Service A is valid.
+        while (serviceNodeA.getState() != ControllerServiceState.ENABLED && 
System.nanoTime() <= maxTime) {
+            Thread.sleep(5L);
+        }
+        assertSame(ControllerServiceState.ENABLED, serviceNodeA.getState());
 
         try {
             provider.disableControllerService(serviceNodeB);

Reply via email to