Repository: incubator-apex-core
Updated Branches:
  refs/heads/master 03267b3de -> 623227ea3


APEXCORE-380 - Idle time sleep time should increase from 0 to a configurable 
max value


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/08957596
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/08957596
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/08957596

Branch: refs/heads/master
Commit: 08957596ad9e6fb377cdaf61ae1a923ed0453eca
Parents: 7c84e05
Author: Vlad Rozov <[email protected]>
Authored: Mon Mar 14 14:09:09 2016 -0700
Committer: Vlad Rozov <[email protected]>
Committed: Mon Mar 14 14:09:09 2016 -0700

----------------------------------------------------------------------
 .../java/com/datatorrent/stram/engine/GenericNode.java  | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/08957596/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java 
b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
index 3e1f4c0..505fee0 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
@@ -230,7 +230,8 @@ public class GenericNode extends Node<Operator>
   {
     doCheckpoint = false;
 
-    long spinMillis = context.getValue(OperatorContext.SPIN_MILLIS);
+    final long maxSpinMillis = context.getValue(OperatorContext.SPIN_MILLIS);
+    long spinMillis = 0;
     final boolean handleIdleTime = operator instanceof IdleTimeHandler;
     int totalQueues = inputs.size();
     int regularQueues = totalQueues;
@@ -261,6 +262,7 @@ public class GenericNode extends Node<Operator>
           SweepableReservoir activePort = activePortEntry.getValue();
           Tuple t = activePort.sweep();
           if (t != null) {
+            spinMillis = 0;
             boolean delay = (operator instanceof Operator.DelayOperator);
             long windowAhead = 0;
             if (delay) {
@@ -595,12 +597,12 @@ public class GenericNode extends Node<Operator>
         if (activeQueues.isEmpty() && alive) {
           logger.error("Catastrophic Error: Invalid State - the operator 
blocked forever!");
           System.exit(2);
-        }
-        else {
+        } else {
           boolean need2sleep = true;
           for (Map.Entry<String, SweepableReservoir> cb : activeQueues) {
             need2sleep = cb.getValue().isEmpty();
             if (!need2sleep) {
+              spinMillis = 0;
               break;
             }
           }
@@ -608,9 +610,9 @@ public class GenericNode extends Node<Operator>
           if (need2sleep) {
             if (handleIdleTime && insideWindow) {
               ((IdleTimeHandler) operator).handleIdleTime();
-            }
-            else {
+            } else {
               Thread.sleep(spinMillis);
+              spinMillis = Math.min(maxSpinMillis, spinMillis + 1);
             }
           }
         }

Reply via email to