NIFI-4: Fixed documentation of OnScheduled and OnUnscheduled. Updated StandardProcessScheduler to invoke OnScheduled, OnUnscheduled, OnStopped methods appropriately.
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/68707ce3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/68707ce3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/68707ce3 Branch: refs/heads/NIFI-250 Commit: 68707ce3c43f96e6a26789686c8f5bc397c6a532 Parents: 6b5d1a8 Author: Mark Payne <[email protected]> Authored: Fri Jan 16 13:35:17 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Fri Jan 16 13:35:17 2015 -0500 ---------------------------------------------------------------------- .../scheduling/StandardProcessScheduler.java | 50 ++++++++++---------- .../nifi/annotation/lifecycle/OnScheduled.java | 2 +- .../annotation/lifecycle/OnUnscheduled.java | 14 ++++++ 3 files changed, 40 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/68707ce3/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 5950b4e..e565ebc 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -161,18 +161,21 @@ public final class StandardProcessScheduler implements ProcessScheduler { scheduleState.setScheduled(true); final Runnable startReportingTaskRunnable = new Runnable() { + @SuppressWarnings("deprecation") @Override public void run() { + // Continually attempt to start the Reporting Task, and if we fail sleep for a bit each time. while (true) { final ReportingTask reportingTask = taskNode.getReportingTask(); try { try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, reportingTask, taskNode.getConfigurationContext()); + ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, OnScheduled.class, reportingTask, taskNode.getConfigurationContext()); } + break; } catch (final InvocationTargetException ite) { - LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", + LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration}); LOG.error("", ite.getTargetException()); @@ -181,7 +184,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { } catch (final InterruptedException ie) { } } catch (final Exception e) { - LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", + LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e); try { Thread.sleep(administrativeYieldMillis); @@ -213,34 +216,31 @@ public final class StandardProcessScheduler implements ProcessScheduler { public void run() { final ConfigurationContext configurationContext = taskNode.getConfigurationContext(); - while (true) { - try { - try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext); - } - break; - } catch (final InvocationTargetException ite) { - LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", - new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration}); - LOG.error("", ite.getTargetException()); + try { + try (final NarCloseable x = NarCloseable.withNarLoader()) { + ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext); + } + } catch (final InvocationTargetException ite) { + LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", + new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration}); + LOG.error("", ite.getTargetException()); - try { - Thread.sleep(administrativeYieldMillis); - } catch (final InterruptedException ie) { - } - } catch (final Exception e) { - LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", - new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e); - try { - Thread.sleep(administrativeYieldMillis); - } catch (final InterruptedException ie) { - } + try { + Thread.sleep(administrativeYieldMillis); + } catch (final InterruptedException ie) { + } + } catch (final Exception e) { + LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", + new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e); + try { + Thread.sleep(administrativeYieldMillis); + } catch (final InterruptedException ie) { } } agent.unschedule(taskNode, scheduleState); - if (scheduleState.getActiveThreadCount() == 0) { + if (scheduleState.getActiveThreadCount() == 0 && scheduleState.mustCallOnStoppedMethods()) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, reportingTask, configurationContext); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/68707ce3/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java index 9dfd150..a0703fa 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java @@ -41,7 +41,7 @@ import java.lang.annotation.Target; * * <p> * If using 1 argument and the component using the annotation is a Reporting Task, that argument must - * be of type {@link org.apache.nifi.reporting.ReportingContext ReportingContext}. + * be of type {@link org.apache.nifi.controller.ConfigurationContext ConfigurationContext}. * </p> * * If any method annotated with this annotation throws any Throwable, the framework will wait a while http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/68707ce3/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java index 68d0fe8..b1dbde1 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java @@ -33,6 +33,20 @@ import java.lang.annotation.Target; * threads are potentially running. To invoke a method after all threads have * finished processing, see the {@link OnStopped} annotation. * </p> + * + * <p> + * Methods using this annotation must take either 0 arguments or a single argument. + * </p> + * + * <p> + * If using 1 argument and the component using the annotation is a Processor, that argument must + * be of type {@link org.apache.nifi.processor.ProcessContext ProcessContext}. + * </p> + * + * <p> + * If using 1 argument and the component using the annotation is a Reporting Task, that argument must + * be of type {@link org.apache.nifi.controller.ConfigurationContext ConfigurationContext}. + * </p> * * @author none */
