APEXCORE-254 Introduce Abstract and Forwarding Reservoir classes.

Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/7cb6e93b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/7cb6e93b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/7cb6e93b

Branch: refs/heads/master
Commit: 7cb6e93b8e3f2393d0c4192de772e2f7b4d8c10e
Parents: 2d175ff
Author: Vlad Rozov <[email protected]>
Authored: Tue Nov 10 17:43:06 2015 -0800
Committer: Vlad Rozov <[email protected]>
Committed: Tue Feb 2 12:49:23 2016 -0800

----------------------------------------------------------------------
 .../stram/debug/TappedReservoir.java            |  10 +-
 .../stram/engine/AbstractReservoir.java         | 311 +++++++++++++++++--
 .../stram/engine/ForwardingReservoir.java       |  99 ++++++
 .../datatorrent/stram/engine/GenericNode.java   |  16 +-
 .../datatorrent/stram/engine/MuxReservoir.java  |  35 ++-
 .../com/datatorrent/stram/engine/Reservoir.java |  22 +-
 .../stram/engine/SweepableReservoir.java        |   8 +-
 .../stram/engine/WindowGenerator.java           |  30 +-
 .../engine/WindowIdActivatedReservoir.java      |  10 +-
 .../stram/stream/BufferServerSubscriber.java    |  16 +
 .../datatorrent/stram/stream/InlineStream.java  |   9 +-
 .../com/datatorrent/stram/stream/OiOStream.java |   8 +-
 .../stram/engine/AtMostOnceTest.java            |   4 +-
 .../stram/engine/GenericNodeTest.java           |   8 +-
 .../datatorrent/stram/engine/InputNodeTest.java |   8 +-
 .../stram/engine/ProcessingModeTests.java       |   4 +-
 .../stram/stream/InlineStreamTest.java          |   6 +-
 17 files changed, 502 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/main/java/com/datatorrent/stram/debug/TappedReservoir.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/debug/TappedReservoir.java 
b/engine/src/main/java/com/datatorrent/stram/debug/TappedReservoir.java
index 429b354..85ffdb6 100644
--- a/engine/src/main/java/com/datatorrent/stram/debug/TappedReservoir.java
+++ b/engine/src/main/java/com/datatorrent/stram/debug/TappedReservoir.java
@@ -64,9 +64,15 @@ public class TappedReservoir extends MuxSink implements 
SweepableReservoir
   }
 
   @Override
-  public int size()
+  public int size(final boolean dataTupleAware)
   {
-    return reservoir.size();
+    return reservoir.size(dataTupleAware);
+  }
+
+  @Override
+  public boolean isEmpty()
+  {
+    return reservoir.isEmpty();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java 
b/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java
index 2dd0dad..3c9dd48 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java
@@ -18,64 +18,83 @@
  */
 package com.datatorrent.stram.engine;
 
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 
-import com.datatorrent.api.Sink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.Sink;
 import com.datatorrent.netlet.util.CircularBuffer;
+import com.datatorrent.netlet.util.UnsafeBlockingQueue;
 import com.datatorrent.stram.tuple.Tuple;
 
 /**
- * <p>DefaultReservoir class.</p>
- *
- * @since 0.3.2
+ * Abstract Sweepable Reservoir implementation. Implements all methods of 
{@link SweepableReservoir} except
+ * {@link SweepableReservoir#sweep}. Classes that extend {@link 
AbstractReservoir} must implement
+ * {@link BlockingQueue<Object>} interface.
  */
-public class DefaultReservoir extends CircularBuffer<Object> implements 
SweepableReservoir
+public abstract class AbstractReservoir implements SweepableReservoir, 
BlockingQueue<Object>
 {
-  private Sink<Object> sink;
+  private static final Logger logger = 
LoggerFactory.getLogger(AbstractReservoir.class);
+
+  /**
+   * Reservoir factory. Constructs concrete implementation of {@link 
AbstractReservoir}.
+   * @param id reservoir identifier
+   * @param capacity reservoir capacity
+   * @return concrete implementation of {@link AbstractReservoir}
+   */
+  public static AbstractReservoir newReservoir(final String id, final int 
capacity)
+  {
+    return new CircularBufferReservoir(id, capacity);
+  }
+
+  protected Sink<Object> sink;
   private String id;
-  private int count;
+  protected int count;
 
-  public DefaultReservoir(String id, int capacity)
+  protected AbstractReservoir(final String id)
   {
-    super(capacity);
     this.id = id;
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public Sink<Object> setSink(Sink<Object> sink)
   {
     try {
       return this.sink;
-    }
-    finally {
+    } finally {
       this.sink = sink;
     }
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
-  public Tuple sweep()
+  public int getCount(boolean reset)
   {
-    final int size = size();
-    for (int i = 0; i < size; i++) {
-      if (peekUnsafe() instanceof Tuple) {
-        count += i;
-        return (Tuple)peekUnsafe();
+    try {
+      return count;
+    } finally {
+      if (reset) {
+        count = 0;
       }
-      sink.put(pollUnsafe());
     }
-
-    count += size;
-    return null;
   }
 
-  @Override
-  public String toString()
-  {
-    return "DefaultReservoir{" + "sink=" + sink + ", id=" + id + ", count=" + 
count + '}';
-  }
+  /**
+   * @return allocated reservoir capacity
+   */
+  public abstract int capacity();
 
   /**
-   * @return the id
+   * @return reservoir id
    */
   public String getId()
   {
@@ -91,16 +110,242 @@ public class DefaultReservoir extends 
CircularBuffer<Object> implements Sweepabl
   }
 
   @Override
-  public int getCount(boolean reset)
+  public String toString()
   {
-    try {
-      return count;
+    return getClass().getName() + '@' + Integer.toHexString(hashCode()) +
+      "{sink=" + sink + ", id=" + id + ", count=" + count + '}';
+  }
+
+  /**
+   * CircularBufferReservoir {@link SweepableReservoir} implementation that 
extends AbstractReservoir and delegates
+   * {@link BlockingQueue} implementation to {@link CircularBuffer}. Replaces 
DefaultReservoir class since release 3.3}.
+   *
+   * @since 0.3.2
+   */
+  private static class CircularBufferReservoir extends AbstractReservoir 
implements UnsafeBlockingQueue<Object>
+  {
+    private final CircularBuffer<Object> circularBuffer;
+
+    private CircularBufferReservoir(String id, int capacity)
+    {
+      super(id);
+      circularBuffer = new CircularBuffer<>(capacity);
     }
-    finally {
-      if (reset) {
-        count = 0;
+
+    @Override
+    public Tuple sweep()
+    {
+      final int size = circularBuffer.size();
+      for (int i = 0; i < size; i++) {
+        if (circularBuffer.peekUnsafe() instanceof Tuple) {
+          count += i;
+          return (Tuple)peekUnsafe();
+        }
+        sink.put(pollUnsafe());
       }
+
+      count += size;
+      return null;
+    }
+
+    @Override
+    public boolean add(Object e)
+    {
+      return circularBuffer.add(e);
+    }
+
+    @Override
+    public Object remove()
+    {
+      return circularBuffer.remove();
+    }
+
+    @Override
+    public Object peek()
+    {
+      return circularBuffer.peek();
+    }
+
+    @Override
+    public int size(final boolean dataTupleAware)
+    {
+      int size = circularBuffer.size();
+      if (dataTupleAware) {
+        Iterator<Object> iterator = circularBuffer.getFrozenIterator();
+        while (iterator.hasNext()) {
+          if (iterator.next() instanceof Tuple) {
+            size--;
+          }
+        }
+      }
+      return size;
+    }
+
+    @Override
+    public int capacity()
+    {
+      return circularBuffer.capacity();
+    }
+
+    @Override
+    public int drainTo(Collection<? super Object> container)
+    {
+      return circularBuffer.drainTo(container);
+    }
+
+    @Override
+    public boolean offer(Object e)
+    {
+      return circularBuffer.offer(e);
+    }
+
+    @Override
+    public void put(Object e) throws InterruptedException
+    {
+      circularBuffer.put(e);
+    }
+
+    @Override
+    public boolean offer(Object e, long timeout, TimeUnit unit) throws 
InterruptedException
+    {
+      return circularBuffer.offer(e, timeout, unit);
+    }
+
+    @Override
+    public Object take() throws InterruptedException
+    {
+      return circularBuffer.take();
+    }
+
+    @Override
+    public Object poll(long timeout, TimeUnit unit) throws InterruptedException
+    {
+      return circularBuffer.poll(timeout, unit);
+    }
+
+    @Override
+    public int remainingCapacity()
+    {
+      return circularBuffer.remainingCapacity();
+    }
+
+    @Override
+    public boolean remove(Object o)
+    {
+      return circularBuffer.remove(o);
+    }
+
+    @Override
+    public boolean contains(Object o)
+    {
+      return circularBuffer.contains(o);
+    }
+
+    @Override
+    public int drainTo(Collection<? super Object> collection, int maxElements)
+    {
+      return circularBuffer.drainTo(collection, maxElements);
+    }
+
+    @Override
+    public Object poll()
+    {
+      return circularBuffer.poll();
+    }
+
+    @Override
+    public Object pollUnsafe()
+    {
+      return circularBuffer.pollUnsafe();
+    }
+
+    @Override
+    public Object element()
+    {
+      return circularBuffer.element();
+    }
+
+    @Override
+    public boolean isEmpty()
+    {
+      return circularBuffer.isEmpty();
+    }
+
+    public Iterator<Object> getFrozenIterator()
+    {
+      return circularBuffer.getFrozenIterator();
+    }
+
+    public Iterable<Object> getFrozenIterable()
+    {
+      return circularBuffer.getFrozenIterable();
+    }
+
+    @Override
+    public Iterator<Object> iterator()
+    {
+      return circularBuffer.iterator();
+    }
+
+    @Override
+    public Object[] toArray()
+    {
+      return circularBuffer.toArray();
+    }
+
+    @Override
+    public <T> T[] toArray(T[] a)
+    {
+      return circularBuffer.toArray(a);
+    }
+
+    @Override
+    public boolean containsAll(Collection<?> c)
+    {
+      return circularBuffer.containsAll(c);
+    }
+
+    @Override
+    public boolean addAll(Collection<?> c)
+    {
+      return circularBuffer.addAll(c);
+    }
+
+    @Override
+    public boolean removeAll(Collection<?> c)
+    {
+      return circularBuffer.removeAll(c);
+    }
+
+    @Override
+    public boolean retainAll(Collection<?> c)
+    {
+      return circularBuffer.retainAll(c);
+    }
+
+    @Override
+    public int size() {
+      return circularBuffer.size();
+    }
+
+    @Override
+    public void clear()
+    {
+      circularBuffer.clear();
+    }
+
+    @Override
+    public Object peekUnsafe()
+    {
+      return circularBuffer.peekUnsafe();
+    }
+
+    public CircularBuffer<Object> getWhitehole(String exceptionMessage)
+    {
+      return circularBuffer.getWhitehole(exceptionMessage);
     }
   }
 
 }
+
+

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/main/java/com/datatorrent/stram/engine/ForwardingReservoir.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/engine/ForwardingReservoir.java 
b/engine/src/main/java/com/datatorrent/stram/engine/ForwardingReservoir.java
new file mode 100644
index 0000000..49b7f31
--- /dev/null
+++ b/engine/src/main/java/com/datatorrent/stram/engine/ForwardingReservoir.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.engine;
+
+import com.datatorrent.api.Sink;
+import com.datatorrent.stram.tuple.Tuple;
+
+public class ForwardingReservoir implements SweepableReservoir
+{
+  public static ForwardingReservoir newReservoir(final String id, final int 
capacity)
+  {
+    return new ForwardingReservoir(AbstractReservoir.newReservoir(id, 
capacity));
+  }
+
+  private final AbstractReservoir reservoir;
+
+  public ForwardingReservoir(AbstractReservoir reservoir)
+  {
+    this.reservoir = reservoir;
+  }
+
+  @Override
+  public Sink<Object> setSink(Sink<Object> sink)
+  {
+    return reservoir.setSink(sink);
+  }
+
+  @Override
+  public int getCount(boolean reset)
+  {
+    return reservoir.getCount(reset);
+  }
+
+  public String getId()
+  {
+    return reservoir.getId();
+  }
+
+  public void setId(String id)
+  {
+    reservoir.setId(id);
+  }
+
+  public boolean add(Object o)
+  {
+    return reservoir.add(o);
+  }
+
+  public void put(Object o) throws InterruptedException
+  {
+    reservoir.put(o);
+  }
+
+  @Override
+  public Tuple sweep()
+  {
+    return reservoir.sweep();
+  }
+
+  @Override
+  public int size(final boolean dataTupleAware)
+  {
+    return reservoir.size(dataTupleAware);
+  }
+
+  @Override
+  public Object remove()
+  {
+    return reservoir.remove();
+  }
+
+  @Override
+  public boolean isEmpty()
+  {
+    return reservoir.isEmpty();
+  }
+
+  public AbstractReservoir getReservoir()
+  {
+    return reservoir;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java 
b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
index 1ccec31..1394474 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
@@ -586,8 +586,8 @@ public class GenericNode extends Node<Operator>
         else {
           boolean need2sleep = true;
           for (Map.Entry<String, SweepableReservoir> cb : activeQueues) {
-            if (cb.getValue().size() > 0) {
-              need2sleep = false;
+            need2sleep = cb.getValue().isEmpty();
+            if (!need2sleep) {
               break;
             }
           }
@@ -694,17 +694,7 @@ public class GenericNode extends Node<Operator>
     for (Entry<String, SweepableReservoir> e : inputs.entrySet()) {
       SweepableReservoir ar = e.getValue();
       ContainerStats.OperatorStats.PortStats portStats = new 
ContainerStats.OperatorStats.PortStats(e.getKey());
-      portStats.queueSize = ar.size();
-      if(DATA_TUPLE_AWARE) {
-        if (ar instanceof CircularBuffer) {
-          Iterator iterator = ((CircularBuffer) ar).getFrozenIterator();
-          while (iterator.hasNext()) {
-            if (iterator.next() instanceof Tuple) {
-              portStats.queueSize--;
-            }
-          }
-        }
-      }
+      portStats.queueSize = ar.size(DATA_TUPLE_AWARE);
       portStats.tupleCount = ar.getCount(true);
       portStats.endWindowTimestamp = endWindowDequeueTimes.get(e.getValue());
       ipstats.add(portStats);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/main/java/com/datatorrent/stram/engine/MuxReservoir.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/engine/MuxReservoir.java 
b/engine/src/main/java/com/datatorrent/stram/engine/MuxReservoir.java
index 3e60756..09ef344 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/MuxReservoir.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/MuxReservoir.java
@@ -22,6 +22,9 @@ import com.datatorrent.stram.tuple.Tuple;
 import com.datatorrent.api.Sink;
 import com.datatorrent.netlet.util.CircularBuffer;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Queue;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,7 +74,7 @@ public abstract class MuxReservoir
     return r;
   }
 
-  public abstract Reservoir getMasterReservoir();
+  protected abstract Queue getQueue();
 
   class SubReservoir extends CircularBuffer<Object> implements 
SweepableReservoir
   {
@@ -84,6 +87,21 @@ public abstract class MuxReservoir
     }
 
     @Override
+    public int size(final boolean dataTupleAware)
+    {
+      int size = size();
+      if (dataTupleAware) {
+        Iterator<Object> iterator = getFrozenIterator();
+        while (iterator.hasNext()) {
+          if (iterator.next() instanceof Tuple) {
+            size--;
+          }
+        }
+      }
+      return size;
+    }
+
+    @Override
     public Sink<Object> setSink(Sink<Object> sink)
     {
       try {
@@ -110,14 +128,14 @@ public abstract class MuxReservoir
         count += size;
       }
 
-      final Reservoir masterReservoir = getMasterReservoir();
-      synchronized (masterReservoir) {
-        /* find out the minimum remaining capacity in all the other buffers 
and consume those many tuples from bufferserver */
-        int min = masterReservoir.size();
-        if (min == 0) {
+      final Queue queue = getQueue();
+      synchronized (queue) {
+        if (queue.isEmpty()) {
           return null;
         }
 
+        /* find out the minimum remaining capacity in all the other buffers 
and consume those many tuples from bufferserver */
+        int min = Integer.MAX_VALUE;
         for (int i = reservoirs.length; i-- > 0;) {
           if (reservoirs[i].remainingCapacity() < min) {
             min = reservoirs[i].remainingCapacity();
@@ -125,7 +143,10 @@ public abstract class MuxReservoir
         }
 
         while (min-- > 0) {
-          Object o = masterReservoir.remove();
+          Object o = queue.poll();
+          if (o == null) {
+            break;
+          }
           for (int i = reservoirs.length; i-- > 0;) {
             reservoirs[i].add(o);
           }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/main/java/com/datatorrent/stram/engine/Reservoir.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Reservoir.java 
b/engine/src/main/java/com/datatorrent/stram/engine/Reservoir.java
index bb2f5f6..6616c8c 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Reservoir.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Reservoir.java
@@ -23,20 +23,32 @@ package com.datatorrent.stram.engine;
  *
  * @since 0.3.2
  */
-public interface Reservoir
+public interface Reservoir<T>
 {
   /**
-   * the count of elements in this SweepableReservoir.
+   * @param dataTupleAware if true excludes control tuples from the returned 
count, otherwise returns the count of all
+   *     elements in the reservoir including control tuples.
+   * @return number of elements including or excluding control tuples in the 
Reservoir. Depending on implementation
+   * {@code size()} may return exact number of elements in the reservoir or an 
over estimate. In case {@code size()}
+   * returns {@code 0} the reservoir is guaranteed to be empty, while {@code 
size() != 0} does not imply that
+   * {@code isEmpty()} is {@code false}
    *
-   * @return the count
+   * @since 3.3
    */
-  public int size();
+  int size(final boolean dataTupleAware);
 
   /**
    * Remove an element from the reservoir.
    *
    * @return the removed element.
    */
-  public Object remove();
+  T remove();
+
+  /**
+   * @return true if reservoir is empty, false otherwise
+   *
+   * @since 3.3
+   */
+  boolean isEmpty();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/main/java/com/datatorrent/stram/engine/SweepableReservoir.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/engine/SweepableReservoir.java 
b/engine/src/main/java/com/datatorrent/stram/engine/SweepableReservoir.java
index 5f81c2c..3db564f 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/SweepableReservoir.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/SweepableReservoir.java
@@ -26,7 +26,7 @@ import com.datatorrent.api.Sink;
  *
  * @since 0.3.2
  */
-public interface SweepableReservoir extends Reservoir
+public interface SweepableReservoir extends Reservoir<Object>
 {
   /**
    * Set a new sink on this reservoir where data tuples would be put.
@@ -34,14 +34,14 @@ public interface SweepableReservoir extends Reservoir
    * @param sink The new Sink for the data tuples
    * @return The old sink if present or null
    */
-  public Sink<Object> setSink(Sink<Object> sink);
+  Sink<Object> setSink(Sink<Object> sink);
 
   /**
    * Consume all the data tuples until control tuple is encountered.
    *
    * @return The control tuple encountered or null
    */
-  public Tuple sweep();
+  Tuple sweep();
 
   /**
    * Get the count of tuples consumed.
@@ -49,6 +49,6 @@ public interface SweepableReservoir extends Reservoir
    * @param reset flag to indicate if the count should be reset to zero after 
this operation
    * @return the count of tuples
    */
-  public int getCount(boolean reset);
+  int getCount(boolean reset);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java 
b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
index ea429af..08e1249 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
@@ -18,6 +18,8 @@
  */
 package com.datatorrent.stram.engine;
 
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
@@ -51,7 +53,7 @@ public class WindowGenerator extends MuxReservoir implements 
Stream, Runnable
   public static final int MAX_WINDOW_ID = WINDOW_MASK - (WINDOW_MASK % 1000) - 
1;
   public static final int MAX_WINDOW_WIDTH = (int)(Long.MAX_VALUE / 
MAX_WINDOW_ID);
   private final ScheduledExecutorService ses;
-  private final MasterReservoir masterReservoir;
+  private final BlockingQueue<Tuple> queue;
   private long firstWindowMillis; // Window start time
   private int windowWidthMillis; // Window size
   private long currentWindowMillis;
@@ -65,7 +67,7 @@ public class WindowGenerator extends MuxReservoir implements 
Stream, Runnable
   public WindowGenerator(ScheduledExecutorService service, int capacity)
   {
     ses = service;
-    masterReservoir = new MasterReservoir(capacity);
+    queue = new CircularBuffer<>(capacity);
   }
 
   /**
@@ -86,8 +88,8 @@ public class WindowGenerator extends MuxReservoir implements 
Stream, Runnable
     baseSeconds = (resetWindowMillis / 1000) << 32;
     //logger.info("generating reset -> begin {}", 
Codec.getStringWindowId(baseSeconds));
 
-    masterReservoir.put(new ResetWindowTuple(baseSeconds | windowWidthMillis));
-    masterReservoir.put(new Tuple(MessageType.BEGIN_WINDOW, baseSeconds | 
windowId));
+    queue.put(new ResetWindowTuple(baseSeconds | windowWidthMillis));
+    queue.put(new Tuple(MessageType.BEGIN_WINDOW, baseSeconds | windowId));
   }
 
   /**
@@ -96,9 +98,9 @@ public class WindowGenerator extends MuxReservoir implements 
Stream, Runnable
    */
   private void endCurrentBeginNewWindow() throws InterruptedException
   {
-    masterReservoir.put(new EndWindowTuple(baseSeconds | windowId));
+    queue.put(new EndWindowTuple(baseSeconds | windowId));
     if (++checkPointWindowCount == checkpointCount) {
-      masterReservoir.put(new Tuple(MessageType.CHECKPOINT, baseSeconds | 
windowId));
+      queue.put(new Tuple(MessageType.CHECKPOINT, baseSeconds | windowId));
       checkPointWindowCount = 0;
     }
 
@@ -108,7 +110,7 @@ public class WindowGenerator extends MuxReservoir 
implements Stream, Runnable
     }
     else {
       advanceWindow();
-      masterReservoir.put(new Tuple(MessageType.BEGIN_WINDOW, baseSeconds | 
windowId));
+      queue.put(new Tuple(MessageType.BEGIN_WINDOW, baseSeconds | windowId));
     }
   }
 
@@ -234,10 +236,9 @@ public class WindowGenerator extends MuxReservoir 
implements Stream, Runnable
   }
 
   @Override
-  @SuppressWarnings("ReturnOfCollectionOrArrayField")
-  public Reservoir getMasterReservoir()
+  protected Queue getQueue()
   {
-    return masterReservoir;
+    return queue;
   }
 
   @Override
@@ -335,14 +336,5 @@ public class WindowGenerator extends MuxReservoir 
implements Stream, Runnable
     return windowId >>> 32;
   }
 
-  private class MasterReservoir extends CircularBuffer<Tuple> implements 
Reservoir
-  {
-    MasterReservoir(int n)
-    {
-      super(n);
-    }
-
-  }
-
   private static final Logger logger = 
LoggerFactory.getLogger(WindowGenerator.class);
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/main/java/com/datatorrent/stram/engine/WindowIdActivatedReservoir.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/engine/WindowIdActivatedReservoir.java
 
b/engine/src/main/java/com/datatorrent/stram/engine/WindowIdActivatedReservoir.java
index b17c963..0b7521e 100644
--- 
a/engine/src/main/java/com/datatorrent/stram/engine/WindowIdActivatedReservoir.java
+++ 
b/engine/src/main/java/com/datatorrent/stram/engine/WindowIdActivatedReservoir.java
@@ -50,9 +50,15 @@ public class WindowIdActivatedReservoir implements 
SweepableReservoir
   }
 
   @Override
-  public int size()
+  public int size(final boolean dataTupleAware)
   {
-    return reservoir.size();
+    return reservoir.size(dataTupleAware);
+  }
+
+  @Override
+  public boolean isEmpty()
+  {
+    return reservoir.isEmpty();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java 
b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
index 7738ef3..9441e76 100644
--- 
a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
+++ 
b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
@@ -21,6 +21,7 @@ package com.datatorrent.stram.stream;
 import java.net.InetSocketAddress;
 import java.util.ArrayDeque;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
@@ -242,6 +243,21 @@ public class BufferServerSubscriber extends Subscriber 
implements ByteCounterStr
     }
 
     @Override
+    public int size(final boolean dataTupleAware)
+    {
+      int size = size();
+      if (dataTupleAware) {
+        Iterator<Object> iterator = getFrozenIterator();
+        while (iterator.hasNext()) {
+          if (iterator.next() instanceof Tuple) {
+            size--;
+          }
+        }
+      }
+      return size;
+    }
+
+    @Override
     public Sink<Object> setSink(Sink<Object> sink)
     {
       try {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java 
b/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java
index 491a906..b49a5b0 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java
@@ -21,7 +21,8 @@ package com.datatorrent.stram.stream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.stram.engine.DefaultReservoir;
+import com.datatorrent.stram.engine.AbstractReservoir;
+import com.datatorrent.stram.engine.ForwardingReservoir;
 import com.datatorrent.stram.engine.Stream;
 import com.datatorrent.stram.engine.StreamContext;
 import com.datatorrent.stram.engine.SweepableReservoir;
@@ -33,11 +34,11 @@ import com.datatorrent.stram.engine.SweepableReservoir;
  *
  * @since 0.3.2
  */
-public class InlineStream extends DefaultReservoir implements Stream, 
SweepableReservoir
+public class InlineStream extends ForwardingReservoir implements Stream, 
SweepableReservoir
 {
   public InlineStream(int capacity)
   {
-    super("InlineStream", capacity);
+    super(AbstractReservoir.newReservoir("InlineStream", capacity));
   }
 
   /**
@@ -90,7 +91,7 @@ public class InlineStream extends DefaultReservoir implements 
Stream, SweepableR
   @Override
   public String toString()
   {
-    return "InlineStream{" + super.toString() + '}';
+    return getClass().getName() + '@' + Integer.toHexString(hashCode()) + 
"{reservoir=" + getReservoir().toString() + '}';
   }
 
   private static final Logger logger = 
LoggerFactory.getLogger(InlineStream.class);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java 
b/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java
index eb8b1ee..5cb461f 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java
@@ -107,12 +107,18 @@ public class OiOStream implements Stream, 
SweepableReservoir
    * It's an error to have more than one tuple active on OiO.
    */
   @Override
-  public int size()
+  public int size(final boolean dataTupleAware)
   {
     return 1;
   }
 
   @Override
+  public boolean isEmpty()
+  {
+    return false;
+  }
+
+  @Override
   public Object remove()
   {
     throw new UnsupportedOperationException("Not supported yet."); //To change 
body of generated methods, choose Tools | Templates.

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java 
b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
index 891cf1f..b98eb7e 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
@@ -84,8 +84,8 @@ public class AtMostOnceTest extends ProcessingModeTests
     map.put(OperatorContext.PROCESSING_MODE, processingMode);
 
     final GenericNode node = new GenericNode(new MultiInputOperator(), new 
com.datatorrent.stram.engine.OperatorContext(1, map, null));
-    DefaultReservoir reservoir1 = new DefaultReservoir("input1", 1024);
-    DefaultReservoir reservoir2 = new DefaultReservoir("input1", 1024);
+    AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("input1", 
1024);
+    AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("input1", 
1024);
     node.connectInputPort("input1", reservoir1);
     node.connectInputPort("input2", reservoir2);
     node.connectOutputPort("output", new Sink<Object>()

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java 
b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index 2577504..20cfb7b 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -256,8 +256,8 @@ public class GenericNodeTest
     GenericOperator go = new GenericOperator();
     final GenericNode gn = new GenericNode(go, new 
com.datatorrent.stram.engine.OperatorContext(0, new DefaultAttributeMap(), 
null));
     gn.setId(1);
-    DefaultReservoir reservoir1 = new DefaultReservoir("ip1Res", 1024);
-    DefaultReservoir reservoir2 = new DefaultReservoir("ip2Res", 1024);
+    AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("ip1Res", 
1024);
+    AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("ip2Res", 
1024);
     Sink<Object> output = new Sink<Object>()
     {
       @Override
@@ -378,8 +378,8 @@ public class GenericNodeTest
     GenericOperator go = new GenericOperator();
     final GenericNode gn = new GenericNode(go, new 
com.datatorrent.stram.engine.OperatorContext(0, new DefaultAttributeMap(), 
null));
     gn.setId(1);
-    DefaultReservoir reservoir1 = new DefaultReservoir("ip1Res", 1024);
-    DefaultReservoir reservoir2 = new DefaultReservoir("ip2Res", 1024);
+    AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("ip1Res", 
1024);
+    AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("ip2Res", 
1024);
 
     gn.connectInputPort("ip1", reservoir1);
     gn.connectInputPort("ip2", reservoir2);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java 
b/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
index 3dedd28..f723c2b 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
@@ -182,7 +182,7 @@ public class InputNodeTest
     }
 
     @Override
-    public int size()
+    public int size(final boolean dataTupleAware)
     {
       if (currentTuple != null) {
         return 1;
@@ -192,6 +192,12 @@ public class InputNodeTest
     }
 
     @Override
+    public boolean isEmpty()
+    {
+      return currentTuple == null;
+    }
+
+    @Override
     public Object remove()
     {
       Tuple tempTuple = currentTuple;

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java 
b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
index fa36fec..9c2d7f7 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
@@ -261,8 +261,8 @@ public class ProcessingModeTests
 
     final GenericNode node = new GenericNode(new MultiInputOperator(),
                                              new 
com.datatorrent.stram.engine.OperatorContext(1, map, null));
-    DefaultReservoir reservoir1 = new DefaultReservoir("input1", 1024);
-    DefaultReservoir reservoir2 = new DefaultReservoir("input1", 1024);
+    AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("input1", 
1024);
+    AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("input1", 
1024);
     node.connectInputPort("input1", reservoir1);
     node.connectInputPort("input2", reservoir2);
     node.connectOutputPort("output", new Sink<Object>()

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7cb6e93b/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java 
b/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
index d840955..69c6df9 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
@@ -111,7 +111,7 @@ public class InlineStreamTest
     };
     node2.connectOutputPort("output", sink);
 
-    DefaultReservoir reservoir1 = new DefaultReservoir("input", 1024 * 5);
+    AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("input", 
1024 * 5);
     node1.connectInputPort("input", reservoir1);
 
     Map<Integer, Node<?>> activeNodes = new ConcurrentHashMap<Integer, 
Node<?>>();
@@ -139,8 +139,8 @@ public class InlineStreamTest
       @Override
       public boolean isComplete()
       {
-        logger.debug("stream size={}", stream.size());
-        return stream.size() == 0;
+        logger.debug("stream {} empty {}, size {}", stream, stream.isEmpty(), 
stream.size(false));
+        return stream.isEmpty();
       }
     };
 


Reply via email to