Repository: incubator-apex-core
Updated Branches:
  refs/heads/master 623227ea3 -> 84ddf1803


APEXCORE-383 - Time to sleep while reservoirs are full should increase from 0 
to a configurable max value


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/9fd27745
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/9fd27745
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/9fd27745

Branch: refs/heads/master
Commit: 9fd27745a1ae74c975eae8af1d7a3ad037b22939
Parents: 7c84e05
Author: Vlad Rozov <[email protected]>
Authored: Mon Mar 14 17:43:55 2016 -0700
Committer: Vlad Rozov <[email protected]>
Committed: Mon Mar 14 17:43:55 2016 -0700

----------------------------------------------------------------------
 .../java/com/datatorrent/stram/engine/AbstractReservoir.java     | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9fd27745/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java 
b/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java
index 45521cd..fb46e79 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java
@@ -148,7 +148,7 @@ public abstract class AbstractReservoir implements 
SweepableReservoir, BlockingQ
    */
   private static class SpscArrayQueueReservoir extends AbstractReservoir
   {
-    private final int spinMillis = 10;
+    private final int maxSpinMillis = 10;
     private final SpscArrayQueue<Object> queue;
 
     private SpscArrayQueueReservoir(final String id, final int capacity)
@@ -223,8 +223,10 @@ public abstract class AbstractReservoir implements 
SweepableReservoir, BlockingQ
     @Override
     public void put(Object e) throws InterruptedException
     {
+      long spinMillis = 0;
       while (!queue.offer(e)) {
         sleep(spinMillis);
+        spinMillis = Math.min(maxSpinMillis, spinMillis + 1);
       }
     }
 

Reply via email to