Repository: incubator-apex-core
Updated Branches:
  refs/heads/master 84ddf1803 -> 3ade9baee


APEXCORE-384 - For smaller InlineStream port queue size use 
ArrayBlockingQueueReservoir as default


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/a6fd80ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/a6fd80ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/a6fd80ef

Branch: refs/heads/master
Commit: a6fd80efbe4730aea233eccddc9061af953f24ac
Parents: 7c84e05
Author: Vlad Rozov <[email protected]>
Authored: Mon Mar 14 16:59:27 2016 -0700
Committer: Vlad Rozov <[email protected]>
Committed: Mon Mar 14 17:53:19 2016 -0700

----------------------------------------------------------------------
 .../com/datatorrent/stram/engine/AbstractReservoir.java | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/a6fd80ef/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java 
b/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java
index 45521cd..58578a1 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java
@@ -47,7 +47,7 @@ public abstract class AbstractReservoir implements 
SweepableReservoir, BlockingQ
 {
   private static final Logger logger = 
LoggerFactory.getLogger(AbstractReservoir.class);
   static final String reservoirClassNameProperty = 
"com.datatorrent.stram.engine.Reservoir";
-  private static final String reservoirDefaultClassName = 
SpscArrayQueueReservoir.class.getName();
+  private static final int USE_SPSC_CAPACITY = 8 * 1024;
 
   /**
    * Reservoir factory. Constructs concrete implementation of {@link 
AbstractReservoir} based on
@@ -58,8 +58,14 @@ public abstract class AbstractReservoir implements 
SweepableReservoir, BlockingQ
    */
   public static AbstractReservoir newReservoir(final String id, final int 
capacity)
   {
-    String reservoirClassName = System.getProperty(reservoirClassNameProperty, 
reservoirDefaultClassName);
-    if (reservoirClassName.equals(SpscArrayQueueReservoir.class.getName())) {
+    String reservoirClassName = System.getProperty(reservoirClassNameProperty);
+    if (reservoirClassName == null) {
+      if (capacity >=  USE_SPSC_CAPACITY) {
+        return new SpscArrayQueueReservoir(id, capacity);
+      } else {
+        return new ArrayBlockingQueueReservoir(id, capacity);
+      }
+    } else if 
(reservoirClassName.equals(SpscArrayQueueReservoir.class.getName())) {
       return new SpscArrayQueueReservoir(id, capacity);
     } else if 
(reservoirClassName.equals(CircularBufferReservoir.class.getName())) {
       return new CircularBufferReservoir(id, capacity);

Reply via email to