Repository: incubator-apex-core Updated Branches: refs/heads/master 03267b3de -> 623227ea3
APEXCORE-380 - Idle time sleep time should increase from 0 to a configurable max value Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/08957596 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/08957596 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/08957596 Branch: refs/heads/master Commit: 08957596ad9e6fb377cdaf61ae1a923ed0453eca Parents: 7c84e05 Author: Vlad Rozov <[email protected]> Authored: Mon Mar 14 14:09:09 2016 -0700 Committer: Vlad Rozov <[email protected]> Committed: Mon Mar 14 14:09:09 2016 -0700 ---------------------------------------------------------------------- .../java/com/datatorrent/stram/engine/GenericNode.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/08957596/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java index 3e1f4c0..505fee0 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java @@ -230,7 +230,8 @@ public class GenericNode extends Node<Operator> { doCheckpoint = false; - long spinMillis = context.getValue(OperatorContext.SPIN_MILLIS); + final long maxSpinMillis = context.getValue(OperatorContext.SPIN_MILLIS); + long spinMillis = 0; final boolean handleIdleTime = operator instanceof IdleTimeHandler; int totalQueues = inputs.size(); int regularQueues = totalQueues; @@ -261,6 +262,7 @@ public class GenericNode extends Node<Operator> SweepableReservoir activePort = activePortEntry.getValue(); Tuple t = activePort.sweep(); if (t != null) { + spinMillis = 0; boolean delay = (operator instanceof Operator.DelayOperator); long windowAhead = 0; if (delay) { @@ -595,12 +597,12 @@ public class GenericNode extends Node<Operator> if (activeQueues.isEmpty() && alive) { logger.error("Catastrophic Error: Invalid State - the operator blocked forever!"); System.exit(2); - } - else { + } else { boolean need2sleep = true; for (Map.Entry<String, SweepableReservoir> cb : activeQueues) { need2sleep = cb.getValue().isEmpty(); if (!need2sleep) { + spinMillis = 0; break; } } @@ -608,9 +610,9 @@ public class GenericNode extends Node<Operator> if (need2sleep) { if (handleIdleTime && insideWindow) { ((IdleTimeHandler) operator).handleIdleTime(); - } - else { + } else { Thread.sleep(spinMillis); + spinMillis = Math.min(maxSpinMillis, spinMillis + 1); } } }
