http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 0000000,7fc65f9..0653b03 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@@ -1,0 -1,569 +1,640 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.controller.scheduling; + + import static java.util.Objects.requireNonNull; + + import java.lang.reflect.InvocationTargetException; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.LinkedBlockingQueue; + import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; + ++import org.apache.nifi.annotation.lifecycle.OnDisabled; ++import org.apache.nifi.annotation.lifecycle.OnEnabled; ++import org.apache.nifi.annotation.lifecycle.OnScheduled; ++import org.apache.nifi.annotation.lifecycle.OnStopped; ++import org.apache.nifi.annotation.lifecycle.OnUnscheduled; + import org.apache.nifi.connectable.Connectable; + import org.apache.nifi.connectable.Funnel; + import org.apache.nifi.connectable.Port; + import org.apache.nifi.controller.AbstractPort; + import org.apache.nifi.controller.ConfigurationContext; + import org.apache.nifi.controller.Heartbeater; + import org.apache.nifi.controller.ProcessScheduler; + import org.apache.nifi.controller.ProcessorNode; + import org.apache.nifi.controller.ReportingTaskNode; + import org.apache.nifi.controller.ScheduledState; + import org.apache.nifi.controller.annotation.OnConfigured; ++import org.apache.nifi.controller.service.ControllerServiceNode; + import org.apache.nifi.controller.service.ControllerServiceProvider; + import org.apache.nifi.encrypt.StringEncryptor; + import org.apache.nifi.engine.FlowEngine; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.nar.NarCloseable; + import org.apache.nifi.processor.SchedulingContext; + import org.apache.nifi.processor.SimpleProcessLogger; + import org.apache.nifi.processor.StandardProcessContext; + import org.apache.nifi.processor.StandardSchedulingContext; -import org.apache.nifi.processor.annotation.OnScheduled; -import org.apache.nifi.processor.annotation.OnStopped; -import org.apache.nifi.processor.annotation.OnUnscheduled; + import org.apache.nifi.reporting.ReportingTask; + import org.apache.nifi.scheduling.SchedulingStrategy; + import org.apache.nifi.util.FormatUtils; + import org.apache.nifi.util.NiFiProperties; + import org.apache.nifi.util.ReflectionUtils; - + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + /** + * Responsible for scheduling Processors, Ports, and Funnels to run at regular + * intervals + */ + public final class StandardProcessScheduler implements ProcessScheduler { + + private static final Logger LOG = LoggerFactory.getLogger(StandardProcessScheduler.class); + + private final ControllerServiceProvider controllerServiceProvider; + private final Heartbeater heartbeater; + private final long administrativeYieldMillis; + private final String administrativeYieldDuration; + + private final ConcurrentMap<Object, ScheduleState> scheduleStates = new ConcurrentHashMap<>(); + private final ScheduledExecutorService frameworkTaskExecutor; + private final ConcurrentMap<SchedulingStrategy, SchedulingAgent> strategyAgentMap = new ConcurrentHashMap<>(); + // thread pool for starting/stopping components + private final ExecutorService componentLifeCycleThreadPool = new ThreadPoolExecutor(25, 50, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(5000)); + private final StringEncryptor encryptor; + + public StandardProcessScheduler(final Heartbeater heartbeater, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor) { + this.heartbeater = heartbeater; + this.controllerServiceProvider = controllerServiceProvider; + this.encryptor = encryptor; + + administrativeYieldDuration = NiFiProperties.getInstance().getAdministrativeYieldDuration(); + administrativeYieldMillis = FormatUtils.getTimeDuration(administrativeYieldDuration, TimeUnit.MILLISECONDS); + + frameworkTaskExecutor = new FlowEngine(4, "Framework Task Thread"); + } + + public void scheduleFrameworkTask(final Runnable command, final String taskName, final long initialDelay, final long delay, final TimeUnit timeUnit) { + frameworkTaskExecutor.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + try { + command.run(); + } catch (final Throwable t) { + LOG.error("Failed to run Framework Task {} due to {}", command, t.toString()); + if (LOG.isDebugEnabled()) { + LOG.error("", t); + } + } + } + }, initialDelay, delay, timeUnit); + } + + @Override + public void setMaxThreadCount(final SchedulingStrategy schedulingStrategy, final int maxThreadCount) { + final SchedulingAgent agent = getSchedulingAgent(schedulingStrategy); + if (agent == null) { + return; + } + + agent.setMaxThreadCount(maxThreadCount); + } + + public void setSchedulingAgent(final SchedulingStrategy strategy, final SchedulingAgent agent) { + strategyAgentMap.put(strategy, agent); + } + + public SchedulingAgent getSchedulingAgent(final SchedulingStrategy strategy) { + return strategyAgentMap.get(strategy); + } + + private SchedulingAgent getSchedulingAgent(final Connectable connectable) { + return getSchedulingAgent(connectable.getSchedulingStrategy()); + } + + @Override + public void shutdown() { + for (final SchedulingAgent schedulingAgent : strategyAgentMap.values()) { + try { + schedulingAgent.shutdown(); + } catch (final Throwable t) { + LOG.error("Failed to shutdown Scheduling Agent {} due to {}", schedulingAgent, t.toString()); + LOG.error("", t); + } + } + + frameworkTaskExecutor.shutdown(); + componentLifeCycleThreadPool.shutdown(); + } + + public void schedule(final ReportingTaskNode taskNode) { + final ScheduleState scheduleState = getScheduleState(requireNonNull(taskNode)); + if (scheduleState.isScheduled()) { + return; + } + + final int activeThreadCount = scheduleState.getActiveThreadCount(); + if (activeThreadCount > 0) { + throw new IllegalStateException("Reporting Task " + taskNode.getName() + " cannot be started because it has " + activeThreadCount + " threads still running"); + } + + if (!taskNode.isValid()) { + throw new IllegalStateException("Reporting Task " + taskNode.getName() + " is not in a valid state for the following reasons: " + taskNode.getValidationErrors()); + } + + final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy()); + scheduleState.setScheduled(true); + + final Runnable startReportingTaskRunnable = new Runnable() { ++ @SuppressWarnings("deprecation") + @Override + public void run() { ++ // Continually attempt to start the Reporting Task, and if we fail sleep for a bit each time. + while (true) { + final ReportingTask reportingTask = taskNode.getReportingTask(); + + try { + try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, reportingTask, taskNode.getConfigurationContext()); ++ ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, OnScheduled.class, reportingTask, taskNode.getConfigurationContext()); + } ++ + break; + } catch (final InvocationTargetException ite) { - LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", ++ LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", + new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration}); + LOG.error("", ite.getTargetException()); + + try { + Thread.sleep(administrativeYieldMillis); + } catch (final InterruptedException ie) { + } + } catch (final Exception e) { - LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", ++ LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", + new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e); + try { + Thread.sleep(administrativeYieldMillis); + } catch (final InterruptedException ie) { + } + } + } + + agent.schedule(taskNode, scheduleState); + } + }; + + componentLifeCycleThreadPool.execute(startReportingTaskRunnable); + } + + public void unschedule(final ReportingTaskNode taskNode) { + final ScheduleState scheduleState = getScheduleState(requireNonNull(taskNode)); + if (!scheduleState.isScheduled()) { + return; + } + + final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy()); + final ReportingTask reportingTask = taskNode.getReportingTask(); + scheduleState.setScheduled(false); + + final Runnable unscheduleReportingTaskRunnable = new Runnable() { ++ @SuppressWarnings("deprecation") + @Override + public void run() { + final ConfigurationContext configurationContext = taskNode.getConfigurationContext(); + - while (true) { - try { - try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, configurationContext); - } - break; - } catch (final InvocationTargetException ite) { - LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", - new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration}); - LOG.error("", ite.getTargetException()); ++ try { ++ try (final NarCloseable x = NarCloseable.withNarLoader()) { ++ ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext); ++ } ++ } catch (final InvocationTargetException ite) { ++ LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", ++ new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration}); ++ LOG.error("", ite.getTargetException()); + - try { - Thread.sleep(administrativeYieldMillis); - } catch (final InterruptedException ie) { - } - } catch (final Exception e) { - LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", - new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e); - try { - Thread.sleep(administrativeYieldMillis); - } catch (final InterruptedException ie) { - } ++ try { ++ Thread.sleep(administrativeYieldMillis); ++ } catch (final InterruptedException ie) { ++ } ++ } catch (final Exception e) { ++ LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", ++ new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e); ++ try { ++ Thread.sleep(administrativeYieldMillis); ++ } catch (final InterruptedException ie) { + } + } + + agent.unschedule(taskNode, scheduleState); + - if (scheduleState.getActiveThreadCount() == 0) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, reportingTask, configurationContext); ++ if (scheduleState.getActiveThreadCount() == 0 && scheduleState.mustCallOnStoppedMethods()) { ++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, reportingTask, configurationContext); + } + } + }; + + componentLifeCycleThreadPool.execute(unscheduleReportingTaskRunnable); + } + + /** + * Starts scheduling the given processor to run after invoking all methods + * on the underlying {@link nifi.processor.Processor + * FlowFileProcessor} that are annotated with the {@link OnScheduled} + * annotation. + */ + @Override + public synchronized void startProcessor(final ProcessorNode procNode) { + if (procNode.getScheduledState() == ScheduledState.DISABLED) { + throw new IllegalStateException(procNode + " is disabled, so it cannot be started"); + } + final ScheduleState scheduleState = getScheduleState(requireNonNull(procNode)); + + if (scheduleState.isScheduled()) { + return; + } + + final int activeThreadCount = scheduleState.getActiveThreadCount(); + if (activeThreadCount > 0) { + throw new IllegalStateException("Processor " + procNode.getName() + " cannot be started because it has " + activeThreadCount + " threads still running"); + } + + if (!procNode.isValid()) { + throw new IllegalStateException("Processor " + procNode.getName() + " is not in a valid state"); + } + + final Runnable startProcRunnable = new Runnable() { ++ @SuppressWarnings("deprecation") + @Override + public void run() { + try (final NarCloseable x = NarCloseable.withNarLoader()) { + long lastStopTime = scheduleState.getLastStopTime(); + final StandardProcessContext processContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor); + + while (true) { + try { + synchronized (scheduleState) { + // if no longer scheduled to run, then we're finished. This can happen, for example, + // if the @OnScheduled method throws an Exception and the user stops the processor + // while we're administratively yielded. + // + // we also check if the schedule state's last start time is equal to what it was before. + // if not, then means that the processor has been stopped and started again, so we should just + // bail; another thread will be responsible for invoking the @OnScheduled methods. + if (!scheduleState.isScheduled() || scheduleState.getLastStopTime() != lastStopTime) { + return; + } + + final SchedulingContext schedulingContext = new StandardSchedulingContext(processContext, controllerServiceProvider, procNode); - ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, procNode.getProcessor(), schedulingContext); ++ ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, org.apache.nifi.processor.annotation.OnScheduled.class, procNode.getProcessor(), schedulingContext); + + getSchedulingAgent(procNode).schedule(procNode, scheduleState); + + heartbeater.heartbeat(); + return; + } + } catch (final Exception e) { + final ProcessorLog procLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor()); + + procLog.error("{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run for {}", + new Object[]{procNode.getProcessor(), e.getCause(), administrativeYieldDuration}, e.getCause()); + LOG.error("Failed to invoke @OnScheduled method due to {}", e.getCause().toString(), e.getCause()); + + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, procNode.getProcessor(), processContext); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, procNode.getProcessor(), processContext); + + Thread.sleep(administrativeYieldMillis); + continue; + } + } + } catch (final Throwable t) { + final ProcessorLog procLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor()); + procLog.error("{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run", new Object[]{procNode.getProcessor(), t}); + LOG.error("Failed to invoke @OnScheduled method due to {}", t.toString(), t); + } + } + }; + + scheduleState.setScheduled(true); + procNode.setScheduledState(ScheduledState.RUNNING); + + componentLifeCycleThreadPool.execute(startProcRunnable); + } + + /** + * Used to delay scheduling the given Processor to run until its yield + * duration expires. + * + * @param procNode + */ + @Override + public void yield(final ProcessorNode procNode) { + // This exists in the ProcessScheduler so that the scheduler can take advantage of the fact that + // the Processor was yielded and, as a result, avoid scheduling the Processor to potentially run + // (thereby skipping the overhead of the Context Switches) if nothing can be done. + // + // We used to implement this feature by canceling all futures for the given Processor and + // re-submitting them with a delay. However, this became problematic, because we have situations where + // a Processor will wait several seconds (often 30 seconds in the case of a network timeout), and then yield + // the context. If this Processor has X number of threads, we end up submitting X new tasks while the previous + // X-1 tasks are still running. At this point, another thread could finish and do the same thing, resulting in + // an additional X-1 extra tasks being submitted. + // + // As a result, we simply removed this buggy implementation, as it was a very minor performance optimization + // that gave very bad results. + } + + /** + * Stops scheduling the given processor to run and invokes all methods on + * the underlying {@link nifi.processor.Processor FlowFileProcessor} that + * are annotated with the {@link OnUnscheduled} annotation. + */ + @Override + public synchronized void stopProcessor(final ProcessorNode procNode) { + final ScheduleState state = getScheduleState(requireNonNull(procNode)); + + synchronized (state) { + if (!state.isScheduled()) { + procNode.setScheduledState(ScheduledState.STOPPED); + return; + } + + getSchedulingAgent(procNode).unschedule(procNode, state); + procNode.setScheduledState(ScheduledState.STOPPED); + state.setScheduled(false); + } + + final Runnable stopProcRunnable = new Runnable() { + @Override + public void run() { + try (final NarCloseable x = NarCloseable.withNarLoader()) { + final StandardProcessContext processContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor); + + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, procNode.getProcessor(), processContext); + + // If no threads currently running, call the OnStopped methods + if (state.getActiveThreadCount() == 0 && state.mustCallOnStoppedMethods()) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, procNode.getProcessor(), processContext); + heartbeater.heartbeat(); + } + } + } + }; + + componentLifeCycleThreadPool.execute(stopProcRunnable); + } + + @Override + public void registerEvent(final Connectable worker) { + getSchedulingAgent(worker).onEvent(worker); + } + + /** + * Returns the number of threads that are currently active for the given + * <code>Connectable</code>. + * + * @return + */ + @Override + public int getActiveThreadCount(final Object scheduled) { + return getScheduleState(scheduled).getActiveThreadCount(); + } + + /** + * Begins scheduling the given port to run. + * + * @throws NullPointerException if the Port is null + * @throws IllegalStateException if the Port is already scheduled to run or + * has threads running + */ + @Override + public void startPort(final Port port) { + if (!port.isValid()) { + throw new IllegalStateException("Port " + port.getName() + " is not in a valid state"); + } + + port.onSchedulingStart(); + startConnectable(port); + } + + @Override + public void startFunnel(final Funnel funnel) { + startConnectable(funnel); + funnel.setScheduledState(ScheduledState.RUNNING); + } + + @Override + public void stopPort(final Port port) { + stopConnectable(port); + port.shutdown(); + } + + @Override + public void stopFunnel(final Funnel funnel) { + stopConnectable(funnel); + funnel.setScheduledState(ScheduledState.STOPPED); + } + + private synchronized void startConnectable(final Connectable connectable) { + if (connectable.getScheduledState() == ScheduledState.DISABLED) { + throw new IllegalStateException(connectable + " is disabled, so it cannot be started"); + } + + final ScheduleState scheduleState = getScheduleState(requireNonNull(connectable)); + if (scheduleState.isScheduled()) { + return; + } + + final int activeThreads = scheduleState.getActiveThreadCount(); + if (activeThreads > 0) { + throw new IllegalStateException("Port cannot be scheduled to run until its last " + activeThreads + " threads finish"); + } + + getSchedulingAgent(connectable).schedule(connectable, scheduleState); + scheduleState.setScheduled(true); + } + + private synchronized void stopConnectable(final Connectable connectable) { + final ScheduleState state = getScheduleState(requireNonNull(connectable)); + if (!state.isScheduled()) { + return; + } + state.setScheduled(false); + + getSchedulingAgent(connectable).unschedule(connectable, state); + + if (!state.isScheduled() && state.getActiveThreadCount() == 0 && state.mustCallOnStoppedMethods()) { + final ConnectableProcessContext processContext = new ConnectableProcessContext(connectable, encryptor); + try (final NarCloseable x = NarCloseable.withNarLoader()) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, connectable, processContext); + heartbeater.heartbeat(); + } + } + } + + @Override + public synchronized void enableFunnel(final Funnel funnel) { + if (funnel.getScheduledState() != ScheduledState.DISABLED) { + throw new IllegalStateException("Funnel cannot be enabled because it is not disabled"); + } + funnel.setScheduledState(ScheduledState.STOPPED); + } + + @Override + public synchronized void disableFunnel(final Funnel funnel) { + if (funnel.getScheduledState() != ScheduledState.STOPPED) { + throw new IllegalStateException("Funnel cannot be disabled because its state its state is set to " + funnel.getScheduledState()); + } + funnel.setScheduledState(ScheduledState.DISABLED); + } + + @Override + public synchronized void disablePort(final Port port) { + if (port.getScheduledState() != ScheduledState.STOPPED) { + throw new IllegalStateException("Port cannot be disabled because its state is set to " + port.getScheduledState()); + } + + if (!(port instanceof AbstractPort)) { + throw new IllegalArgumentException(); + } + + ((AbstractPort) port).disable(); + } + + @Override - public synchronized void disableProcessor(final ProcessorNode procNode) { - if (procNode.getScheduledState() != ScheduledState.STOPPED) { - throw new IllegalStateException("Processor cannot be disabled because its state is set to " + procNode.getScheduledState()); - } - procNode.setScheduledState(ScheduledState.DISABLED); - } - - @Override + public synchronized void enablePort(final Port port) { + if (port.getScheduledState() != ScheduledState.DISABLED) { + throw new IllegalStateException("Funnel cannot be enabled because it is not disabled"); + } + + if (!(port instanceof AbstractPort)) { + throw new IllegalArgumentException(); + } + + ((AbstractPort) port).enable(); + } + + @Override + public synchronized void enableProcessor(final ProcessorNode procNode) { + if (procNode.getScheduledState() != ScheduledState.DISABLED) { + throw new IllegalStateException("Processor cannot be enabled because it is not disabled"); + } ++ + procNode.setScheduledState(ScheduledState.STOPPED); ++ ++ try (final NarCloseable x = NarCloseable.withNarLoader()) { ++ final ProcessorLog processorLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor()); ++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, procNode.getProcessor(), processorLog); ++ } + } + + @Override ++ public synchronized void disableProcessor(final ProcessorNode procNode) { ++ if (procNode.getScheduledState() != ScheduledState.STOPPED) { ++ throw new IllegalStateException("Processor cannot be disabled because its state is set to " + procNode.getScheduledState()); ++ } ++ ++ procNode.setScheduledState(ScheduledState.DISABLED); ++ ++ try (final NarCloseable x = NarCloseable.withNarLoader()) { ++ final ProcessorLog processorLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor()); ++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, procNode.getProcessor(), processorLog); ++ } ++ } ++ ++ public synchronized void enableReportingTask(final ReportingTaskNode taskNode) { ++ if ( taskNode.getScheduledState() != ScheduledState.DISABLED ) { ++ throw new IllegalStateException("Reporting Task cannot be enabled because it is not disabled"); ++ } ++ ++ taskNode.setScheduledState(ScheduledState.STOPPED); ++ ++ try (final NarCloseable x = NarCloseable.withNarLoader()) { ++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, taskNode.getReportingTask()); ++ } ++ } ++ ++ public synchronized void disableReportingTask(final ReportingTaskNode taskNode) { ++ if ( taskNode.getScheduledState() != ScheduledState.STOPPED ) { ++ throw new IllegalStateException("Reporting Task cannot be disabled because its state is set to " + taskNode.getScheduledState() + " but transition to DISABLED state is allowed only from the STOPPED state"); ++ } ++ ++ taskNode.setScheduledState(ScheduledState.DISABLED); ++ ++ try (final NarCloseable x = NarCloseable.withNarLoader()) { ++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, taskNode.getReportingTask()); ++ } ++ } ++ ++ public synchronized void enableControllerService(final ControllerServiceNode serviceNode) { ++ if ( !serviceNode.isDisabled() ) { ++ throw new IllegalStateException("Controller Service cannot be enabled because it is not disabled"); ++ } ++ ++ // we set the service to enabled before invoking the @OnEnabled methods. We do this because it must be ++ // done in this order for disabling (serviceNode.setDisabled(true) will throw Exceptions if the service ++ // is currently known to be in use) and we want to be consistent with the ordering of calling setDisabled ++ // before annotated methods. ++ serviceNode.setDisabled(false); ++ ++ try (final NarCloseable x = NarCloseable.withNarLoader()) { ++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, serviceNode.getControllerServiceImplementation()); ++ } ++ } ++ ++ public synchronized void disableControllerService(final ControllerServiceNode serviceNode) { ++ if ( serviceNode.isDisabled() ) { ++ throw new IllegalStateException("Controller Service cannot be disabled because it is already disabled"); ++ } ++ ++ // We must set the service to disabled before we invoke the OnDisabled methods because the service node ++ // can throw Exceptions if we attempt to disable the service while it's known to be in use. ++ serviceNode.setDisabled(true); ++ ++ try (final NarCloseable x = NarCloseable.withNarLoader()) { ++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, serviceNode.getControllerServiceImplementation()); ++ } ++ } ++ ++ ++ @Override + public boolean isScheduled(final Object scheduled) { + final ScheduleState scheduleState = scheduleStates.get(scheduled); + return (scheduleState == null) ? false : scheduleState.isScheduled(); + } + + /** - * Returns the ScheduleState that is registered for the given ProcessorNode; ++ * Returns the ScheduleState that is registered for the given component; + * if no ScheduleState current is registered, one is created and registered + * atomically, and then that value is returned. + * + * @param schedulable + * @return + */ + private ScheduleState getScheduleState(final Object schedulable) { + ScheduleState scheduleState = scheduleStates.get(schedulable); + if (scheduleState == null) { + scheduleState = new ScheduleState(); + ScheduleState previous = scheduleStates.putIfAbsent(schedulable, scheduleState); + if (previous != null) { + scheduleState = previous; + } + } + return scheduleState; + } + }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java index 0000000,42bd55f..9fec307 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java @@@ -1,0 -1,156 +1,154 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.controller.service; + + import java.io.BufferedInputStream; + import java.io.File; + import java.io.IOException; + import java.io.InputStream; + import java.net.URL; + import java.nio.file.Files; + import java.nio.file.Path; + import java.nio.file.StandardOpenOption; + import java.util.ArrayList; + import java.util.HashMap; + import java.util.List; + import java.util.Map; + + import javax.xml.XMLConstants; + import javax.xml.parsers.DocumentBuilder; + import javax.xml.parsers.DocumentBuilderFactory; + import javax.xml.parsers.ParserConfigurationException; + import javax.xml.validation.Schema; + import javax.xml.validation.SchemaFactory; + + import org.apache.nifi.util.file.FileUtils; + import org.apache.nifi.util.DomUtils; + + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; + import org.w3c.dom.Document; + import org.w3c.dom.Element; + import org.w3c.dom.NodeList; + import org.xml.sax.SAXException; + import org.xml.sax.SAXParseException; + + /** + * + */ + public class ControllerServiceLoader { + + private static final Log logger = LogFactory.getLog(ControllerServiceLoader.class); + + private final Path serviceConfigXmlPath; + + public ControllerServiceLoader(final Path serviceConfigXmlPath) throws IOException { + final File serviceConfigXmlFile = serviceConfigXmlPath.toFile(); + if (!serviceConfigXmlFile.exists() || !serviceConfigXmlFile.canRead()) { + throw new IOException(serviceConfigXmlPath + " does not appear to exist or cannot be read. Cannot load configuration."); + } + + this.serviceConfigXmlPath = serviceConfigXmlPath; + } + + public List<ControllerServiceNode> loadControllerServices(final ControllerServiceProvider provider) throws IOException { + final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI); + final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance(); + InputStream fis = null; + BufferedInputStream bis = null; + documentBuilderFactory.setNamespaceAware(true); + + final List<ControllerServiceNode> services = new ArrayList<>(); + + try { + final URL configurationResource = this.getClass().getResource("/ControllerServiceConfiguration.xsd"); + if (configurationResource == null) { + throw new NullPointerException("Unable to load XML Schema for ControllerServiceConfiguration"); + } + final Schema schema = schemaFactory.newSchema(configurationResource); + documentBuilderFactory.setSchema(schema); + final DocumentBuilder builder = documentBuilderFactory.newDocumentBuilder(); + + builder.setErrorHandler(new org.xml.sax.ErrorHandler() { + + @Override + public void fatalError(final SAXParseException err) throws SAXException { + logger.error("Config file line " + err.getLineNumber() + ", col " + err.getColumnNumber() + ", uri " + err.getSystemId() + " :message: " + err.getMessage()); + if (logger.isDebugEnabled()) { + logger.error("Error Stack Dump", err); + } + throw err; + } + + @Override + public void error(final SAXParseException err) throws SAXParseException { + logger.error("Config file line " + err.getLineNumber() + ", col " + err.getColumnNumber() + ", uri " + err.getSystemId() + " :message: " + err.getMessage()); + if (logger.isDebugEnabled()) { + logger.error("Error Stack Dump", err); + } + throw err; + } + + @Override + public void warning(final SAXParseException err) throws SAXParseException { + logger.warn(" Config file line " + err.getLineNumber() + ", uri " + err.getSystemId() + " : message : " + err.getMessage()); + if (logger.isDebugEnabled()) { + logger.warn("Warning stack dump", err); + } + throw err; + } + }); + + //if controllerService.xml does not exist, create an empty file... + fis = Files.newInputStream(this.serviceConfigXmlPath, StandardOpenOption.READ); + bis = new BufferedInputStream(fis); + if (Files.size(this.serviceConfigXmlPath) > 0) { + final Document document = builder.parse(bis); + final NodeList servicesNodes = document.getElementsByTagName("services"); + final Element servicesElement = (Element) servicesNodes.item(0); + + final List<Element> serviceNodes = DomUtils.getChildElementsByTagName(servicesElement, "service"); + for (final Element serviceElement : serviceNodes) { - //add global properties common to all tasks - Map<String, String> properties = new HashMap<>(); - + //get properties for the specific controller task - id, name, class, + //and schedulingPeriod must be set + final String serviceId = DomUtils.getChild(serviceElement, "identifier").getTextContent().trim(); + final String serviceClass = DomUtils.getChild(serviceElement, "class").getTextContent().trim(); + ++ //set the class to be used for the configured controller task ++ final ControllerServiceNode serviceNode = provider.createControllerService(serviceClass, serviceId, false); ++ + //optional task-specific properties + for (final Element optionalProperty : DomUtils.getChildElementsByTagName(serviceElement, "property")) { + final String name = optionalProperty.getAttribute("name").trim(); + final String value = optionalProperty.getTextContent().trim(); - properties.put(name, value); ++ serviceNode.setProperty(name, value); + } + - //set the class to be used for the configured controller task - final ControllerServiceNode serviceNode = provider.createControllerService(serviceClass, serviceId, properties); + services.add(serviceNode); + serviceNode.setDisabled(false); + } + } + } catch (SAXException | ParserConfigurationException sxe) { + throw new IOException(sxe); + } finally { + FileUtils.closeQuietly(fis); + FileUtils.closeQuietly(bis); + } + + return services; + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index 0000000,455eac1..4681293 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@@ -1,0 -1,125 +1,195 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.controller.service; + + import java.util.HashSet; + import java.util.Set; + import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.atomic.AtomicReference; + import java.util.concurrent.locks.Lock; + import java.util.concurrent.locks.ReadWriteLock; + import java.util.concurrent.locks.ReentrantReadWriteLock; + + import org.apache.nifi.controller.AbstractConfiguredComponent; + import org.apache.nifi.controller.Availability; ++import org.apache.nifi.controller.ConfigurationContext; + import org.apache.nifi.controller.ConfiguredComponent; + import org.apache.nifi.controller.ControllerService; + import org.apache.nifi.controller.ValidationContextFactory; ++import org.apache.nifi.controller.annotation.OnConfigured; ++import org.apache.nifi.controller.exception.ProcessorLifeCycleException; ++import org.apache.nifi.nar.NarCloseable; ++import org.apache.nifi.util.ReflectionUtils; + + public class StandardControllerServiceNode extends AbstractConfiguredComponent implements ControllerServiceNode { + - private final ControllerService controllerService; ++ private final ControllerService proxedControllerService; ++ private final ControllerService implementation; ++ private final ControllerServiceProvider serviceProvider; + + private final AtomicReference<Availability> availability = new AtomicReference<>(Availability.NODE_ONLY); + private final AtomicBoolean disabled = new AtomicBoolean(true); + + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock readLock = rwLock.readLock(); + private final Lock writeLock = rwLock.writeLock(); + + private final Set<ConfiguredComponent> referencingComponents = new HashSet<>(); + - public StandardControllerServiceNode(final ControllerService controllerService, final String id, ++ public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id, + final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) { - super(controllerService, id, validationContextFactory, serviceProvider); - this.controllerService = controllerService; ++ super(proxiedControllerService, id, validationContextFactory, serviceProvider); ++ this.proxedControllerService = proxiedControllerService; ++ this.implementation = implementation; ++ this.serviceProvider = serviceProvider; + } + + @Override + public boolean isDisabled() { + return disabled.get(); + } + + @Override + public void setDisabled(final boolean disabled) { + if (!disabled && !isValid()) { - throw new IllegalStateException("Cannot enable Controller Service " + controllerService + " because it is not valid"); ++ throw new IllegalStateException("Cannot enable Controller Service " + implementation + " because it is not valid"); + } + + if (disabled) { + // do not allow a Controller Service to be disabled if it's currently being used. + final Set<ConfiguredComponent> runningRefs = getReferences().getRunningReferences(); + if (!runningRefs.isEmpty()) { + throw new IllegalStateException("Cannot disable Controller Service because it is referenced (either directly or indirectly) by " + runningRefs.size() + " different components that are currently running"); + } + } + + this.disabled.set(disabled); + } + + @Override + public Availability getAvailability() { + return availability.get(); + } + + @Override + public void setAvailability(final Availability availability) { + this.availability.set(availability); + } + + @Override - public ControllerService getControllerService() { - return controllerService; ++ public ControllerService getProxiedControllerService() { ++ return proxedControllerService; ++ } ++ ++ @Override ++ public ControllerService getControllerServiceImplementation() { ++ return implementation; + } + + @Override + public ControllerServiceReference getReferences() { + readLock.lock(); + try { + return new StandardControllerServiceReference(this, referencingComponents); + } finally { + readLock.unlock(); + } + } + + @Override + public void addReference(final ConfiguredComponent referencingComponent) { + writeLock.lock(); + try { + referencingComponents.add(referencingComponent); + } finally { + writeLock.unlock(); + } + } + + @Override + public void removeReference(final ConfiguredComponent referencingComponent) { + writeLock.lock(); + try { + referencingComponents.remove(referencingComponent); + } finally { + writeLock.unlock(); + } + } + + @Override + public void verifyModifiable() throws IllegalStateException { + if (!isDisabled()) { + throw new IllegalStateException("Cannot modify Controller Service configuration because it is currently enabled. Please disable the Controller Service first."); + } + } ++ ++ @Override ++ public void setProperty(final String name, final String value) { ++ super.setProperty(name, value); ++ ++ onConfigured(); ++ } ++ ++ @Override ++ public boolean removeProperty(String name) { ++ final boolean removed = super.removeProperty(name); ++ if ( removed ) { ++ onConfigured(); ++ } ++ ++ return removed; ++ } ++ ++ private void onConfigured() { ++ try (final NarCloseable x = NarCloseable.withNarLoader()) { ++ final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceProvider); ++ ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, implementation, configContext); ++ } catch (final Exception e) { ++ throw new ProcessorLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + implementation, e); ++ } ++ } ++ ++ @Override ++ public void verifyCanDelete() { ++ if ( !isDisabled() ) { ++ throw new IllegalStateException(implementation + " cannot be deleted because it is not disabled"); ++ } ++ } ++ ++ @Override ++ public void verifyCanDisable() { ++ final ControllerServiceReference references = getReferences(); ++ final int numRunning = references.getRunningReferences().size(); ++ if ( numRunning > 0 ) { ++ throw new IllegalStateException(implementation + " cannot be disabled because it is referenced by " + numRunning + " components that are currently running"); ++ } ++ } ++ ++ @Override ++ public void verifyCanEnable() { ++ if ( !isDisabled() ) { ++ throw new IllegalStateException(implementation + " cannot be enabled because it is not disabled"); ++ } ++ } ++ ++ @Override ++ public void verifyCanUpdate() { ++ if ( !isDisabled() ) { ++ throw new IllegalStateException(implementation + " cannot be updated because it is not disabled"); ++ } ++ } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index 0000000,fc07ce1..cc7a18a mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@@ -1,0 -1,196 +1,219 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.controller.service; + + import static java.util.Objects.requireNonNull; + + import java.lang.reflect.InvocationHandler; + import java.lang.reflect.InvocationTargetException; + import java.lang.reflect.Method; + import java.lang.reflect.Proxy; + import java.util.ArrayList; + import java.util.Collections; + import java.util.HashSet; + import java.util.List; + import java.util.Map; + import java.util.Set; + import java.util.concurrent.ConcurrentHashMap; + ++import org.apache.nifi.annotation.lifecycle.OnAdded; ++import org.apache.nifi.annotation.lifecycle.OnRemoved; ++import org.apache.nifi.controller.ConfigurationContext; + import org.apache.nifi.controller.ControllerService; + import org.apache.nifi.controller.ValidationContextFactory; -import org.apache.nifi.controller.annotation.OnConfigured; + import org.apache.nifi.controller.exception.ControllerServiceAlreadyExistsException; + import org.apache.nifi.controller.exception.ControllerServiceNotFoundException; ++import org.apache.nifi.controller.exception.ProcessorLifeCycleException; + import org.apache.nifi.nar.ExtensionManager; + import org.apache.nifi.nar.NarCloseable; + import org.apache.nifi.processor.StandardValidationContextFactory; + import org.apache.nifi.util.ObjectHolder; + import org.apache.nifi.util.ReflectionUtils; - + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + /** + * + */ + public class StandardControllerServiceProvider implements ControllerServiceProvider { + + private static final Logger logger = LoggerFactory.getLogger(StandardControllerServiceProvider.class); + + private final Map<String, ControllerServiceNode> controllerServices; + private static final Set<Method> validDisabledMethods; + + static { + // methods that are okay to be called when the service is disabled. + final Set<Method> validMethods = new HashSet<>(); + for (final Method method : ControllerService.class.getMethods()) { + validMethods.add(method); + } + for (final Method method : Object.class.getMethods()) { + validMethods.add(method); + } + validDisabledMethods = Collections.unmodifiableSet(validMethods); + } + + public StandardControllerServiceProvider() { + // the following 2 maps must be updated atomically, but we do not lock around them because they are modified + // only in the createControllerService method, and both are modified before the method returns + this.controllerServices = new ConcurrentHashMap<>(); + } + + private Class<?>[] getInterfaces(final Class<?> cls) { + final List<Class<?>> allIfcs = new ArrayList<>(); + populateInterfaces(cls, allIfcs); + return allIfcs.toArray(new Class<?>[allIfcs.size()]); + } + + private void populateInterfaces(final Class<?> cls, final List<Class<?>> interfacesDefinedThusFar) { + final Class<?>[] ifc = cls.getInterfaces(); + if (ifc != null && ifc.length > 0) { + for (final Class<?> i : ifc) { + interfacesDefinedThusFar.add(i); + } + } + + final Class<?> superClass = cls.getSuperclass(); + if (superClass != null) { + populateInterfaces(superClass, interfacesDefinedThusFar); + } + } + + @Override - public ControllerServiceNode createControllerService(final String type, final String id, final Map<String, String> properties) { ++ public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) { + if (type == null || id == null) { + throw new NullPointerException(); + } + if (controllerServices.containsKey(id)) { + throw new ControllerServiceAlreadyExistsException(id); + } + + final ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader(); + try { + final ClassLoader cl = ExtensionManager.getClassLoader(type); + Thread.currentThread().setContextClassLoader(cl); + final Class<?> rawClass = Class.forName(type, false, cl); + final Class<? extends ControllerService> controllerServiceClass = rawClass.asSubclass(ControllerService.class); + + final ControllerService originalService = controllerServiceClass.newInstance(); + final ObjectHolder<ControllerServiceNode> serviceNodeHolder = new ObjectHolder<>(null); + final InvocationHandler invocationHandler = new InvocationHandler() { + @Override + public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { + final ControllerServiceNode node = serviceNodeHolder.get(); + if (node.isDisabled() && !validDisabledMethods.contains(method)) { ++ // Use nar class loader here because we are implicitly calling toString() on the original implementation. + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service " + originalService + " because the Controller Service is disabled"); + } catch (final Throwable e) { + throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service with identifier " + id + " because the Controller Service is disabled"); + } + } + + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + return method.invoke(originalService, args); + } catch (final InvocationTargetException e) { + // If the ControllerService throws an Exception, it'll be wrapped in an InvocationTargetException. We want + // to instead re-throw what the ControllerService threw, so we pull it out of the InvocationTargetException. + throw e.getCause(); + } + } + }; + + final ControllerService proxiedService = (ControllerService) Proxy.newProxyInstance(cl, getInterfaces(controllerServiceClass), invocationHandler); + logger.info("Loaded service {} as configured.", type); + + originalService.initialize(new StandardControllerServiceInitializationContext(id, this)); + + final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this); + - final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, id, validationContextFactory, this); ++ final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, originalService, id, validationContextFactory, this); + serviceNodeHolder.set(serviceNode); + serviceNode.setAnnotationData(null); + serviceNode.setName(id); - for (final Map.Entry<String, String> entry : properties.entrySet()) { - serviceNode.setProperty(entry.getKey(), entry.getValue()); ++ ++ if ( firstTimeAdded ) { ++ try (final NarCloseable x = NarCloseable.withNarLoader()) { ++ ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, originalService); ++ } catch (final Exception e) { ++ throw new ProcessorLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + originalService, e); ++ } + } - final StandardConfigurationContext configurationContext = new StandardConfigurationContext(serviceNode, this); - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigured.class, originalService, configurationContext); + + this.controllerServices.put(id, serviceNode); + return serviceNode; + } catch (final Throwable t) { + throw new ControllerServiceNotFoundException(t); + } finally { + if (currentContextClassLoader != null) { + Thread.currentThread().setContextClassLoader(currentContextClassLoader); + } + } + } + + @Override + public ControllerService getControllerService(final String serviceIdentifier) { + final ControllerServiceNode node = controllerServices.get(serviceIdentifier); - return (node == null) ? null : node.getControllerService(); ++ return (node == null) ? null : node.getProxiedControllerService(); + } + + @Override + public boolean isControllerServiceEnabled(final ControllerService service) { + return isControllerServiceEnabled(service.getIdentifier()); + } + + @Override + public boolean isControllerServiceEnabled(final String serviceIdentifier) { + final ControllerServiceNode node = controllerServices.get(serviceIdentifier); + return (node == null) ? false : !node.isDisabled(); + } + + @Override + public ControllerServiceNode getControllerServiceNode(final String serviceIdentifier) { + return controllerServices.get(serviceIdentifier); + } + + @Override + public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) { + final Set<String> identifiers = new HashSet<>(); + for (final Map.Entry<String, ControllerServiceNode> entry : controllerServices.entrySet()) { - if (requireNonNull(serviceType).isAssignableFrom(entry.getValue().getControllerService().getClass())) { ++ if (requireNonNull(serviceType).isAssignableFrom(entry.getValue().getProxiedControllerService().getClass())) { + identifiers.add(entry.getKey()); + } + } + + return identifiers; + } ++ ++ @Override ++ public void removeControllerService(final ControllerServiceNode serviceNode) { ++ final ControllerServiceNode existing = controllerServices.get(serviceNode.getIdentifier()); ++ if ( existing == null || existing != serviceNode ) { ++ throw new IllegalStateException("Controller Service " + serviceNode + " does not exist in this Flow"); ++ } ++ ++ serviceNode.verifyCanDelete(); ++ ++ try (final NarCloseable x = NarCloseable.withNarLoader()) { ++ final ConfigurationContext configurationContext = new StandardConfigurationContext(serviceNode, this); ++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, serviceNode.getControllerServiceImplementation(), configurationContext); ++ } ++ ++ controllerServices.remove(serviceNode.getIdentifier()); ++ } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java index 0000000,c04a04f..aca870b mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java @@@ -1,0 -1,97 +1,97 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.controller.tasks; + + import java.util.concurrent.atomic.AtomicLong; + ++import org.apache.nifi.annotation.lifecycle.OnStopped; + import org.apache.nifi.connectable.Connectable; + import org.apache.nifi.controller.repository.StandardProcessSessionFactory; + import org.apache.nifi.controller.scheduling.ConnectableProcessContext; + import org.apache.nifi.controller.scheduling.ProcessContextFactory; + import org.apache.nifi.controller.scheduling.ScheduleState; + import org.apache.nifi.encrypt.StringEncryptor; + import org.apache.nifi.nar.NarCloseable; + import org.apache.nifi.processor.ProcessSessionFactory; -import org.apache.nifi.processor.annotation.OnStopped; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.util.Connectables; + import org.apache.nifi.util.ReflectionUtils; - + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + public class ContinuallyRunConnectableTask implements Runnable { + + private static final Logger logger = LoggerFactory.getLogger(ContinuallyRunConnectableTask.class); + + private final Connectable connectable; + private final ScheduleState scheduleState; + private final ProcessSessionFactory sessionFactory; + private final ConnectableProcessContext processContext; + + public ContinuallyRunConnectableTask(final ProcessContextFactory contextFactory, final Connectable connectable, final ScheduleState scheduleState, final StringEncryptor encryptor) { + this.connectable = connectable; + this.scheduleState = scheduleState; + this.sessionFactory = new StandardProcessSessionFactory(contextFactory.newProcessContext(connectable, new AtomicLong(0L))); + this.processContext = new ConnectableProcessContext(connectable, encryptor); + } + ++ @SuppressWarnings("deprecation") + @Override + public void run() { + if (!scheduleState.isScheduled()) { + return; + } + // Connectable should run if the following conditions are met: + // 1. It's an Input Port or or is a Remote Input Port or has incoming FlowFiles queued + // 2. Any relationship is available (since there's only 1 + // relationship for a Connectable, we can say "any" or "all" and + // it means the same thing) + // 3. It is not yielded. + final boolean triggerWhenEmpty = connectable.isTriggerWhenEmpty(); + final boolean shouldRun = (connectable.getYieldExpiration() < System.currentTimeMillis()) + && (triggerWhenEmpty || Connectables.flowFilesQueued(connectable)) && (connectable.getRelationships().isEmpty() || Connectables.anyRelationshipAvailable(connectable)); + + if (shouldRun) { + scheduleState.incrementActiveThreadCount(); + try { + try (final AutoCloseable ncl = NarCloseable.withNarLoader()) { + connectable.onTrigger(processContext, sessionFactory); + } catch (final ProcessException pe) { + logger.error("{} failed to process session due to {}", connectable, pe.toString()); + } catch (final Throwable t) { + logger.error("{} failed to process session due to {}", connectable, t.toString()); + logger.error("", t); + + logger.warn("{} Administratively Pausing for 10 seconds due to processing failure: {}", connectable, t.toString(), t); + try { + Thread.sleep(10000L); + } catch (final InterruptedException e) { + } + + } + } finally { + if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) { + try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, connectable, processContext); ++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, connectable, processContext); + } + } + + scheduleState.decrementActiveThreadCount(); + } + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java index 0000000,65c375f..33bd327 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java @@@ -1,0 -1,185 +1,185 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.controller.tasks; + + import java.io.IOException; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicLong; + ++import org.apache.nifi.annotation.lifecycle.OnStopped; + import org.apache.nifi.controller.FlowController; + import org.apache.nifi.controller.ProcessorNode; + import org.apache.nifi.controller.repository.BatchingSessionFactory; + import org.apache.nifi.controller.repository.ProcessContext; + import org.apache.nifi.controller.repository.StandardFlowFileEvent; + import org.apache.nifi.controller.repository.StandardProcessSession; + import org.apache.nifi.controller.repository.StandardProcessSessionFactory; + import org.apache.nifi.controller.scheduling.ProcessContextFactory; + import org.apache.nifi.controller.scheduling.ScheduleState; + import org.apache.nifi.controller.scheduling.SchedulingAgent; + import org.apache.nifi.encrypt.StringEncryptor; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.nar.NarCloseable; + import org.apache.nifi.processor.ProcessSessionFactory; + import org.apache.nifi.processor.SimpleProcessLogger; + import org.apache.nifi.processor.StandardProcessContext; -import org.apache.nifi.processor.annotation.OnStopped; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.util.Connectables; + import org.apache.nifi.util.ReflectionUtils; - + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + public class ContinuallyRunProcessorTask implements Runnable { + + private static final Logger logger = LoggerFactory.getLogger(ContinuallyRunProcessorTask.class); + + private final SchedulingAgent schedulingAgent; + private final ProcessorNode procNode; + private final ProcessContext context; + private final ScheduleState scheduleState; + private final StandardProcessContext processContext; + private final FlowController flowController; + private final int numRelationships; + + public ContinuallyRunProcessorTask(final SchedulingAgent schedulingAgent, final ProcessorNode procNode, + final FlowController flowController, final ProcessContextFactory contextFactory, final ScheduleState scheduleState, final StringEncryptor encryptor) { + + this.schedulingAgent = schedulingAgent; + this.procNode = procNode; + this.scheduleState = scheduleState; + this.numRelationships = procNode.getRelationships().size(); + this.flowController = flowController; + + context = contextFactory.newProcessContext(procNode, new AtomicLong(0L)); + this.processContext = new StandardProcessContext(procNode, flowController, encryptor); + } + ++ @SuppressWarnings("deprecation") + @Override + public void run() { + // make sure processor is not yielded + boolean shouldRun = (procNode.getYieldExpiration() < System.currentTimeMillis()); + if (!shouldRun) { + return; + } + + // make sure that either we're not clustered or this processor runs on all nodes or that this is the primary node + shouldRun = !procNode.isIsolated() || !flowController.isClustered() || flowController.isPrimary(); + if (!shouldRun) { + return; + } + + // make sure that either proc has incoming FlowFiles or has no incoming connections or is annotated with @TriggerWhenEmpty + shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || Connectables.flowFilesQueued(procNode); + if (!shouldRun) { + return; + } + + if (numRelationships > 0) { + final int requiredNumberOfAvailableRelationships = procNode.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships; + shouldRun = context.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships); + } + + final long batchNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS); + final ProcessSessionFactory sessionFactory; + final StandardProcessSession rawSession; + final boolean batch; + if (procNode.isHighThroughputSupported() && batchNanos > 0L) { + rawSession = new StandardProcessSession(context); + sessionFactory = new BatchingSessionFactory(rawSession); + batch = true; + } else { + rawSession = null; + sessionFactory = new StandardProcessSessionFactory(context); + batch = false; + } + + if (!shouldRun) { + return; + } + + scheduleState.incrementActiveThreadCount(); + + final long startNanos = System.nanoTime(); + final long finishNanos = startNanos + batchNanos; + int invocationCount = 0; + try { + try (final AutoCloseable ncl = NarCloseable.withNarLoader()) { + while (shouldRun) { + procNode.onTrigger(processContext, sessionFactory); + invocationCount++; + + if (!batch) { + return; + } + + if (System.nanoTime() > finishNanos) { + return; + } + + shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || Connectables.flowFilesQueued(procNode); + shouldRun = shouldRun && (procNode.getYieldExpiration() < System.currentTimeMillis()); + + if (shouldRun && numRelationships > 0) { + final int requiredNumberOfAvailableRelationships = procNode.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships; + shouldRun = context.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships); + } + } + } catch (final ProcessException pe) { + final ProcessorLog procLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor()); + procLog.error("Failed to process session due to {}", new Object[]{pe}); + } catch (final Throwable t) { + // Use ProcessorLog to log the event so that a bulletin will be created for this processor + final ProcessorLog procLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor()); + procLog.error("{} failed to process session due to {}", new Object[]{procNode.getProcessor(), t}); + procLog.warn("Processor Administratively Yielded for {} due to processing failure", new Object[]{schedulingAgent.getAdministrativeYieldDuration()}); + logger.warn("Administratively Yielding {} due to uncaught Exception: {}", procNode.getProcessor(), t.toString()); + logger.warn("", t); + + procNode.yield(schedulingAgent.getAdministrativeYieldDuration(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); + } + } finally { + if (batch) { + rawSession.commit(); + } + + final long processingNanos = System.nanoTime() - startNanos; + + // if the processor is no longer scheduled to run and this is the last thread, + // invoke the OnStopped methods + if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) { + try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, procNode.getProcessor(), processContext); ++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, procNode.getProcessor(), processContext); + flowController.heartbeat(); + } + } + + scheduleState.decrementActiveThreadCount(); + + try { + final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(procNode.getIdentifier()); + procEvent.setProcessingNanos(processingNanos); + procEvent.setInvocations(invocationCount); + context.getFlowFileEventRepository().updateRepository(procEvent); + } catch (final IOException e) { + logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", procNode.getProcessor(), e.toString()); + logger.error("", e); + } + } + } + + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java index 0000000,36aa9dd..9b70581 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java @@@ -1,0 -1,63 +1,63 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.controller.tasks; + ++import org.apache.nifi.annotation.lifecycle.OnStopped; + import org.apache.nifi.controller.ReportingTaskNode; + import org.apache.nifi.controller.scheduling.ScheduleState; + import org.apache.nifi.nar.NarCloseable; -import org.apache.nifi.processor.annotation.OnStopped; + import org.apache.nifi.util.ReflectionUtils; - + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + public class ReportingTaskWrapper implements Runnable { + + private static final Logger logger = LoggerFactory.getLogger(ReportingTaskWrapper.class); + + private final ReportingTaskNode taskNode; + private final ScheduleState scheduleState; + + public ReportingTaskWrapper(final ReportingTaskNode taskNode, final ScheduleState scheduleState) { + this.taskNode = taskNode; + this.scheduleState = scheduleState; + } + ++ @SuppressWarnings("deprecation") + @Override + public synchronized void run() { + scheduleState.incrementActiveThreadCount(); + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + taskNode.getReportingTask().onTrigger(taskNode.getReportingContext()); + } catch (final Throwable t) { + logger.error("Error running task {} due to {}", taskNode.getReportingTask(), t.toString()); + if (logger.isDebugEnabled()) { + logger.error("", t); + } + } finally { + // if the processor is no longer scheduled to run and this is the last thread, + // invoke the OnStopped methods + if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) { + try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, taskNode.getReportingTask(), taskNode.getReportingContext()); ++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, taskNode.getReportingTask(), taskNode.getReportingContext()); + } + } + + scheduleState.decrementActiveThreadCount(); + } + } + + }
