Repository: nifi Updated Branches: refs/heads/master f5108ea83 -> 2afbf9638
NIFI-5204: Ensure that verifyCanStop throws ISE if component is disabled NIFI-5204: If processor joins cluster and inherits 'disabled' state but is still stopping, ensure that the state becomes disabled when the processor finishes stopping This closes #2713 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2afbf963 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2afbf963 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2afbf963 Branch: refs/heads/master Commit: 2afbf963812bccdec8c997e6c63a93c644016769 Parents: f5108ea Author: Mark Payne <[email protected]> Authored: Wed May 16 16:06:00 2018 -0400 Committer: Matt Gilman <[email protected]> Committed: Thu May 17 13:44:52 2018 -0400 ---------------------------------------------------------------------- .../apache/nifi/controller/ProcessorNode.java | 12 +---- .../nifi/controller/StandardProcessorNode.java | 50 ++++++++++++++++++-- .../nifi/groups/StandardProcessGroup.java | 4 ++ .../nifi/processors/standard/DebugFlow.java | 7 +-- 4 files changed, 57 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/2afbf963/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index a5fe9b1..12c5ffc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -229,20 +229,12 @@ public abstract class ProcessorNode extends AbstractComponentNode implements Con * that this processor can be started. This is idempotent operation and will * result in the WARN message if processor can not be enabled. */ - public void enable() { - if (!this.scheduledState.compareAndSet(ScheduledState.DISABLED, ScheduledState.STOPPED)) { - logger.warn("Processor cannot be enabled because it is not disabled"); - } - } + public abstract void enable(); /** * Will set the state of the processor to DISABLED which essentially implies * that this processor can NOT be started. This is idempotent operation and * will result in the WARN message if processor can not be enabled. */ - public void disable() { - if (!this.scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED)) { - logger.warn("Processor cannot be disabled because its state is set to " + this.scheduledState); - } - } + public abstract void disable(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/2afbf963/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index afdb3e1..c8e0570 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -1268,6 +1268,31 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } } + @Override + public void enable() { + desiredState = ScheduledState.STOPPED; + final boolean updated = scheduledState.compareAndSet(ScheduledState.DISABLED, ScheduledState.STOPPED); + + if (updated) { + LOG.info("{} enabled so ScheduledState transitioned from DISABLED to STOPPED", this); + } else { + LOG.info("{} enabled but not currently DISABLED so set desired state to STOPPED; current state is {}", this, scheduledState.get()); + } + } + + @Override + public void disable() { + desiredState = ScheduledState.DISABLED; + final boolean updated = scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED); + + if (updated) { + LOG.info("{} disabled so ScheduledState transitioned from STOPPED to DISABLED", this); + } else { + LOG.info("{} disabled but not currently STOPPED so set desired state to DISABLED; current state is {}", this, scheduledState.get()); + } + } + + /** * Will idempotently start the processor using the following sequence: <i> * <ul> @@ -1453,11 +1478,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable deactivateThread(); } - if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) { + if (desiredState == ScheduledState.RUNNING && scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) { LOG.debug("Successfully completed the @OnScheduled methods of {}; will now start triggering processor to run", processor); schedulingAgentCallback.trigger(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle } else { - LOG.debug("Successfully invoked @OnScheduled methods of {} but scheduled state is no longer STARTING so will stop processor now", processor); + LOG.info("Successfully invoked @OnScheduled methods of {} but scheduled state is no longer STARTING so will stop processor now; current state = {}, desired state = {}", + processor, scheduledState.get(), desiredState); // can only happen if stopProcessor was called before service was transitioned to RUNNING state activateThread(); @@ -1470,6 +1496,13 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } scheduledState.set(ScheduledState.STOPPED); + + if (desiredState == ScheduledState.DISABLED) { + final boolean disabled = scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED); + if (disabled) { + LOG.info("After stopping {}, determined that Desired State is DISABLED so disabled processor", processor); + } + } } } finally { schedulingAgentCallback.onTaskComplete(); @@ -1603,8 +1636,19 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable // the Processor is to be running. However, if the Processor is already in the process of stopping, we cannot immediately // start running the Processor. As a result, we check here, since the Processor is stopped, and then immediately start the // Processor if need be. - if (desiredState == ScheduledState.RUNNING) { + final ScheduledState desired = StandardProcessorNode.this.desiredState; + if (desired == ScheduledState.RUNNING) { + LOG.info("Finished stopping {} but desired state is now RUNNING so will start processor", this); processScheduler.startProcessor(StandardProcessorNode.this, true); + } else if (desired == ScheduledState.DISABLED) { + final boolean updated = scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED); + + if (updated) { + LOG.info("Finished stopping {} but desired state is now DISABLED so disabled processor", this); + } else { + LOG.info("Finished stopping {} but desired state is now DISABLED. Scheduled State could not be transitioned from STOPPED to DISABLED, " + + "though, so will allow the other thread to finish state transition. Current state is {}", this, scheduledState.get()); + } } } else { // Not all of the active threads have finished. Try again in 100 milliseconds. http://git-wip-us.apache.org/repos/asf/nifi/blob/2afbf963/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 96bd65e..3660b99 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -2671,6 +2671,10 @@ public final class StandardProcessGroup implements ProcessGroup { @Override public void verifyCanStop(Connectable connectable) { + final ScheduledState state = connectable.getScheduledState(); + if (state == ScheduledState.DISABLED) { + throw new IllegalStateException("Cannot stop component with id " + connectable + " because it is currently disabled."); + } } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/2afbf963/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java index 86e5080..dba192c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java @@ -33,6 +33,8 @@ import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; @@ -41,8 +43,6 @@ import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; @@ -200,6 +200,7 @@ public class DebugFlow extends AbstractProcessor { .required(true) .defaultValue("0 sec") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); static final PropertyDescriptor ON_STOPPED_FAIL = new PropertyDescriptor.Builder() .name("Fail When @OnStopped called") @@ -339,7 +340,7 @@ public class DebugFlow extends AbstractProcessor { @OnStopped public void onStopped(final ProcessContext context) throws InterruptedException { - sleep(context.getProperty(ON_STOPPED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS), + sleep(context.getProperty(ON_STOPPED_SLEEP_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS), context.getProperty(IGNORE_INTERRUPTS).asBoolean()); fail(context.getProperty(ON_STOPPED_FAIL).asBoolean(), OnStopped.class);
