Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/141#discussion_r48486134
  
    --- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
 ---
    @@ -229,7 +244,106 @@ public ControllerServiceState getState() {
         }
     
         @Override
    -    public void setState(final ControllerServiceState state) {
    -        this.stateRef.set(state);
    +    public boolean isActive() {
    +        return this.active.get();
    +    }
    +
    +    /**
    +     * Will atomically enable this service by invoking its @OnEnabled 
operation.
    +     * It uses CAS operation on {@link #stateRef} to transition this 
service
    +     * from DISABLED to ENABLING state. If such transition succeeds the 
service
    +     * will be marked as 'active' (see {@link 
ControllerServiceNode#isActive()}).
    +     * If such transition doesn't succeed then no enabling logic will be
    +     * performed and the method will exit. In other words it is safe to 
invoke
    +     * this operation multiple times and from multiple threads.
    +     * <br>
    +     * This operation will also perform re-try of service enabling in the 
event
    +     * of exception being thrown by previous invocation of @OnEnabled.
    +     * <br>
    +     * Upon successful invocation of @OnEnabled this service will be 
transitioned to
    +     * ENABLED state.
    +     * <br>
    +     * In the event where enabling took longer then expected by the user 
and such user
    +     * initiated disable operation, this service will be automatically 
disabled as soon
    +     * as it reached ENABLED state.
    +     */
    +    @Override
    +    public void enable(final ScheduledExecutorService scheduler, final 
long administrativeYieldMillis, final Heartbeater heartbeater) {
    +        if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, 
ControllerServiceState.ENABLING)){
    +            this.active.set(true);
    +            final ConfigurationContext configContext = new 
StandardConfigurationContext(this, this.serviceProvider, null);
    +            scheduler.execute(new Runnable() {
    +                @Override
    +                public void run() {
    +                    try {
    +                        
ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, 
getControllerServiceImplementation(), configContext);
    +                        if 
(stateRef.compareAndSet(ControllerServiceState.ENABLING, 
ControllerServiceState.ENABLED)) {
    +                            heartbeater.heartbeat();
    +                        } else {
    +                            LOG.debug("Disabling service " + this + " 
after it has been enabled due to disable action being initiated.");
    +                            // Can only happen if user initiated DISABLE 
operation before service finished enabling. It's state will be
    +                            // set to DISABLING (see disable() operation)
    +                            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, 
getControllerServiceImplementation(), configContext);
    +                            stateRef.set(ControllerServiceState.DISABLED);
    +                        }
    +                    } catch (Exception e) {
    +                        final Throwable cause = e instanceof 
InvocationTargetException ? e.getCause() : e;
    +                        final ComponentLog componentLog = new 
SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
    +                        componentLog.error("Failed to invoke @OnEnabled 
method due to {}", cause);
    +                        LOG.error("Failed to invoke @OnEnabled method of 
{} due to {}", getControllerServiceImplementation(), cause.toString());
    +                        
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, 
getControllerServiceImplementation(), configContext);
    +                        if (isActive()) {
    +                            scheduler.schedule(this, 
administrativeYieldMillis, TimeUnit.MILLISECONDS);
    +                        }
    +                        else {
    +                            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, 
getControllerServiceImplementation(), configContext);
    +                            stateRef.set(ControllerServiceState.DISABLED);
    +                        }
    +                    }
    +                }
    +            });
    +        }
    +    }
    +
    +    /**
    +     * Will atomically disable this service by invoking its @OnDisabled 
operation.
    +     * It uses CAS operation on {@link #stateRef} to transition this 
service
    +     * from ENABLED to DISABLING state. If such transition succeeds the 
service
    +     * will be de-activated (see {@link ControllerServiceNode#isActive()}).
    +     * If such transition doesn't succeed (the service is still in 
ENABLING state)
    +     * then the service will still be transitioned to DISABLING state to 
ensure that
    +     * no other transition could happen on this service. However in such 
event
    +     * (e.g., its @OnEnabled finally succeeded), the {@link 
#enable(ScheduledExecutorService, long, Heartbeater)}
    +     * operation will initiate service disabling javadoc for (see {@link 
#enable(ScheduledExecutorService, long, Heartbeater)}
    +     * <br>
    +     * Upon successful invocation of @OnDisabled this service will be 
transitioned to
    +     * DISABLED state.
    +     */
    +    @Override
    +    public void disable(final ScheduledExecutorService scheduler, final 
Heartbeater heartbeater) {
    +        this.active.set(false); // de-activating regardless of CAS 
operation
    +                                // that follows since this operation will 
always result in service state being DISABLING
    +        if (this.stateRef.compareAndSet(ControllerServiceState.ENABLED, 
ControllerServiceState.DISABLING)) {
    +            final ConfigurationContext configContext = new 
StandardConfigurationContext(this, this.serviceProvider, null);
    +            scheduler.execute(new Runnable() {
    +                @Override
    +                public void run() {
    +                    try {
    +                        
ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, 
StandardControllerServiceNode.this.getControllerServiceImplementation(), 
configContext);
    +                    } catch (Exception e) {
    +                        final Throwable cause = e instanceof 
InvocationTargetException ? e.getCause() : e;
    +                        final ComponentLog componentLog = new 
SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
    +                        componentLog.error("Failed to invoke @OnDisabled 
method due to {}", cause);
    +                        LOG.error("Failed to invoke @OnDisabled method of 
{} due to {}", getControllerServiceImplementation(), cause.toString());
    +                        
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, 
getControllerServiceImplementation(), configContext);
    +                    } finally {
    +                        stateRef.set(ControllerServiceState.DISABLED);
    +                        heartbeater.heartbeat();
    +                    }
    +                }
    +            });
    +        } else {
    +            this.stateRef.compareAndSet(ControllerServiceState.ENABLING, 
ControllerServiceState.DISABLING);
    --- End diff --
    
    @olegz I think there was a case that was missed here. Assume that we have 
two threads: E at line 280 and D and line 326. In this case, if D executes it 
will return false from compareAndSet. Then, if thread E executes, it will 
change the state from ENABLING to ENABLED. Thread D then executes line 346 and 
gets a value of false. At this point, Thread D returns from the disable() 
method, but it never did disable the service. This appears to be a race 
condition that we need to tackle.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to