Repository: nifi
Updated Branches:
  refs/heads/master b12cf8a6d -> 232380dbf


NIFI-1452 on timer-driven yield, use the greater of yield duration or run 
schedule

This closes #1832.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/232380db
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/232380db
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/232380db

Branch: refs/heads/master
Commit: 232380dbfd59de45c4c6623f141d6e7052c367f9
Parents: b12cf8a
Author: Mike Moser <[email protected]>
Authored: Fri May 19 19:48:10 2017 +0000
Committer: Mark Payne <[email protected]>
Committed: Wed May 24 14:24:29 2017 -0400

----------------------------------------------------------------------
 .../controller/scheduling/TimerDrivenSchedulingAgent.java    | 8 +++++---
 .../nifi/controller/tasks/ContinuallyRunProcessorTask.java   | 5 ++++-
 2 files changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/232380db/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
index fcd901f..a82fde4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
@@ -139,8 +139,10 @@ public class TimerDrivenSchedulingAgent extends 
AbstractSchedulingAgent {
                     // If the component is yielded, cancel its future and 
re-submit it to run again
                     // after the yield has expired.
                     final long newYieldExpiration = 
connectable.getYieldExpiration();
-                    if (newYieldExpiration > System.currentTimeMillis()) {
-                        final long yieldMillis = newYieldExpiration - 
System.currentTimeMillis();
+                    final long now = System.currentTimeMillis();
+                    if (newYieldExpiration > now) {
+                        final long yieldMillis = newYieldExpiration - now;
+                        final long scheduleMillis = 
connectable.getSchedulingPeriod(TimeUnit.MILLISECONDS);
                         final ScheduledFuture<?> scheduledFuture = 
futureRef.get();
                         if (scheduledFuture == null) {
                             return;
@@ -150,7 +152,7 @@ public class TimerDrivenSchedulingAgent extends 
AbstractSchedulingAgent {
                         // an accurate accounting of which futures are 
outstanding; we must then also update the futureRef
                         // so that we can do this again the next time that the 
component is yielded.
                         if (scheduledFuture.cancel(false)) {
-                            final long yieldNanos = 
TimeUnit.MILLISECONDS.toNanos(yieldMillis);
+                            final long yieldNanos = 
Math.max(TimeUnit.MILLISECONDS.toNanos(scheduleMillis), 
TimeUnit.MILLISECONDS.toNanos(yieldMillis));
 
                             synchronized (scheduleState) {
                                 if (scheduleState.isScheduled()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/232380db/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
index 01f3c8c..f2a7eee 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
@@ -75,7 +75,10 @@ public class ContinuallyRunProcessorTask implements 
Callable<Boolean> {
     }
 
     static boolean isYielded(final ProcessorNode procNode) {
-        return procNode.getYieldExpiration() >= System.currentTimeMillis();
+        // after one yield period, the scheduling agent could call this again 
when
+        // yieldExpiration == currentTime, and we don't want that to still be 
considered 'yielded'
+        // so this uses ">" instead of ">="
+        return procNode.getYieldExpiration() > System.currentTimeMillis();
     }
 
     static boolean isWorkToDo(final ProcessorNode procNode) {

Reply via email to