http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 607da94..0749467 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -20,24 +20,17 @@ package 
org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
-import org.apache.flink.runtime.util.event.EventListener;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import java.io.IOException;
 import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
+import java.util.ArrayDeque;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkElementIndex;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -60,34 +53,42 @@ public class TestSingleInputGate {
                checkArgument(numberOfInputChannels >= 1);
 
                SingleInputGate realGate = new SingleInputGate(
-                               "Test Task Name", new JobID(), new 
ExecutionAttemptID(), new IntermediateDataSetID(), 0, numberOfInputChannels, 
mock(PartitionStateChecker.class), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+                       "Test Task Name",
+                       new JobID(),
+                       new ExecutionAttemptID(),
+                       new IntermediateDataSetID(),
+                       0,
+                       numberOfInputChannels,
+                       mock(PartitionStateChecker.class),
+                       new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                this.inputGate = spy(realGate);
 
                // Notify about late registrations (added for 
DataSinkTaskTest#testUnionDataSinkTask).
                // After merging registerInputOutput and invoke, we have to 
make sure that the test
-               // notifcations happen at the expected time. In real programs, 
this is guaranteed by
+               // notifications happen at the expected time. In real programs, 
this is guaranteed by
                // the instantiation and request partition life cycle.
                try {
                        Field f = 
realGate.getClass().getDeclaredField("inputChannelsWithData");
                        f.setAccessible(true);
-                       final BlockingQueue<InputChannel> notifications = 
(BlockingQueue<InputChannel>) f.get(realGate);
+                       final ArrayDeque<InputChannel> notifications = 
(ArrayDeque<InputChannel>) f.get(realGate);
 
                        doAnswer(new Answer<Void>() {
                                @Override
                                public Void answer(InvocationOnMock invocation) 
throws Throwable {
                                        invocation.callRealMethod();
 
-                                       if (!notifications.isEmpty()) {
-                                               EventListener<InputGate> 
listener = (EventListener<InputGate>) invocation.getArguments()[0];
-                                               listener.onEvent(inputGate);
+                                       synchronized (notifications) {
+                                               if (!notifications.isEmpty()) {
+                                                       InputGateListener 
listener = (InputGateListener) invocation.getArguments()[0];
+                                                       
listener.notifyInputGateNonEmpty(inputGate);
+                                               }
                                        }
 
                                        return null;
                                }
-                       
}).when(inputGate).registerListener(any(EventListener.class));
-               }
-               catch (Exception e) {
+                       
}).when(inputGate).registerListener(any(InputGateListener.class));
+               } catch (Exception e) {
                        throw new RuntimeException(e);
                }
 
@@ -101,81 +102,8 @@ public class TestSingleInputGate {
                }
        }
 
-       public TestSingleInputGate read(Buffer buffer, int channelIndex) throws 
IOException, InterruptedException {
-               checkElementIndex(channelIndex, 
inputGate.getNumberOfInputChannels());
-
-               inputChannels[channelIndex].read(buffer);
-
-               return this;
-       }
-
-       public TestSingleInputGate readBuffer() throws IOException, 
InterruptedException {
-               return readBuffer(0);
-       }
-
-       public TestSingleInputGate readBuffer(int channelIndex) throws 
IOException, InterruptedException {
-               inputChannels[channelIndex].readBuffer();
-
-               return this;
-       }
-
-       public TestSingleInputGate readEvent() throws IOException, 
InterruptedException {
-               return readEvent(0);
-       }
-
-       public TestSingleInputGate readEvent(int channelIndex) throws 
IOException, InterruptedException {
-               inputChannels[channelIndex].readEvent();
-
-               return this;
-       }
-
-       public TestSingleInputGate readEndOfSuperstepEvent() throws 
IOException, InterruptedException {
-               for (TestInputChannel inputChannel : inputChannels) {
-                       inputChannel.readEndOfSuperstepEvent();
-               }
-
-               return this;
-       }
-
-       public TestSingleInputGate readEndOfSuperstepEvent(int channelIndex) 
throws IOException, InterruptedException {
-               inputChannels[channelIndex].readEndOfSuperstepEvent();
-
-               return this;
-       }
-
-       public TestSingleInputGate readEndOfPartitionEvent() throws 
IOException, InterruptedException {
-               for (TestInputChannel inputChannel : inputChannels) {
-                       inputChannel.readEndOfPartitionEvent();
-               }
-
-               return this;
-       }
-
-       public TestSingleInputGate readEndOfPartitionEvent(int channelIndex) 
throws IOException, InterruptedException {
-               inputChannels[channelIndex].readEndOfPartitionEvent();
-
-               return this;
-       }
-
        public SingleInputGate getInputGate() {
                return inputGate;
        }
 
-       // 
------------------------------------------------------------------------
-
-       public List<Integer> readAllChannels() throws IOException, 
InterruptedException {
-               final List<Integer> readOrder = new 
ArrayList<Integer>(inputChannels.length);
-
-               for (int i = 0; i < inputChannels.length; i++) {
-                       readOrder.add(i);
-               }
-
-               Collections.shuffle(readOrder);
-
-               for (int channelIndex : readOrder) {
-                       inputChannels[channelIndex].readBuffer();
-               }
-
-               return readOrder;
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 28f621f..faec77e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-
 import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.junit.Test;
 
@@ -73,22 +72,32 @@ public class UnionInputGateTest {
                inputChannels[1][1].readEndOfPartitionEvent(); // 0 => 3
                inputChannels[1][0].readEndOfPartitionEvent(); // 0 => 3
 
-               SingleInputGateTest.verifyBufferOrEvent(union, true, 0);
-               SingleInputGateTest.verifyBufferOrEvent(union, false, 0);
-               SingleInputGateTest.verifyBufferOrEvent(union, true, 5);
-               SingleInputGateTest.verifyBufferOrEvent(union, false, 5);
-               SingleInputGateTest.verifyBufferOrEvent(union, true, 3);
-               SingleInputGateTest.verifyBufferOrEvent(union, true, 4);
-               SingleInputGateTest.verifyBufferOrEvent(union, true, 1);
-               SingleInputGateTest.verifyBufferOrEvent(union, true, 6);
-               SingleInputGateTest.verifyBufferOrEvent(union, false, 1);
-               SingleInputGateTest.verifyBufferOrEvent(union, false, 6);
-               SingleInputGateTest.verifyBufferOrEvent(union, true, 2);
-               SingleInputGateTest.verifyBufferOrEvent(union, false, 2);
-               SingleInputGateTest.verifyBufferOrEvent(union, true, 7);
-               SingleInputGateTest.verifyBufferOrEvent(union, false, 7);
-               SingleInputGateTest.verifyBufferOrEvent(union, false, 4);
-               SingleInputGateTest.verifyBufferOrEvent(union, false, 3);
+               
ig1.notifyChannelNonEmpty(inputChannels[0][0].getInputChannel());
+               
ig1.notifyChannelNonEmpty(inputChannels[0][1].getInputChannel());
+               
ig1.notifyChannelNonEmpty(inputChannels[0][2].getInputChannel());
+
+               
ig2.notifyChannelNonEmpty(inputChannels[1][0].getInputChannel());
+               
ig2.notifyChannelNonEmpty(inputChannels[1][1].getInputChannel());
+               
ig2.notifyChannelNonEmpty(inputChannels[1][2].getInputChannel());
+               
ig2.notifyChannelNonEmpty(inputChannels[1][3].getInputChannel());
+               
ig2.notifyChannelNonEmpty(inputChannels[1][4].getInputChannel());
+
+               SingleInputGateTest.verifyBufferOrEvent(union, true, 0); // 
gate 1, channel 0
+               SingleInputGateTest.verifyBufferOrEvent(union, true, 3); // 
gate 2, channel 0
+               SingleInputGateTest.verifyBufferOrEvent(union, true, 1); // 
gate 1, channel 1
+               SingleInputGateTest.verifyBufferOrEvent(union, true, 4); // 
gate 2, channel 1
+               SingleInputGateTest.verifyBufferOrEvent(union, true, 2); // 
gate 1, channel 2
+               SingleInputGateTest.verifyBufferOrEvent(union, true, 5); // 
gate 2, channel 1
+               SingleInputGateTest.verifyBufferOrEvent(union, false, 0); // 
gate 1, channel 0
+               SingleInputGateTest.verifyBufferOrEvent(union, true, 6); // 
gate 2, channel 1
+               SingleInputGateTest.verifyBufferOrEvent(union, false, 1); // 
gate 1, channel 1
+               SingleInputGateTest.verifyBufferOrEvent(union, true, 7); // 
gate 2, channel 1
+               SingleInputGateTest.verifyBufferOrEvent(union, false, 2); // 
gate 1, channel 2
+               SingleInputGateTest.verifyBufferOrEvent(union, false, 3); // 
gate 2, channel 0
+               SingleInputGateTest.verifyBufferOrEvent(union, false, 4); // 
gate 2, channel 1
+               SingleInputGateTest.verifyBufferOrEvent(union, false, 5); // 
gate 2, channel 2
+               SingleInputGateTest.verifyBufferOrEvent(union, false, 6); // 
gate 2, channel 3
+               SingleInputGateTest.verifyBufferOrEvent(union, false, 7); // 
gate 2, channel 4
 
                // Return null when the input gate has received all 
end-of-partition events
                assertTrue(union.isFinished());

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
index 1b51805..676a304 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
@@ -22,26 +22,32 @@ import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 
 import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A test subpartition view consumer.
+ * A test subpartition viewQueue consumer.
  *
  * <p> The behaviour of the consumer is customizable by specifying a callback.
  *
  * @see TestConsumerCallback
  */
-public class TestSubpartitionConsumer implements Callable<Boolean> {
+public class TestSubpartitionConsumer implements Callable<Boolean>, 
BufferAvailabilityListener {
 
        private static final int MAX_SLEEP_TIME_MS = 20;
 
-       /** The subpartition view to consume. */
-       private final ResultSubpartitionView subpartitionView;
+       /** The subpartition viewQueue to consume. */
+       private volatile ResultSubpartitionView subpartitionView;
+
+       private BlockingQueue<ResultSubpartitionView> viewQueue = new 
ArrayBlockingQueue<>(1);
 
        /**
         * Flag indicating whether the consumer is slow. If true, the consumer 
will sleep a random
@@ -49,33 +55,43 @@ public class TestSubpartitionConsumer implements 
Callable<Boolean> {
         */
        private final boolean isSlowConsumer;
 
-       /** The callback to handle a read buffer. */
+       /** The callback to handle a notifyNonEmpty buffer. */
        private final TestConsumerCallback callback;
 
        /** Random source for sleeps. */
        private final Random random;
 
+       private final AtomicLong numBuffersAvailable = new AtomicLong();
+
        public TestSubpartitionConsumer(
-                       ResultSubpartitionView subpartitionView,
-                       boolean isSlowConsumer,
-                       TestConsumerCallback callback) {
+               boolean isSlowConsumer,
+               TestConsumerCallback callback) {
 
-               this.subpartitionView = checkNotNull(subpartitionView);
                this.isSlowConsumer = isSlowConsumer;
                this.random = isSlowConsumer ? new Random() : null;
                this.callback = checkNotNull(callback);
        }
 
+       public void setSubpartitionView(ResultSubpartitionView 
subpartitionView) {
+               this.subpartitionView = checkNotNull(subpartitionView);
+       }
+
        @Override
        public Boolean call() throws Exception {
-               final TestNotificationListener listener = new 
TestNotificationListener();
-
                try {
                        while (true) {
                                if (Thread.interrupted()) {
                                        throw new InterruptedException();
                                }
 
+                               if (numBuffersAvailable.get() == 0) {
+                                       synchronized (numBuffersAvailable) {
+                                               while 
(numBuffersAvailable.get() == 0) {
+                                                       
numBuffersAvailable.wait();
+                                               }
+                                       }
+                               }
+
                                final Buffer buffer = 
subpartitionView.getNextBuffer();
 
                                if (isSlowConsumer) {
@@ -83,12 +99,13 @@ public class TestSubpartitionConsumer implements 
Callable<Boolean> {
                                }
 
                                if (buffer != null) {
+                                       numBuffersAvailable.decrementAndGet();
+
                                        if (buffer.isBuffer()) {
                                                callback.onBuffer(buffer);
-                                       }
-                                       else {
+                                       } else {
                                                final AbstractEvent event = 
EventSerializer.fromBuffer(buffer,
-                                                               
getClass().getClassLoader());
+                                                       
getClass().getClassLoader());
 
                                                callback.onEvent(event);
 
@@ -100,22 +117,22 @@ public class TestSubpartitionConsumer implements 
Callable<Boolean> {
                                                        return true;
                                                }
                                        }
-                               }
-                               else {
-                                       int current = 
listener.getNumberOfNotifications();
-
-                                       if 
(subpartitionView.registerListener(listener)) {
-                                               
listener.waitForNotification(current);
-                                       }
-                                       else if (subpartitionView.isReleased()) 
{
-                                               return true;
-                                       }
+                               } else if (subpartitionView.isReleased()) {
+                                       return true;
                                }
                        }
-               }
-               finally {
+               } finally {
                        subpartitionView.releaseAllResources();
                }
        }
 
+       @Override
+       public void notifyBuffersAvailable(long numBuffers) {
+               if (numBuffers > 0 && numBuffersAvailable.getAndAdd(numBuffers) 
== 0) {
+                       synchronized (numBuffersAvailable) {
+                               numBuffersAvailable.notifyAll();
+                       }
+                       ;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index a41c25b..67071f9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -34,7 +34,6 @@ import org.apache.flink.types.Record;
 
 import org.junit.After;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -139,7 +138,6 @@ public class DataSinkTaskTest extends TaskTestBase {
 
        @Test
        public void testUnionDataSinkTask() {
-
                int keyCnt = 10;
                int valCnt = 20;
 
@@ -157,9 +155,10 @@ public class DataSinkTaskTest extends TaskTestBase {
 
                try {
                        // For the union reader to work, we need to start 
notifications *after* the union reader
-                       // has been initialized.
+                       // has been initialized. This is accomplished via a 
mockito hack in TestSingleInputGate,
+                       // which checks forwards existing notifications on 
registerListener calls.
                        for (IteratorWrappingTestSingleInputGate<?> reader : 
readers) {
-                               reader.read();
+                               reader.notifyNonEmpty();
                        }
 
                        testTask.invoke();

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index 02c420c..fb8ed68 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -53,7 +53,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({Task.class, ResultPartitionWriter.class})
 @PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
-
 public class ChainTaskTest extends TaskTestBase {
        
        private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3;

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 22dee63..05c4814 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -64,9 +64,9 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class MockEnvironment implements Environment {
-       
+
        private final TaskInfo taskInfo;
-       
+
        private final ExecutionConfig executionConfig;
 
        private final MemoryManager memManager;
@@ -158,7 +158,7 @@ public class MockEnvironment implements Environment {
                                                }
 
                                                if (result == 
RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER
-                                                               || result == 
RecordDeserializer.DeserializationResult.PARTIAL_RECORD) {
+                                                       || result == 
RecordDeserializer.DeserializationResult.PARTIAL_RECORD) {
                                                        break;
                                                }
                                        }
@@ -208,9 +208,9 @@ public class MockEnvironment implements Environment {
        @Override
        public TaskManagerRuntimeInfo getTaskManagerInfo() {
                return new TaskManagerRuntimeInfo(
-                               "localhost",
-                               new UnmodifiableConfiguration(new 
Configuration()),
-                               System.getProperty("java.io.tmpdir"));
+                       "localhost",
+                       new UnmodifiableConfiguration(new Configuration()),
+                       System.getProperty("java.io.tmpdir"));
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index eaf44db..53d75b3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -67,7 +67,7 @@ public abstract class TaskTestBase extends TestLogger {
                conf.setInputSerializer(RecordSerializerFactory.get(), groupId);
 
                if (read) {
-                       reader.read();
+                       reader.notifyNonEmpty();
                }
 
                return reader;

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
index a093233..876e908 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -43,6 +42,7 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -129,18 +129,17 @@ public class TaskCancelAsyncProducerConsumerITCase 
extends TestLogger {
                        }
 
                        // Verify that async producer is in blocking request
-                       assertTrue("Producer thread is not blocked.", 
producerBlocked);
+                       assertTrue("Producer thread is not blocked: " + 
Arrays.toString(ASYNC_CONSUMER_THREAD.getStackTrace()), producerBlocked);
 
-                       boolean consumerBlocked = false;
+                       boolean consumerWaiting = false;
                        for (int i = 0; i < 50; i++) {
                                Thread thread = ASYNC_CONSUMER_THREAD;
 
                                if (thread != null && thread.isAlive()) {
-                                       StackTraceElement[] stackTrace = 
thread.getStackTrace();
-                                       consumerBlocked = 
isInBlockingQueuePoll(stackTrace);
+                                       consumerWaiting = thread.getState() == 
Thread.State.WAITING;
                                }
 
-                               if (consumerBlocked) {
+                               if (consumerWaiting) {
                                        break;
                                } else {
                                        // Retry
@@ -149,7 +148,7 @@ public class TaskCancelAsyncProducerConsumerITCase extends 
TestLogger {
                        }
 
                        // Verify that async consumer is in blocking request
-                       assertTrue("Consumer thread is not blocked.", 
consumerBlocked);
+                       assertTrue("Consumer thread is not blocked.", 
consumerWaiting);
 
                        msg = new CancelJob(jobGraph.getJobID());
                        Future<?> cancelFuture = jobManager.ask(msg, 
deadline.timeLeft());
@@ -186,27 +185,6 @@ public class TaskCancelAsyncProducerConsumerITCase extends 
TestLogger {
        }
 
        /**
-        * Returns whether the stack trace represents a Thread in a blocking 
queue
-        * poll call.
-        *
-        * @param stackTrace Stack trace of the Thread to check
-        *
-        * @return Flag indicating whether the Thread is in a blocking queue 
poll
-        * call.
-        */
-       private boolean isInBlockingQueuePoll(StackTraceElement[] stackTrace) {
-               for (StackTraceElement elem : stackTrace) {
-                       if (elem.getMethodName().equals("poll") &&
-                                       
elem.getClassName().equals("java.util.concurrent.LinkedBlockingQueue")) {
-
-                               return true;
-                       }
-               }
-
-               return false;
-       }
-
-       /**
         * Invokable emitting records in a separate Thread (not the main Task
         * thread).
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index 1187fe6..1b31c2c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
@@ -62,9 +63,9 @@ public class StreamTestSingleInputGate<T> extends 
TestSingleInputGate {
 
        @SuppressWarnings("unchecked")
        public StreamTestSingleInputGate(
-                       int numInputChannels,
-                       int bufferSize,
-                       TypeSerializer<T> serializer) throws IOException, 
InterruptedException {
+               int numInputChannels,
+               int bufferSize,
+               TypeSerializer<T> serializer) throws IOException, 
InterruptedException {
                super(numInputChannels, false);
 
                this.bufferSize = bufferSize;
@@ -86,39 +87,36 @@ public class StreamTestSingleInputGate<T> extends 
TestSingleInputGate {
                        final int channelIndex = i;
                        final RecordSerializer<SerializationDelegate<Object>> 
recordSerializer = new 
SpanningRecordSerializer<SerializationDelegate<Object>>();
                        final SerializationDelegate<Object> delegate = 
(SerializationDelegate<Object>) (SerializationDelegate<?>)
-                                       new 
SerializationDelegate<StreamElement>(new 
MultiplexingStreamRecordSerializer<T>(serializer));
+                               new SerializationDelegate<StreamElement>(new 
MultiplexingStreamRecordSerializer<T>(serializer));
 
                        inputQueues[channelIndex] = new 
ConcurrentLinkedQueue<InputValue<Object>>();
                        inputChannels[channelIndex] = new 
TestInputChannel(inputGate, i);
 
 
-                       final Answer<Buffer> answer = new Answer<Buffer>() {
+                       final Answer<BufferAndAvailability> answer = new 
Answer<BufferAndAvailability>() {
                                @Override
-                               public Buffer answer(InvocationOnMock 
invocationOnMock) throws Throwable {
+                               public BufferAndAvailability 
answer(InvocationOnMock invocationOnMock) throws Throwable {
                                        InputValue<Object> input = 
inputQueues[channelIndex].poll();
                                        if (input != null && 
input.isStreamEnd()) {
                                                
when(inputChannels[channelIndex].getInputChannel().isReleased()).thenReturn(
-                                                               true);
-                                               return 
EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
-                                       }
-                                       else if (input != null && 
input.isStreamRecord()) {
+                                                       true);
+                                               return new 
BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), 
false);
+                                       } else if (input != null && 
input.isStreamRecord()) {
                                                Object inputElement = 
input.getStreamRecord();
                                                final Buffer buffer = new 
Buffer(
-                                                               
MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
-                                                               
mock(BufferRecycler.class));
-                                               
+                                                       
MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
+                                                       
mock(BufferRecycler.class));
+
                                                
recordSerializer.setNextBuffer(buffer);
                                                
delegate.setInstance(inputElement);
                                                
recordSerializer.addRecord(delegate);
 
                                                // Call getCurrentBuffer to 
ensure size is set
-                                               return 
recordSerializer.getCurrentBuffer();
-                                       }
-                                       else if (input != null && 
input.isEvent()) {
+                                               return new 
BufferAndAvailability(recordSerializer.getCurrentBuffer(), false);
+                                       } else if (input != null && 
input.isEvent()) {
                                                AbstractEvent event = 
input.getEvent();
-                                               return 
EventSerializer.toBuffer(event);
-                                       }
-                                       else {
+                                               return new 
BufferAndAvailability(EventSerializer.toBuffer(event), false);
+                                       } else {
                                                synchronized 
(inputQueues[channelIndex]) {
                                                        
inputQueues[channelIndex].wait();
                                                        return 
answer(invocationOnMock);
@@ -130,7 +128,7 @@ public class StreamTestSingleInputGate<T> extends 
TestSingleInputGate {
                        
when(inputChannels[channelIndex].getInputChannel().getNextBuffer()).thenAnswer(answer);
 
                        inputGate.setInputChannel(new 
IntermediateResultPartitionID(),
-                                       
inputChannels[channelIndex].getInputChannel());
+                               inputChannels[channelIndex].getInputChannel());
                }
        }
 
@@ -139,7 +137,7 @@ public class StreamTestSingleInputGate<T> extends 
TestSingleInputGate {
                        inputQueues[channel].add(InputValue.element(element));
                        inputQueues[channel].notifyAll();
                }
-               
inputGate.onAvailableBuffer(inputChannels[channel].getInputChannel());
+               
inputGate.notifyChannelNonEmpty(inputChannels[channel].getInputChannel());
        }
 
        public void sendEvent(AbstractEvent event, int channel) {
@@ -147,7 +145,7 @@ public class StreamTestSingleInputGate<T> extends 
TestSingleInputGate {
                        inputQueues[channel].add(InputValue.event(event));
                        inputQueues[channel].notifyAll();
                }
-               
inputGate.onAvailableBuffer(inputChannels[channel].getInputChannel());
+               
inputGate.notifyChannelNonEmpty(inputChannels[channel].getInputChannel());
        }
 
        public void endInput() {
@@ -156,7 +154,7 @@ public class StreamTestSingleInputGate<T> extends 
TestSingleInputGate {
                                inputQueues[i].add(InputValue.streamEnd());
                                inputQueues[i].notifyAll();
                        }
-                       
inputGate.onAvailableBuffer(inputChannels[i].getInputChannel());
+                       
inputGate.notifyChannelNonEmpty(inputChannels[i].getInputChannel());
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index a8a989b..0cf866a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -17,25 +17,24 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.Random;
-
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
+import 
org.apache.flink.runtime.io.network.partition.consumer.InputGateListener;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Random;
+
+import static org.junit.Assert.fail;
+
 /**
  * The test generates two random streams (input channels) which independently
  * and randomly generate checkpoint barriers. The two streams are very
@@ -165,7 +164,7 @@ public class BarrierBufferMassiveRandomTest {
                public void sendTaskEvent(TaskEvent event) {}
 
                @Override
-               public void registerListener(EventListener<InputGate> listener) 
{}
+               public void registerListener(InputGateListener listener) {}
 
                @Override
                public int getPageSize() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index cb8a058..3e2a75a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
+import 
org.apache.flink.runtime.io.network.partition.consumer.InputGateListener;
 
 import java.util.ArrayDeque;
 import java.util.List;
@@ -31,16 +31,15 @@ import java.util.Queue;
 public class MockInputGate implements InputGate {
 
        private final int pageSize;
-       
+
        private final int numChannels;
-       
+
        private final Queue<BufferOrEvent> boes;
 
        private final boolean[] closed;
-       
+
        private int closedChannels;
 
-       
        public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> 
boes) {
                this.pageSize = pageSize;
                this.numChannels = numChannels;
@@ -52,7 +51,7 @@ public class MockInputGate implements InputGate {
        public int getPageSize() {
                return pageSize;
        }
-       
+
        @Override
        public int getNumberOfInputChannels() {
                return numChannels;
@@ -69,11 +68,11 @@ public class MockInputGate implements InputGate {
                if (next == null) {
                        return null;
                }
-               
+
                int channelIdx = next.getChannelIndex();
                if (closed[channelIdx]) {
                        throw new RuntimeException("Inconsistent: Channel " + 
channelIdx
-                                       + " has data even though it is already 
closed.");
+                               + " has data even though it is already 
closed.");
                }
                if (next.isEvent() && next.getEvent() instanceof 
EndOfPartitionEvent) {
                        closed[channelIdx] = true;
@@ -83,12 +82,15 @@ public class MockInputGate implements InputGate {
        }
 
        @Override
-       public void requestPartitions() {}
+       public void requestPartitions() {
+       }
 
        @Override
-       public void sendTaskEvent(TaskEvent event) {}
+       public void sendTaskEvent(TaskEvent event) {
+       }
 
        @Override
-       public void registerListener(EventListener<InputGate> listener) {}
-       
-}
\ No newline at end of file
+       public void registerListener(InputGateListener listener) {
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
index 145edc2..25ac356 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
@@ -45,7 +45,7 @@ import java.io.IOException;
  *
  * <p>
  * When Elements or Events are offered to the Task they are put into a queue. 
The input gates
- * of the Task read from this queue. Use {@link #waitForInputProcessing()} to 
wait until all
+ * of the Task notifyNonEmpty from this queue. Use {@link 
#waitForInputProcessing()} to wait until all
  * queues are empty. This must be used after entering some elements before 
checking the
  * desired output.
  *
@@ -62,11 +62,13 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends 
StreamTaskTestHarnes
         * Creates a test harness with the specified number of input gates and 
specified number
         * of channels per input gate.
         */
-       public OneInputStreamTaskTestHarness(OneInputStreamTask<IN, OUT> task,
-                       int numInputGates,
-                       int numInputChannelsPerGate,
-                       TypeInformation<IN> inputType,
-                       TypeInformation<OUT> outputType) {
+       public OneInputStreamTaskTestHarness(
+               OneInputStreamTask<IN, OUT> task,
+               int numInputGates,
+               int numInputChannelsPerGate,
+               TypeInformation<IN> inputType,
+               TypeInformation<OUT> outputType) {
+               
                super(task, outputType);
 
                this.inputType = inputType;
@@ -79,9 +81,10 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends 
StreamTaskTestHarnes
        /**
         * Creates a test harness with one input gate that has one input 
channel.
         */
-       public OneInputStreamTaskTestHarness(OneInputStreamTask<IN, OUT> task,
-                       TypeInformation<IN> inputType,
-                       TypeInformation<OUT> outputType) {
+       public OneInputStreamTaskTestHarness(
+               OneInputStreamTask<IN, OUT> task,
+               TypeInformation<IN> inputType,
+               TypeInformation<OUT> outputType) {
                this(task, 1, 1, inputType, outputType);
        }
 
@@ -91,9 +94,9 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends 
StreamTaskTestHarnes
 
                for (int i = 0; i < numInputGates; i++) {
                        inputGates[i] = new StreamTestSingleInputGate<IN>(
-                                       numInputChannelsPerGate,
-                                       bufferSize,
-                                       inputSerializer);
+                               numInputChannelsPerGate,
+                               bufferSize,
+                               inputSerializer);
                        this.mockEnv.addInputGate(inputGates[i].getInputGate());
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 36ad8ff..94a1bcb 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -112,8 +112,10 @@ public class StreamMockEnvironment implements Environment {
                this.accumulatorRegistry = new AccumulatorRegistry(jobID, 
getExecutionId());
        }
 
-       public StreamMockEnvironment(Configuration jobConfig, Configuration 
taskConfig, long memorySize,
-                                                                
MockInputSplitProvider inputSplitProvider, int bufferSize) {
+       public StreamMockEnvironment(
+               Configuration jobConfig, Configuration taskConfig, long 
memorySize,
+               MockInputSplitProvider inputSplitProvider, int bufferSize) {
+
                this(jobConfig, taskConfig, null, memorySize, 
inputSplitProvider, bufferSize);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index 8b8b659..6845548 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
-
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -38,7 +37,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.eq;
@@ -218,4 +216,4 @@ public class StreamTaskCancellationBarrierTest {
                        return value;
                }
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 0bd8d9a..aaac3f8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -91,7 +91,7 @@ public class StreamTaskTestHarness<OUT> {
        // input related methods only need to be implemented once, in generic 
form
        protected int numInputGates;
        protected int numInputChannelsPerGate;
-       
+
        @SuppressWarnings("rawtypes")
        protected StreamTestSingleInputGate[] inputGates;
 
@@ -154,7 +154,7 @@ public class StreamTaskTestHarness<OUT> {
                return new StreamMockEnvironment(
                                jobConfig, taskConfig, executionConfig, 
memorySize, new MockInputSplitProvider(), bufferSize);
        }
-       
+
        /**
         * Invoke the Task. This resets the output of any previous invocation. 
This will start a new
         * Thread to execute the Task in. Use {@link #waitForTaskCompletion()} 
to wait for the
@@ -297,7 +297,7 @@ public class StreamTaskTestHarness<OUT> {
                        try {
                                Thread.sleep(10);
                        } catch (InterruptedException ignored) {}
-                       
+
                        if (allEmpty) {
                                break;
                        }
@@ -305,7 +305,7 @@ public class StreamTaskTestHarness<OUT> {
 
                // then wait for the Task Thread to be in a blocked state
                // Check whether the state is blocked, this should be the case 
if it cannot
-               // read more input, i.e. all currently available input has been 
processed.
+               // notifyNonEmpty more input, i.e. all currently available 
input has been processed.
                while (true) {
                        Thread.State state = taskThread.getState();
                        if (state == Thread.State.BLOCKED || state == 
Thread.State.TERMINATED ||
@@ -328,13 +328,13 @@ public class StreamTaskTestHarness<OUT> {
                        inputGates[i].endInput();
                }
        }
-       
+
        // 
------------------------------------------------------------------------
-       
+
        private class TaskThread extends Thread {
-               
+
                private final AbstractInvokable task;
-               
+
                private volatile Throwable error;
 
                TaskThread(AbstractInvokable task) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 92f8553..09522cd 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -206,6 +206,8 @@ public class TwoInputStreamTaskTest {
                testHarness.processElement(new StreamRecord<String>("Ciao-0-0", 
initialTime), 0, 1);
                expectedOutput.add(new StreamRecord<String>("Ciao-0-0", 
initialTime));
 
+               testHarness.waitForInputProcessing();
+
                // These elements should be forwarded, since we did not yet 
receive a checkpoint barrier
                // on that input, only add to same input, otherwise we would 
not know the ordering
                // of the output since the Task might read the inputs in any 
order
@@ -217,8 +219,8 @@ public class TwoInputStreamTaskTest {
                testHarness.waitForInputProcessing();
                // we should not yet see the barrier, only the two elements 
from non-blocked input
                TestHarnessUtil.assertOutputEquals("Output was not correct.",
-                               testHarness.getOutput(),
-                               expectedOutput);
+                       expectedOutput,
+                       testHarness.getOutput());
 
                testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
                testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
index 0e7565e..9c2284f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
@@ -128,7 +128,6 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> 
extends StreamTaskTest
                                                        bufferSize,
                                                        inputSerializer1);
 
-
                                        StreamEdge streamEdge = new 
StreamEdge(sourceVertexDummy,
                                                        targetVertexDummy,
                                                        1,

Reply via email to