[FLINK-5169] [network] Adjust tests to new consumer logic

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d97eaaf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d97eaaf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d97eaaf

Branch: refs/heads/release-1.1
Commit: 8d97eaaf8676968a6b8a800d638b61b0161e6570
Parents: 6cfce17
Author: Ufuk Celebi <[email protected]>
Authored: Mon Nov 28 09:59:58 2016 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Mon Nov 28 21:05:00 2016 +0100

----------------------------------------------------------------------
 .../runtime/io/disk/SpillingBufferTest.java     |  40 ++--
 .../iomanager/BufferFileWriterReaderTest.java   |   1 -
 .../io/network/api/reader/BufferReaderTest.java | 115 ------------
 .../netty/CancelPartitionRequestTest.java       |  37 ++--
 .../netty/PartitionRequestQueueTest.java        |  23 ++-
 .../netty/ServerTransportErrorHandlingTest.java |  54 +++---
 .../PartialConsumePipelinedResultTest.java      |  18 +-
 .../partition/PipelinedSubpartitionTest.java    | 118 +++---------
 .../network/partition/ResultPartitionTest.java  |   1 -
 .../partition/SpillableSubpartitionTest.java    |  20 +-
 .../SpilledSubpartitionViewAsyncIOTest.java     |  65 -------
 .../SpilledSubpartitionViewSyncIOTest.java      | 103 ----------
 .../partition/SpilledSubpartitionViewTest.java  | 188 ++++++++++++-------
 .../network/partition/SubpartitionTestBase.java |  10 +-
 .../partition/consumer/InputChannelTest.java    |  19 +-
 .../IteratorWrappingTestSingleInputGate.java    |  23 +--
 .../consumer/LocalInputChannelTest.java         |  14 +-
 .../consumer/RemoteInputChannelTest.java        |   2 +-
 .../partition/consumer/SingleInputGateTest.java | 130 ++++++-------
 .../partition/consumer/TestInputChannel.java    |  32 +---
 .../partition/consumer/TestSingleInputGate.java | 108 ++---------
 .../partition/consumer/UnionInputGateTest.java  |  43 +++--
 .../network/util/TestSubpartitionConsumer.java  |  69 ++++---
 .../runtime/operators/DataSinkTaskTest.java     |   7 +-
 .../operators/chaining/ChainTaskTest.java       |   1 -
 .../operators/testutils/MockEnvironment.java    |  12 +-
 .../operators/testutils/TaskTestBase.java       |   2 +-
 .../TaskCancelAsyncProducerConsumerITCase.java  |  34 +---
 .../consumer/StreamTestSingleInputGate.java     |  44 +++--
 .../io/BarrierBufferMassiveRandomTest.java      |  17 +-
 .../streaming/runtime/io/MockInputGate.java     |  28 +--
 .../tasks/OneInputStreamTaskTestHarness.java    |  27 +--
 .../runtime/tasks/StreamMockEnvironment.java    |   6 +-
 .../StreamTaskCancellationBarrierTest.java      |   4 +-
 .../runtime/tasks/StreamTaskTestHarness.java    |  16 +-
 .../runtime/tasks/TwoInputStreamTaskTest.java   |   6 +-
 .../tasks/TwoInputStreamTaskTestHarness.java    |   1 -
 37 files changed, 539 insertions(+), 899 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
index 538c416..01a9723 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
@@ -109,7 +109,7 @@ public class SpillingBufferTest {
                DataInputView inView = outView.flip();
                generator.reset();
                
-               // read and re-generate all records and compare them
+               // notifyNonEmpty and re-generate all records and compare them
                final Tuple2<Integer, String> readRec = new Tuple2<>();
                for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
                        generator.next(rec);
@@ -121,14 +121,14 @@ public class SpillingBufferTest {
                        int k2 = readRec.f0;
                        String v2 = readRec.f1;
                        
-                       Assert.assertTrue("The re-generated and the read record 
do not match.", k1 == k2 && v1.equals(v2));
+                       Assert.assertTrue("The re-generated and the 
notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2));
                }
                
-               // re-read the data
+               // re-notifyNonEmpty the data
                inView = outView.flip();
                generator.reset();
                
-               // read and re-generate all records and compare them
+               // notifyNonEmpty and re-generate all records and compare them
                for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
                        generator.next(rec);
                        serializer.deserialize(readRec, inView);
@@ -139,7 +139,7 @@ public class SpillingBufferTest {
                        int k2 = readRec.f0;
                        String v2 = readRec.f1;
                        
-                       Assert.assertTrue("The re-generated and the read record 
do not match.", k1 == k2 && v1.equals(v2));
+                       Assert.assertTrue("The re-generated and the 
notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2));
                }
                
                this.memoryManager.release(outView.close());
@@ -169,7 +169,7 @@ public class SpillingBufferTest {
                DataInputView inView = outView.flip();
                generator.reset();
                
-               // read and re-generate all records and compare them
+               // notifyNonEmpty and re-generate all records and compare them
                final Tuple2<Integer, String> readRec = new Tuple2<>();
                try {
                        for (int i = 0; i < NUM_PAIRS_INMEM + 1; i++) {
@@ -182,7 +182,7 @@ public class SpillingBufferTest {
                                int k2 = readRec.f0;
                                String v2 = readRec.f1;
                                
-                               Assert.assertTrue("The re-generated and the 
read record do not match.", k1 == k2 && v1.equals(v2));
+                               Assert.assertTrue("The re-generated and the 
notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2));
                        }
                        Assert.fail("Read too much, expected EOFException.");
                }
@@ -190,11 +190,11 @@ public class SpillingBufferTest {
                        // expected
                }
                
-               // re-read the data
+               // re-notifyNonEmpty the data
                inView = outView.flip();
                generator.reset();
                
-               // read and re-generate all records and compare them
+               // notifyNonEmpty and re-generate all records and compare them
                for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
                        generator.next(rec);
                        serializer.deserialize(readRec, inView);
@@ -205,7 +205,7 @@ public class SpillingBufferTest {
                        int k2 = readRec.f0;
                        String v2 = readRec.f1;
                        
-                       Assert.assertTrue("The re-generated and the read record 
do not match.", k1 == k2 && v1.equals(v2));
+                       Assert.assertTrue("The re-generated and the 
notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2));
                }
                
                this.memoryManager.release(outView.close());
@@ -237,7 +237,7 @@ public class SpillingBufferTest {
                DataInputView inView = outView.flip();
                generator.reset();
                
-               // read and re-generate all records and compare them
+               // notifyNonEmpty and re-generate all records and compare them
                final Tuple2<Integer, String> readRec = new Tuple2<>();
                for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
                        generator.next(rec);
@@ -249,14 +249,14 @@ public class SpillingBufferTest {
                        int k2 = readRec.f0;
                        String v2 = readRec.f1;
                        
-                       Assert.assertTrue("The re-generated and the read record 
do not match.", k1 == k2 && v1.equals(v2));
+                       Assert.assertTrue("The re-generated and the 
notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2));
                }
                
-               // re-read the data
+               // re-notifyNonEmpty the data
                inView = outView.flip();
                generator.reset();
                
-               // read and re-generate all records and compare them
+               // notifyNonEmpty and re-generate all records and compare them
                for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
                        generator.next(rec);
                        serializer.deserialize(readRec, inView);
@@ -267,7 +267,7 @@ public class SpillingBufferTest {
                        int k2 = readRec.f0;
                        String v2 = readRec.f1;
                        
-                       Assert.assertTrue("The re-generated and the read record 
do not match.", k1 == k2 && v1.equals(v2));
+                       Assert.assertTrue("The re-generated and the 
notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2));
                }
                
                this.memoryManager.release(outView.close());
@@ -297,7 +297,7 @@ public class SpillingBufferTest {
                DataInputView inView = outView.flip();
                generator.reset();
                
-               // read and re-generate all records and compare them
+               // notifyNonEmpty and re-generate all records and compare them
                final Tuple2<Integer, String> readRec = new Tuple2<>();
                try {
                        for (int i = 0; i < NUM_PAIRS_EXTERNAL + 1; i++) {
@@ -310,7 +310,7 @@ public class SpillingBufferTest {
                                int k2 = readRec.f0;
                                String v2 = readRec.f1;
                                
-                               Assert.assertTrue("The re-generated and the 
read record do not match.", k1 == k2 && v1.equals(v2));
+                               Assert.assertTrue("The re-generated and the 
notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2));
                        }
                        Assert.fail("Read too much, expected EOFException.");
                }
@@ -318,11 +318,11 @@ public class SpillingBufferTest {
                        // expected
                }
                
-               // re-read the data
+               // re-notifyNonEmpty the data
                inView = outView.flip();
                generator.reset();
                
-               // read and re-generate all records and compare them
+               // notifyNonEmpty and re-generate all records and compare them
                for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
                        generator.next(rec);
                        serializer.deserialize(readRec, inView);
@@ -333,7 +333,7 @@ public class SpillingBufferTest {
                        int k2 = readRec.f0;
                        String v2 = readRec.f1;
                        
-                       Assert.assertTrue("The re-generated and the read record 
do not match.", k1 == k2 && v1.equals(v2));
+                       Assert.assertTrue("The re-generated and the 
notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2));
                }
                
                this.memoryManager.release(outView.close());

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
index 375be45..2da0f7e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.testutils.DiscardingRecycler;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
deleted file mode 100644
index 099b6fb..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.reader;
-
-import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import 
org.apache.flink.runtime.io.network.partition.consumer.TestSingleInputGate;
-import org.apache.flink.runtime.io.network.util.TestTaskEvent;
-import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Task.class)
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
-@SuppressWarnings("unchecked")
-public class BufferReaderTest {
-
-       @Test
-       public void testGetNextBufferOrEvent() throws IOException, 
InterruptedException {
-
-               final TestSingleInputGate inputGate = new TestSingleInputGate(1)
-                               .readBuffer().readBuffer().readEvent()
-                               .readBuffer().readBuffer().readEvent()
-                               .readBuffer().readEndOfPartitionEvent();
-
-               final BufferReader reader = new 
BufferReader(inputGate.getInputGate());
-
-               // Task event listener to be notified...
-               final EventListener<TaskEvent> listener = 
mock(EventListener.class);
-               reader.registerTaskEventListener(listener, TestTaskEvent.class);
-
-               int numReadBuffers = 0;
-               while ((reader.getNextBuffer()) != null) {
-                       numReadBuffers++;
-               }
-
-               assertEquals(5, numReadBuffers);
-               verify(listener, times(2)).onEvent(any(TaskEvent.class));
-       }
-
-       @Test
-       public void testIterativeGetNextBufferOrEvent() throws IOException, 
InterruptedException {
-
-               final TestSingleInputGate inputGate = new TestSingleInputGate(1)
-                               .readBuffer().readBuffer().readEvent()
-                               .readBuffer().readBuffer().readEvent()
-                               .readBuffer().readEndOfSuperstepEvent()
-                               .readBuffer().readBuffer().readEvent()
-                               .readBuffer().readBuffer().readEvent()
-                               .readBuffer().readEndOfPartitionEvent();
-
-               final BufferReader reader = new 
BufferReader(inputGate.getInputGate());
-
-               // Set reader iterative
-               reader.setIterativeReader();
-
-               // Task event listener to be notified...
-               final EventListener<TaskEvent> listener = 
mock(EventListener.class);
-               // Task event listener to be notified...
-               reader.registerTaskEventListener(listener, TestTaskEvent.class);
-
-               int numReadBuffers = 0;
-               int numEndOfSuperstepEvents = 0;
-
-               while (true) {
-                       Buffer buffer = reader.getNextBuffer();
-
-                       if (buffer != null) {
-                               numReadBuffers++;
-                       }
-                       else if (reader.hasReachedEndOfSuperstep()) {
-                               reader.startNextSuperstep();
-
-                               numEndOfSuperstepEvents++;
-                       }
-                       else if (reader.isFinished()) {
-                               break;
-                       }
-               }
-
-               assertEquals(10, numReadBuffers);
-               assertEquals(1, numEndOfSuperstepEvents);
-
-               verify(listener, times(4)).onEvent(any(TaskEvent.class));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index 1ff1e99..a2f866a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -24,14 +24,16 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import 
org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.util.event.NotificationListener;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
@@ -73,11 +75,18 @@ public class CancelPartitionRequestTest {
 
                        CountDownLatch sync = new CountDownLatch(1);
 
-                       ResultSubpartitionView view = spy(new 
InfiniteSubpartitionView(outboundBuffers, sync));
+                       final ResultSubpartitionView view = spy(new 
InfiniteSubpartitionView(outboundBuffers, sync));
 
                        // Return infinite subpartition
-                       when(partitions.createSubpartitionView(eq(pid), eq(0), 
any(BufferProvider.class)))
-                                       .thenReturn(view);
+                       when(partitions.createSubpartitionView(eq(pid), eq(0), 
any(BufferProvider.class), any(BufferAvailabilityListener.class)))
+                               .thenAnswer(new 
Answer<ResultSubpartitionView>() {
+                                       @Override
+                                       public ResultSubpartitionView 
answer(InvocationOnMock invocationOnMock) throws Throwable {
+                                               BufferAvailabilityListener 
listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[3];
+                                               
listener.notifyBuffersAvailable(Long.MAX_VALUE);
+                                               return view;
+                                       }
+                               });
 
                        PartitionRequestProtocol protocol = new 
PartitionRequestProtocol(
                                        partitions, 
mock(TaskEventDispatcher.class), mock(NetworkBufferPool.class));
@@ -109,19 +118,26 @@ public class CancelPartitionRequestTest {
                NettyServerAndClient serverAndClient = null;
 
                try {
-                       TestPooledBufferProvider outboundBuffers = new 
TestPooledBufferProvider(16);
+                       final TestPooledBufferProvider outboundBuffers = new 
TestPooledBufferProvider(16);
 
                        ResultPartitionManager partitions = 
mock(ResultPartitionManager.class);
 
                        ResultPartitionID pid = new ResultPartitionID();
 
-                       CountDownLatch sync = new CountDownLatch(1);
+                       final CountDownLatch sync = new CountDownLatch(1);
 
-                       ResultSubpartitionView view = spy(new 
InfiniteSubpartitionView(outboundBuffers, sync));
+                       final ResultSubpartitionView view = spy(new 
InfiniteSubpartitionView(outboundBuffers, sync));
 
                        // Return infinite subpartition
-                       when(partitions.createSubpartitionView(eq(pid), eq(0), 
any(BufferProvider.class)))
-                                       .thenReturn(view);
+                       when(partitions.createSubpartitionView(eq(pid), eq(0), 
any(BufferProvider.class), any(BufferAvailabilityListener.class)))
+                                       .thenAnswer(new 
Answer<ResultSubpartitionView>() {
+                                               @Override
+                                               public ResultSubpartitionView 
answer(InvocationOnMock invocationOnMock) throws Throwable {
+                                                       
BufferAvailabilityListener listener = (BufferAvailabilityListener) 
invocationOnMock.getArguments()[3];
+                                                       
listener.notifyBuffersAvailable(Long.MAX_VALUE);
+                                                       return view;
+                                               }
+                                       });
 
                        PartitionRequestProtocol protocol = new 
PartitionRequestProtocol(
                                        partitions, 
mock(TaskEventDispatcher.class), mock(NetworkBufferPool.class));
@@ -174,8 +190,7 @@ public class CancelPartitionRequestTest {
                }
 
                @Override
-               public boolean registerListener(final NotificationListener 
listener) throws IOException {
-                       return false;
+               public void notifyBuffersAvailable(long buffers) throws 
IOException {
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index 3f281bd..7224e96 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -20,12 +20,18 @@ package org.apache.flink.runtime.io.network.netty;
 
 import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -35,14 +41,27 @@ public class PartitionRequestQueueTest {
        public void testProducerFailedException() throws Exception {
                PartitionRequestQueue queue = new PartitionRequestQueue();
 
-               EmbeddedChannel ch = new EmbeddedChannel(queue);
+               ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+               ResultPartitionID rpid = new ResultPartitionID();
+               BufferProvider bufferProvider = mock(BufferProvider.class);
 
                ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
                when(view.isReleased()).thenReturn(true);
                when(view.getFailureCause()).thenReturn(new 
RuntimeException("Expected test exception"));
 
+               when(partitionProvider.createSubpartitionView(
+                       eq(rpid),
+                       eq(0),
+                       eq(bufferProvider),
+                       
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+               EmbeddedChannel ch = new EmbeddedChannel(queue);
+
+               SequenceNumberingViewReader seqView = new 
SequenceNumberingViewReader(new InputChannelID(), queue);
+               seqView.requestSubpartitionView(partitionProvider, rpid, 0, 
bufferProvider);
+
                // Enqueue the erroneous view
-               queue.enqueue(view, new InputChannelID());
+               queue.notifyReaderNonEmpty(seqView);
                ch.runPendingTasks();
 
                // Read the enqueued msg

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
index 1515f83..1c3557e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
@@ -25,20 +25,20 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import 
org.apache.flink.runtime.io.network.netty.CancelPartitionRequestTest.InfiniteSubpartitionView;
-import 
org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder;
-import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect;
 import static 
org.apache.flink.runtime.io.network.netty.NettyTestUtil.createConfig;
 import static 
org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
@@ -63,36 +63,43 @@ public class ServerTransportErrorHandlingTest {
                final ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
 
                when(partitionManager
-                               
.createSubpartitionView(any(ResultPartitionID.class), anyInt(), 
any(BufferProvider.class)))
-                               .thenReturn(new 
InfiniteSubpartitionView(outboundBuffers, sync));
+                       .createSubpartitionView(any(ResultPartitionID.class), 
anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class)))
+                       .thenAnswer(new Answer<ResultSubpartitionView>() {
+                               @Override
+                               public ResultSubpartitionView 
answer(InvocationOnMock invocationOnMock) throws Throwable {
+                                       BufferAvailabilityListener listener = 
(BufferAvailabilityListener) invocationOnMock.getArguments()[3];
+                                       
listener.notifyBuffersAvailable(Long.MAX_VALUE);
+                                       return new 
CancelPartitionRequestTest.InfiniteSubpartitionView(outboundBuffers, sync);
+                               }
+                       });
 
                NettyProtocol protocol = new NettyProtocol() {
                        @Override
                        public ChannelHandler[] getServerChannelHandlers() {
                                return new PartitionRequestProtocol(
-                                               partitionManager,
-                                               mock(TaskEventDispatcher.class),
-                                               
mock(NetworkBufferPool.class)).getServerChannelHandlers();
+                                       partitionManager,
+                                       mock(TaskEventDispatcher.class),
+                                       
mock(NetworkBufferPool.class)).getServerChannelHandlers();
                        }
 
                        @Override
                        public ChannelHandler[] getClientChannelHandlers() {
-                               return new ChannelHandler[] {
-                                               new NettyMessageEncoder(),
-                                               // Close on read
-                                               new 
ChannelInboundHandlerAdapter() {
-                                                       @Override
-                                                       public void 
channelRead(ChannelHandlerContext ctx, Object msg)
-                                                                       throws 
Exception {
-
-                                                               
ctx.channel().close();
-                                                       }
+                               return new ChannelHandler[]{
+                                       new NettyMessage.NettyMessageEncoder(),
+                                       // Close on read
+                                       new ChannelInboundHandlerAdapter() {
+                                               @Override
+                                               public void 
channelRead(ChannelHandlerContext ctx, Object msg)
+                                                       throws Exception {
+
+                                                       ctx.channel().close();
                                                }
+                                       }
                                };
                        }
                };
 
-               NettyServerAndClient serverAndClient = null;
+               NettyTestUtil.NettyServerAndClient serverAndClient = null;
 
                try {
                        serverAndClient = initServerAndClient(protocol, 
createConfig());
@@ -100,15 +107,14 @@ public class ServerTransportErrorHandlingTest {
                        Channel ch = connect(serverAndClient);
 
                        // Write something to trigger close by server
-                       ch.writeAndFlush(new PartitionRequest(new 
ResultPartitionID(), 0, new InputChannelID()));
+                       ch.writeAndFlush(new NettyMessage.PartitionRequest(new 
ResultPartitionID(), 0, new InputChannelID()));
 
                        // Wait for the notification
                        if 
(!sync.await(TestingUtils.TESTING_DURATION().toMillis(), 
TimeUnit.MILLISECONDS)) {
                                fail("Timed out after waiting for " + 
TestingUtils.TESTING_DURATION().toMillis() +
-                                               " ms to be notified about 
released partition.");
+                                       " ms to be notified about released 
partition.");
                        }
-               }
-               finally {
+               } finally {
                        shutdown(serverAndClient);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 97f42b1..4a826b7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -20,12 +20,12 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.api.reader.BufferReader;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
@@ -86,12 +86,12 @@ public class PartialConsumePipelinedResultTest {
                // The partition needs to be pipelined, otherwise the original 
issue does not occur, because
                // the sender and receiver are not online at the same time.
                receiver.connectNewDataSetAsInput(
-                               sender, DistributionPattern.POINTWISE, 
ResultPartitionType.PIPELINED);
+                       sender, DistributionPattern.POINTWISE, 
ResultPartitionType.PIPELINED);
 
                final JobGraph jobGraph = new JobGraph("Partial Consume of 
Pipelined Result", sender, receiver);
 
                final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(
-                               sender.getID(), receiver.getID());
+                       sender.getID(), receiver.getID());
 
                sender.setSlotSharingGroup(slotSharingGroup);
                receiver.setSlotSharingGroup(slotSharingGroup);
@@ -126,11 +126,11 @@ public class PartialConsumePipelinedResultTest {
 
                @Override
                public void invoke() throws Exception {
-                       final BufferReader reader = new 
BufferReader(getEnvironment().getInputGate(0));
-
-                       final Buffer buffer = reader.getNextBuffer();
-
-                       buffer.recycle();
+                       InputGate gate = getEnvironment().getInputGate(0);
+                       Buffer buffer = gate.getNextBufferOrEvent().getBuffer();
+                       if (buffer != null) {
+                               buffer.recycle();
+                       }
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 8750a1a..a56177e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
-import org.apache.flink.runtime.io.network.util.TestNotificationListener;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
 import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
@@ -38,12 +37,13 @@ import java.util.concurrent.Future;
 
 import static 
org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
@@ -63,80 +63,25 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
        }
 
        @Test
-       public void testRegisterListener() throws Exception {
-               final PipelinedSubpartition subpartition = createSubpartition();
-
-               final TestNotificationListener listener = new 
TestNotificationListener();
-
-               // Register a listener
-               assertTrue(subpartition.registerListener(listener));
-
-               // Try to register another listener
-               try {
-                       subpartition.registerListener(listener);
-
-                       fail("Did not throw expected exception after duplicate 
listener registration.");
-               }
-               catch (IllegalStateException expected) {
-               }
-       }
-
-       @Test
-       public void testListenerNotification() throws Exception {
-               final TestNotificationListener listener = new 
TestNotificationListener();
-               assertEquals(0, listener.getNumberOfNotifications());
-
-               {
-                       final PipelinedSubpartition subpartition = 
createSubpartition();
-
-                       // Register a listener
-                       assertTrue(subpartition.registerListener(listener));
-
-                       // Notify on add and remove listener
-                       subpartition.add(mock(Buffer.class));
-                       assertEquals(1, listener.getNumberOfNotifications());
-
-                       // No notification, should have removed listener after 
first notification
-                       subpartition.add(mock(Buffer.class));
-                       assertEquals(1, listener.getNumberOfNotifications());
-               }
-
-               {
-                       final PipelinedSubpartition subpartition = 
createSubpartition();
-
-                       // Register a listener
-                       assertTrue(subpartition.registerListener(listener));
-
-                       // Notify on finish
-                       subpartition.finish();
-                       assertEquals(2, listener.getNumberOfNotifications());
-               }
-
-               {
-                       final PipelinedSubpartition subpartition = 
createSubpartition();
-
-                       // Register a listener
-                       assertTrue(subpartition.registerListener(listener));
-
-                       // Notify on release
-                       subpartition.release();
-                       assertEquals(3, listener.getNumberOfNotifications());
-               }
-       }
-
-       @Test
        public void testIllegalReadViewRequest() throws Exception {
                final PipelinedSubpartition subpartition = createSubpartition();
 
                // Successful request
-               assertNotNull(subpartition.createReadView(null));
+               assertNotNull(subpartition.createReadView(null, new 
BufferAvailabilityListener() {
+                       @Override
+                       public void notifyBuffersAvailable(long numBuffers) {
+                       }
+               }));
 
                try {
-                       subpartition.createReadView(null);
+                       subpartition.createReadView(null, new 
BufferAvailabilityListener() {
+                               @Override
+                               public void notifyBuffersAvailable(long 
numBuffers) {
+                               }
+                       });
 
-                       fail("Did not throw expected exception after duplicate 
read view request.");
-               }
-               catch (IllegalStateException expected) {
+                       fail("Did not throw expected exception after duplicate 
notifyNonEmpty view request.");
+               } catch (IllegalStateException expected) {
                }
        }
 
@@ -144,23 +89,19 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
        public void testBasicPipelinedProduceConsumeLogic() throws Exception {
                final PipelinedSubpartition subpartition = createSubpartition();
 
-               TestNotificationListener listener = new 
TestNotificationListener();
+               BufferAvailabilityListener listener = 
mock(BufferAvailabilityListener.class);
 
-               ResultSubpartitionView view = subpartition.createReadView(null);
+               ResultSubpartitionView view = subpartition.createReadView(null, 
listener);
 
                // Empty => should return null
                assertNull(view.getNextBuffer());
-
-               // Register listener for notifications
-               assertTrue(view.registerListener(listener));
-
-               assertEquals(0, listener.getNumberOfNotifications());
+               verify(listener, times(1)).notifyBuffersAvailable(eq(0L));
 
                // Add data to the queue...
                subpartition.add(createBuffer());
 
                // ...should have resulted in a notification
-               assertEquals(1, listener.getNumberOfNotifications());
+               verify(listener, times(1)).notifyBuffersAvailable(eq(1L));
 
                // ...and one available result
                assertNotNull(view.getNextBuffer());
@@ -168,10 +109,7 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
 
                // Add data to the queue...
                subpartition.add(createBuffer());
-               // ...don't allow to subscribe, if data is available
-               assertFalse(view.registerListener(listener));
-
-               assertEquals(1, listener.getNumberOfNotifications());
+               verify(listener, times(2)).notifyBuffersAvailable(eq(1L));
        }
 
        @Test
@@ -208,7 +146,6 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
 
                        @Override
                        public BufferOrEvent getNextBufferOrEvent() throws 
Exception {
-
                                if (numberOfBuffers == 
producerNumberOfBuffersToProduce) {
                                        return null;
                                }
@@ -261,16 +198,17 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
 
                final PipelinedSubpartition subpartition = createSubpartition();
 
-               final PipelinedSubpartitionView view = 
subpartition.createReadView(null);
+               TestSubpartitionConsumer consumer = new 
TestSubpartitionConsumer(isSlowConsumer, consumerCallback);
+               final PipelinedSubpartitionView view = 
subpartition.createReadView(null, consumer);
+               consumer.setSubpartitionView(view);
 
-               Future<Boolean> producer = executorService.submit(
-                               new TestSubpartitionProducer(subpartition, 
isSlowProducer, producerSource));
+               Future<Boolean> producerResult = executorService.submit(
+                       new TestSubpartitionProducer(subpartition, 
isSlowProducer, producerSource));
 
-               Future<Boolean> consumer = executorService.submit(
-                               new TestSubpartitionConsumer(view, 
isSlowConsumer, consumerCallback));
+               Future<Boolean> consumerResult = 
executorService.submit(consumer);
 
                // Wait for producer and consumer to finish
-               producer.get();
-               consumer.get();
+               producerResult.get();
+               consumerResult.get();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 302b667..a4abe75 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -84,7 +84,6 @@ public class ResultPartitionTest {
                        mock(ResultPartitionManager.class),
                        notifier,
                        mock(IOManager.class),
-                       IOManager.IOMode.SYNC,
                        sendScheduleOrUpdateConsumersMessage);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index d7e56c8..b7a54d7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
 import org.junit.AfterClass;
 import org.junit.Test;
@@ -34,7 +35,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import static org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode.SYNC;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
@@ -59,7 +60,7 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
 
        @Override
        ResultSubpartition createSubpartition() {
-               return new SpillableSubpartition(0, 
mock(ResultPartition.class), ioManager, SYNC);
+               return new SpillableSubpartition(0, 
mock(ResultPartition.class), ioManager);
        }
 
        /**
@@ -87,14 +88,14 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                // Mock I/O manager returning the blocking spill writer
                IOManager ioManager = mock(IOManager.class);
                
when(ioManager.createBufferFileWriter(any(FileIOChannel.ID.class)))
-                               .thenReturn(spillWriter);
+                       .thenReturn(spillWriter);
 
                // The partition
                final SpillableSubpartition partition = new 
SpillableSubpartition(
-                               0, mock(ResultPartition.class), ioManager, 
SYNC);
+                       0, mock(ResultPartition.class), ioManager);
 
                // Spill the partition initially (creates the spill writer)
-               partition.releaseMemory();
+               assertEquals(0, partition.releaseMemory());
 
                ExecutorService executor = Executors.newSingleThreadExecutor();
 
@@ -130,13 +131,18 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
        public void testReleasePartitionAndGetNext() throws Exception {
                // Create partition and add some buffers
                SpillableSubpartition partition = new SpillableSubpartition(
-                               0, mock(ResultPartition.class), ioManager, 
SYNC);
+                       0, mock(ResultPartition.class), ioManager);
 
                partition.finish();
 
                // Create the read view
                ResultSubpartitionView readView = spy(partition
-                               .createReadView(new 
TestInfiniteBufferProvider()));
+                       .createReadView(new TestInfiniteBufferProvider(), new 
BufferAvailabilityListener() {
+                               @Override
+                               public void notifyBuffersAvailable(long 
numBuffers) {
+
+                               }
+                       }));
 
                // The released state check (of the parent) needs to be 
independent
                // of the released state of the view.

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIOTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIOTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIOTest.java
deleted file mode 100644
index 981c8ee..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIOTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
-import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
-import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import static org.mockito.Mockito.mock;
-
-public class SpilledSubpartitionViewAsyncIOTest {
-
-       private static final IOManager ioManager = new IOManagerAsync();
-
-       @AfterClass
-       public static void shutdown() {
-               ioManager.shutdown();
-       }
-
-       @Test
-       public void testWriteConsume() throws Exception {
-               // Config
-               final int numberOfBuffersToWrite = 1024;
-
-               // Setup
-               final BufferFileWriter writer = SpilledSubpartitionViewTest
-                               .createWriterAndWriteBuffers(ioManager, new 
TestInfiniteBufferProvider(), numberOfBuffersToWrite);
-
-               writer.close();
-
-               final TestPooledBufferProvider viewBufferPool = new 
TestPooledBufferProvider(1);
-
-               final SpilledSubpartitionViewAsyncIO view = new 
SpilledSubpartitionViewAsyncIO(
-                               mock(ResultSubpartition.class), viewBufferPool, 
ioManager,
-                               writer.getChannelID(), 0);
-
-               final TestSubpartitionConsumer consumer = new 
TestSubpartitionConsumer(view, false,
-                               new TestConsumerCallback.RecyclingCallback());
-
-               // Consume subpartition
-               consumer.call();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIOTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIOTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIOTest.java
deleted file mode 100644
index f8baae4..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIOTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
-import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
-import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import static org.mockito.Mockito.mock;
-
-public class SpilledSubpartitionViewSyncIOTest {
-
-       private static final IOManager ioManager = new IOManagerAsync();
-
-       private static final TestInfiniteBufferProvider writerBufferPool =
-                       new TestInfiniteBufferProvider();
-
-       @AfterClass
-       public static void shutdown() {
-               ioManager.shutdown();
-       }
-
-       @Test
-       public void testWriteConsume() throws Exception {
-               // Config
-               final int numberOfBuffersToWrite = 512;
-
-               // Setup
-               final BufferFileWriter writer = SpilledSubpartitionViewTest
-                               .createWriterAndWriteBuffers(ioManager, 
writerBufferPool, numberOfBuffersToWrite);
-
-               writer.close();
-
-               final TestPooledBufferProvider viewBufferPool = new 
TestPooledBufferProvider(1);
-
-               final SpilledSubpartitionViewSyncIO view = new 
SpilledSubpartitionViewSyncIO(
-                               mock(ResultSubpartition.class),
-                               viewBufferPool.getMemorySegmentSize(),
-                               writer.getChannelID(),
-                               0);
-
-               final TestSubpartitionConsumer consumer = new 
TestSubpartitionConsumer(view, false,
-                               new TestConsumerCallback.RecyclingCallback());
-
-               // Consume subpartition
-               consumer.call();
-       }
-
-       @Test
-       public void testConsumeWithFewBuffers() throws Exception {
-               // Config
-               final int numberOfBuffersToWrite = 512;
-
-               // Setup
-               final BufferFileWriter writer = SpilledSubpartitionViewTest
-                               .createWriterAndWriteBuffers(ioManager, 
writerBufferPool, numberOfBuffersToWrite);
-
-               writer.close();
-
-               final SpilledSubpartitionViewSyncIO view = new 
SpilledSubpartitionViewSyncIO(
-                               mock(ResultSubpartition.class),
-                               32 * 1024,
-                               writer.getChannelID(),
-                               0);
-
-               // No buffer available, don't deadlock. We need to make 
progress in situations when the view
-               // is consumed at an input gate with local and remote channels. 
The remote channels might
-               // eat up all the buffers, at which point the spilled view will 
not have any buffers
-               // available and the input gate can't make any progress if we 
don't return immediately.
-               //
-               // The current solution is straight-forward with a separate 
buffer per spilled subpartition,
-               // but introduces memory-overhead.
-               //
-               // TODO Replace with asynchronous buffer pool request as this 
introduces extra buffers per
-               // consumed subpartition.
-               final TestSubpartitionConsumer consumer = new 
TestSubpartitionConsumer(view, false,
-                               new TestConsumerCallback.RecyclingCallback());
-
-               consumer.call();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
index fff7bc6..8f8da93 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
@@ -21,23 +21,18 @@ package org.apache.flink.runtime.io.network.partition;
 import com.google.common.collect.Lists;
 import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import 
org.apache.flink.runtime.io.network.util.TestConsumerCallback.RecyclingCallback;
+import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
 import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
 import org.junit.AfterClass;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -47,55 +42,103 @@ import java.util.concurrent.TimeoutException;
 
 import static org.mockito.Mockito.mock;
 
-/**
- * Test for both the asynchronous and synchronous spilled subpartition view 
implementation.
- */
-@RunWith(Parameterized.class)
 public class SpilledSubpartitionViewTest {
 
-       private static final IOManager ioManager = new IOManagerAsync();
-
-       private static final ExecutorService executor = 
Executors.newCachedThreadPool();
+       private static final IOManager IO_MANAGER = new IOManagerAsync();
 
        private static final TestInfiniteBufferProvider writerBufferPool =
-                       new TestInfiniteBufferProvider();
-
-       private IOMode ioMode;
-
-       public SpilledSubpartitionViewTest(IOMode ioMode) {
-               this.ioMode = ioMode;
-       }
+               new TestInfiniteBufferProvider();
 
        @AfterClass
        public static void shutdown() {
-               ioManager.shutdown();
-               executor.shutdown();
+               IO_MANAGER.shutdown();
        }
 
-       @Parameterized.Parameters
-       public static Collection<Object[]> ioMode() {
-               return Arrays.asList(new Object[][]{
-                               {IOMode.SYNC},
-                               {IOMode.ASYNC}});
+       @Test
+       public void testWriteConsume() throws Exception {
+               // Config
+               final int numberOfBuffersToWrite = 512;
+
+               // Setup
+               final BufferFileWriter writer = 
createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, 
numberOfBuffersToWrite);
+
+               writer.close();
+
+               TestPooledBufferProvider viewBufferPool = new 
TestPooledBufferProvider(1);
+
+               TestSubpartitionConsumer consumer = new 
TestSubpartitionConsumer(
+                       false, new TestConsumerCallback.RecyclingCallback());
+
+               SpilledSubpartitionView view = new SpilledSubpartitionView(
+                       mock(ResultSubpartition.class),
+                       viewBufferPool.getMemorySegmentSize(),
+                       writer,
+                       numberOfBuffersToWrite + 1, // +1 for end-of-partition
+                       consumer);
+
+               consumer.setSubpartitionView(view);
+
+               // Consume subpartition
+               consumer.call();
        }
 
        @Test
-       public void testReadMultipleFilesWithSingleBufferPool() throws 
Exception {
+       public void testConsumeWithFewBuffers() throws Exception {
+               // Config
+               final int numberOfBuffersToWrite = 512;
+
                // Setup
-               BufferFileWriter[] writers = new BufferFileWriter[]{
-                               createWriterAndWriteBuffers(ioManager, 
writerBufferPool, 512),
-                               createWriterAndWriteBuffers(ioManager, 
writerBufferPool, 512)
-               };
+               final BufferFileWriter writer = 
createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, 
numberOfBuffersToWrite);
+
+               writer.close();
+
+               TestSubpartitionConsumer consumer = new 
TestSubpartitionConsumer(
+                       false, new TestConsumerCallback.RecyclingCallback());
+
+               SpilledSubpartitionView view = new SpilledSubpartitionView(
+                       mock(ResultSubpartition.class),
+                       32 * 1024,
+                       writer,
+                       numberOfBuffersToWrite + 1,
+                       consumer);
+
+               consumer.setSubpartitionView(view);
+
+               // No buffer available, don't deadlock. We need to make 
progress in situations when the view
+               // is consumed at an input gate with local and remote channels. 
The remote channels might
+               // eat up all the buffers, at which point the spilled view will 
not have any buffers
+               // available and the input gate can't make any progress if we 
don't return immediately.
+               //
+               // The current solution is straight-forward with a separate 
buffer per spilled subpartition,
+               // but introduces memory-overhead.
+               //
+               // TODO Replace with asynchronous buffer pool request as this 
introduces extra buffers per
+               // consumed subpartition.
+               consumer.call();
+       }
 
-               final ResultSubpartitionView[] readers = new 
ResultSubpartitionView[writers.length];
+       @Test
+       public void testReadMultipleFilesWithSingleBufferPool() throws 
Exception {
+               ExecutorService executor = null;
+               BufferFileWriter[] writers = null;
+               ResultSubpartitionView[] readers = null;
 
-               // Make this buffer pool small so that we can test the 
behaviour of the asynchronous view
-               // with few  buffers.
-               final BufferProvider inputBuffers = new 
TestPooledBufferProvider(2);
+               try {
+                       executor = Executors.newCachedThreadPool();
 
-               final ResultSubpartition parent = 
mock(ResultSubpartition.class);
+                       // Setup
+                       writers = new BufferFileWriter[]{
+                               createWriterAndWriteBuffers(IO_MANAGER, 
writerBufferPool, 512),
+                               createWriterAndWriteBuffers(IO_MANAGER, 
writerBufferPool, 512)
+                       };
+
+                       readers = new ResultSubpartitionView[writers.length];
+                       TestSubpartitionConsumer[] consumers = new 
TestSubpartitionConsumer[writers.length];
+
+                       BufferProvider inputBuffers = new 
TestPooledBufferProvider(2);
+
+                       ResultSubpartition parent = 
mock(ResultSubpartition.class);
 
-               try {
                        // Wait for writers to finish
                        for (BufferFileWriter writer : writers) {
                                writer.close();
@@ -103,56 +146,56 @@ public class SpilledSubpartitionViewTest {
 
                        // Create the views depending on the test configuration
                        for (int i = 0; i < readers.length; i++) {
-                               if (ioMode.isSynchronous()) {
-                                       readers[i] = new 
SpilledSubpartitionViewSyncIO(
-                                                       parent,
-                                                       
inputBuffers.getMemorySegmentSize(),
-                                                       
writers[i].getChannelID(),
-                                                       0);
-                               }
-                               else {
-                                       // For the asynchronous view, it is 
important that a registered listener will
-                                       // eventually be notified even if the 
view never got a buffer to read data into.
-                                       //
-                                       // At runtime, multiple threads never 
share the same buffer pool as in test. We
-                                       // do it here to provoke the erroneous 
behaviour.
-                                       readers[i] = new 
SpilledSubpartitionViewAsyncIO(
-                                                       parent, inputBuffers, 
ioManager, writers[i].getChannelID(), 0);
-                               }
+                               consumers[i] = new TestSubpartitionConsumer(
+                                       false, new 
TestConsumerCallback.RecyclingCallback());
+
+                               readers[i] = new SpilledSubpartitionView(
+                                       parent,
+                                       inputBuffers.getMemorySegmentSize(),
+                                       writers[i],
+                                       512 + 1, // +1 for end of partition 
event
+                                       consumers[i]);
+
+                               consumers[i].setSubpartitionView(readers[i]);
                        }
 
                        final List<Future<Boolean>> results = 
Lists.newArrayList();
 
                        // Submit the consuming tasks
-                       for (ResultSubpartitionView view : readers) {
-                               results.add(executor.submit(new 
TestSubpartitionConsumer(
-                                               view, false, new 
RecyclingCallback())));
+                       for (TestSubpartitionConsumer consumer : consumers) {
+                               results.add(executor.submit(consumer));
                        }
 
                        // Wait for the results
                        for (Future<Boolean> res : results) {
                                try {
                                        res.get(2, TimeUnit.MINUTES);
-                               }
-                               catch (TimeoutException e) {
+                               } catch (TimeoutException e) {
                                        throw new TimeoutException("There has 
been a timeout in the test. This " +
-                                                       "indicates that there 
is a bug/deadlock in the tested subpartition " +
-                                                       "view. The timed out 
test was in " + ioMode + " mode.");
+                                               "indicates that there is a 
bug/deadlock in the tested subpartition " +
+                                               "view.");
                                }
                        }
-               }
-               finally {
-                       for (BufferFileWriter writer : writers) {
-                               if (writer != null) {
-                                       writer.deleteChannel();
+               } finally {
+                       if (writers != null) {
+                               for (BufferFileWriter writer : writers) {
+                                       if (writer != null) {
+                                               writer.deleteChannel();
+                                       }
                                }
                        }
 
-                       for (ResultSubpartitionView reader : readers) {
-                               if (reader != null) {
-                                       reader.releaseAllResources();
+                       if (readers != null) {
+                               for (ResultSubpartitionView reader : readers) {
+                                       if (reader != null) {
+                                               reader.releaseAllResources();
+                                       }
                                }
                        }
+
+                       if (executor != null) {
+                               executor.shutdown();
+                       }
                }
        }
 
@@ -163,9 +206,9 @@ public class SpilledSubpartitionViewTest {
         * <p> Call {@link BufferFileWriter#close()} to ensure that all buffers 
have been written.
         */
        static BufferFileWriter createWriterAndWriteBuffers(
-                       IOManager ioManager,
-                       BufferProvider bufferProvider,
-                       int numberOfBuffers) throws IOException {
+               IOManager ioManager,
+               BufferProvider bufferProvider,
+               int numberOfBuffers) throws IOException {
 
                final BufferFileWriter writer = 
ioManager.createBufferFileWriter(ioManager.createChannel());
 
@@ -177,4 +220,5 @@ public class SpilledSubpartitionViewTest {
 
                return writer;
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 26a8f29..14942bc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -50,8 +49,7 @@ public abstract class SubpartitionTestBase extends TestLogger 
{
                        subpartition.finish();
 
                        assertFalse(subpartition.add(mock(Buffer.class)));
-               }
-               finally {
+               } finally {
                        if (subpartition != null) {
                                subpartition.release();
                        }
@@ -66,8 +64,7 @@ public abstract class SubpartitionTestBase extends TestLogger 
{
                        subpartition.release();
 
                        assertFalse(subpartition.add(mock(Buffer.class)));
-               }
-               finally {
+               } finally {
                        if (subpartition != null) {
                                subpartition.release();
                        }
@@ -97,7 +94,8 @@ public abstract class SubpartitionTestBase extends TestLogger 
{
                TestInfiniteBufferProvider buffers = new 
TestInfiniteBufferProvider();
 
                // Create the view
-               ResultSubpartitionView view = partition.createReadView(buffers);
+               BufferAvailabilityListener listener = 
mock(BufferAvailabilityListener.class);
+               ResultSubpartitionView view = partition.createReadView(buffers, 
listener);
 
                // The added buffer and end-of-partition event
                assertNotNull(view.getNextBuffer());

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index 0868398..2cb3b2f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -20,7 +20,6 @@ package 
org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.junit.Test;
 import scala.Tuple2;
@@ -103,10 +102,10 @@ public class InputChannelTest {
 
        private InputChannel createInputChannel(int initialBackoff, int 
maxBackoff) {
                return new MockInputChannel(
-                               mock(SingleInputGate.class),
-                               0,
-                               new ResultPartitionID(),
-                               new Tuple2<Integer, Integer>(initialBackoff, 
maxBackoff));
+                       mock(SingleInputGate.class),
+                       0,
+                       new ResultPartitionID(),
+                       new Tuple2<Integer, Integer>(initialBackoff, 
maxBackoff));
        }
 
        // 
---------------------------------------------------------------------------------------------
@@ -114,10 +113,10 @@ public class InputChannelTest {
        private static class MockInputChannel extends InputChannel {
 
                private MockInputChannel(
-                               SingleInputGate inputGate,
-                               int channelIndex,
-                               ResultPartitionID partitionId,
-                               Tuple2<Integer, Integer> initialAndMaxBackoff) {
+                       SingleInputGate inputGate,
+                       int channelIndex,
+                       ResultPartitionID partitionId,
+                       Tuple2<Integer, Integer> initialAndMaxBackoff) {
 
                        super(inputGate, channelIndex, partitionId, 
initialAndMaxBackoff, new SimpleCounter());
                }
@@ -127,7 +126,7 @@ public class InputChannelTest {
                }
 
                @Override
-               Buffer getNextBuffer() throws IOException, InterruptedException 
{
+               BufferAndAvailability getNextBuffer() throws IOException, 
InterruptedException {
                        return null;
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index cfbe99e..fa44393 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -64,24 +64,25 @@ public class IteratorWrappingTestSingleInputGate<T extends 
IOReadableWritable> e
 
                // The input iterator can produce an infinite stream. That's 
why we have to serialize each
                // record on demand and cannot do it upfront.
-               final Answer<Buffer> answer = new Answer<Buffer>() {
+               final Answer<InputChannel.BufferAndAvailability> answer = new 
Answer<InputChannel.BufferAndAvailability>() {
+
+                       private boolean hasData = inputIterator.next(reuse) != 
null;
+
                        @Override
-                       public Buffer answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-                               if (inputIterator.next(reuse) != null) {
+                       public InputChannel.BufferAndAvailability 
answer(InvocationOnMock invocationOnMock) throws Throwable {
+                               if (hasData) {
                                        final Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), 
mock(BufferRecycler.class));
                                        serializer.setNextBuffer(buffer);
                                        serializer.addRecord(reuse);
 
-                                       
inputGate.onAvailableBuffer(inputChannel.getInputChannel());
+                                       hasData = inputIterator.next(reuse) != 
null;
 
                                        // Call getCurrentBuffer to ensure size 
is set
-                                       return serializer.getCurrentBuffer();
-                               }
-                               else {
-
+                                       return new 
InputChannel.BufferAndAvailability(serializer.getCurrentBuffer(), true);
+                               } else {
                                        
when(inputChannel.getInputChannel().isReleased()).thenReturn(true);
 
-                                       return 
EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+                                       return new 
InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE),
 false);
                                }
                        }
                };
@@ -93,8 +94,8 @@ public class IteratorWrappingTestSingleInputGate<T extends 
IOReadableWritable> e
                return this;
        }
 
-       public IteratorWrappingTestSingleInputGate<T> read() {
-               inputGate.onAvailableBuffer(inputChannel.getInputChannel());
+       public IteratorWrappingTestSingleInputGate<T> notifyNonEmpty() {
+               inputGate.notifyChannelNonEmpty(inputChannel.getInputChannel());
 
                return this;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index ee28b5a..9b36ea9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import com.google.common.collect.Lists;
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.execution.CancelTaskException;
@@ -31,6 +30,7 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -42,7 +42,6 @@ import 
org.apache.flink.runtime.io.network.util.TestPartitionProducer;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-
 import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -59,7 +58,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import static 
org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode.ASYNC;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -123,7 +121,6 @@ public class LocalInputChannelTest {
                                        partitionManager,
                                        partitionConsumableNotifier,
                                        ioManager,
-                                       ASYNC,
                                        true);
 
                        // Create a buffer pool for this partition
@@ -198,7 +195,7 @@ public class LocalInputChannelTest {
                LocalInputChannel ch = createLocalInputChannel(inputGate, 
partitionManager, backoff);
 
                when(partitionManager
-                               .createSubpartitionView(eq(ch.partitionId), 
eq(0), eq(bufferProvider)))
+                               .createSubpartitionView(eq(ch.partitionId), 
eq(0), eq(bufferProvider), any(BufferAvailabilityListener.class)))
                                .thenThrow(new 
PartitionNotFoundException(ch.partitionId));
 
                Timer timer = mock(Timer.class);
@@ -214,7 +211,7 @@ public class LocalInputChannelTest {
                // Initial request
                ch.requestSubpartition(0);
                verify(partitionManager)
-                               .createSubpartitionView(eq(ch.partitionId), 
eq(0), eq(bufferProvider));
+                               .createSubpartitionView(eq(ch.partitionId), 
eq(0), eq(bufferProvider), any(BufferAvailabilityListener.class));
 
                // Request subpartition and verify that the actual requests are 
delayed.
                for (long expected : expectedDelays) {
@@ -235,14 +232,13 @@ public class LocalInputChannelTest {
 
        @Test(expected = CancelTaskException.class)
        public void testProducerFailedException() throws Exception {
-
                ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
                when(view.isReleased()).thenReturn(true);
                when(view.getFailureCause()).thenReturn(new Exception("Expected 
test exception"));
 
                ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
                when(partitionManager
-                               
.createSubpartitionView(any(ResultPartitionID.class), anyInt(), 
any(BufferProvider.class)))
+                               
.createSubpartitionView(any(ResultPartitionID.class), anyInt(), 
any(BufferProvider.class), any(BufferAvailabilityListener.class)))
                                .thenReturn(view);
 
                SingleInputGate inputGate = mock(SingleInputGate.class);
@@ -250,7 +246,7 @@ public class LocalInputChannelTest {
                when(inputGate.getBufferProvider()).thenReturn(bufferProvider);
 
                LocalInputChannel ch = createLocalInputChannel(
-                               inputGate, partitionManager, new 
Tuple2<Integer, Integer>(0, 0));
+                               inputGate, partitionManager, new Tuple2<>(0, 
0));
 
                ch.requestSubpartition(0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 9eb49ef..e7eb5c4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -70,7 +70,7 @@ public class RemoteInputChannelTest {
 
                // Need to notify the input gate for the out-of-order buffer as 
well. Otherwise the
                // receiving task will not notice the error.
-               verify(inputGate, times(2)).onAvailableBuffer(eq(inputChannel));
+               verify(inputGate, 
times(2)).notifyChannelNonEmpty(eq(inputChannel));
        }
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 9c8be81..ec4b31d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
@@ -73,18 +74,18 @@ public class SingleInputGateTest {
        public void testBasicGetNextLogic() throws Exception {
                // Setup
                final SingleInputGate inputGate = new SingleInputGate(
-                               "Test Task Name", new JobID(), new 
ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, 
mock(PartitionStateChecker.class), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+                       "Test Task Name", new JobID(), new 
ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, 
mock(PartitionStateChecker.class), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                final TestInputChannel[] inputChannels = new TestInputChannel[]{
-                               new TestInputChannel(inputGate, 0),
-                               new TestInputChannel(inputGate, 1)
+                       new TestInputChannel(inputGate, 0),
+                       new TestInputChannel(inputGate, 1)
                };
 
                inputGate.setInputChannel(
-                               new IntermediateResultPartitionID(), 
inputChannels[0].getInputChannel());
+                       new IntermediateResultPartitionID(), 
inputChannels[0].getInputChannel());
 
                inputGate.setInputChannel(
-                               new IntermediateResultPartitionID(), 
inputChannels[1].getInputChannel());
+                       new IntermediateResultPartitionID(), 
inputChannels[1].getInputChannel());
 
                // Test
                inputChannels[0].readBuffer();
@@ -93,9 +94,12 @@ public class SingleInputGateTest {
                inputChannels[1].readEndOfPartitionEvent();
                inputChannels[0].readEndOfPartitionEvent();
 
-               verifyBufferOrEvent(inputGate, true, 0);
+               
inputGate.notifyChannelNonEmpty(inputChannels[0].getInputChannel());
+               
inputGate.notifyChannelNonEmpty(inputChannels[1].getInputChannel());
+
                verifyBufferOrEvent(inputGate, true, 0);
                verifyBufferOrEvent(inputGate, true, 1);
+               verifyBufferOrEvent(inputGate, true, 0);
                verifyBufferOrEvent(inputGate, false, 1);
                verifyBufferOrEvent(inputGate, false, 0);
 
@@ -112,10 +116,14 @@ public class SingleInputGateTest {
 
                final ResultSubpartitionView iterator = 
mock(ResultSubpartitionView.class);
                when(iterator.getNextBuffer()).thenReturn(
-                               new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), 
mock(BufferRecycler.class)));
+                       new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), 
mock(BufferRecycler.class)));
 
                final ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
-               
when(partitionManager.createSubpartitionView(any(ResultPartitionID.class), 
anyInt(), any(BufferProvider.class))).thenReturn(iterator);
+               when(partitionManager.createSubpartitionView(
+                       any(ResultPartitionID.class),
+                       anyInt(),
+                       any(BufferProvider.class),
+                       
any(BufferAvailabilityListener.class))).thenReturn(iterator);
 
                // Setup reader with one local and one unknown input channel
                final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
@@ -144,7 +152,7 @@ public class SingleInputGateTest {
                inputGate.requestPartitions();
 
                // Only the local channel can request
-               verify(partitionManager, 
times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), 
any(BufferProvider.class));
+               verify(partitionManager, 
times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), 
any(BufferProvider.class), any(BufferAvailabilityListener.class));
 
                // Send event backwards and initialize unknown channel 
afterwards
                final TaskEvent event = new TestTaskEvent();
@@ -156,7 +164,7 @@ public class SingleInputGateTest {
                // After the update, the pending event should be send to local 
channel
                inputGate.updateInputChannel(new 
InputChannelDeploymentDescriptor(new 
ResultPartitionID(unknownPartitionId.getPartitionId(), 
unknownPartitionId.getProducerId()), ResultPartitionLocation.createLocal()));
 
-               verify(partitionManager, 
times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), 
any(BufferProvider.class));
+               verify(partitionManager, 
times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), 
any(BufferProvider.class), any(BufferAvailabilityListener.class));
                verify(taskEventDispatcher, 
times(2)).publish(any(ResultPartitionID.class), any(TaskEvent.class));
        }
 
@@ -169,34 +177,34 @@ public class SingleInputGateTest {
        @Test
        public void testUpdateChannelBeforeRequest() throws Exception {
                SingleInputGate inputGate = new SingleInputGate(
-                               "t1",
-                               new JobID(),
-                               new ExecutionAttemptID(),
-                               new IntermediateDataSetID(),
-                               0,
-                               1,
-                               mock(PartitionStateChecker.class), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+                       "t1",
+                       new JobID(),
+                       new ExecutionAttemptID(),
+                       new IntermediateDataSetID(),
+                       0,
+                       1,
+                       mock(PartitionStateChecker.class), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
 
                InputChannel unknown = new UnknownInputChannel(
-                               inputGate,
-                               0,
-                               new ResultPartitionID(),
-                               partitionManager,
-                               new TaskEventDispatcher(),
-                               new LocalConnectionManager(),
-                               new Tuple2<Integer, Integer>(0, 0), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+                       inputGate,
+                       0,
+                       new ResultPartitionID(),
+                       partitionManager,
+                       new TaskEventDispatcher(),
+                       new LocalConnectionManager(),
+                       new Tuple2<Integer, Integer>(0, 0), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                inputGate.setInputChannel(unknown.partitionId.getPartitionId(), 
unknown);
 
                // Update to a local channel and verify that no request is 
triggered
                inputGate.updateInputChannel(new 
InputChannelDeploymentDescriptor(
-                               unknown.partitionId,
-                               ResultPartitionLocation.createLocal()));
+                       unknown.partitionId,
+                       ResultPartitionLocation.createLocal()));
 
                verify(partitionManager, never()).createSubpartitionView(
-                               any(ResultPartitionID.class), anyInt(), 
any(BufferProvider.class));
+                       any(ResultPartitionID.class), anyInt(), 
any(BufferProvider.class), any(BufferAvailabilityListener.class));
        }
 
        /**
@@ -209,24 +217,24 @@ public class SingleInputGateTest {
 
                // Setup the input gate with a single channel that does nothing
                final SingleInputGate inputGate = new SingleInputGate(
-                               "InputGate",
-                               new JobID(),
-                               new ExecutionAttemptID(),
-                               new IntermediateDataSetID(),
-                               0,
-                               1,
-                               mock(PartitionStateChecker.class),
-                               new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+                       "InputGate",
+                       new JobID(),
+                       new ExecutionAttemptID(),
+                       new IntermediateDataSetID(),
+                       0,
+                       1,
+                       mock(PartitionStateChecker.class),
+                       new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                InputChannel unknown = new UnknownInputChannel(
-                               inputGate,
-                               0,
-                               new ResultPartitionID(),
-                               new ResultPartitionManager(),
-                               new TaskEventDispatcher(),
-                               new LocalConnectionManager(),
-                               new Tuple2<Integer, Integer>(0, 0),
-                               new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+                       inputGate,
+                       0,
+                       new ResultPartitionID(),
+                       new ResultPartitionManager(),
+                       new TaskEventDispatcher(),
+                       new LocalConnectionManager(),
+                       new Tuple2<>(0, 0),
+                       new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                inputGate.setInputChannel(unknown.partitionId.getPartitionId(), 
unknown);
 
@@ -246,16 +254,15 @@ public class SingleInputGateTest {
                // Wait for blocking queue poll call and release input gate
                boolean success = false;
                for (int i = 0; i < 50; i++) {
-                       if (asyncConsumer != null && asyncConsumer.isAlive()) {
-                               StackTraceElement[] stackTrace = 
asyncConsumer.getStackTrace();
-                               success = isInBlockingQueuePoll(stackTrace);
+                       if (asyncConsumer.isAlive()) {
+                               success = asyncConsumer.getState() == 
Thread.State.WAITING;
                        }
 
                        if (success) {
                                break;
                        } else {
                                // Retry
-                               Thread.sleep(500);
+                               Thread.sleep(100);
                        }
                }
 
@@ -279,7 +286,7 @@ public class SingleInputGateTest {
         */
        @Test
        public void testRequestBackoffConfiguration() throws Exception {
-               ResultPartitionID[] partitionIds = new ResultPartitionID[] {
+               ResultPartitionID[] partitionIds = new ResultPartitionID[]{
                        new ResultPartitionID(),
                        new ResultPartitionID(),
                        new ResultPartitionID()
@@ -351,33 +358,12 @@ public class SingleInputGateTest {
                }
        }
 
-       /**
-        * Returns whether the stack trace represents a Thread in a blocking 
queue
-        * poll call.
-        *
-        * @param stackTrace Stack trace of the Thread to check
-        *
-        * @return Flag indicating whether the Thread is in a blocking queue 
poll
-        * call.
-        */
-       private boolean isInBlockingQueuePoll(StackTraceElement[] stackTrace) {
-               for (StackTraceElement elem : stackTrace) {
-                       if (elem.getMethodName().equals("poll") &&
-                                       
elem.getClassName().equals("java.util.concurrent.LinkedBlockingQueue")) {
-
-                               return true;
-                       }
-               }
-
-               return false;
-       }
-
        // 
---------------------------------------------------------------------------------------------
 
        static void verifyBufferOrEvent(
-                       InputGate inputGate,
-                       boolean isBuffer,
-                       int channelIndex) throws IOException, 
InterruptedException {
+               InputGate inputGate,
+               boolean isBuffer,
+               int channelIndex) throws IOException, InterruptedException {
 
                final BufferOrEvent boe = inputGate.getNextBufferOrEvent();
                assertEquals(isBuffer, boe.isBuffer());

http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
index 7ea67b3..a6597a2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -19,10 +19,8 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -46,7 +44,7 @@ public class TestInputChannel {
        private final SingleInputGate inputGate;
 
        // Abusing Mockito here... ;)
-       protected OngoingStubbing<Buffer> stubbing;
+       protected OngoingStubbing<InputChannel.BufferAndAvailability> stubbing;
 
        public TestInputChannel(SingleInputGate inputGate, int channelIndex) {
                checkArgument(channelIndex >= 0);
@@ -57,13 +55,10 @@ public class TestInputChannel {
 
        public TestInputChannel read(Buffer buffer) throws IOException, 
InterruptedException {
                if (stubbing == null) {
-                       stubbing = 
when(mock.getNextBuffer()).thenReturn(buffer);
+                       stubbing = when(mock.getNextBuffer()).thenReturn(new 
InputChannel.BufferAndAvailability(buffer, true));
+               } else {
+                       stubbing = stubbing.thenReturn(new 
InputChannel.BufferAndAvailability(buffer, true));
                }
-               else {
-                       stubbing = stubbing.thenReturn(buffer);
-               }
-
-               inputGate.onAvailableBuffer(mock);
 
                return this;
        }
@@ -75,34 +70,23 @@ public class TestInputChannel {
                return read(buffer);
        }
 
-       public TestInputChannel readEvent() throws IOException, 
InterruptedException {
-               return read(EventSerializer.toBuffer(new TestTaskEvent()));
-       }
-
-       public TestInputChannel readEndOfSuperstepEvent() throws IOException, 
InterruptedException {
-               return 
read(EventSerializer.toBuffer(EndOfSuperstepEvent.INSTANCE));
-       }
-
        public TestInputChannel readEndOfPartitionEvent() throws IOException, 
InterruptedException {
-               final Answer<Buffer> answer = new Answer<Buffer>() {
+               final Answer<InputChannel.BufferAndAvailability> answer = new 
Answer<InputChannel.BufferAndAvailability>() {
                        @Override
-                       public Buffer answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+                       public InputChannel.BufferAndAvailability 
answer(InvocationOnMock invocationOnMock) throws Throwable {
                                // Return true after finishing
                                when(mock.isReleased()).thenReturn(true);
 
-                               return 
EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+                               return new 
InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE),
 false);
                        }
                };
 
                if (stubbing == null) {
                        stubbing = 
when(mock.getNextBuffer()).thenAnswer(answer);
-               }
-               else {
+               } else {
                        stubbing = stubbing.thenAnswer(answer);
                }
 
-               inputGate.onAvailableBuffer(mock);
-
                return this;
        }
 

Reply via email to