This is an automated email from the ASF dual-hosted git repository.
mcgilman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 706cf7d NIFI-5944: When components are started on NiFi startup, if
they are invalid, don't fail immediately and give up. Instead, keep attempting
to start the component when it becomes valid.
706cf7d is described below
commit 706cf7dcff1eddaf86e6dd7496734ec637d31810
Author: Mark Payne <[email protected]>
AuthorDate: Thu Jan 10 09:53:05 2019 -0500
NIFI-5944: When components are started on NiFi startup, if they are
invalid, don't fail immediately and give up. Instead, keep attempting to start
the component when it becomes valid.
This closes #3259
---
.../nifi/controller/AbstractComponentNode.java | 12 ++--
.../org/apache/nifi/controller/ComponentNode.java | 2 +-
.../org/apache/nifi/controller/ProcessorNode.java | 17 ++++-
.../org/apache/nifi/controller/FlowController.java | 4 +-
.../nifi/controller/StandardProcessorNode.java | 33 ++++++---
.../reporting/AbstractReportingTaskNode.java | 15 +++--
.../scheduling/StandardProcessScheduler.java | 22 +++---
.../serialization/ScheduledStateLookup.java | 2 +-
.../service/StandardControllerServiceNode.java | 78 +++++++++++-----------
.../scheduling/TestStandardProcessScheduler.java | 43 ++++++++----
.../TestStandardControllerServiceProvider.java | 21 ++++--
11 files changed, 158 insertions(+), 91 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
index f3ae41f..e17682e 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
@@ -373,9 +373,8 @@ public abstract class AbstractComponentNode implements
ComponentNode {
}
@Override
- public final void performValidation() {
- boolean replaced = false;
- do {
+ public final ValidationStatus performValidation() {
+ while (true) {
final ValidationState validationState = getValidationState();
final ValidationContext validationContext = getValidationContext();
@@ -391,8 +390,11 @@ public abstract class AbstractComponentNode implements
ComponentNode {
final ValidationStatus status = results.isEmpty() ?
ValidationStatus.VALID : ValidationStatus.INVALID;
final ValidationState updatedState = new ValidationState(status,
results);
- replaced = replaceValidationState(validationState, updatedState);
- } while (!replaced);
+ final boolean replaced = replaceValidationState(validationState,
updatedState);
+ if (replaced) {
+ return status;
+ }
+ }
}
protected Collection<ValidationResult> computeValidationErrors(final
ValidationContext validationContext) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
index d0ed572..2357d41 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
@@ -179,7 +179,7 @@ public interface ComponentNode extends
ComponentAuthorizable {
/**
* Asynchronously begins the validation process
*/
- public abstract void performValidation();
+ public abstract ValidationStatus performValidation();
/**
* Returns a {@link List} of all {@link PropertyDescriptor}s that this
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index 6e8206e..12eeb88 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -17,6 +17,7 @@
package org.apache.nifi.controller;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.scheduling.LifecycleState;
@@ -144,7 +145,13 @@ public abstract class ProcessorNode extends
AbstractComponentNode implements Con
public ScheduledState getScheduledState() {
ScheduledState sc = this.scheduledState.get();
if (sc == ScheduledState.STARTING) {
- return ScheduledState.RUNNING;
+ final ValidationStatus validationStatus = getValidationStatus();
+
+ if (validationStatus == ValidationStatus.INVALID) {
+ return ScheduledState.STOPPED;
+ } else {
+ return ScheduledState.RUNNING;
+ }
} else if (sc == ScheduledState.STOPPING) {
return ScheduledState.STOPPED;
}
@@ -240,4 +247,12 @@ public abstract class ProcessorNode extends
AbstractComponentNode implements Con
* will result in the WARN message if processor can not be enabled.
*/
public abstract void disable();
+
+ /**
+ * Returns the Scheduled State that is desired for this Processor. This
may vary from the current state if the Processor is not
+ * currently valid, is in the process of stopping but should then
transition to Running, etc.
+ *
+ * @return the desired state for this Processor
+ */
+ public abstract ScheduledState getDesiredState();
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index dad01b8..8ab7e69 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -912,7 +912,6 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
try {
if (connectable instanceof ProcessorNode) {
- ((ProcessorNode)
connectable).getValidationStatus(5, TimeUnit.SECONDS);
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true);
} else {
startConnectable(connectable);
@@ -1242,7 +1241,7 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
return ScheduledState.RUNNING;
}
- return procNode.getScheduledState();
+ return procNode.getDesiredState();
}
@Override
@@ -1699,7 +1698,6 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
throw new IllegalStateException("Cannot start reporting task " +
reportingTaskNode.getIdentifier() + " because the controller is terminated");
}
- reportingTaskNode.performValidation(); // ensure that the reporting
task has completed its validation before attempting to start it
reportingTaskNode.verifyCanStart();
reportingTaskNode.reloadAdditionalResourcesIfNecessary();
processScheduler.schedule(reportingTaskNode);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 4adfad4..8ca5173 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -38,6 +38,7 @@ import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.validation.ValidationState;
+import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
@@ -139,7 +140,7 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
private final ProcessScheduler processScheduler;
private long runNanos = 0L;
private volatile long yieldNanos;
- private volatile ScheduledState desiredState;
+ private volatile ScheduledState desiredState = ScheduledState.STOPPED;
private volatile LogLevel bulletinLevel = LogLevel.WARN;
private SchedulingStrategy schedulingStrategy; // guarded by read/write
lock
@@ -1343,13 +1344,6 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
public void start(final ScheduledExecutorService taskScheduler, final long
administrativeYieldMillis, final long timeoutMillis, final ProcessContext
processContext,
final SchedulingAgentCallback schedulingAgentCallback, final
boolean failIfStopping) {
- switch (getValidationStatus()) {
- case INVALID:
- throw new IllegalStateException("Processor " + this.getName()
+ " is not in a valid state due to " + this.getValidationErrors());
- case VALIDATING:
- throw new IllegalStateException("Processor " + this.getName()
+ " cannot be started because its validation is still being performed");
- }
-
final Processor processor = processorRef.get().getProcessor();
final ComponentLog procLog = new
SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
@@ -1487,6 +1481,25 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
// Create a task to invoke the @OnScheduled annotation of the processor
final Callable<Void> startupTask = () -> {
+ final ScheduledState currentScheduleState = scheduledState.get();
+ if (currentScheduleState == ScheduledState.STOPPING ||
currentScheduleState == ScheduledState.STOPPED) {
+ LOG.debug("{} is stopped. Will not call @OnScheduled lifecycle
methods or begin trigger onTrigger() method", StandardProcessorNode.this);
+ schedulingAgentCallback.onTaskComplete();
+ return null;
+ }
+
+ final ValidationStatus validationStatus = getValidationStatus();
+ if (validationStatus != ValidationStatus.VALID) {
+ LOG.debug("Cannot start {} because Processor is currently not
valid; will try again after 5 seconds", StandardProcessorNode.this);
+
+ // re-initiate the entire process
+ final Runnable initiateStartTask = () ->
initiateStart(taskScheduler, administrativeYieldMillis, timeoutMilis,
processContext, schedulingAgentCallback);
+ taskScheduler.schedule(initiateStartTask, 5, TimeUnit.SECONDS);
+
+ schedulingAgentCallback.onTaskComplete();
+ return null;
+ }
+
LOG.debug("Invoking @OnScheduled methods of {}", processor);
// Now that the task has been scheduled, set the timeout
@@ -1696,6 +1709,10 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
return future;
}
+ @Override
+ public ScheduledState getDesiredState() {
+ return desiredState;
+ }
private void monitorAsyncTask(final Future<?> taskFuture, final Future<?>
monitoringFuture, final long completionTimestamp) {
if (taskFuture.isDone()) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index bce85f8..f1b585e 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -16,16 +16,11 @@
*/
package org.apache.nifi.controller.reporting;
-import java.net.URL;
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.controller.AbstractComponentNode;
import org.apache.nifi.controller.ConfigurationContext;
@@ -51,6 +46,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.AnnotationUtils;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
public abstract class AbstractReportingTaskNode extends AbstractComponentNode
implements ReportingTaskNode {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractReportingTaskNode.class);
@@ -176,7 +177,7 @@ public abstract class AbstractReportingTaskNode extends
AbstractComponentNode im
@Override
public boolean isValidationNecessary() {
- return !processScheduler.isScheduled(this);
+ return !processScheduler.isScheduled(this) || getValidationStatus() !=
ValidationStatus.VALID;
}
@Override
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 2ff3307..a7d5fd8 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -21,6 +21,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
@@ -186,13 +187,6 @@ public final class StandardProcessScheduler implements
ProcessScheduler {
throw new IllegalStateException("Reporting Task " +
taskNode.getName() + " cannot be started because it has " + activeThreadCount +
" threads still running");
}
- switch (taskNode.getValidationStatus()) {
- case INVALID:
- throw new IllegalStateException("Reporting Task " +
taskNode.getName() + " is not in a valid state for the following reasons: " +
taskNode.getValidationErrors());
- case VALIDATING:
- throw new IllegalStateException("Reporting Task " +
taskNode.getName() + " cannot be scheduled because it is in the process of
validating its configuration");
- }
-
final SchedulingAgent agent =
getSchedulingAgent(taskNode.getSchedulingStrategy());
lifecycleState.setScheduled(true);
@@ -216,6 +210,13 @@ public final class StandardProcessScheduler implements
ProcessScheduler {
return;
}
+ final ValidationStatus validationStatus =
taskNode.getValidationStatus();
+ if (validationStatus != ValidationStatus.VALID) {
+ LOG.debug("Cannot schedule {} to run because it is
currently invalid. Will try again in 5 seconds", taskNode);
+ componentLifeCycleThreadPool.schedule(this, 5,
TimeUnit.SECONDS);
+ return;
+ }
+
try (final NarCloseable x =
NarCloseable.withComponentNarLoader(flowController.getExtensionManager(),
reportingTask.getClass(), reportingTask.getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, reportingTask,
taskNode.getConfigurationContext());
}
@@ -231,8 +232,11 @@ public final class StandardProcessScheduler implements
ProcessScheduler {
+ "ReportingTask and will attempt to schedule it
again after {}",
new Object[]{reportingTask, e.toString(),
administrativeYieldDuration}, e);
-
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class,
reportingTask, taskNode.getConfigurationContext());
-
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class,
reportingTask, taskNode.getConfigurationContext());
+
+ try (final NarCloseable x =
NarCloseable.withComponentNarLoader(flowController.getExtensionManager(),
reportingTask.getClass(), reportingTask.getIdentifier())) {
+
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class,
reportingTask, taskNode.getConfigurationContext());
+
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class,
reportingTask, taskNode.getConfigurationContext());
+ }
componentLifeCycleThreadPool.schedule(this,
administrativeYieldMillis, TimeUnit.MILLISECONDS);
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
index 39693b8..4a6b420 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
@@ -30,7 +30,7 @@ public interface ScheduledStateLookup {
public static final ScheduledStateLookup IDENTITY_LOOKUP = new
ScheduledStateLookup() {
@Override
public ScheduledState getScheduledState(final ProcessorNode procNode) {
- return procNode.getScheduledState();
+ return procNode.getDesiredState();
}
@Override
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 795fc8c..3d6329f 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -16,25 +16,6 @@
*/
package org.apache.nifi.controller.service;
-import java.lang.reflect.InvocationTargetException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
@@ -47,8 +28,7 @@ import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.validation.ValidationState;
+import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.controller.AbstractComponentNode;
import org.apache.nifi.controller.ComponentNode;
@@ -71,6 +51,24 @@ import
org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
public class StandardControllerServiceNode extends AbstractComponentNode
implements ControllerServiceNode {
private static final Logger LOG =
LoggerFactory.getLogger(StandardControllerServiceNode.class);
@@ -319,14 +317,6 @@ public class StandardControllerServiceNode extends
AbstractComponentNode impleme
if (getState() != ControllerServiceState.DISABLED) {
throw new
IllegalStateException(getControllerServiceImplementation().getIdentifier() + "
cannot be enabled because it is not disabled");
}
-
- final ValidationState validationState = getValidationState();
- switch (validationState.getStatus()) {
- case INVALID:
- throw new
IllegalStateException(getControllerServiceImplementation().getIdentifier() + "
cannot be enabled because it is not valid: " +
validationState.getValidationErrors());
- case VALIDATING:
- throw new
IllegalStateException(getControllerServiceImplementation().getIdentifier() + "
cannot be enabled because its validation has not yet completed");
- }
}
@Override
@@ -334,11 +324,6 @@ public class StandardControllerServiceNode extends
AbstractComponentNode impleme
if (getState() != ControllerServiceState.DISABLED) {
throw new
IllegalStateException(getControllerServiceImplementation().getIdentifier() + "
cannot be enabled because it is not disabled");
}
-
- final Collection<ValidationResult> validationErrors =
getValidationErrors(ignoredReferences);
- if (ignoredReferences != null && !validationErrors.isEmpty()) {
- throw new IllegalStateException("Controller Service with ID " +
getIdentifier() + " cannot be enabled because it is not currently valid: " +
validationErrors);
- }
}
@Override
@@ -389,8 +374,11 @@ public class StandardControllerServiceNode extends
AbstractComponentNode impleme
case DISABLED:
case DISABLING:
return true;
- case ENABLED:
case ENABLING:
+ // If enabling and currently not valid, then we must trigger
validation to occur. This allows the #enable method
+ // to continue running in the background and complete enabling
when the service becomes valid.
+ return getValidationStatus() != ValidationStatus.VALID;
+ case ENABLED:
default:
return false;
}
@@ -398,7 +386,7 @@ public class StandardControllerServiceNode extends
AbstractComponentNode impleme
/**
* Will atomically enable this service by invoking its @OnEnabled
operation.
- * It uses CAS operation on {@link #stateRef} to transition this service
+ * It uses CAS operation on {@link #stateTransition} to transition this
service
* from DISABLED to ENABLING state. If such transition succeeds the service
* will be marked as 'active' (see {@link
ControllerServiceNode#isActive()}).
* If such transition doesn't succeed then no enabling logic will be
@@ -429,6 +417,20 @@ public class StandardControllerServiceNode extends
AbstractComponentNode impleme
scheduler.execute(new Runnable() {
@Override
public void run() {
+ if (!isActive()) {
+ LOG.debug("{} is no longer active so will not attempt
to enable it", StandardControllerServiceNode.this);
+ stateTransition.disable();
+ return;
+ }
+
+ final ValidationStatus validationStatus =
getValidationStatus();
+ if (validationStatus != ValidationStatus.VALID) {
+ LOG.debug("Cannot enable {} because it is not
currently valid. Will try again in 5 seconds",
StandardControllerServiceNode.this);
+ scheduler.schedule(this, 5, TimeUnit.SECONDS);
+ future.completeExceptionally(new RuntimeException(this
+ " cannot be enabled because it is not currently valid. Will try again in 5
seconds."));
+ return;
+ }
+
try {
try (final NarCloseable nc =
NarCloseable.withComponentNarLoader(getExtensionManager(),
getControllerServiceImplementation().getClass(), getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class,
getControllerServiceImplementation(), configContext);
@@ -446,7 +448,7 @@ public class StandardControllerServiceNode extends
AbstractComponentNode impleme
invokeDisable(configContext);
stateTransition.disable();
} else {
- LOG.debug("Successfully enabled {}", service);
+ LOG.info("Successfully enabled {}", service);
}
} catch (Exception e) {
future.completeExceptionally(e);
@@ -478,7 +480,7 @@ public class StandardControllerServiceNode extends
AbstractComponentNode impleme
/**
* Will atomically disable this service by invoking its @OnDisabled
operation.
- * It uses CAS operation on {@link #stateRef} to transition this service
+ * It uses CAS operation on {@link #stateTransition} to transition this
service
* from ENABLED to DISABLING state. If such transition succeeds the service
* will be de-activated (see {@link ControllerServiceNode#isActive()}).
* If such transition doesn't succeed (the service is still in ENABLING
state)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
index 920999e..242c0ad 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
@@ -24,6 +24,7 @@ import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
@@ -92,6 +93,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -342,6 +344,8 @@ public class TestStandardProcessScheduler {
final ControllerServiceNode serviceNode =
flowManager.createControllerService(SimpleTestService.class.getName(),
"1", systemBundle.getBundleDetails().getCoordinate(), null,
false, true);
+ serviceNode.performValidation();
+
assertFalse(serviceNode.isActive());
final SimpleTestService ts = (SimpleTestService)
serviceNode.getControllerServiceImplementation();
final ExecutorService executor = Executors.newCachedThreadPool();
@@ -361,10 +365,10 @@ public class TestStandardProcessScheduler {
}
});
}
- // need to sleep a while since we are emulating async invocations on
- // method that is also internally async
- Thread.sleep(500);
+
executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+
assertFalse(asyncFailed.get());
assertEquals(1, ts.enableInvocationCount());
}
@@ -399,10 +403,9 @@ public class TestStandardProcessScheduler {
}
});
}
- // need to sleep a while since we are emulating async invocations on
- // method that is also internally async
- Thread.sleep(500);
+
executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
assertFalse(asyncFailed.get());
assertEquals(0, ts.disableInvocationCount());
}
@@ -419,8 +422,10 @@ public class TestStandardProcessScheduler {
final ControllerServiceNode serviceNode =
flowManager.createControllerService(SimpleTestService.class.getName(),
"1", systemBundle.getBundleDetails().getCoordinate(), null,
false, true);
+ assertSame(ValidationStatus.VALID, serviceNode.performValidation());
+
final SimpleTestService ts = (SimpleTestService)
serviceNode.getControllerServiceImplementation();
- scheduler.enableControllerService(serviceNode);
+ scheduler.enableControllerService(serviceNode).get();
assertTrue(serviceNode.isActive());
final ExecutorService executor = Executors.newCachedThreadPool();
@@ -441,8 +446,8 @@ public class TestStandardProcessScheduler {
}
// need to sleep a while since we are emulating async invocations on
// method that is also internally async
- Thread.sleep(500);
executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS); // change to seconds.
assertFalse(asyncFailed.get());
assertEquals(1, ts.disableInvocationCount());
}
@@ -453,9 +458,17 @@ public class TestStandardProcessScheduler {
final ControllerServiceNode serviceNode =
flowManager.createControllerService(FailingService.class.getName(),
"1", systemBundle.getBundleDetails().getCoordinate(), null,
false, true);
- scheduler.enableControllerService(serviceNode);
- Thread.sleep(1000);
+ serviceNode.performValidation();
+
+ final Future<?> future =
scheduler.enableControllerService(serviceNode);
+ try {
+ future.get();
+ } catch (final Exception e) {
+ // Expected behavior because the FailingService throws Exception
when attempting to enable
+ }
+
scheduler.shutdown();
+
/*
* Because it was never disabled it will remain active since its
* enabling is being retried. This may actually be a bug in the
@@ -528,14 +541,20 @@ public class TestStandardProcessScheduler {
final ControllerServiceNode serviceNode =
flowManager.createControllerService(LongEnablingService.class.getName(),
"1", systemBundle.getBundleDetails().getCoordinate(), null,
false, true);
+
final LongEnablingService ts = (LongEnablingService)
serviceNode.getControllerServiceImplementation();
ts.setLimit(Long.MAX_VALUE);
+
+ serviceNode.performValidation();
scheduler.enableControllerService(serviceNode);
- Thread.sleep(100);
+
assertTrue(serviceNode.isActive());
+ final long maxTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
+ while (ts.enableInvocationCount() != 1 && System.nanoTime() <=
maxTime) {
+ Thread.sleep(1L);
+ }
assertEquals(1, ts.enableInvocationCount());
- Thread.sleep(1000);
scheduler.disableControllerService(serviceNode);
assertFalse(serviceNode.isActive());
assertEquals(ControllerServiceState.DISABLING, serviceNode.getState());
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index daee3c8..ebeb916 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -21,6 +21,7 @@ import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.controller.ExtensionBuilder;
import org.apache.nifi.controller.FlowController;
@@ -73,6 +74,7 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
public class TestStandardControllerServiceProvider {
@@ -197,8 +199,8 @@ public class TestStandardControllerServiceProvider {
provider.disableControllerService(serviceNode);
}
- @Test(timeout = 1000000)
- public void testEnableDisableWithReference() {
+ @Test(timeout = 10000)
+ public void testEnableDisableWithReference() throws InterruptedException {
final ProcessGroup group = new MockProcessGroup(controller);
final FlowController controller = Mockito.mock(FlowController.class);
final FlowManager flowManager = Mockito.mock(FlowManager.class);
@@ -221,17 +223,24 @@ public class TestStandardControllerServiceProvider {
try {
provider.enableControllerService(serviceNodeA);
- Assert.fail("Was able to enable Service A but Service B is
disabled.");
} catch (final IllegalStateException expected) {
}
+ assertSame(ControllerServiceState.ENABLING, serviceNodeA.getState());
+
serviceNodeB.performValidation();
- serviceNodeB.getValidationStatus(5, TimeUnit.SECONDS);
+ assertSame(ValidationStatus.VALID, serviceNodeB.getValidationStatus(5,
TimeUnit.SECONDS));
provider.enableControllerService(serviceNodeB);
serviceNodeA.performValidation();
- serviceNodeA.getValidationStatus(5, TimeUnit.SECONDS);
- provider.enableControllerService(serviceNodeA);
+ assertSame(ValidationStatus.VALID, serviceNodeA.getValidationStatus(5,
TimeUnit.SECONDS));
+
+ final long maxTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
+ // Wait for Service A to become ENABLED. This will happen in a
background thread after approximately 5 seconds, now that Service A is valid.
+ while (serviceNodeA.getState() != ControllerServiceState.ENABLED &&
System.nanoTime() <= maxTime) {
+ Thread.sleep(5L);
+ }
+ assertSame(ControllerServiceState.ENABLED, serviceNodeA.getState());
try {
provider.disableControllerService(serviceNodeB);