Repository: incubator-apex-core Updated Branches: refs/heads/master 84ddf1803 -> 3ade9baee
APEXCORE-384 - For smaller InlineStream port queue size use ArrayBlockingQueueReservoir as default Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/a6fd80ef Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/a6fd80ef Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/a6fd80ef Branch: refs/heads/master Commit: a6fd80efbe4730aea233eccddc9061af953f24ac Parents: 7c84e05 Author: Vlad Rozov <[email protected]> Authored: Mon Mar 14 16:59:27 2016 -0700 Committer: Vlad Rozov <[email protected]> Committed: Mon Mar 14 17:53:19 2016 -0700 ---------------------------------------------------------------------- .../com/datatorrent/stram/engine/AbstractReservoir.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/a6fd80ef/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java b/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java index 45521cd..58578a1 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java @@ -47,7 +47,7 @@ public abstract class AbstractReservoir implements SweepableReservoir, BlockingQ { private static final Logger logger = LoggerFactory.getLogger(AbstractReservoir.class); static final String reservoirClassNameProperty = "com.datatorrent.stram.engine.Reservoir"; - private static final String reservoirDefaultClassName = SpscArrayQueueReservoir.class.getName(); + private static final int USE_SPSC_CAPACITY = 8 * 1024; /** * Reservoir factory. Constructs concrete implementation of {@link AbstractReservoir} based on @@ -58,8 +58,14 @@ public abstract class AbstractReservoir implements SweepableReservoir, BlockingQ */ public static AbstractReservoir newReservoir(final String id, final int capacity) { - String reservoirClassName = System.getProperty(reservoirClassNameProperty, reservoirDefaultClassName); - if (reservoirClassName.equals(SpscArrayQueueReservoir.class.getName())) { + String reservoirClassName = System.getProperty(reservoirClassNameProperty); + if (reservoirClassName == null) { + if (capacity >= USE_SPSC_CAPACITY) { + return new SpscArrayQueueReservoir(id, capacity); + } else { + return new ArrayBlockingQueueReservoir(id, capacity); + } + } else if (reservoirClassName.equals(SpscArrayQueueReservoir.class.getName())) { return new SpscArrayQueueReservoir(id, capacity); } else if (reservoirClassName.equals(CircularBufferReservoir.class.getName())) { return new CircularBufferReservoir(id, capacity);
