Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1278#discussion_r90308237
  
    --- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
 ---
    @@ -1385,15 +1387,32 @@ public void run() {
                     }
                 });
             } else {
    -            /*
    -             * We do compareAndSet() instead of set() to ensure that 
Processor
    -             * stoppage is handled consistently including a condition where
    -             * Processor never got a chance to transition to RUNNING state
    -             * before stop() was called. If that happens the stop processor
    -             * routine will be initiated in start() method, otherwise the 
IF
    -             * part will handle the stop processor routine.
    -             */
    -            this.scheduledState.compareAndSet(ScheduledState.STARTING, 
ScheduledState.STOPPING);
    +            synchronized (this) {
    +                if (onScheduleFuture != null) { // can only be null if 
stop was called before start
    +                    if (onScheduleFuture.cancel(false)) { // 'false' 
ensures we are not canceling the one in progress, only the scheduled one
    +                        /*
    +                         * We only want to transition to STOPPED stated in 
the
    +                         * event when stop was called by the user
    +                         * when @OnSchedule results in exception and is
    +                         * scheduled for re-try. However if @OnSchedule is
    +                         * currently executing we don't want to interrupt 
it,
    +                         * hence 'false' in the above 'cancel' call
    +                         */
    +                        
this.scheduledState.compareAndSet(ScheduledState.STARTING, 
ScheduledState.STOPPED);
    +                    } else {
    +                        /*
    +                         * We do compareAndSet() instead of set() to 
ensure that
    +                         * Processor stoppage is handled consistently 
including
    +                         * a condition where Processor never got a chance 
to
    +                         * transition to RUNNING state before stop() was 
called.
    +                         * If that happens the stop processor routine will 
be
    +                         * initiated in start() method, otherwise the IF 
part
    +                         * will handle the stop processor routine.
    +                         */
    +                        
this.scheduledState.compareAndSet(ScheduledState.STARTING, 
ScheduledState.STOPPING);
    --- End diff --
    
    I think we have a threading issue here. I will try to explain what I think 
can happen in chronological order, but these things are always difficult to 
explain. Suppose that we have two threads, T1 and T2. T1 is running in the 
start() method, while T2 is running in the stop() method. We could have the 
following series of events:
    
    T1: Line 1281, is calling @OnScheduled method of the Processor. It has set 
Scheduled State to STARTING.
    T2: Enters stop() method and line 1354 returns false because Scheduled 
State is STARTING.
    T2: At line 1391 it sees that onScheduleFuture is not null.
    T2: At line 1392, calls onScheduleFuture.cancel(false). However, T1 is 
still at line 1281. As a result, the call to Future.cancel() returns false 
because the future is currently executing and false indicates that the active 
task should not be canceled. So execution drops to the 'else' clause on line 
1402.
    T1: Finishes invoking the @OnScheduled method and proceeds to line 1287. 
The call to compareAndSet returns true, changing the Scheduled State to RUNNING.
    T2: At line 1412, the call to scheduledState.compareAndSet returns false, 
because the Scheduled State is no longer STARTING but is now RUNNING. This 
means that Scheduled State remains RUNNING.
    T2: Returns from method.
    
    At this point, the Scheduled State does accurately represent the state that 
the Processor is operating in. However, the state is still RUNNING. This means 
that the call to stop() did not actually stop the Processor but in fact had no 
effect.
    
    Now I realize that this is very much a corner case, where the user would 
have to click Stop on the Processor at exactly the right time for this to 
occur. However, if this does happen, the stop() method will return without 
stopping the Processor. This is a bug.
    
    One simple solution may be to simply look at the return value on line 1412, 
and if the return value is false, recursively invoke the stop() method again 
with the same parameters (after exiting the synchronized block). Or, perhaps 
more cleanly, to wrap the entire outer-most if/else block in a loop that will 
continue if line 1412 returns false.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to