Repository: incubator-apex-core Updated Branches: refs/heads/master 623227ea3 -> 84ddf1803
APEXCORE-383 - Time to sleep while reservoirs are full should increase from 0 to a configurable max value Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/9fd27745 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/9fd27745 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/9fd27745 Branch: refs/heads/master Commit: 9fd27745a1ae74c975eae8af1d7a3ad037b22939 Parents: 7c84e05 Author: Vlad Rozov <[email protected]> Authored: Mon Mar 14 17:43:55 2016 -0700 Committer: Vlad Rozov <[email protected]> Committed: Mon Mar 14 17:43:55 2016 -0700 ---------------------------------------------------------------------- .../java/com/datatorrent/stram/engine/AbstractReservoir.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9fd27745/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java b/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java index 45521cd..fb46e79 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java @@ -148,7 +148,7 @@ public abstract class AbstractReservoir implements SweepableReservoir, BlockingQ */ private static class SpscArrayQueueReservoir extends AbstractReservoir { - private final int spinMillis = 10; + private final int maxSpinMillis = 10; private final SpscArrayQueue<Object> queue; private SpscArrayQueueReservoir(final String id, final int capacity) @@ -223,8 +223,10 @@ public abstract class AbstractReservoir implements SweepableReservoir, BlockingQ @Override public void put(Object e) throws InterruptedException { + long spinMillis = 0; while (!queue.offer(e)) { sleep(spinMillis); + spinMillis = Math.min(maxSpinMillis, spinMillis + 1); } }
