Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1278#discussion_r90308237
--- Diff:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
---
@@ -1385,15 +1387,32 @@ public void run() {
}
});
} else {
- /*
- * We do compareAndSet() instead of set() to ensure that
Processor
- * stoppage is handled consistently including a condition where
- * Processor never got a chance to transition to RUNNING state
- * before stop() was called. If that happens the stop processor
- * routine will be initiated in start() method, otherwise the
IF
- * part will handle the stop processor routine.
- */
- this.scheduledState.compareAndSet(ScheduledState.STARTING,
ScheduledState.STOPPING);
+ synchronized (this) {
+ if (onScheduleFuture != null) { // can only be null if
stop was called before start
+ if (onScheduleFuture.cancel(false)) { // 'false'
ensures we are not canceling the one in progress, only the scheduled one
+ /*
+ * We only want to transition to STOPPED stated in
the
+ * event when stop was called by the user
+ * when @OnSchedule results in exception and is
+ * scheduled for re-try. However if @OnSchedule is
+ * currently executing we don't want to interrupt
it,
+ * hence 'false' in the above 'cancel' call
+ */
+
this.scheduledState.compareAndSet(ScheduledState.STARTING,
ScheduledState.STOPPED);
+ } else {
+ /*
+ * We do compareAndSet() instead of set() to
ensure that
+ * Processor stoppage is handled consistently
including
+ * a condition where Processor never got a chance
to
+ * transition to RUNNING state before stop() was
called.
+ * If that happens the stop processor routine will
be
+ * initiated in start() method, otherwise the IF
part
+ * will handle the stop processor routine.
+ */
+
this.scheduledState.compareAndSet(ScheduledState.STARTING,
ScheduledState.STOPPING);
--- End diff --
I think we have a threading issue here. I will try to explain what I think
can happen in chronological order, but these things are always difficult to
explain. Suppose that we have two threads, T1 and T2. T1 is running in the
start() method, while T2 is running in the stop() method. We could have the
following series of events:
T1: Line 1281, is calling @OnScheduled method of the Processor. It has set
Scheduled State to STARTING.
T2: Enters stop() method and line 1354 returns false because Scheduled
State is STARTING.
T2: At line 1391 it sees that onScheduleFuture is not null.
T2: At line 1392, calls onScheduleFuture.cancel(false). However, T1 is
still at line 1281. As a result, the call to Future.cancel() returns false
because the future is currently executing and false indicates that the active
task should not be canceled. So execution drops to the 'else' clause on line
1402.
T1: Finishes invoking the @OnScheduled method and proceeds to line 1287.
The call to compareAndSet returns true, changing the Scheduled State to RUNNING.
T2: At line 1412, the call to scheduledState.compareAndSet returns false,
because the Scheduled State is no longer STARTING but is now RUNNING. This
means that Scheduled State remains RUNNING.
T2: Returns from method.
At this point, the Scheduled State does accurately represent the state that
the Processor is operating in. However, the state is still RUNNING. This means
that the call to stop() did not actually stop the Processor but in fact had no
effect.
Now I realize that this is very much a corner case, where the user would
have to click Stop on the Processor at exactly the right time for this to
occur. However, if this does happen, the stop() method will return without
stopping the Processor. This is a bug.
One simple solution may be to simply look at the return value on line 1412,
and if the return value is false, recursively invoke the stop() method again
with the same parameters (after exiting the synchronized block). Or, perhaps
more cleanly, to wrap the entire outer-most if/else block in a loop that will
continue if line 1412 returns false.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---