[FLINK-1638] [streaming] Barrier sync added to CoRecordReader, barrier tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5327d56d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5327d56d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5327d56d

Branch: refs/heads/master
Commit: 5327d56dc6f6f49a07054d89efcf30c894c85eca
Parents: c9a3992
Author: Gyula Fora <gyf...@apache.org>
Authored: Thu Mar 5 22:04:49 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 10 14:58:49 2015 +0100

----------------------------------------------------------------------
 .../io/network/api/reader/BarrierBuffer.java    | 143 -------------
 .../reader/StreamingAbstractRecordReader.java   | 122 -----------
 .../connectors/kafka/api/KafkaSource.java       |   5 +-
 .../api/invokable/operator/co/CoInvokable.java  |  11 +-
 .../flink/streaming/io/BarrierBuffer.java       | 155 ++++++++++++++
 .../flink/streaming/io/CoRecordReader.java      | 108 ++++++++--
 .../io/StreamingAbstractRecordReader.java       | 123 ++++++++++++
 .../io/StreamingMutableRecordReader.java        |   1 -
 .../streaming/state/PartitionableState.java     |   8 +-
 .../streaming/api/WindowCrossJoinTest.java      |   4 +-
 .../flink/streaming/io/BarrierBufferTest.java   | 200 +++++++++++++++++++
 11 files changed, 589 insertions(+), 291 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java
deleted file mode 100644
index ee317cd..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.reader;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Set;
-
-import org.apache.flink.runtime.event.task.StreamingSuperstep;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BarrierBuffer {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(BarrierBuffer.class);
-
-       private Queue<BufferOrEvent> bufferOrEvents = new 
LinkedList<BufferOrEvent>();
-       private Queue<BufferOrEvent> unprocessed = new 
LinkedList<BufferOrEvent>();
-
-       private Set<Integer> blockedChannels = new HashSet<Integer>();
-       private int totalNumberOfInputChannels;
-
-       private StreamingSuperstep currentSuperstep;
-       private boolean receivedSuperstep;
-
-       private boolean blockAll = false;
-
-       private AbstractReader reader;
-
-       private InputGate inputGate;
-
-       public BarrierBuffer(InputGate inputGate, AbstractReader reader) {
-               this.inputGate = inputGate;
-               totalNumberOfInputChannels = 
inputGate.getNumberOfInputChannels();
-               this.reader = reader;
-       }
-
-       private void startSuperstep(StreamingSuperstep superstep) {
-               this.currentSuperstep = superstep;
-               this.receivedSuperstep = true;
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Superstep started with id: " + 
superstep.getId());
-               }
-       }
-
-       private void store(BufferOrEvent bufferOrEvent) {
-               bufferOrEvents.add(bufferOrEvent);
-       }
-
-       private BufferOrEvent getNonProcessed() {
-               return unprocessed.poll();
-       }
-
-       private boolean isBlocked(int channelIndex) {
-               return blockAll || blockedChannels.contains(channelIndex);
-       }
-       
-       private boolean containsNonprocessed() {
-               return !unprocessed.isEmpty();
-       }
-
-       private boolean receivedSuperstep() {
-               return receivedSuperstep;
-       }
-
-       public BufferOrEvent getNextNonBlocked() throws IOException,
-                       InterruptedException {
-               BufferOrEvent bufferOrEvent = null;
-
-               if (containsNonprocessed()) {
-                       bufferOrEvent = getNonProcessed();
-               } else {
-                       while (bufferOrEvent == null) {
-                               BufferOrEvent nextBufferOrEvent = 
inputGate.getNextBufferOrEvent();
-                               if 
(isBlocked(nextBufferOrEvent.getChannelIndex())) {
-                                       store(nextBufferOrEvent);
-                               } else {
-                                       bufferOrEvent = nextBufferOrEvent;
-                               }
-                       }
-               }
-               return bufferOrEvent;
-       }
-
-       private void blockChannel(int channelIndex) {
-               if (!blockedChannels.contains(channelIndex)) {
-                       blockedChannels.add(channelIndex);
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Channel blocked with index: " + 
channelIndex);
-                       }
-                       if (blockedChannels.size() == 
totalNumberOfInputChannels) {
-                               reader.publish(currentSuperstep);
-                               unprocessed.addAll(bufferOrEvents);
-                               bufferOrEvents.clear();
-                               blockedChannels.clear();
-                               receivedSuperstep = false;
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.debug("All barriers received, 
blocks released");
-                               }
-                       }
-
-               } else {
-                       throw new RuntimeException("Tried to block an already 
blocked channel");
-               }
-       }
-
-       public String toString() {
-               return blockedChannels.toString();
-       }
-
-       public void processSuperstep(BufferOrEvent bufferOrEvent) {
-               int channelIndex = bufferOrEvent.getChannelIndex();
-               if (isBlocked(channelIndex)) {
-                       store(bufferOrEvent);
-               } else {
-                       StreamingSuperstep superstep = (StreamingSuperstep) 
bufferOrEvent.getEvent();
-                       if (!receivedSuperstep()) {
-                               startSuperstep(superstep);
-                       }
-                       blockChannel(channelIndex);
-               }
-       }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java
deleted file mode 100644
index ea2d7a6..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.reader;
-
-import java.io.IOException;
-
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.StreamingSuperstep;
-import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import 
org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A record-oriented reader.
- * <p>
- * This abstract base class is used by both the mutable and immutable record
- * readers.
- * 
- * @param <T>
- *            The type of the record that can be read with this record reader.
- */
-public abstract class StreamingAbstractRecordReader<T extends 
IOReadableWritable> extends AbstractReader implements
-               ReaderBase {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(StreamingAbstractRecordReader.class);
-
-       private final RecordDeserializer<T>[] recordDeserializers;
-
-       private RecordDeserializer<T> currentRecordDeserializer;
-
-       private boolean isFinished;
-
-       private final BarrierBuffer barrierBuffer;
-
-       protected StreamingAbstractRecordReader(InputGate inputGate) {
-               super(inputGate);
-               barrierBuffer = new BarrierBuffer(inputGate, this);
-
-               // Initialize one deserializer per input channel
-               this.recordDeserializers = new 
SpillingAdaptiveSpanningRecordDeserializer[inputGate
-                               .getNumberOfInputChannels()];
-               for (int i = 0; i < recordDeserializers.length; i++) {
-                       recordDeserializers[i] = new 
SpillingAdaptiveSpanningRecordDeserializer<T>();
-               }
-       }
-
-       protected boolean getNextRecord(T target) throws IOException, 
InterruptedException {
-               if (isFinished) {
-                       return false;
-               }
-
-               while (true) {
-                       if (currentRecordDeserializer != null) {
-                               DeserializationResult result = 
currentRecordDeserializer.getNextRecord(target);
-
-                               if (result.isBufferConsumed()) {
-                                       
currentRecordDeserializer.getCurrentBuffer().recycle();
-                                       currentRecordDeserializer = null;
-                               }
-
-                               if (result.isFullRecord()) {
-                                       return true;
-                               }
-                       }
-
-                       final BufferOrEvent bufferOrEvent = 
barrierBuffer.getNextNonBlocked();
-
-                       if (bufferOrEvent.isBuffer()) {
-                               currentRecordDeserializer = 
recordDeserializers[bufferOrEvent.getChannelIndex()];
-                               
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
-                       } else {
-                               // Event received
-                               final AbstractEvent event = 
bufferOrEvent.getEvent();
-
-                               if (event instanceof StreamingSuperstep) {
-                                       
barrierBuffer.processSuperstep(bufferOrEvent);
-                               } else {
-                                       if (handleEvent(event)) {
-                                               if (inputGate.isFinished()) {
-                                                       isFinished = true;
-                                                       return false;
-                                               } else if 
(hasReachedEndOfSuperstep()) {
-                                                       return false;
-                                               } // else: More data is 
coming...
-                                       }
-                               }
-                       }
-               }
-       }
-
-       public void clearBuffers() {
-               for (RecordDeserializer<?> deserializer : recordDeserializers) {
-                       Buffer buffer = deserializer.getCurrentBuffer();
-                       if (buffer != null && !buffer.isRecycled()) {
-                               buffer.recycle();
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index 4349081..0c6cd4a 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -78,8 +78,9 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
        }
 
        public KafkaSource(String zookeeperHost, String topicId,
-                                               DeserializationSchema<OUT> 
deserializationSchema, long zookeeperSyncTimeMillis){
-               this(zookeeperHost, topicId, DEFAULT_GROUP_ID, 
deserializationSchema, ZOOKEEPER_DEFAULT_SYNC_TIME);
+                       DeserializationSchema<OUT> deserializationSchema, long 
zookeeperSyncTimeMillis) {
+               this(zookeeperHost, topicId, DEFAULT_GROUP_ID, 
deserializationSchema,
+                               ZOOKEEPER_DEFAULT_SYNC_TIME);
        }
 
        public KafkaSource(String zookeeperHost, String topicId,

http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index b41dbbb..2b407c6 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -84,7 +84,16 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends 
StreamInvokable<IN1, OU
                                next = recordIterator.next(reuse1, reuse2);
                        } catch (IOException e) {
                                if (isRunning) {
-                                       throw e;
+                                       throw new RuntimeException("Could not 
read next record due to: "
+                                                       + 
StringUtils.stringifyException(e));
+                               } else {
+                                       // Task already cancelled do nothing
+                                       next = 0;
+                               }
+                       } catch (IllegalStateException e) {
+                               if (isRunning) {
+                                       throw new RuntimeException("Could not 
read next record due to: "
+                                                       + 
StringUtils.stringifyException(e));
                                } else {
                                        // Task already cancelled do nothing
                                        next = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
new file mode 100644
index 0000000..3ff718a
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
+import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BarrierBuffer {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(BarrierBuffer.class);
+
+       private Queue<BufferOrEvent> nonprocessed = new 
LinkedList<BufferOrEvent>();
+       private Queue<BufferOrEvent> blockedNonprocessed = new 
LinkedList<BufferOrEvent>();
+
+       private Set<Integer> blockedChannels = new HashSet<Integer>();
+       private int totalNumberOfInputChannels;
+
+       private StreamingSuperstep currentSuperstep;
+       private boolean superstepStarted;
+
+       private AbstractReader reader;
+
+       private InputGate inputGate;
+
+       public BarrierBuffer(InputGate inputGate, AbstractReader reader) {
+               this.inputGate = inputGate;
+               totalNumberOfInputChannels = 
inputGate.getNumberOfInputChannels();
+               this.reader = reader;
+       }
+
+       protected void startSuperstep(StreamingSuperstep superstep) {
+               this.currentSuperstep = superstep;
+               this.superstepStarted = true;
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Superstep started with id: " + 
superstep.getId());
+               }
+       }
+
+       protected void store(BufferOrEvent bufferOrEvent) {
+               nonprocessed.add(bufferOrEvent);
+       }
+
+       protected BufferOrEvent getNonProcessed() {
+               BufferOrEvent nextNonprocessed = null;
+               while (nextNonprocessed == null && !nonprocessed.isEmpty()) {
+                       nextNonprocessed = nonprocessed.poll();
+                       if (isBlocked(nextNonprocessed.getChannelIndex())) {
+                               blockedNonprocessed.add(nextNonprocessed);
+                               nextNonprocessed = null;
+                       }
+               }
+               return nextNonprocessed;
+       }
+
+       protected boolean isBlocked(int channelIndex) {
+               return blockedChannels.contains(channelIndex);
+       }
+
+       protected boolean isAllBlocked() {
+               return blockedChannels.size() == totalNumberOfInputChannels;
+       }
+
+       public BufferOrEvent getNextNonBlocked() throws IOException, 
InterruptedException {
+               // If there are non-processed buffers from the previously 
blocked ones,
+               // we get the next
+               BufferOrEvent bufferOrEvent = getNonProcessed();
+
+               if (bufferOrEvent != null) {
+                       return bufferOrEvent;
+               } else {
+                       // If no non-processed, get new from input
+                       while (true) {
+                               // We read the next buffer from the inputgate
+                               bufferOrEvent = 
inputGate.getNextBufferOrEvent();
+                               if (isBlocked(bufferOrEvent.getChannelIndex())) 
{
+                                       // If channel blocked we just store it
+                                       store(bufferOrEvent);
+                               } else {
+                                       return bufferOrEvent;
+                               }
+                       }
+               }
+       }
+
+       protected void blockChannel(int channelIndex) {
+               if (!blockedChannels.contains(channelIndex)) {
+                       blockedChannels.add(channelIndex);
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Channel blocked with index: " + 
channelIndex);
+                       }
+                       if (isAllBlocked()) {
+                               actOnAllBlocked();
+                       }
+
+               } else {
+                       throw new RuntimeException("Tried to block an already 
blocked channel");
+               }
+       }
+
+       protected void releaseBlocks() {
+               nonprocessed.addAll(blockedNonprocessed);
+               blockedChannels.clear();
+               blockedNonprocessed.clear();
+               superstepStarted = false;
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("All barriers received, blocks released");
+               }
+       }
+
+       protected void actOnAllBlocked() {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Publishing barrier to the vertex");
+               }
+               reader.publish(currentSuperstep);
+               releaseBlocks();
+       }
+
+       public String toString() {
+               return blockedChannels.toString();
+       }
+
+       public void processSuperstep(BufferOrEvent bufferOrEvent) {
+               StreamingSuperstep superstep = (StreamingSuperstep) 
bufferOrEvent.getEvent();
+               if (!superstepStarted) {
+                       startSuperstep(superstep);
+               }
+               blockChannel(bufferOrEvent.getChannelIndex());
+       }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
index 79f09c4..6a1f624 100755
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
@@ -18,10 +18,12 @@
 package org.apache.flink.streaming.io;
 
 import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
 import 
org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
@@ -44,7 +46,9 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 
extends IOReadable
 
        private final InputGate bufferReader2;
 
-       private final BlockingQueue<Integer> availableRecordReaders = new 
LinkedBlockingQueue<Integer>();
+       private final LinkedBlockingDeque<Integer> availableRecordReaders = new 
LinkedBlockingDeque<Integer>();
+
+       private LinkedList<Integer> processed = new LinkedList<Integer>();
 
        private AdaptiveSpanningRecordDeserializer[] reader1RecordDeserializers;
 
@@ -59,15 +63,20 @@ public class CoRecordReader<T1 extends IOReadableWritable, 
T2 extends IOReadable
 
        private boolean hasRequestedPartitions;
 
-       public CoRecordReader(InputGate bufferReader1, InputGate bufferReader2) 
{
-               super(new UnionInputGate(bufferReader1, bufferReader2));
+       private CoBarrierBuffer barrierBuffer1;
+       private CoBarrierBuffer barrierBuffer2;
+
+       private Queue<Integer> unprocessedIndices = new LinkedList<Integer>();
+
+       public CoRecordReader(InputGate inputgate1, InputGate inputgate2) {
+               super(new UnionInputGate(inputgate1, inputgate2));
 
-               this.bufferReader1 = bufferReader1;
-               this.bufferReader2 = bufferReader2;
+               this.bufferReader1 = inputgate1;
+               this.bufferReader2 = inputgate2;
 
-               this.reader1RecordDeserializers = new 
AdaptiveSpanningRecordDeserializer[bufferReader1
+               this.reader1RecordDeserializers = new 
AdaptiveSpanningRecordDeserializer[inputgate1
                                .getNumberOfInputChannels()];
-               this.reader2RecordDeserializers = new 
AdaptiveSpanningRecordDeserializer[bufferReader2
+               this.reader2RecordDeserializers = new 
AdaptiveSpanningRecordDeserializer[inputgate2
                                .getNumberOfInputChannels()];
 
                for (int i = 0; i < reader1RecordDeserializers.length; i++) {
@@ -78,8 +87,14 @@ public class CoRecordReader<T1 extends IOReadableWritable, 
T2 extends IOReadable
                        reader2RecordDeserializers[i] = new 
AdaptiveSpanningRecordDeserializer<T2>();
                }
 
-               bufferReader1.registerListener(this);
-               bufferReader2.registerListener(this);
+               inputgate1.registerListener(this);
+               inputgate2.registerListener(this);
+
+               barrierBuffer1 = new CoBarrierBuffer(inputgate1, this);
+               barrierBuffer2 = new CoBarrierBuffer(inputgate2, this);
+
+               barrierBuffer1.setOtherBarrierBuffer(barrierBuffer2);
+               barrierBuffer2.setOtherBarrierBuffer(barrierBuffer1);
        }
 
        public void requestPartitionsOnce() throws IOException, 
InterruptedException {
@@ -94,15 +109,16 @@ public class CoRecordReader<T1 extends IOReadableWritable, 
T2 extends IOReadable
        @SuppressWarnings("unchecked")
        protected int getNextRecord(T1 target1, T2 target2) throws IOException, 
InterruptedException {
 
-               requestPartitionsOnce();
+               requestPartitionsOnce();        
 
                while (true) {
                        if (currentReaderIndex == 0) {
                                if ((bufferReader1.isFinished() && 
bufferReader2.isFinished())) {
                                        return 0;
                                }
-
+                               
                                currentReaderIndex = 
getNextReaderIndexBlocking();
+
                        }
 
                        if (currentReaderIndex == 1) {
@@ -123,12 +139,17 @@ public class CoRecordReader<T1 extends 
IOReadableWritable, T2 extends IOReadable
                                                }
                                        } else {
 
-                                               final BufferOrEvent boe = 
bufferReader1.getNextBufferOrEvent();
+                                               final BufferOrEvent boe = 
barrierBuffer1.getNextNonBlocked();
 
                                                if (boe.isBuffer()) {
                                                        
reader1currentRecordDeserializer = reader1RecordDeserializers[boe
                                                                        
.getChannelIndex()];
                                                        
reader1currentRecordDeserializer.setNextBuffer(boe.getBuffer());
+                                               } else if (boe.getEvent() 
instanceof StreamingSuperstep) {
+                                                       
barrierBuffer1.processSuperstep(boe);
+                                                       currentReaderIndex = 0;
+
+                                                       break;
                                                } else if 
(handleEvent(boe.getEvent())) {
                                                        currentReaderIndex = 0;
 
@@ -153,12 +174,17 @@ public class CoRecordReader<T1 extends 
IOReadableWritable, T2 extends IOReadable
                                                        return 2;
                                                }
                                        } else {
-                                               final BufferOrEvent boe = 
bufferReader2.getNextBufferOrEvent();
+                                               final BufferOrEvent boe = 
barrierBuffer2.getNextNonBlocked();
 
                                                if (boe.isBuffer()) {
                                                        
reader2currentRecordDeserializer = reader2RecordDeserializers[boe
                                                                        
.getChannelIndex()];
                                                        
reader2currentRecordDeserializer.setNextBuffer(boe.getBuffer());
+                                               } else if (boe.getEvent() 
instanceof StreamingSuperstep) {
+                                                       
barrierBuffer2.processSuperstep(boe);
+                                                       currentReaderIndex = 0;
+
+                                                       break;
                                                } else if 
(handleEvent(boe.getEvent())) {
                                                        currentReaderIndex = 0;
 
@@ -173,7 +199,32 @@ public class CoRecordReader<T1 extends IOReadableWritable, 
T2 extends IOReadable
        }
 
        private int getNextReaderIndexBlocking() throws InterruptedException {
-               return availableRecordReaders.take();
+
+               Integer nextIndex = 0;
+
+               while (processed.contains(nextIndex = 
availableRecordReaders.take())) {
+                       processed.remove(nextIndex);
+               }
+
+               if (nextIndex == 1) {
+                       if (barrierBuffer1.isAllBlocked()) {
+                               availableRecordReaders.addFirst(1);
+                               processed.add(2);
+                               return 2;
+                       } else {
+                               return 1;
+                       }
+               } else {
+                       if (barrierBuffer2.isAllBlocked()) {
+                               availableRecordReaders.addFirst(2);
+                               processed.add(1);
+                               return 1;
+                       } else {
+                               return 2;
+                       }
+
+               }
+
        }
 
        // 
------------------------------------------------------------------------
@@ -183,8 +234,10 @@ public class CoRecordReader<T1 extends IOReadableWritable, 
T2 extends IOReadable
        @Override
        public void onEvent(InputGate bufferReader) {
                if (bufferReader == bufferReader1) {
+                       System.out.println("Added 1");
                        availableRecordReaders.add(1);
                } else if (bufferReader == bufferReader2) {
+                       System.out.println("Added 2");
                        availableRecordReaders.add(2);
                }
        }
@@ -203,4 +256,27 @@ public class CoRecordReader<T1 extends IOReadableWritable, 
T2 extends IOReadable
                        }
                }
        }
+
+       private class CoBarrierBuffer extends BarrierBuffer {
+
+               private CoBarrierBuffer otherBuffer;
+
+               public CoBarrierBuffer(InputGate inputGate, AbstractReader 
reader) {
+                       super(inputGate, reader);
+               }
+
+               public void setOtherBarrierBuffer(CoBarrierBuffer other) {
+                       this.otherBuffer = other;
+               }
+
+               @Override
+               protected void actOnAllBlocked() {
+                       if (otherBuffer.isAllBlocked()) {
+                               super.actOnAllBlocked();
+                               otherBuffer.releaseBlocks();
+                       }
+               }
+
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
new file mode 100644
index 0000000..811c48a
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
+import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import 
org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A record-oriented reader.
+ * <p>
+ * This abstract base class is used by both the mutable and immutable record
+ * readers.
+ * 
+ * @param <T>
+ *            The type of the record that can be read with this record reader.
+ */
+public abstract class StreamingAbstractRecordReader<T extends 
IOReadableWritable> extends AbstractReader implements
+               ReaderBase {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(StreamingAbstractRecordReader.class);
+
+       private final RecordDeserializer<T>[] recordDeserializers;
+
+       private RecordDeserializer<T> currentRecordDeserializer;
+
+       private boolean isFinished;
+
+       private final BarrierBuffer barrierBuffer;
+
+       protected StreamingAbstractRecordReader(InputGate inputGate) {
+               super(inputGate);
+               barrierBuffer = new BarrierBuffer(inputGate, this);
+
+               // Initialize one deserializer per input channel
+               this.recordDeserializers = new 
SpillingAdaptiveSpanningRecordDeserializer[inputGate
+                               .getNumberOfInputChannels()];
+               for (int i = 0; i < recordDeserializers.length; i++) {
+                       recordDeserializers[i] = new 
SpillingAdaptiveSpanningRecordDeserializer<T>();
+               }
+       }
+
+       protected boolean getNextRecord(T target) throws IOException, 
InterruptedException {
+               if (isFinished) {
+                       return false;
+               }
+
+               while (true) {
+                       if (currentRecordDeserializer != null) {
+                               DeserializationResult result = 
currentRecordDeserializer.getNextRecord(target);
+
+                               if (result.isBufferConsumed()) {
+                                       
currentRecordDeserializer.getCurrentBuffer().recycle();
+                                       currentRecordDeserializer = null;
+                               }
+
+                               if (result.isFullRecord()) {
+                                       return true;
+                               }
+                       }
+
+                       final BufferOrEvent bufferOrEvent = 
barrierBuffer.getNextNonBlocked();
+
+                       if (bufferOrEvent.isBuffer()) {
+                               currentRecordDeserializer = 
recordDeserializers[bufferOrEvent.getChannelIndex()];
+                               
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
+                       } else {
+                               // Event received
+                               final AbstractEvent event = 
bufferOrEvent.getEvent();
+
+                               if (event instanceof StreamingSuperstep) {
+                                       
barrierBuffer.processSuperstep(bufferOrEvent);
+                               } else {
+                                       if (handleEvent(event)) {
+                                               if (inputGate.isFinished()) {
+                                                       isFinished = true;
+                                                       return false;
+                                               } else if 
(hasReachedEndOfSuperstep()) {
+                                                       return false;
+                                               } // else: More data is 
coming...
+                                       }
+                               }
+                       }
+               }
+       }
+
+       public void clearBuffers() {
+               for (RecordDeserializer<?> deserializer : recordDeserializers) {
+                       Buffer buffer = deserializer.getCurrentBuffer();
+                       if (buffer != null && !buffer.isRecycled()) {
+                               buffer.recycle();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
index ffa436b..e868879 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.io;
 import java.io.IOException;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import 
org.apache.flink.runtime.io.network.api.reader.StreamingAbstractRecordReader;
 import org.apache.flink.runtime.io.network.api.reader.MutableReader;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
index a5e67ab..1c67c9e 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
@@ -22,18 +22,18 @@ import org.apache.flink.runtime.state.OperatorState;
 /**
  * Base class for representing operator states that can be repartitioned for
  * state state and load balancing.
- *
+ * 
  * @param <T>
  *            The type of the operator state.
  */
 public abstract class PartitionableState<T> extends OperatorState<T> {
 
-       public PartitionableState(T initialState) {
+       private static final long serialVersionUID = 1L;
+
+       PartitionableState(T initialState) {
                super(initialState);
        }
 
-       private static final long serialVersionUID = 1L;
-
        /**
         * Repartitions(divides) the current state into the given number of new
         * partitions. The created partitions will be used to redistribute then

http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
index e14e281..bd97917 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -143,7 +143,7 @@ public class WindowCrossJoinTest implements Serializable {
                public void invoke(Tuple2<Tuple2<Integer, String>, 
Tuple1<Integer>> value) {
                        joinResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(value.f0, value.f1.f0));
                }
-               
+
                @Override
                public void cancel() {
                }
@@ -157,7 +157,7 @@ public class WindowCrossJoinTest implements Serializable {
                public void invoke(Tuple2<Tuple2<Integer, String>, 
Tuple1<Integer>> value) {
                        crossResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(value.f0, value.f1.f0));
                }
-               
+
                @Override
                public void cancel() {
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/5327d56d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
new file mode 100644
index 0000000..e7a03d9
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.junit.Test;
+
+public class BarrierBufferTest {
+
+       @Test
+       public void testWithoutBarriers() throws IOException, 
InterruptedException {
+
+               List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
+               input.add(createBuffer(0));
+               input.add(createBuffer(0));
+               input.add(createBuffer(0));
+               input.add(createBuffer(2));
+               input.add(createBuffer(2));
+
+               InputGate mockIG = new MockInputGate(1, input);
+               AbstractReader mockAR = new MockReader(mockIG);
+
+               BarrierBuffer bb = new BarrierBuffer(mockIG, mockAR);
+
+               assertEquals(input.get(0), bb.getNextNonBlocked());
+               assertEquals(input.get(1), bb.getNextNonBlocked());
+               assertEquals(input.get(2), bb.getNextNonBlocked());
+               assertEquals(input.get(3), bb.getNextNonBlocked());
+               assertEquals(input.get(4), bb.getNextNonBlocked());
+
+       }
+
+       @Test
+       public void testOneChannelBarrier() throws IOException, 
InterruptedException {
+
+               List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
+               input.add(createBuffer(0));
+               input.add(createBuffer(0));
+               input.add(createSuperstep(1, 0));
+               input.add(createBuffer(0));
+               input.add(createBuffer(0));
+               input.add(createSuperstep(2, 0));
+               input.add(createBuffer(0));
+
+               InputGate mockIG = new MockInputGate(1, input);
+               AbstractReader mockAR = new MockReader(mockIG);
+
+               BarrierBuffer bb = new BarrierBuffer(mockIG, mockAR);
+               BufferOrEvent nextBoe;
+
+               assertEquals(input.get(0), nextBoe = bb.getNextNonBlocked());
+               assertEquals(input.get(1), nextBoe = bb.getNextNonBlocked());
+               assertEquals(input.get(2), nextBoe = bb.getNextNonBlocked());
+               bb.processSuperstep(nextBoe);
+               assertEquals(input.get(3), nextBoe = bb.getNextNonBlocked());
+               assertEquals(input.get(4), nextBoe = bb.getNextNonBlocked());
+               assertEquals(input.get(5), nextBoe = bb.getNextNonBlocked());
+               bb.processSuperstep(nextBoe);
+               assertEquals(input.get(6), nextBoe = bb.getNextNonBlocked());
+
+       }
+
+       @Test
+       public void testMultiChannelBarrier() throws IOException, 
InterruptedException {
+
+               List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
+               input.add(createBuffer(0));
+               input.add(createBuffer(1));
+               input.add(createSuperstep(1, 0));
+               input.add(createSuperstep(2, 0));
+               input.add(createBuffer(0));
+               input.add(createSuperstep(3, 0));
+               input.add(createBuffer(0));
+               input.add(createBuffer(1));
+               input.add(createSuperstep(1, 1));
+               input.add(createBuffer(0));
+               input.add(createBuffer(1));
+               input.add(createSuperstep(2, 1));
+               input.add(createSuperstep(3, 1));
+
+               InputGate mockIG1 = new MockInputGate(2, input);
+               AbstractReader mockAR1 = new MockReader(mockIG1);
+
+               BarrierBuffer bb = new BarrierBuffer(mockIG1, mockAR1);
+               BufferOrEvent nextBoe;
+
+               assertEquals(input.get(0), nextBoe = bb.getNextNonBlocked());
+               assertEquals(input.get(1), nextBoe = bb.getNextNonBlocked());
+               assertEquals(input.get(2), nextBoe = bb.getNextNonBlocked());
+               bb.processSuperstep(nextBoe);
+               assertEquals(input.get(7), nextBoe = bb.getNextNonBlocked());
+               assertEquals(input.get(8), nextBoe = bb.getNextNonBlocked());
+               bb.processSuperstep(nextBoe);
+               assertEquals(input.get(3), nextBoe = bb.getNextNonBlocked());
+               bb.processSuperstep(nextBoe);
+               assertEquals(input.get(10), nextBoe = bb.getNextNonBlocked());
+               assertEquals(input.get(11), nextBoe = bb.getNextNonBlocked());
+               bb.processSuperstep(nextBoe);
+               assertEquals(input.get(4), nextBoe = bb.getNextNonBlocked());
+               assertEquals(input.get(5), nextBoe = bb.getNextNonBlocked());
+               bb.processSuperstep(nextBoe);
+               assertEquals(input.get(12), nextBoe = bb.getNextNonBlocked());
+               bb.processSuperstep(nextBoe);
+               assertEquals(input.get(6), nextBoe = bb.getNextNonBlocked());
+               assertEquals(input.get(9), nextBoe = bb.getNextNonBlocked());
+
+       }
+
+       private static class MockInputGate implements InputGate {
+
+               private int numChannels;
+               private Queue<BufferOrEvent> boes;
+
+               public MockInputGate(int numChannels, List<BufferOrEvent> boes) 
{
+                       this.numChannels = numChannels;
+                       this.boes = new LinkedList<BufferOrEvent>(boes);
+               }
+
+               @Override
+               public int getNumberOfInputChannels() {
+                       return numChannels;
+               }
+
+               @Override
+               public boolean isFinished() {
+                       return boes.isEmpty();
+               }
+
+               @Override
+               public void requestPartitions() throws IOException, 
InterruptedException {
+               }
+
+               @Override
+               public BufferOrEvent getNextBufferOrEvent() throws IOException, 
InterruptedException {
+                       return boes.remove();
+               }
+
+               @Override
+               public void sendTaskEvent(TaskEvent event) throws IOException {
+               }
+
+               @Override
+               public void registerListener(EventListener<InputGate> listener) 
{
+               }
+
+       }
+
+       private static class MockReader extends AbstractReader {
+
+               protected MockReader(InputGate inputGate) {
+                       super(inputGate);
+               }
+
+       }
+
+       private static BufferOrEvent createSuperstep(long id, int channel) {
+               return new BufferOrEvent(new StreamingSuperstep(id), channel);
+       }
+
+       private static BufferOrEvent createBuffer(int channel) {
+               return new BufferOrEvent(new Buffer(new MemorySegment(new 
byte[] { 1 }),
+                               new BufferRecycler() {
+
+                                       @Override
+                                       public void recycle(MemorySegment 
memorySegment) {
+                                       }
+                               }), channel);
+       }
+
+}

Reply via email to