Repository: nifi
Updated Branches:
  refs/heads/master f5108ea83 -> 2afbf9638


NIFI-5204: Ensure that verifyCanStop throws ISE if component is disabled
NIFI-5204: If processor joins cluster and inherits 'disabled' state but is 
still stopping, ensure that the state becomes disabled when the processor 
finishes stopping
This closes #2713


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2afbf963
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2afbf963
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2afbf963

Branch: refs/heads/master
Commit: 2afbf963812bccdec8c997e6c63a93c644016769
Parents: f5108ea
Author: Mark Payne <marka...@hotmail.com>
Authored: Wed May 16 16:06:00 2018 -0400
Committer: Matt Gilman <matt.c.gil...@gmail.com>
Committed: Thu May 17 13:44:52 2018 -0400

----------------------------------------------------------------------
 .../apache/nifi/controller/ProcessorNode.java   | 12 +----
 .../nifi/controller/StandardProcessorNode.java  | 50 ++++++++++++++++++--
 .../nifi/groups/StandardProcessGroup.java       |  4 ++
 .../nifi/processors/standard/DebugFlow.java     |  7 +--
 4 files changed, 57 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/2afbf963/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index a5fe9b1..12c5ffc 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -229,20 +229,12 @@ public abstract class ProcessorNode extends 
AbstractComponentNode implements Con
      * that this processor can be started. This is idempotent operation and 
will
      * result in the WARN message if processor can not be enabled.
      */
-    public void enable() {
-        if (!this.scheduledState.compareAndSet(ScheduledState.DISABLED, 
ScheduledState.STOPPED)) {
-            logger.warn("Processor cannot be enabled because it is not 
disabled");
-        }
-    }
+    public abstract void enable();
 
     /**
      * Will set the state of the processor to DISABLED which essentially 
implies
      * that this processor can NOT be started. This is idempotent operation and
      * will result in the WARN message if processor can not be enabled.
      */
-    public void disable() {
-        if (!this.scheduledState.compareAndSet(ScheduledState.STOPPED, 
ScheduledState.DISABLED)) {
-            logger.warn("Processor cannot be disabled because its state is set 
to " + this.scheduledState);
-        }
-    }
+    public abstract void disable();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/2afbf963/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index afdb3e1..c8e0570 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -1268,6 +1268,31 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
         }
     }
 
+    @Override
+    public void enable() {
+        desiredState = ScheduledState.STOPPED;
+        final boolean updated = 
scheduledState.compareAndSet(ScheduledState.DISABLED, ScheduledState.STOPPED);
+
+        if (updated) {
+            LOG.info("{} enabled so ScheduledState transitioned from DISABLED 
to STOPPED", this);
+        } else {
+            LOG.info("{} enabled but not currently DISABLED so set desired 
state to STOPPED; current state is {}", this, scheduledState.get());
+        }
+    }
+
+    @Override
+    public void disable() {
+        desiredState = ScheduledState.DISABLED;
+        final boolean updated = 
scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED);
+
+        if (updated) {
+            LOG.info("{} disabled so ScheduledState transitioned from STOPPED 
to DISABLED", this);
+        } else {
+            LOG.info("{} disabled but not currently STOPPED so set desired 
state to DISABLED; current state is {}", this, scheduledState.get());
+        }
+    }
+
+
     /**
      * Will idempotently start the processor using the following sequence: <i>
      * <ul>
@@ -1453,11 +1478,12 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
                         deactivateThread();
                     }
 
-                    if (scheduledState.compareAndSet(ScheduledState.STARTING, 
ScheduledState.RUNNING)) {
+                    if (desiredState == ScheduledState.RUNNING && 
scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) {
                         LOG.debug("Successfully completed the @OnScheduled 
methods of {}; will now start triggering processor to run", processor);
                         schedulingAgentCallback.trigger(); // callback 
provided by StandardProcessScheduler to essentially initiate component's 
onTrigger() cycle
                     } else {
-                        LOG.debug("Successfully invoked @OnScheduled methods 
of {} but scheduled state is no longer STARTING so will stop processor now", 
processor);
+                        LOG.info("Successfully invoked @OnScheduled methods of 
{} but scheduled state is no longer STARTING so will stop processor now; 
current state = {}, desired state = {}",
+                            processor, scheduledState.get(), desiredState);
 
                         // can only happen if stopProcessor was called before 
service was transitioned to RUNNING state
                         activateThread();
@@ -1470,6 +1496,13 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
                         }
 
                         scheduledState.set(ScheduledState.STOPPED);
+
+                        if (desiredState == ScheduledState.DISABLED) {
+                            final boolean disabled = 
scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED);
+                            if (disabled) {
+                                LOG.info("After stopping {}, determined that 
Desired State is DISABLED so disabled processor", processor);
+                            }
+                        }
                     }
                 } finally {
                     schedulingAgentCallback.onTaskComplete();
@@ -1603,8 +1636,19 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
                             // the Processor is to be running. However, if the 
Processor is already in the process of stopping, we cannot immediately
                             // start running the Processor. As a result, we 
check here, since the Processor is stopped, and then immediately start the
                             // Processor if need be.
-                            if (desiredState == ScheduledState.RUNNING) {
+                            final ScheduledState desired = 
StandardProcessorNode.this.desiredState;
+                            if (desired == ScheduledState.RUNNING) {
+                                LOG.info("Finished stopping {} but desired 
state is now RUNNING so will start processor", this);
                                 
processScheduler.startProcessor(StandardProcessorNode.this, true);
+                            } else if (desired == ScheduledState.DISABLED) {
+                                final boolean updated = 
scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED);
+
+                                if (updated) {
+                                    LOG.info("Finished stopping {} but desired 
state is now DISABLED so disabled processor", this);
+                                } else {
+                                    LOG.info("Finished stopping {} but desired 
state is now DISABLED. Scheduled State could not be transitioned from STOPPED 
to DISABLED, "
+                                        + "though, so will allow the other 
thread to finish state transition. Current state is {}", this, 
scheduledState.get());
+                                }
                             }
                         } else {
                             // Not all of the active threads have finished. 
Try again in 100 milliseconds.

http://git-wip-us.apache.org/repos/asf/nifi/blob/2afbf963/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 96bd65e..3660b99 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -2671,6 +2671,10 @@ public final class StandardProcessGroup implements 
ProcessGroup {
 
     @Override
     public void verifyCanStop(Connectable connectable) {
+        final ScheduledState state = connectable.getScheduledState();
+        if (state == ScheduledState.DISABLED) {
+            throw new IllegalStateException("Cannot stop component with id " + 
connectable + " because it is currently disabled.");
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/2afbf963/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
index 86e5080..dba192c 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
@@ -33,6 +33,8 @@ import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
@@ -41,8 +43,6 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
@@ -200,6 +200,7 @@ public class DebugFlow extends AbstractProcessor {
         .required(true)
         .defaultValue("0 sec")
         .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
         .build();
     static final PropertyDescriptor ON_STOPPED_FAIL = new 
PropertyDescriptor.Builder()
         .name("Fail When @OnStopped called")
@@ -339,7 +340,7 @@ public class DebugFlow extends AbstractProcessor {
 
     @OnStopped
     public void onStopped(final ProcessContext context) throws 
InterruptedException {
-        
sleep(context.getProperty(ON_STOPPED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS),
+        
sleep(context.getProperty(ON_STOPPED_SLEEP_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS),
             context.getProperty(IGNORE_INTERRUPTS).asBoolean());
 
         fail(context.getProperty(ON_STOPPED_FAIL).asBoolean(), 
OnStopped.class);

Reply via email to