APEXCORE-269 Provide concrete implementation of AbstractReservoir based on 
SpscArrayQueue


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d5c29fea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d5c29fea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d5c29fea

Branch: refs/heads/master
Commit: d5c29fea83028480eb6c70a5208c3219f9925f56
Parents: 7cb6e93
Author: Vlad Rozov <[email protected]>
Authored: Tue Nov 17 15:53:43 2015 -0800
Committer: Vlad Rozov <[email protected]>
Committed: Tue Feb 2 12:52:14 2016 -0800

----------------------------------------------------------------------
 engine/pom.xml                                  |   5 +
 .../java/com/datatorrent/stram/StramClient.java |   3 +-
 .../stram/engine/AbstractReservoir.java         | 447 ++++++++++++++++++-
 .../stram/engine/SweepableReservoir.java        |   2 +-
 .../stram/engine/AbstractReservoirTest.java     | 362 +++++++++++++++
 pom.xml                                         |   6 +
 6 files changed, 817 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d5c29fea/engine/pom.xml
----------------------------------------------------------------------
diff --git a/engine/pom.xml b/engine/pom.xml
index ab9ed37..53376c8 100644
--- a/engine/pom.xml
+++ b/engine/pom.xml
@@ -318,6 +318,11 @@
       <artifactId>xbean-asm5-shaded</artifactId>
       <version>4.3</version>
     </dependency>
+    <dependency>
+      <groupId>org.jctools</groupId>
+      <artifactId>jctools-core</artifactId>
+      <version>1.1</version>
+    </dependency>
   </dependencies>
   <profiles>
     <profile>

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d5c29fea/engine/src/main/java/com/datatorrent/stram/StramClient.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java 
b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index 046a56c..11487ba 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -145,7 +145,8 @@ public class StramClient
       // The jersey client inclusion is only for Hadoop-2.2 and should be 
removed when we upgrade our Hadoop
       // dependency version since Hadoop-2.3 onwards has jersey client bundled
       com.sun.jersey.api.client.ClientHandler.class,
-      com.sun.jersey.client.apache4.ApacheHttpClient4Handler.class
+      com.sun.jersey.client.apache4.ApacheHttpClient4Handler.class,
+      org.jctools.queues.SpscArrayQueue.class
   };
 
   private static final Class<?>[] DATATORRENT_SECURITY_SPECIFIC_CLASSES = new 
Class<?>[]{

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d5c29fea/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java 
b/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java
index 3c9dd48..45521cd 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java
@@ -18,11 +18,16 @@
  */
 package com.datatorrent.stram.engine;
 
+import java.lang.reflect.Constructor;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,24 +36,44 @@ import com.datatorrent.netlet.util.CircularBuffer;
 import com.datatorrent.netlet.util.UnsafeBlockingQueue;
 import com.datatorrent.stram.tuple.Tuple;
 
+import static java.lang.Thread.sleep;
+
 /**
  * Abstract Sweepable Reservoir implementation. Implements all methods of 
{@link SweepableReservoir} except
  * {@link SweepableReservoir#sweep}. Classes that extend {@link 
AbstractReservoir} must implement
- * {@link BlockingQueue<Object>} interface.
+ * {@link BlockingQueue} interface.
  */
 public abstract class AbstractReservoir implements SweepableReservoir, 
BlockingQueue<Object>
 {
   private static final Logger logger = 
LoggerFactory.getLogger(AbstractReservoir.class);
+  static final String reservoirClassNameProperty = 
"com.datatorrent.stram.engine.Reservoir";
+  private static final String reservoirDefaultClassName = 
SpscArrayQueueReservoir.class.getName();
 
   /**
-   * Reservoir factory. Constructs concrete implementation of {@link 
AbstractReservoir}.
+   * Reservoir factory. Constructs concrete implementation of {@link 
AbstractReservoir} based on
+   * {@link AbstractReservoir#reservoirClassNameProperty} property.
    * @param id reservoir identifier
    * @param capacity reservoir capacity
    * @return concrete implementation of {@link AbstractReservoir}
    */
   public static AbstractReservoir newReservoir(final String id, final int 
capacity)
   {
-    return new CircularBufferReservoir(id, capacity);
+    String reservoirClassName = System.getProperty(reservoirClassNameProperty, 
reservoirDefaultClassName);
+    if (reservoirClassName.equals(SpscArrayQueueReservoir.class.getName())) {
+      return new SpscArrayQueueReservoir(id, capacity);
+    } else if 
(reservoirClassName.equals(CircularBufferReservoir.class.getName())) {
+      return new CircularBufferReservoir(id, capacity);
+    } else if 
(reservoirClassName.equals(ArrayBlockingQueueReservoir.class.getName())) {
+      return new ArrayBlockingQueueReservoir(id, capacity);
+    } else {
+      try {
+        final Constructor<?> constructor = 
Class.forName(reservoirClassName).getConstructor(String.class, int.class);
+        return (AbstractReservoir)constructor.newInstance(id, capacity);
+      } catch (ReflectiveOperationException e) {
+        logger.debug("Fail to construct reservoir {}", reservoirClassName, e);
+        throw new RuntimeException("Fail to construct reservoir " + 
reservoirClassName, e);
+      }
+    }
   }
 
   protected Sink<Object> sink;
@@ -117,8 +142,417 @@ public abstract class AbstractReservoir implements 
SweepableReservoir, BlockingQ
   }
 
   /**
-   * CircularBufferReservoir {@link SweepableReservoir} implementation that 
extends AbstractReservoir and delegates
-   * {@link BlockingQueue} implementation to {@link CircularBuffer}. Replaces 
DefaultReservoir class since release 3.3}.
+   * <p>SpscArrayQueueReservoir</p>
+   * {@link SweepableReservoir} implementation that extends AbstractReservoir 
and delegates {@link BlockingQueue}
+   * implementation to {@see <a 
href=http://jctools.github.io/JCTools/>JCTools</a>} SpscArrayQueue.
+   */
+  private static class SpscArrayQueueReservoir extends AbstractReservoir
+  {
+    private final int spinMillis = 10;
+    private final SpscArrayQueue<Object> queue;
+
+    private SpscArrayQueueReservoir(final String id, final int capacity)
+    {
+      super(id);
+      queue = new SpscArrayQueue<>(capacity);
+    }
+
+    @Override
+    public Tuple sweep()
+    {
+      Object o;
+      while ((o = queue.peek()) != null) {
+        if (o instanceof Tuple) {
+          return (Tuple)o;
+        }
+        count++;
+        sink.put(queue.poll());
+      }
+      return null;
+    }
+
+    @Override
+    public boolean add(Object e)
+    {
+      return queue.add(e);
+    }
+
+    @Override
+    public Object remove()
+    {
+      return queue.remove();
+    }
+
+    @Override
+    public Object peek()
+    {
+      return queue.peek();
+    }
+
+    @Override
+    public int size(final boolean dataTupleAware)
+    {
+      return queue.size();
+    }
+
+    @Override
+    public int capacity()
+    {
+      return queue.capacity();
+    }
+
+    @Override
+    public int drainTo(final Collection<? super Object> container)
+    {
+      return queue.drain(new MessagePassingQueue.Consumer<Object>()
+      {
+        @Override
+        public void accept(Object e)
+        {
+          container.add(e);
+        }
+      });
+    }
+
+    @Override
+    public boolean offer(Object e)
+    {
+      return queue.offer(e);
+    }
+
+    @Override
+    public void put(Object e) throws InterruptedException
+    {
+      while (!queue.offer(e)) {
+        sleep(spinMillis);
+      }
+    }
+
+    @Override
+    public boolean offer(Object e, long timeout, TimeUnit unit) throws 
InterruptedException
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Object take() throws InterruptedException
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Object poll(long timeout, TimeUnit unit) throws InterruptedException
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int remainingCapacity()
+    {
+      return queue.capacity() - queue.size();
+    }
+
+    @Override
+    public boolean remove(Object o)
+    {
+      return queue.remove(o);
+    }
+
+    @Override
+    public boolean contains(Object o)
+    {
+      return queue.contains(o);
+    }
+
+    @Override
+    public int drainTo(final Collection<? super Object> collection, int 
maxElements)
+    {
+      return queue.drain(new MessagePassingQueue.Consumer<Object>()
+      {
+        @Override
+        public void accept(Object e)
+        {
+          collection.add(e);
+        }
+      }, maxElements);
+    }
+
+    @Override
+    public Object poll()
+    {
+      return queue.poll();
+    }
+
+    @Override
+    public Object element()
+    {
+      return queue.element();
+    }
+
+    @Override
+    public boolean isEmpty()
+    {
+      return queue.peek() == null;
+    }
+
+    @Override
+    public Iterator<Object> iterator()
+    {
+      return queue.iterator();
+    }
+
+    @Override
+    public Object[] toArray()
+    {
+      return queue.toArray();
+    }
+
+    @Override
+    public <T> T[] toArray(T[] a)
+    {
+      return queue.toArray(a);
+    }
+
+    @Override
+    public boolean containsAll(Collection<?> c)
+    {
+      return queue.containsAll(c);
+    }
+
+    @Override
+    public boolean addAll(Collection<?> c)
+    {
+      return queue.addAll(c);
+    }
+
+    @Override
+    public boolean removeAll(Collection<?> c)
+    {
+      return queue.removeAll(c);
+    }
+
+    @Override
+    public boolean retainAll(Collection<?> c)
+    {
+      return queue.retainAll(c);
+    }
+
+    @Override
+    public int size()
+    {
+      return queue.size();
+    }
+
+    @Override
+    public void clear()
+    {
+      queue.clear();
+    }
+
+  }
+
+  /**
+   * <p>ArrayBlockingQueueReservoir</p>
+   * {@link SweepableReservoir} implementation that extends AbstractReservoir 
and delegates {@link BlockingQueue}
+   * implementation to {@link ArrayBlockingQueue}.
+   */
+  private static class ArrayBlockingQueueReservoir extends AbstractReservoir
+  {
+    private final ArrayBlockingQueue<Object> queue;
+
+    private ArrayBlockingQueueReservoir(final String id, final int capacity)
+    {
+      super(id);
+      queue = new ArrayBlockingQueue<>(capacity);
+    }
+
+    @Override
+    public Tuple sweep()
+    {
+      Object o;
+      while ((o = queue.peek()) != null) {
+        if (o instanceof Tuple) {
+          return (Tuple)o;
+        }
+        count++;
+        sink.put(queue.poll());
+      }
+      return null;
+    }
+
+    @Override
+    public boolean add(Object o)
+    {
+      return queue.add(o);
+    }
+
+    @Override
+    public boolean offer(Object o)
+    {
+      return queue.offer(o);
+    }
+
+    @Override
+    public void put(Object o) throws InterruptedException
+    {
+      queue.put(o);
+    }
+
+    @Override
+    public boolean offer(Object o, long timeout, TimeUnit unit) throws 
InterruptedException
+    {
+      return queue.offer(o, timeout, unit);
+    }
+
+    @Override
+    public Object poll()
+    {
+      return queue.poll();
+    }
+
+    @Override
+    public Object take() throws InterruptedException
+    {
+      return queue.take();
+    }
+
+    @Override
+    public Object poll(long timeout, TimeUnit unit) throws InterruptedException
+    {
+      return queue.poll(timeout, unit);
+    }
+
+    @Override
+    public Object peek()
+    {
+      return queue.peek();
+    }
+
+    @Override
+    public int size()
+    {
+      return queue.size();
+    }
+
+    @Override
+    public int size(final boolean dataTupleAware)
+    {
+      return queue.size();
+    }
+
+    @Override
+    public int capacity()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int remainingCapacity()
+    {
+      return queue.remainingCapacity();
+    }
+
+    @Override
+    public boolean remove(Object o)
+    {
+      return queue.remove(o);
+    }
+
+    @Override
+    public boolean contains(Object o)
+    {
+      return queue.contains(o);
+    }
+
+    @Override
+    public Object[] toArray()
+    {
+      return queue.toArray();
+    }
+
+    @Override
+    public <T> T[] toArray(T[] a)
+    {
+      return queue.toArray(a);
+    }
+
+    @Override
+    public String toString()
+    {
+      return queue.toString();
+    }
+
+    @Override
+    public void clear()
+    {
+      queue.clear();
+    }
+
+    @Override
+    public int drainTo(Collection<? super Object> c)
+    {
+      return queue.drainTo(c);
+    }
+
+    @Override
+    public int drainTo(Collection<? super Object> c, int maxElements)
+    {
+      return queue.drainTo(c, maxElements);
+    }
+
+    @Override
+    public Iterator<Object> iterator()
+    {
+      return queue.iterator();
+    }
+
+    @Override
+    public Object remove()
+    {
+      return queue.remove();
+    }
+
+    @Override
+    public Object element()
+    {
+      return queue.element();
+    }
+
+    @Override
+    public boolean addAll(Collection<?> c)
+    {
+      return queue.addAll(c);
+    }
+
+    @Override
+    public boolean isEmpty()
+    {
+      return queue.isEmpty();
+    }
+
+    @Override
+    public boolean containsAll(Collection<?> c)
+    {
+      return queue.containsAll(c);
+    }
+
+    @Override
+    public boolean removeAll(Collection<?> c)
+    {
+      return queue.removeAll(c);
+    }
+
+    @Override
+    public boolean retainAll(Collection<?> c)
+    {
+      return queue.retainAll(c);
+    }
+  }
+
+  /**
+   * <p>CircularBufferReservoir</p>
+   * {@link SweepableReservoir} implementation that extends AbstractReservoir 
and delegates {@link BlockingQueue}
+   * implementation to {@code CircularBuffer}. Replaces DefaultReservoir class 
since release 3.3}.
    *
    * @since 0.3.2
    */
@@ -324,7 +758,8 @@ public abstract class AbstractReservoir implements 
SweepableReservoir, BlockingQ
     }
 
     @Override
-    public int size() {
+    public int size()
+    {
       return circularBuffer.size();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d5c29fea/engine/src/main/java/com/datatorrent/stram/engine/SweepableReservoir.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/engine/SweepableReservoir.java 
b/engine/src/main/java/com/datatorrent/stram/engine/SweepableReservoir.java
index 3db564f..8b0e90d 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/SweepableReservoir.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/SweepableReservoir.java
@@ -18,8 +18,8 @@
  */
 package com.datatorrent.stram.engine;
 
-import com.datatorrent.stram.tuple.Tuple;
 import com.datatorrent.api.Sink;
+import com.datatorrent.stram.tuple.Tuple;
 
 /**
  * <p>SweepableReservoir interface.</p>

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d5c29fea/engine/src/test/java/com/datatorrent/stram/engine/AbstractReservoirTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/engine/AbstractReservoirTest.java 
b/engine/src/test/java/com/datatorrent/stram/engine/AbstractReservoirTest.java
new file mode 100644
index 0000000..7f83036
--- /dev/null
+++ 
b/engine/src/test/java/com/datatorrent/stram/engine/AbstractReservoirTest.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.engine;
+
+import java.util.ArrayList;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.jctools.queues.SpscArrayQueue;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Sink;
+import com.datatorrent.netlet.util.CircularBuffer;
+import com.datatorrent.stram.tuple.Tuple;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+
+import static com.datatorrent.bufferserver.packet.MessageType.BEGIN_WINDOW;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(JUnitParamsRunner.class)
+public class AbstractReservoirTest
+{
+  private static final Logger logger = 
LoggerFactory.getLogger(AbstractReservoirTest.class);
+
+  private static final String countPropertyName = 
"com.datatorrent.stram.engine.AbstractReservoirTest.count";
+  private static final String capacityPropertyName = 
"com.datatorrent.stram.engine.AbstractReservoirTest.capacity";
+  private static final int COUNT = Integer.getInteger(countPropertyName, 
10000000);
+  private static final int CAPACITY = Integer.getInteger(capacityPropertyName, 
1 << 19);
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  private static AbstractReservoir newReservoir(final String 
reservoirClassName, final int capacity)
+  {
+    if (reservoirClassName == null) {
+      System.clearProperty(AbstractReservoir.reservoirClassNameProperty);
+    } else {
+      System.setProperty(AbstractReservoir.reservoirClassNameProperty, 
reservoirClassName);
+    }
+    final String id = reservoirClassName == null ? "DefaultReservoir" : 
reservoirClassName;
+    final AbstractReservoir reservoir = AbstractReservoir.newReservoir(id, 
capacity);
+    return reservoir;
+  }
+
+  private static void setSink(final AbstractReservoir reservoir, final 
Sink<Object> sink)
+  {
+    assertNull(reservoir.setSink(sink));
+    assertEquals(sink, reservoir.sink);
+  }
+
+  @SuppressWarnings("unused")
+  private Object defaultTestParameters()
+  {
+    Object[][] defaultTestParameters = new Object[][] {
+        {null, NoSuchElementException.class},
+        
{"com.datatorrent.stram.engine.AbstractReservoir$SpscArrayQueueReservoir", 
NoSuchElementException.class},
+        
{"com.datatorrent.stram.engine.AbstractReservoir$ArrayBlockingQueueReservoir", 
NoSuchElementException.class},
+        
{"com.datatorrent.stram.engine.AbstractReservoir$CircularBufferReservoir", 
IllegalStateException.class}
+    };
+
+    for (Object[] o: defaultTestParameters) {
+      o[0] = newReservoir((String)o[0], 2);
+      final Sink<Object> sink = new Sink<Object>()
+      {
+        final ArrayList<Object> sink = new ArrayList<>();
+        int count;
+        @Override
+        public void put(Object tuple)
+        {
+          sink.add(tuple);
+          count++;
+        }
+
+        @Override
+        public int getCount(boolean reset)
+        {
+          try {
+            return count;
+          } finally {
+            if (reset) {
+              count = 0;
+            }
+          }
+        }
+      };
+      setSink((AbstractReservoir)o[0], sink);
+    }
+    return defaultTestParameters;
+  }
+
+  @SuppressWarnings("unused")
+  private Object performanceTestParameters()
+  {
+    Object[][] performanceTestParameters = new Object[][] {
+        {null, 500},
+        
{"com.datatorrent.stram.engine.AbstractReservoir$SpscArrayQueueReservoir", 500},
+        
{"com.datatorrent.stram.engine.AbstractReservoir$ArrayBlockingQueueReservoir", 
6000},
+        
{"com.datatorrent.stram.engine.AbstractReservoir$CircularBufferReservoir", 2000}
+    };
+    for (Object[] o : performanceTestParameters) {
+      o[0] = newReservoir((String)o[0], CAPACITY);
+      final Sink<Object> sink = new Sink<Object>()
+      {
+        private int count = 0;
+        @Override
+        public void put(Object tuple)
+        {
+          count++;
+        }
+
+        @Override
+        public int getCount(boolean reset)
+        {
+          return count;
+        }
+      };
+      setSink((AbstractReservoir)o[0], sink);
+    }
+    return performanceTestParameters;
+  }
+
+  @Test
+  @Parameters(method = "defaultTestParameters")
+  public void testEmpty(final AbstractReservoir reservoir, final Class<? 
extends Throwable> type)
+  {
+    assertTrue(reservoir.isEmpty());
+    assertEquals(0, reservoir.size());
+    assertEquals(0, reservoir.size(true));
+    assertEquals(0, reservoir.size(false));
+    assertNull(reservoir.sweep());
+    exception.expect(type);
+    reservoir.remove();
+  }
+
+  @Test
+  @Parameters(method = "defaultTestParameters")
+  public void testAddAndSweepObject(final AbstractReservoir reservoir, final 
Class<? extends Throwable> type)
+  {
+    final Object o = new Integer(0);
+    assertTrue(reservoir.add(o));
+    assertFalse(reservoir.isEmpty());
+    assertEquals(1, reservoir.size());
+    assertEquals(1, reservoir.size(false));
+    assertEquals(0, reservoir.getCount(false));
+    assertEquals(o, reservoir.peek());
+    assertNull(reservoir.sweep());
+    assertEquals(1, reservoir.getCount(false));
+    assertEquals(1, reservoir.sink.getCount(false));
+    assertTrue(reservoir.isEmpty());
+    assertEquals(0, reservoir.size());
+    assertEquals(0, reservoir.size(false));
+    exception.expect(type);
+    reservoir.remove();
+  }
+
+  @Test
+  @Parameters(method = "defaultTestParameters")
+  public void testAddAndSweepTuple(final AbstractReservoir reservoir, final 
Class<? extends Throwable> type)
+  {
+    final Tuple t = new Tuple(BEGIN_WINDOW, 0L);
+    assertTrue(reservoir.add(t));
+    assertFalse(reservoir.isEmpty());
+    assertEquals(1, reservoir.size());
+    assertEquals(1, reservoir.size(false));
+    assertEquals(t, reservoir.peek());
+    assertEquals(t, reservoir.sweep());
+    assertEquals(t, reservoir.sweep());
+    assertEquals(0, reservoir.getCount(false));
+    assertEquals(0, reservoir.sink.getCount(false));
+    assertFalse(reservoir.isEmpty());
+    assertEquals(t, reservoir.remove());
+    assertNull(reservoir.peek());
+    assertNull(reservoir.poll());
+    assertNull(reservoir.sweep());
+    exception.expect(type);
+    reservoir.remove();
+  }
+
+  @Test
+  @Parameters(method = "defaultTestParameters")
+  public void testFullReservoir(final AbstractReservoir reservoir, final 
Class<? extends Throwable> type)
+  {
+    int capacity = reservoir.remainingCapacity();
+    assertTrue(capacity > 0);
+    final Object o = new Integer(0);
+    for (int i = 0; i < capacity; i++) {
+      assertTrue(reservoir.offer(o));
+    }
+    assertFalse(reservoir.offer(o));
+    exception.expect(IllegalStateException.class);
+    reservoir.add(o);
+  }
+
+  @Test
+  @Parameters(method = "performanceTestParameters")
+  public void performanceTest(final AbstractReservoir reservoir, final long 
expectedTime)
+  {
+    int maxQueueSize = 0;
+
+    final long start = System.currentTimeMillis();
+
+    new Thread(new Runnable()
+    {
+      @Override
+      public void run()
+      {
+        final Object o = new Byte[128];
+        try {
+          for (int i = 0; i < COUNT; i++) {
+            reservoir.put(o);
+          }
+        } catch (InterruptedException e) {
+          logger.debug("Interrupted", e);
+        }
+      }
+    }).start();
+
+    while (reservoir.sink.getCount(false) < COUNT) {
+      maxQueueSize = Math.max(maxQueueSize, reservoir.size(false));
+      reservoir.sweep();
+    }
+
+    long time = System.currentTimeMillis() - start;
+    logger.debug("{}: time {}, max queue size {}", reservoir.getId(), time, 
maxQueueSize);
+    assertTrue("Expected to complete within " + expectedTime + "millis. Actual 
time " + time + " milis",
+        expectedTime > time);
+
+  }
+
+  @Test
+  public void testBlockingQueuePerformance()
+  {
+    final ArrayBlockingQueue<Object> blockingQueue = new 
ArrayBlockingQueue<>(CAPACITY);
+    long start = System.currentTimeMillis();
+    new Thread(
+        new Runnable()
+        {
+          @Override
+          public void run()
+          {
+            final Object o = new Byte[128];
+            try {
+              for (int i = 0; i < COUNT; i++) {
+                blockingQueue.put(o);
+              }
+            } catch (InterruptedException e) {
+              e.printStackTrace();
+            }
+          }
+        }
+    ).start();
+
+    try {
+      for (int i = 0; i < COUNT; i++) {
+        blockingQueue.take();
+      }
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    logger.debug("Time {}", System.currentTimeMillis() - start);
+  }
+
+  @Test
+  public void testSpscQueuePerformance()
+  {
+    final SpscArrayQueue<Object> spscArrayQueue = new 
SpscArrayQueue<>(CAPACITY);
+    long start = System.currentTimeMillis();
+    new Thread(
+        new Runnable()
+        {
+          @Override
+          public void run()
+          {
+            final Object o = new Byte[128];
+            try {
+              for (int i = 0; i < COUNT; i++) {
+                while (!spscArrayQueue.offer(o)) {
+                  Thread.sleep(10);
+                }
+              }
+            } catch (InterruptedException e) {
+              e.printStackTrace();
+            }
+          }
+        }
+    ).start();
+
+    try {
+      for (int i = 0; i < COUNT; i++) {
+        while (spscArrayQueue.poll() == null) {
+          Thread.sleep(10);
+        }
+      }
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    logger.debug("Time {}", System.currentTimeMillis() - start);
+  }
+
+  @Test
+  public void testCircularBufferPerformance()
+  {
+    final CircularBuffer<Object> circularBuffer = new 
CircularBuffer<>(CAPACITY);
+    long start = System.currentTimeMillis();
+    new Thread(
+        new Runnable()
+        {
+          @Override
+          public void run()
+          {
+            final Object o = new Byte[128];
+            try {
+              for (int i = 0; i < COUNT; i++) {
+                circularBuffer.put(o);
+              }
+            } catch (InterruptedException e) {
+              e.printStackTrace();
+            }
+          }
+        }
+    ).start();
+
+    try {
+      for (int i = 0; i < COUNT; i++) {
+        circularBuffer.take();
+      }
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    logger.debug("Time {}", System.currentTimeMillis() - start);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d5c29fea/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c5432ae..37a0d75 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,6 +77,12 @@
       <version>4.11</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>pl.pragmatists</groupId>
+      <artifactId>JUnitParams</artifactId>
+      <version>1.0.4</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <modules>

Reply via email to