NIFI-4: Fixed documentation of OnScheduled and OnUnscheduled. Updated 
StandardProcessScheduler to invoke OnScheduled, OnUnscheduled, OnStopped 
methods appropriately.


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/68707ce3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/68707ce3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/68707ce3

Branch: refs/heads/NIFI-250
Commit: 68707ce3c43f96e6a26789686c8f5bc397c6a532
Parents: 6b5d1a8
Author: Mark Payne <[email protected]>
Authored: Fri Jan 16 13:35:17 2015 -0500
Committer: Mark Payne <[email protected]>
Committed: Fri Jan 16 13:35:17 2015 -0500

----------------------------------------------------------------------
 .../scheduling/StandardProcessScheduler.java    | 50 ++++++++++----------
 .../nifi/annotation/lifecycle/OnScheduled.java  |  2 +-
 .../annotation/lifecycle/OnUnscheduled.java     | 14 ++++++
 3 files changed, 40 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/68707ce3/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 5950b4e..e565ebc 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -161,18 +161,21 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
         scheduleState.setScheduled(true);
 
         final Runnable startReportingTaskRunnable = new Runnable() {
+            @SuppressWarnings("deprecation")
             @Override
             public void run() {
+                // Continually attempt to start the Reporting Task, and if we 
fail sleep for a bit each time.
                 while (true) {
                     final ReportingTask reportingTask = 
taskNode.getReportingTask();
 
                     try {
                         try (final NarCloseable x = 
NarCloseable.withNarLoader()) {
-                            
ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, reportingTask, 
taskNode.getConfigurationContext());
+                            
ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, 
OnScheduled.class, reportingTask, taskNode.getConfigurationContext());
                         }
+                        
                         break;
                     } catch (final InvocationTargetException ite) {
-                        LOG.error("Failed to invoke the @OnConfigured methods 
of {} due to {}; administratively yielding this ReportingTask and will attempt 
to schedule it again after {}",
+                        LOG.error("Failed to invoke the On-Scheduled Lifecycle 
methods of {} due to {}; administratively yielding this ReportingTask and will 
attempt to schedule it again after {}",
                                 new Object[]{reportingTask, 
ite.getTargetException(), administrativeYieldDuration});
                         LOG.error("", ite.getTargetException());
 
@@ -181,7 +184,7 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
                         } catch (final InterruptedException ie) {
                         }
                     } catch (final Exception e) {
-                        LOG.error("Failed to invoke the @OnConfigured methods 
of {} due to {}; administratively yielding this ReportingTask and will attempt 
to schedule it again after {}",
+                        LOG.error("Failed to invoke the On-Scheduled Lifecycle 
methods of {} due to {}; administratively yielding this ReportingTask and will 
attempt to schedule it again after {}",
                                 new Object[]{reportingTask, e.toString(), 
administrativeYieldDuration}, e);
                         try {
                             Thread.sleep(administrativeYieldMillis);
@@ -213,34 +216,31 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
             public void run() {
                 final ConfigurationContext configurationContext = 
taskNode.getConfigurationContext();
 
-                while (true) {
-                    try {
-                        try (final NarCloseable x = 
NarCloseable.withNarLoader()) {
-                            
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, 
org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, 
configurationContext);
-                        }
-                        break;
-                    } catch (final InvocationTargetException ite) {
-                        LOG.error("Failed to invoke the @OnConfigured methods 
of {} due to {}; administratively yielding this ReportingTask and will attempt 
to schedule it again after {}",
-                                new Object[]{reportingTask, 
ite.getTargetException(), administrativeYieldDuration});
-                        LOG.error("", ite.getTargetException());
+                try {
+                    try (final NarCloseable x = NarCloseable.withNarLoader()) {
+                        
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, 
org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, 
configurationContext);
+                    }
+                } catch (final InvocationTargetException ite) {
+                    LOG.error("Failed to invoke the @OnConfigured methods of 
{} due to {}; administratively yielding this ReportingTask and will attempt to 
schedule it again after {}",
+                            new Object[]{reportingTask, 
ite.getTargetException(), administrativeYieldDuration});
+                    LOG.error("", ite.getTargetException());
 
-                        try {
-                            Thread.sleep(administrativeYieldMillis);
-                        } catch (final InterruptedException ie) {
-                        }
-                    } catch (final Exception e) {
-                        LOG.error("Failed to invoke the @OnConfigured methods 
of {} due to {}; administratively yielding this ReportingTask and will attempt 
to schedule it again after {}",
-                                new Object[]{reportingTask, e.toString(), 
administrativeYieldDuration}, e);
-                        try {
-                            Thread.sleep(administrativeYieldMillis);
-                        } catch (final InterruptedException ie) {
-                        }
+                    try {
+                        Thread.sleep(administrativeYieldMillis);
+                    } catch (final InterruptedException ie) {
+                    }
+                } catch (final Exception e) {
+                    LOG.error("Failed to invoke the @OnConfigured methods of 
{} due to {}; administratively yielding this ReportingTask and will attempt to 
schedule it again after {}",
+                            new Object[]{reportingTask, e.toString(), 
administrativeYieldDuration}, e);
+                    try {
+                        Thread.sleep(administrativeYieldMillis);
+                    } catch (final InterruptedException ie) {
                     }
                 }
 
                 agent.unschedule(taskNode, scheduleState);
 
-                if (scheduleState.getActiveThreadCount() == 0) {
+                if (scheduleState.getActiveThreadCount() == 0 && 
scheduleState.mustCallOnStoppedMethods()) {
                     
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, 
org.apache.nifi.processor.annotation.OnStopped.class, reportingTask, 
configurationContext);
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/68707ce3/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java
 
b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java
index 9dfd150..a0703fa 100644
--- 
a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java
+++ 
b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java
@@ -41,7 +41,7 @@ import java.lang.annotation.Target;
  * 
  * <p>
  * If using 1 argument and the component using the annotation is a Reporting 
Task, that argument must
- * be of type {@link org.apache.nifi.reporting.ReportingContext 
ReportingContext}.
+ * be of type {@link org.apache.nifi.controller.ConfigurationContext 
ConfigurationContext}.
  * </p>
  *
  * If any method annotated with this annotation throws any Throwable, the 
framework will wait a while

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/68707ce3/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java
 
b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java
index 68d0fe8..b1dbde1 100644
--- 
a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java
+++ 
b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java
@@ -33,6 +33,20 @@ import java.lang.annotation.Target;
  * threads are potentially running. To invoke a method after all threads have
  * finished processing, see the {@link OnStopped} annotation.
  * </p>
+ * 
+ * <p>
+ * Methods using this annotation must take either 0 arguments or a single 
argument.
+ * </p>
+ * 
+ * <p>
+ * If using 1 argument and the component using the annotation is a Processor, 
that argument must
+ * be of type {@link org.apache.nifi.processor.ProcessContext ProcessContext}.
+ * </p>
+ * 
+ * <p>
+ * If using 1 argument and the component using the annotation is a Reporting 
Task, that argument must
+ * be of type {@link org.apache.nifi.controller.ConfigurationContext 
ConfigurationContext}.
+ * </p>
  *
  * @author none
  */

Reply via email to