Repository: nifi Updated Branches: refs/heads/master b12cf8a6d -> 232380dbf
NIFI-1452 on timer-driven yield, use the greater of yield duration or run schedule This closes #1832. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/232380db Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/232380db Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/232380db Branch: refs/heads/master Commit: 232380dbfd59de45c4c6623f141d6e7052c367f9 Parents: b12cf8a Author: Mike Moser <[email protected]> Authored: Fri May 19 19:48:10 2017 +0000 Committer: Mark Payne <[email protected]> Committed: Wed May 24 14:24:29 2017 -0400 ---------------------------------------------------------------------- .../controller/scheduling/TimerDrivenSchedulingAgent.java | 8 +++++--- .../nifi/controller/tasks/ContinuallyRunProcessorTask.java | 5 ++++- 2 files changed, 9 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/232380db/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java index fcd901f..a82fde4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java @@ -139,8 +139,10 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent { // If the component is yielded, cancel its future and re-submit it to run again // after the yield has expired. final long newYieldExpiration = connectable.getYieldExpiration(); - if (newYieldExpiration > System.currentTimeMillis()) { - final long yieldMillis = newYieldExpiration - System.currentTimeMillis(); + final long now = System.currentTimeMillis(); + if (newYieldExpiration > now) { + final long yieldMillis = newYieldExpiration - now; + final long scheduleMillis = connectable.getSchedulingPeriod(TimeUnit.MILLISECONDS); final ScheduledFuture<?> scheduledFuture = futureRef.get(); if (scheduledFuture == null) { return; @@ -150,7 +152,7 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent { // an accurate accounting of which futures are outstanding; we must then also update the futureRef // so that we can do this again the next time that the component is yielded. if (scheduledFuture.cancel(false)) { - final long yieldNanos = TimeUnit.MILLISECONDS.toNanos(yieldMillis); + final long yieldNanos = Math.max(TimeUnit.MILLISECONDS.toNanos(scheduleMillis), TimeUnit.MILLISECONDS.toNanos(yieldMillis)); synchronized (scheduleState) { if (scheduleState.isScheduled()) { http://git-wip-us.apache.org/repos/asf/nifi/blob/232380db/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java index 01f3c8c..f2a7eee 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java @@ -75,7 +75,10 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> { } static boolean isYielded(final ProcessorNode procNode) { - return procNode.getYieldExpiration() >= System.currentTimeMillis(); + // after one yield period, the scheduling agent could call this again when + // yieldExpiration == currentTime, and we don't want that to still be considered 'yielded' + // so this uses ">" instead of ">=" + return procNode.getYieldExpiration() > System.currentTimeMillis(); } static boolean isWorkToDo(final ProcessorNode procNode) {
