APEXCORE-254 Introduce Abstract and Forwarding Reservoir classes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/7cb6e93b Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/7cb6e93b Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/7cb6e93b Branch: refs/heads/master Commit: 7cb6e93b8e3f2393d0c4192de772e2f7b4d8c10e Parents: 2d175ff Author: Vlad Rozov <[email protected]> Authored: Tue Nov 10 17:43:06 2015 -0800 Committer: Vlad Rozov <[email protected]> Committed: Tue Feb 2 12:49:23 2016 -0800 ---------------------------------------------------------------------- .../stram/debug/TappedReservoir.java | 10 +- .../stram/engine/AbstractReservoir.java | 311 +++++++++++++++++-- .../stram/engine/ForwardingReservoir.java | 99 ++++++ .../datatorrent/stram/engine/GenericNode.java | 16 +- .../datatorrent/stram/engine/MuxReservoir.java | 35 ++- .../com/datatorrent/stram/engine/Reservoir.java | 22 +- .../stram/engine/SweepableReservoir.java | 8 +- .../stram/engine/WindowGenerator.java | 30 +- .../engine/WindowIdActivatedReservoir.java | 10 +- .../stram/stream/BufferServerSubscriber.java | 16 + .../datatorrent/stram/stream/InlineStream.java | 9 +- .../com/datatorrent/stram/stream/OiOStream.java | 8 +- .../stram/engine/AtMostOnceTest.java | 4 +- .../stram/engine/GenericNodeTest.java | 8 +- .../datatorrent/stram/engine/InputNodeTest.java | 8 +- .../stram/engine/ProcessingModeTests.java | 4 +- .../stram/stream/InlineStreamTest.java | 6 +- 17 files changed, 502 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/main/java/com/datatorrent/stram/debug/TappedReservoir.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/debug/TappedReservoir.java b/engine/src/main/java/com/datatorrent/stram/debug/TappedReservoir.java index 429b354..85ffdb6 100644 --- a/engine/src/main/java/com/datatorrent/stram/debug/TappedReservoir.java +++ b/engine/src/main/java/com/datatorrent/stram/debug/TappedReservoir.java @@ -64,9 +64,15 @@ public class TappedReservoir extends MuxSink implements SweepableReservoir } @Override - public int size() + public int size(final boolean dataTupleAware) { - return reservoir.size(); + return reservoir.size(dataTupleAware); + } + + @Override + public boolean isEmpty() + { + return reservoir.isEmpty(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java b/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java index 2dd0dad..3c9dd48 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java @@ -18,64 +18,83 @@ */ package com.datatorrent.stram.engine; +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; -import com.datatorrent.api.Sink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.datatorrent.api.Sink; import com.datatorrent.netlet.util.CircularBuffer; +import com.datatorrent.netlet.util.UnsafeBlockingQueue; import com.datatorrent.stram.tuple.Tuple; /** - * <p>DefaultReservoir class.</p> - * - * @since 0.3.2 + * Abstract Sweepable Reservoir implementation. Implements all methods of {@link SweepableReservoir} except + * {@link SweepableReservoir#sweep}. Classes that extend {@link AbstractReservoir} must implement + * {@link BlockingQueue<Object>} interface. */ -public class DefaultReservoir extends CircularBuffer<Object> implements SweepableReservoir +public abstract class AbstractReservoir implements SweepableReservoir, BlockingQueue<Object> { - private Sink<Object> sink; + private static final Logger logger = LoggerFactory.getLogger(AbstractReservoir.class); + + /** + * Reservoir factory. Constructs concrete implementation of {@link AbstractReservoir}. + * @param id reservoir identifier + * @param capacity reservoir capacity + * @return concrete implementation of {@link AbstractReservoir} + */ + public static AbstractReservoir newReservoir(final String id, final int capacity) + { + return new CircularBufferReservoir(id, capacity); + } + + protected Sink<Object> sink; private String id; - private int count; + protected int count; - public DefaultReservoir(String id, int capacity) + protected AbstractReservoir(final String id) { - super(capacity); this.id = id; } + /** + * {@inheritDoc} + */ @Override public Sink<Object> setSink(Sink<Object> sink) { try { return this.sink; - } - finally { + } finally { this.sink = sink; } } + /** + * {@inheritDoc} + */ @Override - public Tuple sweep() + public int getCount(boolean reset) { - final int size = size(); - for (int i = 0; i < size; i++) { - if (peekUnsafe() instanceof Tuple) { - count += i; - return (Tuple)peekUnsafe(); + try { + return count; + } finally { + if (reset) { + count = 0; } - sink.put(pollUnsafe()); } - - count += size; - return null; } - @Override - public String toString() - { - return "DefaultReservoir{" + "sink=" + sink + ", id=" + id + ", count=" + count + '}'; - } + /** + * @return allocated reservoir capacity + */ + public abstract int capacity(); /** - * @return the id + * @return reservoir id */ public String getId() { @@ -91,16 +110,242 @@ public class DefaultReservoir extends CircularBuffer<Object> implements Sweepabl } @Override - public int getCount(boolean reset) + public String toString() { - try { - return count; + return getClass().getName() + '@' + Integer.toHexString(hashCode()) + + "{sink=" + sink + ", id=" + id + ", count=" + count + '}'; + } + + /** + * CircularBufferReservoir {@link SweepableReservoir} implementation that extends AbstractReservoir and delegates + * {@link BlockingQueue} implementation to {@link CircularBuffer}. Replaces DefaultReservoir class since release 3.3}. + * + * @since 0.3.2 + */ + private static class CircularBufferReservoir extends AbstractReservoir implements UnsafeBlockingQueue<Object> + { + private final CircularBuffer<Object> circularBuffer; + + private CircularBufferReservoir(String id, int capacity) + { + super(id); + circularBuffer = new CircularBuffer<>(capacity); } - finally { - if (reset) { - count = 0; + + @Override + public Tuple sweep() + { + final int size = circularBuffer.size(); + for (int i = 0; i < size; i++) { + if (circularBuffer.peekUnsafe() instanceof Tuple) { + count += i; + return (Tuple)peekUnsafe(); + } + sink.put(pollUnsafe()); } + + count += size; + return null; + } + + @Override + public boolean add(Object e) + { + return circularBuffer.add(e); + } + + @Override + public Object remove() + { + return circularBuffer.remove(); + } + + @Override + public Object peek() + { + return circularBuffer.peek(); + } + + @Override + public int size(final boolean dataTupleAware) + { + int size = circularBuffer.size(); + if (dataTupleAware) { + Iterator<Object> iterator = circularBuffer.getFrozenIterator(); + while (iterator.hasNext()) { + if (iterator.next() instanceof Tuple) { + size--; + } + } + } + return size; + } + + @Override + public int capacity() + { + return circularBuffer.capacity(); + } + + @Override + public int drainTo(Collection<? super Object> container) + { + return circularBuffer.drainTo(container); + } + + @Override + public boolean offer(Object e) + { + return circularBuffer.offer(e); + } + + @Override + public void put(Object e) throws InterruptedException + { + circularBuffer.put(e); + } + + @Override + public boolean offer(Object e, long timeout, TimeUnit unit) throws InterruptedException + { + return circularBuffer.offer(e, timeout, unit); + } + + @Override + public Object take() throws InterruptedException + { + return circularBuffer.take(); + } + + @Override + public Object poll(long timeout, TimeUnit unit) throws InterruptedException + { + return circularBuffer.poll(timeout, unit); + } + + @Override + public int remainingCapacity() + { + return circularBuffer.remainingCapacity(); + } + + @Override + public boolean remove(Object o) + { + return circularBuffer.remove(o); + } + + @Override + public boolean contains(Object o) + { + return circularBuffer.contains(o); + } + + @Override + public int drainTo(Collection<? super Object> collection, int maxElements) + { + return circularBuffer.drainTo(collection, maxElements); + } + + @Override + public Object poll() + { + return circularBuffer.poll(); + } + + @Override + public Object pollUnsafe() + { + return circularBuffer.pollUnsafe(); + } + + @Override + public Object element() + { + return circularBuffer.element(); + } + + @Override + public boolean isEmpty() + { + return circularBuffer.isEmpty(); + } + + public Iterator<Object> getFrozenIterator() + { + return circularBuffer.getFrozenIterator(); + } + + public Iterable<Object> getFrozenIterable() + { + return circularBuffer.getFrozenIterable(); + } + + @Override + public Iterator<Object> iterator() + { + return circularBuffer.iterator(); + } + + @Override + public Object[] toArray() + { + return circularBuffer.toArray(); + } + + @Override + public <T> T[] toArray(T[] a) + { + return circularBuffer.toArray(a); + } + + @Override + public boolean containsAll(Collection<?> c) + { + return circularBuffer.containsAll(c); + } + + @Override + public boolean addAll(Collection<?> c) + { + return circularBuffer.addAll(c); + } + + @Override + public boolean removeAll(Collection<?> c) + { + return circularBuffer.removeAll(c); + } + + @Override + public boolean retainAll(Collection<?> c) + { + return circularBuffer.retainAll(c); + } + + @Override + public int size() { + return circularBuffer.size(); + } + + @Override + public void clear() + { + circularBuffer.clear(); + } + + @Override + public Object peekUnsafe() + { + return circularBuffer.peekUnsafe(); + } + + public CircularBuffer<Object> getWhitehole(String exceptionMessage) + { + return circularBuffer.getWhitehole(exceptionMessage); } } } + + http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/main/java/com/datatorrent/stram/engine/ForwardingReservoir.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/ForwardingReservoir.java b/engine/src/main/java/com/datatorrent/stram/engine/ForwardingReservoir.java new file mode 100644 index 0000000..49b7f31 --- /dev/null +++ b/engine/src/main/java/com/datatorrent/stram/engine/ForwardingReservoir.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram.engine; + +import com.datatorrent.api.Sink; +import com.datatorrent.stram.tuple.Tuple; + +public class ForwardingReservoir implements SweepableReservoir +{ + public static ForwardingReservoir newReservoir(final String id, final int capacity) + { + return new ForwardingReservoir(AbstractReservoir.newReservoir(id, capacity)); + } + + private final AbstractReservoir reservoir; + + public ForwardingReservoir(AbstractReservoir reservoir) + { + this.reservoir = reservoir; + } + + @Override + public Sink<Object> setSink(Sink<Object> sink) + { + return reservoir.setSink(sink); + } + + @Override + public int getCount(boolean reset) + { + return reservoir.getCount(reset); + } + + public String getId() + { + return reservoir.getId(); + } + + public void setId(String id) + { + reservoir.setId(id); + } + + public boolean add(Object o) + { + return reservoir.add(o); + } + + public void put(Object o) throws InterruptedException + { + reservoir.put(o); + } + + @Override + public Tuple sweep() + { + return reservoir.sweep(); + } + + @Override + public int size(final boolean dataTupleAware) + { + return reservoir.size(dataTupleAware); + } + + @Override + public Object remove() + { + return reservoir.remove(); + } + + @Override + public boolean isEmpty() + { + return reservoir.isEmpty(); + } + + public AbstractReservoir getReservoir() + { + return reservoir; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java index 1ccec31..1394474 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java @@ -586,8 +586,8 @@ public class GenericNode extends Node<Operator> else { boolean need2sleep = true; for (Map.Entry<String, SweepableReservoir> cb : activeQueues) { - if (cb.getValue().size() > 0) { - need2sleep = false; + need2sleep = cb.getValue().isEmpty(); + if (!need2sleep) { break; } } @@ -694,17 +694,7 @@ public class GenericNode extends Node<Operator> for (Entry<String, SweepableReservoir> e : inputs.entrySet()) { SweepableReservoir ar = e.getValue(); ContainerStats.OperatorStats.PortStats portStats = new ContainerStats.OperatorStats.PortStats(e.getKey()); - portStats.queueSize = ar.size(); - if(DATA_TUPLE_AWARE) { - if (ar instanceof CircularBuffer) { - Iterator iterator = ((CircularBuffer) ar).getFrozenIterator(); - while (iterator.hasNext()) { - if (iterator.next() instanceof Tuple) { - portStats.queueSize--; - } - } - } - } + portStats.queueSize = ar.size(DATA_TUPLE_AWARE); portStats.tupleCount = ar.getCount(true); portStats.endWindowTimestamp = endWindowDequeueTimes.get(e.getValue()); ipstats.add(portStats); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/main/java/com/datatorrent/stram/engine/MuxReservoir.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/MuxReservoir.java b/engine/src/main/java/com/datatorrent/stram/engine/MuxReservoir.java index 3e60756..09ef344 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/MuxReservoir.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/MuxReservoir.java @@ -22,6 +22,9 @@ import com.datatorrent.stram.tuple.Tuple; import com.datatorrent.api.Sink; import com.datatorrent.netlet.util.CircularBuffer; import java.util.HashMap; +import java.util.Iterator; +import java.util.Queue; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,7 +74,7 @@ public abstract class MuxReservoir return r; } - public abstract Reservoir getMasterReservoir(); + protected abstract Queue getQueue(); class SubReservoir extends CircularBuffer<Object> implements SweepableReservoir { @@ -84,6 +87,21 @@ public abstract class MuxReservoir } @Override + public int size(final boolean dataTupleAware) + { + int size = size(); + if (dataTupleAware) { + Iterator<Object> iterator = getFrozenIterator(); + while (iterator.hasNext()) { + if (iterator.next() instanceof Tuple) { + size--; + } + } + } + return size; + } + + @Override public Sink<Object> setSink(Sink<Object> sink) { try { @@ -110,14 +128,14 @@ public abstract class MuxReservoir count += size; } - final Reservoir masterReservoir = getMasterReservoir(); - synchronized (masterReservoir) { - /* find out the minimum remaining capacity in all the other buffers and consume those many tuples from bufferserver */ - int min = masterReservoir.size(); - if (min == 0) { + final Queue queue = getQueue(); + synchronized (queue) { + if (queue.isEmpty()) { return null; } + /* find out the minimum remaining capacity in all the other buffers and consume those many tuples from bufferserver */ + int min = Integer.MAX_VALUE; for (int i = reservoirs.length; i-- > 0;) { if (reservoirs[i].remainingCapacity() < min) { min = reservoirs[i].remainingCapacity(); @@ -125,7 +143,10 @@ public abstract class MuxReservoir } while (min-- > 0) { - Object o = masterReservoir.remove(); + Object o = queue.poll(); + if (o == null) { + break; + } for (int i = reservoirs.length; i-- > 0;) { reservoirs[i].add(o); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/main/java/com/datatorrent/stram/engine/Reservoir.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Reservoir.java b/engine/src/main/java/com/datatorrent/stram/engine/Reservoir.java index bb2f5f6..6616c8c 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/Reservoir.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/Reservoir.java @@ -23,20 +23,32 @@ package com.datatorrent.stram.engine; * * @since 0.3.2 */ -public interface Reservoir +public interface Reservoir<T> { /** - * the count of elements in this SweepableReservoir. + * @param dataTupleAware if true excludes control tuples from the returned count, otherwise returns the count of all + * elements in the reservoir including control tuples. + * @return number of elements including or excluding control tuples in the Reservoir. Depending on implementation + * {@code size()} may return exact number of elements in the reservoir or an over estimate. In case {@code size()} + * returns {@code 0} the reservoir is guaranteed to be empty, while {@code size() != 0} does not imply that + * {@code isEmpty()} is {@code false} * - * @return the count + * @since 3.3 */ - public int size(); + int size(final boolean dataTupleAware); /** * Remove an element from the reservoir. * * @return the removed element. */ - public Object remove(); + T remove(); + + /** + * @return true if reservoir is empty, false otherwise + * + * @since 3.3 + */ + boolean isEmpty(); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/main/java/com/datatorrent/stram/engine/SweepableReservoir.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/SweepableReservoir.java b/engine/src/main/java/com/datatorrent/stram/engine/SweepableReservoir.java index 5f81c2c..3db564f 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/SweepableReservoir.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/SweepableReservoir.java @@ -26,7 +26,7 @@ import com.datatorrent.api.Sink; * * @since 0.3.2 */ -public interface SweepableReservoir extends Reservoir +public interface SweepableReservoir extends Reservoir<Object> { /** * Set a new sink on this reservoir where data tuples would be put. @@ -34,14 +34,14 @@ public interface SweepableReservoir extends Reservoir * @param sink The new Sink for the data tuples * @return The old sink if present or null */ - public Sink<Object> setSink(Sink<Object> sink); + Sink<Object> setSink(Sink<Object> sink); /** * Consume all the data tuples until control tuple is encountered. * * @return The control tuple encountered or null */ - public Tuple sweep(); + Tuple sweep(); /** * Get the count of tuples consumed. @@ -49,6 +49,6 @@ public interface SweepableReservoir extends Reservoir * @param reset flag to indicate if the count should be reset to zero after this operation * @return the count of tuples */ - public int getCount(boolean reset); + int getCount(boolean reset); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java index ea429af..08e1249 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java @@ -18,6 +18,8 @@ */ package com.datatorrent.stram.engine; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -51,7 +53,7 @@ public class WindowGenerator extends MuxReservoir implements Stream, Runnable public static final int MAX_WINDOW_ID = WINDOW_MASK - (WINDOW_MASK % 1000) - 1; public static final int MAX_WINDOW_WIDTH = (int)(Long.MAX_VALUE / MAX_WINDOW_ID); private final ScheduledExecutorService ses; - private final MasterReservoir masterReservoir; + private final BlockingQueue<Tuple> queue; private long firstWindowMillis; // Window start time private int windowWidthMillis; // Window size private long currentWindowMillis; @@ -65,7 +67,7 @@ public class WindowGenerator extends MuxReservoir implements Stream, Runnable public WindowGenerator(ScheduledExecutorService service, int capacity) { ses = service; - masterReservoir = new MasterReservoir(capacity); + queue = new CircularBuffer<>(capacity); } /** @@ -86,8 +88,8 @@ public class WindowGenerator extends MuxReservoir implements Stream, Runnable baseSeconds = (resetWindowMillis / 1000) << 32; //logger.info("generating reset -> begin {}", Codec.getStringWindowId(baseSeconds)); - masterReservoir.put(new ResetWindowTuple(baseSeconds | windowWidthMillis)); - masterReservoir.put(new Tuple(MessageType.BEGIN_WINDOW, baseSeconds | windowId)); + queue.put(new ResetWindowTuple(baseSeconds | windowWidthMillis)); + queue.put(new Tuple(MessageType.BEGIN_WINDOW, baseSeconds | windowId)); } /** @@ -96,9 +98,9 @@ public class WindowGenerator extends MuxReservoir implements Stream, Runnable */ private void endCurrentBeginNewWindow() throws InterruptedException { - masterReservoir.put(new EndWindowTuple(baseSeconds | windowId)); + queue.put(new EndWindowTuple(baseSeconds | windowId)); if (++checkPointWindowCount == checkpointCount) { - masterReservoir.put(new Tuple(MessageType.CHECKPOINT, baseSeconds | windowId)); + queue.put(new Tuple(MessageType.CHECKPOINT, baseSeconds | windowId)); checkPointWindowCount = 0; } @@ -108,7 +110,7 @@ public class WindowGenerator extends MuxReservoir implements Stream, Runnable } else { advanceWindow(); - masterReservoir.put(new Tuple(MessageType.BEGIN_WINDOW, baseSeconds | windowId)); + queue.put(new Tuple(MessageType.BEGIN_WINDOW, baseSeconds | windowId)); } } @@ -234,10 +236,9 @@ public class WindowGenerator extends MuxReservoir implements Stream, Runnable } @Override - @SuppressWarnings("ReturnOfCollectionOrArrayField") - public Reservoir getMasterReservoir() + protected Queue getQueue() { - return masterReservoir; + return queue; } @Override @@ -335,14 +336,5 @@ public class WindowGenerator extends MuxReservoir implements Stream, Runnable return windowId >>> 32; } - private class MasterReservoir extends CircularBuffer<Tuple> implements Reservoir - { - MasterReservoir(int n) - { - super(n); - } - - } - private static final Logger logger = LoggerFactory.getLogger(WindowGenerator.class); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/main/java/com/datatorrent/stram/engine/WindowIdActivatedReservoir.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/WindowIdActivatedReservoir.java b/engine/src/main/java/com/datatorrent/stram/engine/WindowIdActivatedReservoir.java index b17c963..0b7521e 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/WindowIdActivatedReservoir.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/WindowIdActivatedReservoir.java @@ -50,9 +50,15 @@ public class WindowIdActivatedReservoir implements SweepableReservoir } @Override - public int size() + public int size(final boolean dataTupleAware) { - return reservoir.size(); + return reservoir.size(dataTupleAware); + } + + @Override + public boolean isEmpty() + { + return reservoir.isEmpty(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java index 7738ef3..9441e76 100644 --- a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java +++ b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java @@ -21,6 +21,7 @@ package com.datatorrent.stram.stream; import java.net.InetSocketAddress; import java.util.ArrayDeque; import java.util.HashMap; +import java.util.Iterator; import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; @@ -242,6 +243,21 @@ public class BufferServerSubscriber extends Subscriber implements ByteCounterStr } @Override + public int size(final boolean dataTupleAware) + { + int size = size(); + if (dataTupleAware) { + Iterator<Object> iterator = getFrozenIterator(); + while (iterator.hasNext()) { + if (iterator.next() instanceof Tuple) { + size--; + } + } + } + return size; + } + + @Override public Sink<Object> setSink(Sink<Object> sink) { try { http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java b/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java index 491a906..b49a5b0 100644 --- a/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java +++ b/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java @@ -21,7 +21,8 @@ package com.datatorrent.stram.stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.stram.engine.DefaultReservoir; +import com.datatorrent.stram.engine.AbstractReservoir; +import com.datatorrent.stram.engine.ForwardingReservoir; import com.datatorrent.stram.engine.Stream; import com.datatorrent.stram.engine.StreamContext; import com.datatorrent.stram.engine.SweepableReservoir; @@ -33,11 +34,11 @@ import com.datatorrent.stram.engine.SweepableReservoir; * * @since 0.3.2 */ -public class InlineStream extends DefaultReservoir implements Stream, SweepableReservoir +public class InlineStream extends ForwardingReservoir implements Stream, SweepableReservoir { public InlineStream(int capacity) { - super("InlineStream", capacity); + super(AbstractReservoir.newReservoir("InlineStream", capacity)); } /** @@ -90,7 +91,7 @@ public class InlineStream extends DefaultReservoir implements Stream, SweepableR @Override public String toString() { - return "InlineStream{" + super.toString() + '}'; + return getClass().getName() + '@' + Integer.toHexString(hashCode()) + "{reservoir=" + getReservoir().toString() + '}'; } private static final Logger logger = LoggerFactory.getLogger(InlineStream.class); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java b/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java index eb8b1ee..5cb461f 100644 --- a/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java +++ b/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java @@ -107,12 +107,18 @@ public class OiOStream implements Stream, SweepableReservoir * It's an error to have more than one tuple active on OiO. */ @Override - public int size() + public int size(final boolean dataTupleAware) { return 1; } @Override + public boolean isEmpty() + { + return false; + } + + @Override public Object remove() { throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java index 891cf1f..b98eb7e 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java @@ -84,8 +84,8 @@ public class AtMostOnceTest extends ProcessingModeTests map.put(OperatorContext.PROCESSING_MODE, processingMode); final GenericNode node = new GenericNode(new MultiInputOperator(), new com.datatorrent.stram.engine.OperatorContext(1, map, null)); - DefaultReservoir reservoir1 = new DefaultReservoir("input1", 1024); - DefaultReservoir reservoir2 = new DefaultReservoir("input1", 1024); + AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("input1", 1024); + AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("input1", 1024); node.connectInputPort("input1", reservoir1); node.connectInputPort("input2", reservoir2); node.connectOutputPort("output", new Sink<Object>() http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java index 2577504..20cfb7b 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java @@ -256,8 +256,8 @@ public class GenericNodeTest GenericOperator go = new GenericOperator(); final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, new DefaultAttributeMap(), null)); gn.setId(1); - DefaultReservoir reservoir1 = new DefaultReservoir("ip1Res", 1024); - DefaultReservoir reservoir2 = new DefaultReservoir("ip2Res", 1024); + AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("ip1Res", 1024); + AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("ip2Res", 1024); Sink<Object> output = new Sink<Object>() { @Override @@ -378,8 +378,8 @@ public class GenericNodeTest GenericOperator go = new GenericOperator(); final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, new DefaultAttributeMap(), null)); gn.setId(1); - DefaultReservoir reservoir1 = new DefaultReservoir("ip1Res", 1024); - DefaultReservoir reservoir2 = new DefaultReservoir("ip2Res", 1024); + AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("ip1Res", 1024); + AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("ip2Res", 1024); gn.connectInputPort("ip1", reservoir1); gn.connectInputPort("ip2", reservoir2); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java index 3dedd28..f723c2b 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java @@ -182,7 +182,7 @@ public class InputNodeTest } @Override - public int size() + public int size(final boolean dataTupleAware) { if (currentTuple != null) { return 1; @@ -192,6 +192,12 @@ public class InputNodeTest } @Override + public boolean isEmpty() + { + return currentTuple == null; + } + + @Override public Object remove() { Tuple tempTuple = currentTuple; http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java index fa36fec..9c2d7f7 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java @@ -261,8 +261,8 @@ public class ProcessingModeTests final GenericNode node = new GenericNode(new MultiInputOperator(), new com.datatorrent.stram.engine.OperatorContext(1, map, null)); - DefaultReservoir reservoir1 = new DefaultReservoir("input1", 1024); - DefaultReservoir reservoir2 = new DefaultReservoir("input1", 1024); + AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("input1", 1024); + AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("input1", 1024); node.connectInputPort("input1", reservoir1); node.connectInputPort("input2", reservoir2); node.connectOutputPort("output", new Sink<Object>() http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java index d840955..69c6df9 100644 --- a/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java +++ b/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java @@ -111,7 +111,7 @@ public class InlineStreamTest }; node2.connectOutputPort("output", sink); - DefaultReservoir reservoir1 = new DefaultReservoir("input", 1024 * 5); + AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("input", 1024 * 5); node1.connectInputPort("input", reservoir1); Map<Integer, Node<?>> activeNodes = new ConcurrentHashMap<Integer, Node<?>>(); @@ -139,8 +139,8 @@ public class InlineStreamTest @Override public boolean isComplete() { - logger.debug("stream size={}", stream.size()); - return stream.size() == 0; + logger.debug("stream {} empty {}, size {}", stream, stream.isEmpty(), stream.size(false)); + return stream.isEmpty(); } };
