APEXCORE-269 Provide concrete implementation of AbstractReservoir based on SpscArrayQueue
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d5c29fea Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d5c29fea Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d5c29fea Branch: refs/heads/master Commit: d5c29fea83028480eb6c70a5208c3219f9925f56 Parents: 7cb6e93 Author: Vlad Rozov <[email protected]> Authored: Tue Nov 17 15:53:43 2015 -0800 Committer: Vlad Rozov <[email protected]> Committed: Tue Feb 2 12:52:14 2016 -0800 ---------------------------------------------------------------------- engine/pom.xml | 5 + .../java/com/datatorrent/stram/StramClient.java | 3 +- .../stram/engine/AbstractReservoir.java | 447 ++++++++++++++++++- .../stram/engine/SweepableReservoir.java | 2 +- .../stram/engine/AbstractReservoirTest.java | 362 +++++++++++++++ pom.xml | 6 + 6 files changed, 817 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d5c29fea/engine/pom.xml ---------------------------------------------------------------------- diff --git a/engine/pom.xml b/engine/pom.xml index ab9ed37..53376c8 100644 --- a/engine/pom.xml +++ b/engine/pom.xml @@ -318,6 +318,11 @@ <artifactId>xbean-asm5-shaded</artifactId> <version>4.3</version> </dependency> + <dependency> + <groupId>org.jctools</groupId> + <artifactId>jctools-core</artifactId> + <version>1.1</version> + </dependency> </dependencies> <profiles> <profile> http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d5c29fea/engine/src/main/java/com/datatorrent/stram/StramClient.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java index 046a56c..11487ba 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramClient.java +++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java @@ -145,7 +145,8 @@ public class StramClient // The jersey client inclusion is only for Hadoop-2.2 and should be removed when we upgrade our Hadoop // dependency version since Hadoop-2.3 onwards has jersey client bundled com.sun.jersey.api.client.ClientHandler.class, - com.sun.jersey.client.apache4.ApacheHttpClient4Handler.class + com.sun.jersey.client.apache4.ApacheHttpClient4Handler.class, + org.jctools.queues.SpscArrayQueue.class }; private static final Class<?>[] DATATORRENT_SECURITY_SPECIFIC_CLASSES = new Class<?>[]{ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d5c29fea/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java b/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java index 3c9dd48..45521cd 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java @@ -18,11 +18,16 @@ */ package com.datatorrent.stram.engine; +import java.lang.reflect.Constructor; import java.util.Collection; import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.SpscArrayQueue; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,24 +36,44 @@ import com.datatorrent.netlet.util.CircularBuffer; import com.datatorrent.netlet.util.UnsafeBlockingQueue; import com.datatorrent.stram.tuple.Tuple; +import static java.lang.Thread.sleep; + /** * Abstract Sweepable Reservoir implementation. Implements all methods of {@link SweepableReservoir} except * {@link SweepableReservoir#sweep}. Classes that extend {@link AbstractReservoir} must implement - * {@link BlockingQueue<Object>} interface. + * {@link BlockingQueue} interface. */ public abstract class AbstractReservoir implements SweepableReservoir, BlockingQueue<Object> { private static final Logger logger = LoggerFactory.getLogger(AbstractReservoir.class); + static final String reservoirClassNameProperty = "com.datatorrent.stram.engine.Reservoir"; + private static final String reservoirDefaultClassName = SpscArrayQueueReservoir.class.getName(); /** - * Reservoir factory. Constructs concrete implementation of {@link AbstractReservoir}. + * Reservoir factory. Constructs concrete implementation of {@link AbstractReservoir} based on + * {@link AbstractReservoir#reservoirClassNameProperty} property. * @param id reservoir identifier * @param capacity reservoir capacity * @return concrete implementation of {@link AbstractReservoir} */ public static AbstractReservoir newReservoir(final String id, final int capacity) { - return new CircularBufferReservoir(id, capacity); + String reservoirClassName = System.getProperty(reservoirClassNameProperty, reservoirDefaultClassName); + if (reservoirClassName.equals(SpscArrayQueueReservoir.class.getName())) { + return new SpscArrayQueueReservoir(id, capacity); + } else if (reservoirClassName.equals(CircularBufferReservoir.class.getName())) { + return new CircularBufferReservoir(id, capacity); + } else if (reservoirClassName.equals(ArrayBlockingQueueReservoir.class.getName())) { + return new ArrayBlockingQueueReservoir(id, capacity); + } else { + try { + final Constructor<?> constructor = Class.forName(reservoirClassName).getConstructor(String.class, int.class); + return (AbstractReservoir)constructor.newInstance(id, capacity); + } catch (ReflectiveOperationException e) { + logger.debug("Fail to construct reservoir {}", reservoirClassName, e); + throw new RuntimeException("Fail to construct reservoir " + reservoirClassName, e); + } + } } protected Sink<Object> sink; @@ -117,8 +142,417 @@ public abstract class AbstractReservoir implements SweepableReservoir, BlockingQ } /** - * CircularBufferReservoir {@link SweepableReservoir} implementation that extends AbstractReservoir and delegates - * {@link BlockingQueue} implementation to {@link CircularBuffer}. Replaces DefaultReservoir class since release 3.3}. + * <p>SpscArrayQueueReservoir</p> + * {@link SweepableReservoir} implementation that extends AbstractReservoir and delegates {@link BlockingQueue} + * implementation to {@see <a href=http://jctools.github.io/JCTools/>JCTools</a>} SpscArrayQueue. + */ + private static class SpscArrayQueueReservoir extends AbstractReservoir + { + private final int spinMillis = 10; + private final SpscArrayQueue<Object> queue; + + private SpscArrayQueueReservoir(final String id, final int capacity) + { + super(id); + queue = new SpscArrayQueue<>(capacity); + } + + @Override + public Tuple sweep() + { + Object o; + while ((o = queue.peek()) != null) { + if (o instanceof Tuple) { + return (Tuple)o; + } + count++; + sink.put(queue.poll()); + } + return null; + } + + @Override + public boolean add(Object e) + { + return queue.add(e); + } + + @Override + public Object remove() + { + return queue.remove(); + } + + @Override + public Object peek() + { + return queue.peek(); + } + + @Override + public int size(final boolean dataTupleAware) + { + return queue.size(); + } + + @Override + public int capacity() + { + return queue.capacity(); + } + + @Override + public int drainTo(final Collection<? super Object> container) + { + return queue.drain(new MessagePassingQueue.Consumer<Object>() + { + @Override + public void accept(Object e) + { + container.add(e); + } + }); + } + + @Override + public boolean offer(Object e) + { + return queue.offer(e); + } + + @Override + public void put(Object e) throws InterruptedException + { + while (!queue.offer(e)) { + sleep(spinMillis); + } + } + + @Override + public boolean offer(Object e, long timeout, TimeUnit unit) throws InterruptedException + { + throw new UnsupportedOperationException(); + } + + @Override + public Object take() throws InterruptedException + { + throw new UnsupportedOperationException(); + } + + @Override + public Object poll(long timeout, TimeUnit unit) throws InterruptedException + { + throw new UnsupportedOperationException(); + } + + @Override + public int remainingCapacity() + { + return queue.capacity() - queue.size(); + } + + @Override + public boolean remove(Object o) + { + return queue.remove(o); + } + + @Override + public boolean contains(Object o) + { + return queue.contains(o); + } + + @Override + public int drainTo(final Collection<? super Object> collection, int maxElements) + { + return queue.drain(new MessagePassingQueue.Consumer<Object>() + { + @Override + public void accept(Object e) + { + collection.add(e); + } + }, maxElements); + } + + @Override + public Object poll() + { + return queue.poll(); + } + + @Override + public Object element() + { + return queue.element(); + } + + @Override + public boolean isEmpty() + { + return queue.peek() == null; + } + + @Override + public Iterator<Object> iterator() + { + return queue.iterator(); + } + + @Override + public Object[] toArray() + { + return queue.toArray(); + } + + @Override + public <T> T[] toArray(T[] a) + { + return queue.toArray(a); + } + + @Override + public boolean containsAll(Collection<?> c) + { + return queue.containsAll(c); + } + + @Override + public boolean addAll(Collection<?> c) + { + return queue.addAll(c); + } + + @Override + public boolean removeAll(Collection<?> c) + { + return queue.removeAll(c); + } + + @Override + public boolean retainAll(Collection<?> c) + { + return queue.retainAll(c); + } + + @Override + public int size() + { + return queue.size(); + } + + @Override + public void clear() + { + queue.clear(); + } + + } + + /** + * <p>ArrayBlockingQueueReservoir</p> + * {@link SweepableReservoir} implementation that extends AbstractReservoir and delegates {@link BlockingQueue} + * implementation to {@link ArrayBlockingQueue}. + */ + private static class ArrayBlockingQueueReservoir extends AbstractReservoir + { + private final ArrayBlockingQueue<Object> queue; + + private ArrayBlockingQueueReservoir(final String id, final int capacity) + { + super(id); + queue = new ArrayBlockingQueue<>(capacity); + } + + @Override + public Tuple sweep() + { + Object o; + while ((o = queue.peek()) != null) { + if (o instanceof Tuple) { + return (Tuple)o; + } + count++; + sink.put(queue.poll()); + } + return null; + } + + @Override + public boolean add(Object o) + { + return queue.add(o); + } + + @Override + public boolean offer(Object o) + { + return queue.offer(o); + } + + @Override + public void put(Object o) throws InterruptedException + { + queue.put(o); + } + + @Override + public boolean offer(Object o, long timeout, TimeUnit unit) throws InterruptedException + { + return queue.offer(o, timeout, unit); + } + + @Override + public Object poll() + { + return queue.poll(); + } + + @Override + public Object take() throws InterruptedException + { + return queue.take(); + } + + @Override + public Object poll(long timeout, TimeUnit unit) throws InterruptedException + { + return queue.poll(timeout, unit); + } + + @Override + public Object peek() + { + return queue.peek(); + } + + @Override + public int size() + { + return queue.size(); + } + + @Override + public int size(final boolean dataTupleAware) + { + return queue.size(); + } + + @Override + public int capacity() + { + throw new UnsupportedOperationException(); + } + + @Override + public int remainingCapacity() + { + return queue.remainingCapacity(); + } + + @Override + public boolean remove(Object o) + { + return queue.remove(o); + } + + @Override + public boolean contains(Object o) + { + return queue.contains(o); + } + + @Override + public Object[] toArray() + { + return queue.toArray(); + } + + @Override + public <T> T[] toArray(T[] a) + { + return queue.toArray(a); + } + + @Override + public String toString() + { + return queue.toString(); + } + + @Override + public void clear() + { + queue.clear(); + } + + @Override + public int drainTo(Collection<? super Object> c) + { + return queue.drainTo(c); + } + + @Override + public int drainTo(Collection<? super Object> c, int maxElements) + { + return queue.drainTo(c, maxElements); + } + + @Override + public Iterator<Object> iterator() + { + return queue.iterator(); + } + + @Override + public Object remove() + { + return queue.remove(); + } + + @Override + public Object element() + { + return queue.element(); + } + + @Override + public boolean addAll(Collection<?> c) + { + return queue.addAll(c); + } + + @Override + public boolean isEmpty() + { + return queue.isEmpty(); + } + + @Override + public boolean containsAll(Collection<?> c) + { + return queue.containsAll(c); + } + + @Override + public boolean removeAll(Collection<?> c) + { + return queue.removeAll(c); + } + + @Override + public boolean retainAll(Collection<?> c) + { + return queue.retainAll(c); + } + } + + /** + * <p>CircularBufferReservoir</p> + * {@link SweepableReservoir} implementation that extends AbstractReservoir and delegates {@link BlockingQueue} + * implementation to {@code CircularBuffer}. Replaces DefaultReservoir class since release 3.3}. * * @since 0.3.2 */ @@ -324,7 +758,8 @@ public abstract class AbstractReservoir implements SweepableReservoir, BlockingQ } @Override - public int size() { + public int size() + { return circularBuffer.size(); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d5c29fea/engine/src/main/java/com/datatorrent/stram/engine/SweepableReservoir.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/SweepableReservoir.java b/engine/src/main/java/com/datatorrent/stram/engine/SweepableReservoir.java index 3db564f..8b0e90d 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/SweepableReservoir.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/SweepableReservoir.java @@ -18,8 +18,8 @@ */ package com.datatorrent.stram.engine; -import com.datatorrent.stram.tuple.Tuple; import com.datatorrent.api.Sink; +import com.datatorrent.stram.tuple.Tuple; /** * <p>SweepableReservoir interface.</p> http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d5c29fea/engine/src/test/java/com/datatorrent/stram/engine/AbstractReservoirTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AbstractReservoirTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AbstractReservoirTest.java new file mode 100644 index 0000000..7f83036 --- /dev/null +++ b/engine/src/test/java/com/datatorrent/stram/engine/AbstractReservoirTest.java @@ -0,0 +1,362 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram.engine; + +import java.util.ArrayList; +import java.util.NoSuchElementException; +import java.util.concurrent.ArrayBlockingQueue; + +import org.jctools.queues.SpscArrayQueue; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.Sink; +import com.datatorrent.netlet.util.CircularBuffer; +import com.datatorrent.stram.tuple.Tuple; + +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; + +import static com.datatorrent.bufferserver.packet.MessageType.BEGIN_WINDOW; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +@RunWith(JUnitParamsRunner.class) +public class AbstractReservoirTest +{ + private static final Logger logger = LoggerFactory.getLogger(AbstractReservoirTest.class); + + private static final String countPropertyName = "com.datatorrent.stram.engine.AbstractReservoirTest.count"; + private static final String capacityPropertyName = "com.datatorrent.stram.engine.AbstractReservoirTest.capacity"; + private static final int COUNT = Integer.getInteger(countPropertyName, 10000000); + private static final int CAPACITY = Integer.getInteger(capacityPropertyName, 1 << 19); + + @Rule + public ExpectedException exception = ExpectedException.none(); + + private static AbstractReservoir newReservoir(final String reservoirClassName, final int capacity) + { + if (reservoirClassName == null) { + System.clearProperty(AbstractReservoir.reservoirClassNameProperty); + } else { + System.setProperty(AbstractReservoir.reservoirClassNameProperty, reservoirClassName); + } + final String id = reservoirClassName == null ? "DefaultReservoir" : reservoirClassName; + final AbstractReservoir reservoir = AbstractReservoir.newReservoir(id, capacity); + return reservoir; + } + + private static void setSink(final AbstractReservoir reservoir, final Sink<Object> sink) + { + assertNull(reservoir.setSink(sink)); + assertEquals(sink, reservoir.sink); + } + + @SuppressWarnings("unused") + private Object defaultTestParameters() + { + Object[][] defaultTestParameters = new Object[][] { + {null, NoSuchElementException.class}, + {"com.datatorrent.stram.engine.AbstractReservoir$SpscArrayQueueReservoir", NoSuchElementException.class}, + {"com.datatorrent.stram.engine.AbstractReservoir$ArrayBlockingQueueReservoir", NoSuchElementException.class}, + {"com.datatorrent.stram.engine.AbstractReservoir$CircularBufferReservoir", IllegalStateException.class} + }; + + for (Object[] o: defaultTestParameters) { + o[0] = newReservoir((String)o[0], 2); + final Sink<Object> sink = new Sink<Object>() + { + final ArrayList<Object> sink = new ArrayList<>(); + int count; + @Override + public void put(Object tuple) + { + sink.add(tuple); + count++; + } + + @Override + public int getCount(boolean reset) + { + try { + return count; + } finally { + if (reset) { + count = 0; + } + } + } + }; + setSink((AbstractReservoir)o[0], sink); + } + return defaultTestParameters; + } + + @SuppressWarnings("unused") + private Object performanceTestParameters() + { + Object[][] performanceTestParameters = new Object[][] { + {null, 500}, + {"com.datatorrent.stram.engine.AbstractReservoir$SpscArrayQueueReservoir", 500}, + {"com.datatorrent.stram.engine.AbstractReservoir$ArrayBlockingQueueReservoir", 6000}, + {"com.datatorrent.stram.engine.AbstractReservoir$CircularBufferReservoir", 2000} + }; + for (Object[] o : performanceTestParameters) { + o[0] = newReservoir((String)o[0], CAPACITY); + final Sink<Object> sink = new Sink<Object>() + { + private int count = 0; + @Override + public void put(Object tuple) + { + count++; + } + + @Override + public int getCount(boolean reset) + { + return count; + } + }; + setSink((AbstractReservoir)o[0], sink); + } + return performanceTestParameters; + } + + @Test + @Parameters(method = "defaultTestParameters") + public void testEmpty(final AbstractReservoir reservoir, final Class<? extends Throwable> type) + { + assertTrue(reservoir.isEmpty()); + assertEquals(0, reservoir.size()); + assertEquals(0, reservoir.size(true)); + assertEquals(0, reservoir.size(false)); + assertNull(reservoir.sweep()); + exception.expect(type); + reservoir.remove(); + } + + @Test + @Parameters(method = "defaultTestParameters") + public void testAddAndSweepObject(final AbstractReservoir reservoir, final Class<? extends Throwable> type) + { + final Object o = new Integer(0); + assertTrue(reservoir.add(o)); + assertFalse(reservoir.isEmpty()); + assertEquals(1, reservoir.size()); + assertEquals(1, reservoir.size(false)); + assertEquals(0, reservoir.getCount(false)); + assertEquals(o, reservoir.peek()); + assertNull(reservoir.sweep()); + assertEquals(1, reservoir.getCount(false)); + assertEquals(1, reservoir.sink.getCount(false)); + assertTrue(reservoir.isEmpty()); + assertEquals(0, reservoir.size()); + assertEquals(0, reservoir.size(false)); + exception.expect(type); + reservoir.remove(); + } + + @Test + @Parameters(method = "defaultTestParameters") + public void testAddAndSweepTuple(final AbstractReservoir reservoir, final Class<? extends Throwable> type) + { + final Tuple t = new Tuple(BEGIN_WINDOW, 0L); + assertTrue(reservoir.add(t)); + assertFalse(reservoir.isEmpty()); + assertEquals(1, reservoir.size()); + assertEquals(1, reservoir.size(false)); + assertEquals(t, reservoir.peek()); + assertEquals(t, reservoir.sweep()); + assertEquals(t, reservoir.sweep()); + assertEquals(0, reservoir.getCount(false)); + assertEquals(0, reservoir.sink.getCount(false)); + assertFalse(reservoir.isEmpty()); + assertEquals(t, reservoir.remove()); + assertNull(reservoir.peek()); + assertNull(reservoir.poll()); + assertNull(reservoir.sweep()); + exception.expect(type); + reservoir.remove(); + } + + @Test + @Parameters(method = "defaultTestParameters") + public void testFullReservoir(final AbstractReservoir reservoir, final Class<? extends Throwable> type) + { + int capacity = reservoir.remainingCapacity(); + assertTrue(capacity > 0); + final Object o = new Integer(0); + for (int i = 0; i < capacity; i++) { + assertTrue(reservoir.offer(o)); + } + assertFalse(reservoir.offer(o)); + exception.expect(IllegalStateException.class); + reservoir.add(o); + } + + @Test + @Parameters(method = "performanceTestParameters") + public void performanceTest(final AbstractReservoir reservoir, final long expectedTime) + { + int maxQueueSize = 0; + + final long start = System.currentTimeMillis(); + + new Thread(new Runnable() + { + @Override + public void run() + { + final Object o = new Byte[128]; + try { + for (int i = 0; i < COUNT; i++) { + reservoir.put(o); + } + } catch (InterruptedException e) { + logger.debug("Interrupted", e); + } + } + }).start(); + + while (reservoir.sink.getCount(false) < COUNT) { + maxQueueSize = Math.max(maxQueueSize, reservoir.size(false)); + reservoir.sweep(); + } + + long time = System.currentTimeMillis() - start; + logger.debug("{}: time {}, max queue size {}", reservoir.getId(), time, maxQueueSize); + assertTrue("Expected to complete within " + expectedTime + "millis. Actual time " + time + " milis", + expectedTime > time); + + } + + @Test + public void testBlockingQueuePerformance() + { + final ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(CAPACITY); + long start = System.currentTimeMillis(); + new Thread( + new Runnable() + { + @Override + public void run() + { + final Object o = new Byte[128]; + try { + for (int i = 0; i < COUNT; i++) { + blockingQueue.put(o); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + ).start(); + + try { + for (int i = 0; i < COUNT; i++) { + blockingQueue.take(); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + logger.debug("Time {}", System.currentTimeMillis() - start); + } + + @Test + public void testSpscQueuePerformance() + { + final SpscArrayQueue<Object> spscArrayQueue = new SpscArrayQueue<>(CAPACITY); + long start = System.currentTimeMillis(); + new Thread( + new Runnable() + { + @Override + public void run() + { + final Object o = new Byte[128]; + try { + for (int i = 0; i < COUNT; i++) { + while (!spscArrayQueue.offer(o)) { + Thread.sleep(10); + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + ).start(); + + try { + for (int i = 0; i < COUNT; i++) { + while (spscArrayQueue.poll() == null) { + Thread.sleep(10); + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + logger.debug("Time {}", System.currentTimeMillis() - start); + } + + @Test + public void testCircularBufferPerformance() + { + final CircularBuffer<Object> circularBuffer = new CircularBuffer<>(CAPACITY); + long start = System.currentTimeMillis(); + new Thread( + new Runnable() + { + @Override + public void run() + { + final Object o = new Byte[128]; + try { + for (int i = 0; i < COUNT; i++) { + circularBuffer.put(o); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + ).start(); + + try { + for (int i = 0; i < COUNT; i++) { + circularBuffer.take(); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + logger.debug("Time {}", System.currentTimeMillis() - start); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d5c29fea/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index c5432ae..37a0d75 100644 --- a/pom.xml +++ b/pom.xml @@ -77,6 +77,12 @@ <version>4.11</version> <scope>test</scope> </dependency> + <dependency> + <groupId>pl.pragmatists</groupId> + <artifactId>JUnitParams</artifactId> + <version>1.0.4</version> + <scope>test</scope> + </dependency> </dependencies> <modules>
