[FLINK-5169] [network] Adjust tests to new consumer logic
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d97eaaf Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d97eaaf Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d97eaaf Branch: refs/heads/release-1.1 Commit: 8d97eaaf8676968a6b8a800d638b61b0161e6570 Parents: 6cfce17 Author: Ufuk Celebi <[email protected]> Authored: Mon Nov 28 09:59:58 2016 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Mon Nov 28 21:05:00 2016 +0100 ---------------------------------------------------------------------- .../runtime/io/disk/SpillingBufferTest.java | 40 ++-- .../iomanager/BufferFileWriterReaderTest.java | 1 - .../io/network/api/reader/BufferReaderTest.java | 115 ------------ .../netty/CancelPartitionRequestTest.java | 37 ++-- .../netty/PartitionRequestQueueTest.java | 23 ++- .../netty/ServerTransportErrorHandlingTest.java | 54 +++--- .../PartialConsumePipelinedResultTest.java | 18 +- .../partition/PipelinedSubpartitionTest.java | 118 +++--------- .../network/partition/ResultPartitionTest.java | 1 - .../partition/SpillableSubpartitionTest.java | 20 +- .../SpilledSubpartitionViewAsyncIOTest.java | 65 ------- .../SpilledSubpartitionViewSyncIOTest.java | 103 ---------- .../partition/SpilledSubpartitionViewTest.java | 188 ++++++++++++------- .../network/partition/SubpartitionTestBase.java | 10 +- .../partition/consumer/InputChannelTest.java | 19 +- .../IteratorWrappingTestSingleInputGate.java | 23 +-- .../consumer/LocalInputChannelTest.java | 14 +- .../consumer/RemoteInputChannelTest.java | 2 +- .../partition/consumer/SingleInputGateTest.java | 130 ++++++------- .../partition/consumer/TestInputChannel.java | 32 +--- .../partition/consumer/TestSingleInputGate.java | 108 ++--------- .../partition/consumer/UnionInputGateTest.java | 43 +++-- .../network/util/TestSubpartitionConsumer.java | 69 ++++--- .../runtime/operators/DataSinkTaskTest.java | 7 +- .../operators/chaining/ChainTaskTest.java | 1 - .../operators/testutils/MockEnvironment.java | 12 +- .../operators/testutils/TaskTestBase.java | 2 +- .../TaskCancelAsyncProducerConsumerITCase.java | 34 +--- .../consumer/StreamTestSingleInputGate.java | 44 +++-- .../io/BarrierBufferMassiveRandomTest.java | 17 +- .../streaming/runtime/io/MockInputGate.java | 28 +-- .../tasks/OneInputStreamTaskTestHarness.java | 27 +-- .../runtime/tasks/StreamMockEnvironment.java | 6 +- .../StreamTaskCancellationBarrierTest.java | 4 +- .../runtime/tasks/StreamTaskTestHarness.java | 16 +- .../runtime/tasks/TwoInputStreamTaskTest.java | 6 +- .../tasks/TwoInputStreamTaskTestHarness.java | 1 - 37 files changed, 539 insertions(+), 899 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java index 538c416..01a9723 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java @@ -109,7 +109,7 @@ public class SpillingBufferTest { DataInputView inView = outView.flip(); generator.reset(); - // read and re-generate all records and compare them + // notifyNonEmpty and re-generate all records and compare them final Tuple2<Integer, String> readRec = new Tuple2<>(); for (int i = 0; i < NUM_PAIRS_INMEM; i++) { generator.next(rec); @@ -121,14 +121,14 @@ public class SpillingBufferTest { int k2 = readRec.f0; String v2 = readRec.f1; - Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2)); + Assert.assertTrue("The re-generated and the notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2)); } - // re-read the data + // re-notifyNonEmpty the data inView = outView.flip(); generator.reset(); - // read and re-generate all records and compare them + // notifyNonEmpty and re-generate all records and compare them for (int i = 0; i < NUM_PAIRS_INMEM; i++) { generator.next(rec); serializer.deserialize(readRec, inView); @@ -139,7 +139,7 @@ public class SpillingBufferTest { int k2 = readRec.f0; String v2 = readRec.f1; - Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2)); + Assert.assertTrue("The re-generated and the notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2)); } this.memoryManager.release(outView.close()); @@ -169,7 +169,7 @@ public class SpillingBufferTest { DataInputView inView = outView.flip(); generator.reset(); - // read and re-generate all records and compare them + // notifyNonEmpty and re-generate all records and compare them final Tuple2<Integer, String> readRec = new Tuple2<>(); try { for (int i = 0; i < NUM_PAIRS_INMEM + 1; i++) { @@ -182,7 +182,7 @@ public class SpillingBufferTest { int k2 = readRec.f0; String v2 = readRec.f1; - Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2)); + Assert.assertTrue("The re-generated and the notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2)); } Assert.fail("Read too much, expected EOFException."); } @@ -190,11 +190,11 @@ public class SpillingBufferTest { // expected } - // re-read the data + // re-notifyNonEmpty the data inView = outView.flip(); generator.reset(); - // read and re-generate all records and compare them + // notifyNonEmpty and re-generate all records and compare them for (int i = 0; i < NUM_PAIRS_INMEM; i++) { generator.next(rec); serializer.deserialize(readRec, inView); @@ -205,7 +205,7 @@ public class SpillingBufferTest { int k2 = readRec.f0; String v2 = readRec.f1; - Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2)); + Assert.assertTrue("The re-generated and the notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2)); } this.memoryManager.release(outView.close()); @@ -237,7 +237,7 @@ public class SpillingBufferTest { DataInputView inView = outView.flip(); generator.reset(); - // read and re-generate all records and compare them + // notifyNonEmpty and re-generate all records and compare them final Tuple2<Integer, String> readRec = new Tuple2<>(); for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) { generator.next(rec); @@ -249,14 +249,14 @@ public class SpillingBufferTest { int k2 = readRec.f0; String v2 = readRec.f1; - Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2)); + Assert.assertTrue("The re-generated and the notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2)); } - // re-read the data + // re-notifyNonEmpty the data inView = outView.flip(); generator.reset(); - // read and re-generate all records and compare them + // notifyNonEmpty and re-generate all records and compare them for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) { generator.next(rec); serializer.deserialize(readRec, inView); @@ -267,7 +267,7 @@ public class SpillingBufferTest { int k2 = readRec.f0; String v2 = readRec.f1; - Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2)); + Assert.assertTrue("The re-generated and the notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2)); } this.memoryManager.release(outView.close()); @@ -297,7 +297,7 @@ public class SpillingBufferTest { DataInputView inView = outView.flip(); generator.reset(); - // read and re-generate all records and compare them + // notifyNonEmpty and re-generate all records and compare them final Tuple2<Integer, String> readRec = new Tuple2<>(); try { for (int i = 0; i < NUM_PAIRS_EXTERNAL + 1; i++) { @@ -310,7 +310,7 @@ public class SpillingBufferTest { int k2 = readRec.f0; String v2 = readRec.f1; - Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2)); + Assert.assertTrue("The re-generated and the notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2)); } Assert.fail("Read too much, expected EOFException."); } @@ -318,11 +318,11 @@ public class SpillingBufferTest { // expected } - // re-read the data + // re-notifyNonEmpty the data inView = outView.flip(); generator.reset(); - // read and re-generate all records and compare them + // notifyNonEmpty and re-generate all records and compare them for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) { generator.next(rec); serializer.deserialize(readRec, inView); @@ -333,7 +333,7 @@ public class SpillingBufferTest { int k2 = readRec.f0; String v2 = readRec.f1; - Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2)); + Assert.assertTrue("The re-generated and the notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2)); } this.memoryManager.release(outView.close()); http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java index 375be45..2da0f7e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java @@ -23,7 +23,6 @@ import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.testutils.DiscardingRecycler; - import org.junit.After; import org.junit.Before; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java deleted file mode 100644 index 099b6fb..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.api.reader; - -import org.apache.flink.runtime.event.TaskEvent; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.partition.consumer.TestSingleInputGate; -import org.apache.flink.runtime.io.network.util.TestTaskEvent; -import org.apache.flink.runtime.taskmanager.Task; -import org.apache.flink.runtime.util.event.EventListener; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.io.IOException; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(Task.class) -@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"}) -@SuppressWarnings("unchecked") -public class BufferReaderTest { - - @Test - public void testGetNextBufferOrEvent() throws IOException, InterruptedException { - - final TestSingleInputGate inputGate = new TestSingleInputGate(1) - .readBuffer().readBuffer().readEvent() - .readBuffer().readBuffer().readEvent() - .readBuffer().readEndOfPartitionEvent(); - - final BufferReader reader = new BufferReader(inputGate.getInputGate()); - - // Task event listener to be notified... - final EventListener<TaskEvent> listener = mock(EventListener.class); - reader.registerTaskEventListener(listener, TestTaskEvent.class); - - int numReadBuffers = 0; - while ((reader.getNextBuffer()) != null) { - numReadBuffers++; - } - - assertEquals(5, numReadBuffers); - verify(listener, times(2)).onEvent(any(TaskEvent.class)); - } - - @Test - public void testIterativeGetNextBufferOrEvent() throws IOException, InterruptedException { - - final TestSingleInputGate inputGate = new TestSingleInputGate(1) - .readBuffer().readBuffer().readEvent() - .readBuffer().readBuffer().readEvent() - .readBuffer().readEndOfSuperstepEvent() - .readBuffer().readBuffer().readEvent() - .readBuffer().readBuffer().readEvent() - .readBuffer().readEndOfPartitionEvent(); - - final BufferReader reader = new BufferReader(inputGate.getInputGate()); - - // Set reader iterative - reader.setIterativeReader(); - - // Task event listener to be notified... - final EventListener<TaskEvent> listener = mock(EventListener.class); - // Task event listener to be notified... - reader.registerTaskEventListener(listener, TestTaskEvent.class); - - int numReadBuffers = 0; - int numEndOfSuperstepEvents = 0; - - while (true) { - Buffer buffer = reader.getNextBuffer(); - - if (buffer != null) { - numReadBuffers++; - } - else if (reader.hasReachedEndOfSuperstep()) { - reader.startNextSuperstep(); - - numEndOfSuperstepEvents++; - } - else if (reader.isFinished()) { - break; - } - } - - assertEquals(10, numReadBuffers); - assertEquals(1, numEndOfSuperstepEvents); - - verify(listener, times(4)).onEvent(any(TaskEvent.class)); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java index 1ff1e99..a2f866a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java @@ -24,14 +24,16 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient; +import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.runtime.util.event.NotificationListener; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.io.IOException; import java.util.concurrent.CountDownLatch; @@ -73,11 +75,18 @@ public class CancelPartitionRequestTest { CountDownLatch sync = new CountDownLatch(1); - ResultSubpartitionView view = spy(new InfiniteSubpartitionView(outboundBuffers, sync)); + final ResultSubpartitionView view = spy(new InfiniteSubpartitionView(outboundBuffers, sync)); // Return infinite subpartition - when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferProvider.class))) - .thenReturn(view); + when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferProvider.class), any(BufferAvailabilityListener.class))) + .thenAnswer(new Answer<ResultSubpartitionView>() { + @Override + public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable { + BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[3]; + listener.notifyBuffersAvailable(Long.MAX_VALUE); + return view; + } + }); PartitionRequestProtocol protocol = new PartitionRequestProtocol( partitions, mock(TaskEventDispatcher.class), mock(NetworkBufferPool.class)); @@ -109,19 +118,26 @@ public class CancelPartitionRequestTest { NettyServerAndClient serverAndClient = null; try { - TestPooledBufferProvider outboundBuffers = new TestPooledBufferProvider(16); + final TestPooledBufferProvider outboundBuffers = new TestPooledBufferProvider(16); ResultPartitionManager partitions = mock(ResultPartitionManager.class); ResultPartitionID pid = new ResultPartitionID(); - CountDownLatch sync = new CountDownLatch(1); + final CountDownLatch sync = new CountDownLatch(1); - ResultSubpartitionView view = spy(new InfiniteSubpartitionView(outboundBuffers, sync)); + final ResultSubpartitionView view = spy(new InfiniteSubpartitionView(outboundBuffers, sync)); // Return infinite subpartition - when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferProvider.class))) - .thenReturn(view); + when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferProvider.class), any(BufferAvailabilityListener.class))) + .thenAnswer(new Answer<ResultSubpartitionView>() { + @Override + public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable { + BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[3]; + listener.notifyBuffersAvailable(Long.MAX_VALUE); + return view; + } + }); PartitionRequestProtocol protocol = new PartitionRequestProtocol( partitions, mock(TaskEventDispatcher.class), mock(NetworkBufferPool.class)); @@ -174,8 +190,7 @@ public class CancelPartitionRequestTest { } @Override - public boolean registerListener(final NotificationListener listener) throws IOException { - return false; + public void notifyBuffersAvailable(long buffers) throws IOException { } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java index 3f281bd..7224e96 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java @@ -20,12 +20,18 @@ package org.apache.flink.runtime.io.network.netty; import io.netty.channel.embedded.EmbeddedChannel; import org.apache.flink.runtime.execution.CancelTaskException; +import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -35,14 +41,27 @@ public class PartitionRequestQueueTest { public void testProducerFailedException() throws Exception { PartitionRequestQueue queue = new PartitionRequestQueue(); - EmbeddedChannel ch = new EmbeddedChannel(queue); + ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + ResultPartitionID rpid = new ResultPartitionID(); + BufferProvider bufferProvider = mock(BufferProvider.class); ResultSubpartitionView view = mock(ResultSubpartitionView.class); when(view.isReleased()).thenReturn(true); when(view.getFailureCause()).thenReturn(new RuntimeException("Expected test exception")); + when(partitionProvider.createSubpartitionView( + eq(rpid), + eq(0), + eq(bufferProvider), + any(BufferAvailabilityListener.class))).thenReturn(view); + + EmbeddedChannel ch = new EmbeddedChannel(queue); + + SequenceNumberingViewReader seqView = new SequenceNumberingViewReader(new InputChannelID(), queue); + seqView.requestSubpartitionView(partitionProvider, rpid, 0, bufferProvider); + // Enqueue the erroneous view - queue.enqueue(view, new InputChannelID()); + queue.notifyReaderNonEmpty(seqView); ch.runPendingTasks(); // Read the enqueued msg http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java index 1515f83..1c3557e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java @@ -25,20 +25,20 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.io.network.netty.CancelPartitionRequestTest.InfiniteSubpartitionView; -import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient; +import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder; -import static org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest; import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect; import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.createConfig; import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient; @@ -63,36 +63,43 @@ public class ServerTransportErrorHandlingTest { final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); when(partitionManager - .createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class))) - .thenReturn(new InfiniteSubpartitionView(outboundBuffers, sync)); + .createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class))) + .thenAnswer(new Answer<ResultSubpartitionView>() { + @Override + public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable { + BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[3]; + listener.notifyBuffersAvailable(Long.MAX_VALUE); + return new CancelPartitionRequestTest.InfiniteSubpartitionView(outboundBuffers, sync); + } + }); NettyProtocol protocol = new NettyProtocol() { @Override public ChannelHandler[] getServerChannelHandlers() { return new PartitionRequestProtocol( - partitionManager, - mock(TaskEventDispatcher.class), - mock(NetworkBufferPool.class)).getServerChannelHandlers(); + partitionManager, + mock(TaskEventDispatcher.class), + mock(NetworkBufferPool.class)).getServerChannelHandlers(); } @Override public ChannelHandler[] getClientChannelHandlers() { - return new ChannelHandler[] { - new NettyMessageEncoder(), - // Close on read - new ChannelInboundHandlerAdapter() { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) - throws Exception { - - ctx.channel().close(); - } + return new ChannelHandler[]{ + new NettyMessage.NettyMessageEncoder(), + // Close on read + new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) + throws Exception { + + ctx.channel().close(); } + } }; } }; - NettyServerAndClient serverAndClient = null; + NettyTestUtil.NettyServerAndClient serverAndClient = null; try { serverAndClient = initServerAndClient(protocol, createConfig()); @@ -100,15 +107,14 @@ public class ServerTransportErrorHandlingTest { Channel ch = connect(serverAndClient); // Write something to trigger close by server - ch.writeAndFlush(new PartitionRequest(new ResultPartitionID(), 0, new InputChannelID())); + ch.writeAndFlush(new NettyMessage.PartitionRequest(new ResultPartitionID(), 0, new InputChannelID())); // Wait for the notification if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) { fail("Timed out after waiting for " + TestingUtils.TESTING_DURATION().toMillis() + - " ms to be notified about released partition."); + " ms to be notified about released partition."); } - } - finally { + } finally { shutdown(serverAndClient); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java index 97f42b1..4a826b7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java @@ -20,12 +20,12 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.io.network.api.reader.BufferReader; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.testingUtils.TestingCluster; @@ -86,12 +86,12 @@ public class PartialConsumePipelinedResultTest { // The partition needs to be pipelined, otherwise the original issue does not occur, because // the sender and receiver are not online at the same time. receiver.connectNewDataSetAsInput( - sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); final JobGraph jobGraph = new JobGraph("Partial Consume of Pipelined Result", sender, receiver); final SlotSharingGroup slotSharingGroup = new SlotSharingGroup( - sender.getID(), receiver.getID()); + sender.getID(), receiver.getID()); sender.setSlotSharingGroup(slotSharingGroup); receiver.setSlotSharingGroup(slotSharingGroup); @@ -126,11 +126,11 @@ public class PartialConsumePipelinedResultTest { @Override public void invoke() throws Exception { - final BufferReader reader = new BufferReader(getEnvironment().getInputGate(0)); - - final Buffer buffer = reader.getNextBuffer(); - - buffer.recycle(); + InputGate gate = getEnvironment().getInputGate(0); + Buffer buffer = gate.getNextBufferOrEvent().getBuffer(); + if (buffer != null) { + buffer.recycle(); + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java index 8750a1a..a56177e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.util.TestConsumerCallback; -import org.apache.flink.runtime.io.network.util.TestNotificationListener; import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; import org.apache.flink.runtime.io.network.util.TestProducerSource; import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer; @@ -38,12 +37,13 @@ import java.util.concurrent.Future; import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; public class PipelinedSubpartitionTest extends SubpartitionTestBase { @@ -63,80 +63,25 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { } @Test - public void testRegisterListener() throws Exception { - final PipelinedSubpartition subpartition = createSubpartition(); - - final TestNotificationListener listener = new TestNotificationListener(); - - // Register a listener - assertTrue(subpartition.registerListener(listener)); - - // Try to register another listener - try { - subpartition.registerListener(listener); - - fail("Did not throw expected exception after duplicate listener registration."); - } - catch (IllegalStateException expected) { - } - } - - @Test - public void testListenerNotification() throws Exception { - final TestNotificationListener listener = new TestNotificationListener(); - assertEquals(0, listener.getNumberOfNotifications()); - - { - final PipelinedSubpartition subpartition = createSubpartition(); - - // Register a listener - assertTrue(subpartition.registerListener(listener)); - - // Notify on add and remove listener - subpartition.add(mock(Buffer.class)); - assertEquals(1, listener.getNumberOfNotifications()); - - // No notification, should have removed listener after first notification - subpartition.add(mock(Buffer.class)); - assertEquals(1, listener.getNumberOfNotifications()); - } - - { - final PipelinedSubpartition subpartition = createSubpartition(); - - // Register a listener - assertTrue(subpartition.registerListener(listener)); - - // Notify on finish - subpartition.finish(); - assertEquals(2, listener.getNumberOfNotifications()); - } - - { - final PipelinedSubpartition subpartition = createSubpartition(); - - // Register a listener - assertTrue(subpartition.registerListener(listener)); - - // Notify on release - subpartition.release(); - assertEquals(3, listener.getNumberOfNotifications()); - } - } - - @Test public void testIllegalReadViewRequest() throws Exception { final PipelinedSubpartition subpartition = createSubpartition(); // Successful request - assertNotNull(subpartition.createReadView(null)); + assertNotNull(subpartition.createReadView(null, new BufferAvailabilityListener() { + @Override + public void notifyBuffersAvailable(long numBuffers) { + } + })); try { - subpartition.createReadView(null); + subpartition.createReadView(null, new BufferAvailabilityListener() { + @Override + public void notifyBuffersAvailable(long numBuffers) { + } + }); - fail("Did not throw expected exception after duplicate read view request."); - } - catch (IllegalStateException expected) { + fail("Did not throw expected exception after duplicate notifyNonEmpty view request."); + } catch (IllegalStateException expected) { } } @@ -144,23 +89,19 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { public void testBasicPipelinedProduceConsumeLogic() throws Exception { final PipelinedSubpartition subpartition = createSubpartition(); - TestNotificationListener listener = new TestNotificationListener(); + BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class); - ResultSubpartitionView view = subpartition.createReadView(null); + ResultSubpartitionView view = subpartition.createReadView(null, listener); // Empty => should return null assertNull(view.getNextBuffer()); - - // Register listener for notifications - assertTrue(view.registerListener(listener)); - - assertEquals(0, listener.getNumberOfNotifications()); + verify(listener, times(1)).notifyBuffersAvailable(eq(0L)); // Add data to the queue... subpartition.add(createBuffer()); // ...should have resulted in a notification - assertEquals(1, listener.getNumberOfNotifications()); + verify(listener, times(1)).notifyBuffersAvailable(eq(1L)); // ...and one available result assertNotNull(view.getNextBuffer()); @@ -168,10 +109,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { // Add data to the queue... subpartition.add(createBuffer()); - // ...don't allow to subscribe, if data is available - assertFalse(view.registerListener(listener)); - - assertEquals(1, listener.getNumberOfNotifications()); + verify(listener, times(2)).notifyBuffersAvailable(eq(1L)); } @Test @@ -208,7 +146,6 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { @Override public BufferOrEvent getNextBufferOrEvent() throws Exception { - if (numberOfBuffers == producerNumberOfBuffersToProduce) { return null; } @@ -261,16 +198,17 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { final PipelinedSubpartition subpartition = createSubpartition(); - final PipelinedSubpartitionView view = subpartition.createReadView(null); + TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(isSlowConsumer, consumerCallback); + final PipelinedSubpartitionView view = subpartition.createReadView(null, consumer); + consumer.setSubpartitionView(view); - Future<Boolean> producer = executorService.submit( - new TestSubpartitionProducer(subpartition, isSlowProducer, producerSource)); + Future<Boolean> producerResult = executorService.submit( + new TestSubpartitionProducer(subpartition, isSlowProducer, producerSource)); - Future<Boolean> consumer = executorService.submit( - new TestSubpartitionConsumer(view, isSlowConsumer, consumerCallback)); + Future<Boolean> consumerResult = executorService.submit(consumer); // Wait for producer and consumer to finish - producer.get(); - consumer.get(); + producerResult.get(); + consumerResult.get(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java index 302b667..a4abe75 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java @@ -84,7 +84,6 @@ public class ResultPartitionTest { mock(ResultPartitionManager.class), notifier, mock(IOManager.class), - IOManager.IOMode.SYNC, sendScheduleOrUpdateConsumersMessage); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java index d7e56c8..b7a54d7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter; import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider; import org.junit.AfterClass; import org.junit.Test; @@ -34,7 +35,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import static org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode.SYNC; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; @@ -59,7 +60,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { @Override ResultSubpartition createSubpartition() { - return new SpillableSubpartition(0, mock(ResultPartition.class), ioManager, SYNC); + return new SpillableSubpartition(0, mock(ResultPartition.class), ioManager); } /** @@ -87,14 +88,14 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { // Mock I/O manager returning the blocking spill writer IOManager ioManager = mock(IOManager.class); when(ioManager.createBufferFileWriter(any(FileIOChannel.ID.class))) - .thenReturn(spillWriter); + .thenReturn(spillWriter); // The partition final SpillableSubpartition partition = new SpillableSubpartition( - 0, mock(ResultPartition.class), ioManager, SYNC); + 0, mock(ResultPartition.class), ioManager); // Spill the partition initially (creates the spill writer) - partition.releaseMemory(); + assertEquals(0, partition.releaseMemory()); ExecutorService executor = Executors.newSingleThreadExecutor(); @@ -130,13 +131,18 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { public void testReleasePartitionAndGetNext() throws Exception { // Create partition and add some buffers SpillableSubpartition partition = new SpillableSubpartition( - 0, mock(ResultPartition.class), ioManager, SYNC); + 0, mock(ResultPartition.class), ioManager); partition.finish(); // Create the read view ResultSubpartitionView readView = spy(partition - .createReadView(new TestInfiniteBufferProvider())); + .createReadView(new TestInfiniteBufferProvider(), new BufferAvailabilityListener() { + @Override + public void notifyBuffersAvailable(long numBuffers) { + + } + })); // The released state check (of the parent) needs to be independent // of the released state of the view. http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIOTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIOTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIOTest.java deleted file mode 100644 index 981c8ee..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIOTest.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.partition; - -import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.io.network.util.TestConsumerCallback; -import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider; -import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; -import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer; -import org.junit.AfterClass; -import org.junit.Test; - -import static org.mockito.Mockito.mock; - -public class SpilledSubpartitionViewAsyncIOTest { - - private static final IOManager ioManager = new IOManagerAsync(); - - @AfterClass - public static void shutdown() { - ioManager.shutdown(); - } - - @Test - public void testWriteConsume() throws Exception { - // Config - final int numberOfBuffersToWrite = 1024; - - // Setup - final BufferFileWriter writer = SpilledSubpartitionViewTest - .createWriterAndWriteBuffers(ioManager, new TestInfiniteBufferProvider(), numberOfBuffersToWrite); - - writer.close(); - - final TestPooledBufferProvider viewBufferPool = new TestPooledBufferProvider(1); - - final SpilledSubpartitionViewAsyncIO view = new SpilledSubpartitionViewAsyncIO( - mock(ResultSubpartition.class), viewBufferPool, ioManager, - writer.getChannelID(), 0); - - final TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(view, false, - new TestConsumerCallback.RecyclingCallback()); - - // Consume subpartition - consumer.call(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIOTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIOTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIOTest.java deleted file mode 100644 index f8baae4..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIOTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.partition; - -import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.io.network.util.TestConsumerCallback; -import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider; -import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; -import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer; -import org.junit.AfterClass; -import org.junit.Test; - -import static org.mockito.Mockito.mock; - -public class SpilledSubpartitionViewSyncIOTest { - - private static final IOManager ioManager = new IOManagerAsync(); - - private static final TestInfiniteBufferProvider writerBufferPool = - new TestInfiniteBufferProvider(); - - @AfterClass - public static void shutdown() { - ioManager.shutdown(); - } - - @Test - public void testWriteConsume() throws Exception { - // Config - final int numberOfBuffersToWrite = 512; - - // Setup - final BufferFileWriter writer = SpilledSubpartitionViewTest - .createWriterAndWriteBuffers(ioManager, writerBufferPool, numberOfBuffersToWrite); - - writer.close(); - - final TestPooledBufferProvider viewBufferPool = new TestPooledBufferProvider(1); - - final SpilledSubpartitionViewSyncIO view = new SpilledSubpartitionViewSyncIO( - mock(ResultSubpartition.class), - viewBufferPool.getMemorySegmentSize(), - writer.getChannelID(), - 0); - - final TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(view, false, - new TestConsumerCallback.RecyclingCallback()); - - // Consume subpartition - consumer.call(); - } - - @Test - public void testConsumeWithFewBuffers() throws Exception { - // Config - final int numberOfBuffersToWrite = 512; - - // Setup - final BufferFileWriter writer = SpilledSubpartitionViewTest - .createWriterAndWriteBuffers(ioManager, writerBufferPool, numberOfBuffersToWrite); - - writer.close(); - - final SpilledSubpartitionViewSyncIO view = new SpilledSubpartitionViewSyncIO( - mock(ResultSubpartition.class), - 32 * 1024, - writer.getChannelID(), - 0); - - // No buffer available, don't deadlock. We need to make progress in situations when the view - // is consumed at an input gate with local and remote channels. The remote channels might - // eat up all the buffers, at which point the spilled view will not have any buffers - // available and the input gate can't make any progress if we don't return immediately. - // - // The current solution is straight-forward with a separate buffer per spilled subpartition, - // but introduces memory-overhead. - // - // TODO Replace with asynchronous buffer pool request as this introduces extra buffers per - // consumed subpartition. - final TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(view, false, - new TestConsumerCallback.RecyclingCallback()); - - consumer.call(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java index fff7bc6..8f8da93 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java @@ -21,23 +21,18 @@ package org.apache.flink.runtime.io.network.partition; import com.google.common.collect.Lists; import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; -import org.apache.flink.runtime.io.network.util.TestConsumerCallback.RecyclingCallback; +import org.apache.flink.runtime.io.network.util.TestConsumerCallback; import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider; import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer; import org.junit.AfterClass; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -47,55 +42,103 @@ import java.util.concurrent.TimeoutException; import static org.mockito.Mockito.mock; -/** - * Test for both the asynchronous and synchronous spilled subpartition view implementation. - */ -@RunWith(Parameterized.class) public class SpilledSubpartitionViewTest { - private static final IOManager ioManager = new IOManagerAsync(); - - private static final ExecutorService executor = Executors.newCachedThreadPool(); + private static final IOManager IO_MANAGER = new IOManagerAsync(); private static final TestInfiniteBufferProvider writerBufferPool = - new TestInfiniteBufferProvider(); - - private IOMode ioMode; - - public SpilledSubpartitionViewTest(IOMode ioMode) { - this.ioMode = ioMode; - } + new TestInfiniteBufferProvider(); @AfterClass public static void shutdown() { - ioManager.shutdown(); - executor.shutdown(); + IO_MANAGER.shutdown(); } - @Parameterized.Parameters - public static Collection<Object[]> ioMode() { - return Arrays.asList(new Object[][]{ - {IOMode.SYNC}, - {IOMode.ASYNC}}); + @Test + public void testWriteConsume() throws Exception { + // Config + final int numberOfBuffersToWrite = 512; + + // Setup + final BufferFileWriter writer = createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, numberOfBuffersToWrite); + + writer.close(); + + TestPooledBufferProvider viewBufferPool = new TestPooledBufferProvider(1); + + TestSubpartitionConsumer consumer = new TestSubpartitionConsumer( + false, new TestConsumerCallback.RecyclingCallback()); + + SpilledSubpartitionView view = new SpilledSubpartitionView( + mock(ResultSubpartition.class), + viewBufferPool.getMemorySegmentSize(), + writer, + numberOfBuffersToWrite + 1, // +1 for end-of-partition + consumer); + + consumer.setSubpartitionView(view); + + // Consume subpartition + consumer.call(); } @Test - public void testReadMultipleFilesWithSingleBufferPool() throws Exception { + public void testConsumeWithFewBuffers() throws Exception { + // Config + final int numberOfBuffersToWrite = 512; + // Setup - BufferFileWriter[] writers = new BufferFileWriter[]{ - createWriterAndWriteBuffers(ioManager, writerBufferPool, 512), - createWriterAndWriteBuffers(ioManager, writerBufferPool, 512) - }; + final BufferFileWriter writer = createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, numberOfBuffersToWrite); + + writer.close(); + + TestSubpartitionConsumer consumer = new TestSubpartitionConsumer( + false, new TestConsumerCallback.RecyclingCallback()); + + SpilledSubpartitionView view = new SpilledSubpartitionView( + mock(ResultSubpartition.class), + 32 * 1024, + writer, + numberOfBuffersToWrite + 1, + consumer); + + consumer.setSubpartitionView(view); + + // No buffer available, don't deadlock. We need to make progress in situations when the view + // is consumed at an input gate with local and remote channels. The remote channels might + // eat up all the buffers, at which point the spilled view will not have any buffers + // available and the input gate can't make any progress if we don't return immediately. + // + // The current solution is straight-forward with a separate buffer per spilled subpartition, + // but introduces memory-overhead. + // + // TODO Replace with asynchronous buffer pool request as this introduces extra buffers per + // consumed subpartition. + consumer.call(); + } - final ResultSubpartitionView[] readers = new ResultSubpartitionView[writers.length]; + @Test + public void testReadMultipleFilesWithSingleBufferPool() throws Exception { + ExecutorService executor = null; + BufferFileWriter[] writers = null; + ResultSubpartitionView[] readers = null; - // Make this buffer pool small so that we can test the behaviour of the asynchronous view - // with few buffers. - final BufferProvider inputBuffers = new TestPooledBufferProvider(2); + try { + executor = Executors.newCachedThreadPool(); - final ResultSubpartition parent = mock(ResultSubpartition.class); + // Setup + writers = new BufferFileWriter[]{ + createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, 512), + createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, 512) + }; + + readers = new ResultSubpartitionView[writers.length]; + TestSubpartitionConsumer[] consumers = new TestSubpartitionConsumer[writers.length]; + + BufferProvider inputBuffers = new TestPooledBufferProvider(2); + + ResultSubpartition parent = mock(ResultSubpartition.class); - try { // Wait for writers to finish for (BufferFileWriter writer : writers) { writer.close(); @@ -103,56 +146,56 @@ public class SpilledSubpartitionViewTest { // Create the views depending on the test configuration for (int i = 0; i < readers.length; i++) { - if (ioMode.isSynchronous()) { - readers[i] = new SpilledSubpartitionViewSyncIO( - parent, - inputBuffers.getMemorySegmentSize(), - writers[i].getChannelID(), - 0); - } - else { - // For the asynchronous view, it is important that a registered listener will - // eventually be notified even if the view never got a buffer to read data into. - // - // At runtime, multiple threads never share the same buffer pool as in test. We - // do it here to provoke the erroneous behaviour. - readers[i] = new SpilledSubpartitionViewAsyncIO( - parent, inputBuffers, ioManager, writers[i].getChannelID(), 0); - } + consumers[i] = new TestSubpartitionConsumer( + false, new TestConsumerCallback.RecyclingCallback()); + + readers[i] = new SpilledSubpartitionView( + parent, + inputBuffers.getMemorySegmentSize(), + writers[i], + 512 + 1, // +1 for end of partition event + consumers[i]); + + consumers[i].setSubpartitionView(readers[i]); } final List<Future<Boolean>> results = Lists.newArrayList(); // Submit the consuming tasks - for (ResultSubpartitionView view : readers) { - results.add(executor.submit(new TestSubpartitionConsumer( - view, false, new RecyclingCallback()))); + for (TestSubpartitionConsumer consumer : consumers) { + results.add(executor.submit(consumer)); } // Wait for the results for (Future<Boolean> res : results) { try { res.get(2, TimeUnit.MINUTES); - } - catch (TimeoutException e) { + } catch (TimeoutException e) { throw new TimeoutException("There has been a timeout in the test. This " + - "indicates that there is a bug/deadlock in the tested subpartition " + - "view. The timed out test was in " + ioMode + " mode."); + "indicates that there is a bug/deadlock in the tested subpartition " + + "view."); } } - } - finally { - for (BufferFileWriter writer : writers) { - if (writer != null) { - writer.deleteChannel(); + } finally { + if (writers != null) { + for (BufferFileWriter writer : writers) { + if (writer != null) { + writer.deleteChannel(); + } } } - for (ResultSubpartitionView reader : readers) { - if (reader != null) { - reader.releaseAllResources(); + if (readers != null) { + for (ResultSubpartitionView reader : readers) { + if (reader != null) { + reader.releaseAllResources(); + } } } + + if (executor != null) { + executor.shutdown(); + } } } @@ -163,9 +206,9 @@ public class SpilledSubpartitionViewTest { * <p> Call {@link BufferFileWriter#close()} to ensure that all buffers have been written. */ static BufferFileWriter createWriterAndWriteBuffers( - IOManager ioManager, - BufferProvider bufferProvider, - int numberOfBuffers) throws IOException { + IOManager ioManager, + BufferProvider bufferProvider, + int numberOfBuffers) throws IOException { final BufferFileWriter writer = ioManager.createBufferFileWriter(ioManager.createChannel()); @@ -177,4 +220,5 @@ public class SpilledSubpartitionViewTest { return writer; } + } http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java index 26a8f29..14942bc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider; import org.apache.flink.util.TestLogger; import org.junit.Test; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -50,8 +49,7 @@ public abstract class SubpartitionTestBase extends TestLogger { subpartition.finish(); assertFalse(subpartition.add(mock(Buffer.class))); - } - finally { + } finally { if (subpartition != null) { subpartition.release(); } @@ -66,8 +64,7 @@ public abstract class SubpartitionTestBase extends TestLogger { subpartition.release(); assertFalse(subpartition.add(mock(Buffer.class))); - } - finally { + } finally { if (subpartition != null) { subpartition.release(); } @@ -97,7 +94,8 @@ public abstract class SubpartitionTestBase extends TestLogger { TestInfiniteBufferProvider buffers = new TestInfiniteBufferProvider(); // Create the view - ResultSubpartitionView view = partition.createReadView(buffers); + BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class); + ResultSubpartitionView view = partition.createReadView(buffers, listener); // The added buffer and end-of-partition event assertNotNull(view.getNextBuffer()); http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java index 0868398..2cb3b2f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.event.TaskEvent; -import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.junit.Test; import scala.Tuple2; @@ -103,10 +102,10 @@ public class InputChannelTest { private InputChannel createInputChannel(int initialBackoff, int maxBackoff) { return new MockInputChannel( - mock(SingleInputGate.class), - 0, - new ResultPartitionID(), - new Tuple2<Integer, Integer>(initialBackoff, maxBackoff)); + mock(SingleInputGate.class), + 0, + new ResultPartitionID(), + new Tuple2<Integer, Integer>(initialBackoff, maxBackoff)); } // --------------------------------------------------------------------------------------------- @@ -114,10 +113,10 @@ public class InputChannelTest { private static class MockInputChannel extends InputChannel { private MockInputChannel( - SingleInputGate inputGate, - int channelIndex, - ResultPartitionID partitionId, - Tuple2<Integer, Integer> initialAndMaxBackoff) { + SingleInputGate inputGate, + int channelIndex, + ResultPartitionID partitionId, + Tuple2<Integer, Integer> initialAndMaxBackoff) { super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, new SimpleCounter()); } @@ -127,7 +126,7 @@ public class InputChannelTest { } @Override - Buffer getNextBuffer() throws IOException, InterruptedException { + BufferAndAvailability getNextBuffer() throws IOException, InterruptedException { return null; } http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java index cfbe99e..fa44393 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java @@ -64,24 +64,25 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e // The input iterator can produce an infinite stream. That's why we have to serialize each // record on demand and cannot do it upfront. - final Answer<Buffer> answer = new Answer<Buffer>() { + final Answer<InputChannel.BufferAndAvailability> answer = new Answer<InputChannel.BufferAndAvailability>() { + + private boolean hasData = inputIterator.next(reuse) != null; + @Override - public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable { - if (inputIterator.next(reuse) != null) { + public InputChannel.BufferAndAvailability answer(InvocationOnMock invocationOnMock) throws Throwable { + if (hasData) { final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), mock(BufferRecycler.class)); serializer.setNextBuffer(buffer); serializer.addRecord(reuse); - inputGate.onAvailableBuffer(inputChannel.getInputChannel()); + hasData = inputIterator.next(reuse) != null; // Call getCurrentBuffer to ensure size is set - return serializer.getCurrentBuffer(); - } - else { - + return new InputChannel.BufferAndAvailability(serializer.getCurrentBuffer(), true); + } else { when(inputChannel.getInputChannel().isReleased()).thenReturn(true); - return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE); + return new InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false); } } }; @@ -93,8 +94,8 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e return this; } - public IteratorWrappingTestSingleInputGate<T> read() { - inputGate.onAvailableBuffer(inputChannel.getInputChannel()); + public IteratorWrappingTestSingleInputGate<T> notifyNonEmpty() { + inputGate.notifyChannelNonEmpty(inputChannel.getInputChannel()); return this; } http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index ee28b5a..9b36ea9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; import com.google.common.collect.Lists; - import org.apache.flink.api.common.JobID; import org.apache.flink.core.memory.MemoryType; import org.apache.flink.runtime.execution.CancelTaskException; @@ -31,6 +30,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; +import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -42,7 +42,6 @@ import org.apache.flink.runtime.io.network.util.TestPartitionProducer; import org.apache.flink.runtime.io.network.util.TestProducerSource; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; - import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -59,7 +58,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import static org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode.ASYNC; import static org.apache.flink.util.Preconditions.checkArgument; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -123,7 +121,6 @@ public class LocalInputChannelTest { partitionManager, partitionConsumableNotifier, ioManager, - ASYNC, true); // Create a buffer pool for this partition @@ -198,7 +195,7 @@ public class LocalInputChannelTest { LocalInputChannel ch = createLocalInputChannel(inputGate, partitionManager, backoff); when(partitionManager - .createSubpartitionView(eq(ch.partitionId), eq(0), eq(bufferProvider))) + .createSubpartitionView(eq(ch.partitionId), eq(0), eq(bufferProvider), any(BufferAvailabilityListener.class))) .thenThrow(new PartitionNotFoundException(ch.partitionId)); Timer timer = mock(Timer.class); @@ -214,7 +211,7 @@ public class LocalInputChannelTest { // Initial request ch.requestSubpartition(0); verify(partitionManager) - .createSubpartitionView(eq(ch.partitionId), eq(0), eq(bufferProvider)); + .createSubpartitionView(eq(ch.partitionId), eq(0), eq(bufferProvider), any(BufferAvailabilityListener.class)); // Request subpartition and verify that the actual requests are delayed. for (long expected : expectedDelays) { @@ -235,14 +232,13 @@ public class LocalInputChannelTest { @Test(expected = CancelTaskException.class) public void testProducerFailedException() throws Exception { - ResultSubpartitionView view = mock(ResultSubpartitionView.class); when(view.isReleased()).thenReturn(true); when(view.getFailureCause()).thenReturn(new Exception("Expected test exception")); ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); when(partitionManager - .createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class))) + .createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class))) .thenReturn(view); SingleInputGate inputGate = mock(SingleInputGate.class); @@ -250,7 +246,7 @@ public class LocalInputChannelTest { when(inputGate.getBufferProvider()).thenReturn(bufferProvider); LocalInputChannel ch = createLocalInputChannel( - inputGate, partitionManager, new Tuple2<Integer, Integer>(0, 0)); + inputGate, partitionManager, new Tuple2<>(0, 0)); ch.requestSubpartition(0); http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index 9eb49ef..e7eb5c4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -70,7 +70,7 @@ public class RemoteInputChannelTest { // Need to notify the input gate for the out-of-order buffer as well. Otherwise the // receiving task will not notice the error. - verify(inputGate, times(2)).onAvailableBuffer(eq(inputChannel)); + verify(inputGate, times(2)).notifyChannelNonEmpty(eq(inputChannel)); } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 9c8be81..ec4b31d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; +import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; @@ -73,18 +74,18 @@ public class SingleInputGateTest { public void testBasicGetNextLogic() throws Exception { // Setup final SingleInputGate inputGate = new SingleInputGate( - "Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + "Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); final TestInputChannel[] inputChannels = new TestInputChannel[]{ - new TestInputChannel(inputGate, 0), - new TestInputChannel(inputGate, 1) + new TestInputChannel(inputGate, 0), + new TestInputChannel(inputGate, 1) }; inputGate.setInputChannel( - new IntermediateResultPartitionID(), inputChannels[0].getInputChannel()); + new IntermediateResultPartitionID(), inputChannels[0].getInputChannel()); inputGate.setInputChannel( - new IntermediateResultPartitionID(), inputChannels[1].getInputChannel()); + new IntermediateResultPartitionID(), inputChannels[1].getInputChannel()); // Test inputChannels[0].readBuffer(); @@ -93,9 +94,12 @@ public class SingleInputGateTest { inputChannels[1].readEndOfPartitionEvent(); inputChannels[0].readEndOfPartitionEvent(); - verifyBufferOrEvent(inputGate, true, 0); + inputGate.notifyChannelNonEmpty(inputChannels[0].getInputChannel()); + inputGate.notifyChannelNonEmpty(inputChannels[1].getInputChannel()); + verifyBufferOrEvent(inputGate, true, 0); verifyBufferOrEvent(inputGate, true, 1); + verifyBufferOrEvent(inputGate, true, 0); verifyBufferOrEvent(inputGate, false, 1); verifyBufferOrEvent(inputGate, false, 0); @@ -112,10 +116,14 @@ public class SingleInputGateTest { final ResultSubpartitionView iterator = mock(ResultSubpartitionView.class); when(iterator.getNextBuffer()).thenReturn( - new Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), mock(BufferRecycler.class))); + new Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), mock(BufferRecycler.class))); final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); - when(partitionManager.createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class))).thenReturn(iterator); + when(partitionManager.createSubpartitionView( + any(ResultPartitionID.class), + anyInt(), + any(BufferProvider.class), + any(BufferAvailabilityListener.class))).thenReturn(iterator); // Setup reader with one local and one unknown input channel final IntermediateDataSetID resultId = new IntermediateDataSetID(); @@ -144,7 +152,7 @@ public class SingleInputGateTest { inputGate.requestPartitions(); // Only the local channel can request - verify(partitionManager, times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class)); + verify(partitionManager, times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class)); // Send event backwards and initialize unknown channel afterwards final TaskEvent event = new TestTaskEvent(); @@ -156,7 +164,7 @@ public class SingleInputGateTest { // After the update, the pending event should be send to local channel inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(new ResultPartitionID(unknownPartitionId.getPartitionId(), unknownPartitionId.getProducerId()), ResultPartitionLocation.createLocal())); - verify(partitionManager, times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class)); + verify(partitionManager, times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class)); verify(taskEventDispatcher, times(2)).publish(any(ResultPartitionID.class), any(TaskEvent.class)); } @@ -169,34 +177,34 @@ public class SingleInputGateTest { @Test public void testUpdateChannelBeforeRequest() throws Exception { SingleInputGate inputGate = new SingleInputGate( - "t1", - new JobID(), - new ExecutionAttemptID(), - new IntermediateDataSetID(), - 0, - 1, - mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + "t1", + new JobID(), + new ExecutionAttemptID(), + new IntermediateDataSetID(), + 0, + 1, + mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); InputChannel unknown = new UnknownInputChannel( - inputGate, - 0, - new ResultPartitionID(), - partitionManager, - new TaskEventDispatcher(), - new LocalConnectionManager(), - new Tuple2<Integer, Integer>(0, 0), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + inputGate, + 0, + new ResultPartitionID(), + partitionManager, + new TaskEventDispatcher(), + new LocalConnectionManager(), + new Tuple2<Integer, Integer>(0, 0), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown); // Update to a local channel and verify that no request is triggered inputGate.updateInputChannel(new InputChannelDeploymentDescriptor( - unknown.partitionId, - ResultPartitionLocation.createLocal())); + unknown.partitionId, + ResultPartitionLocation.createLocal())); verify(partitionManager, never()).createSubpartitionView( - any(ResultPartitionID.class), anyInt(), any(BufferProvider.class)); + any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class)); } /** @@ -209,24 +217,24 @@ public class SingleInputGateTest { // Setup the input gate with a single channel that does nothing final SingleInputGate inputGate = new SingleInputGate( - "InputGate", - new JobID(), - new ExecutionAttemptID(), - new IntermediateDataSetID(), - 0, - 1, - mock(PartitionStateChecker.class), - new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + "InputGate", + new JobID(), + new ExecutionAttemptID(), + new IntermediateDataSetID(), + 0, + 1, + mock(PartitionStateChecker.class), + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); InputChannel unknown = new UnknownInputChannel( - inputGate, - 0, - new ResultPartitionID(), - new ResultPartitionManager(), - new TaskEventDispatcher(), - new LocalConnectionManager(), - new Tuple2<Integer, Integer>(0, 0), - new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + inputGate, + 0, + new ResultPartitionID(), + new ResultPartitionManager(), + new TaskEventDispatcher(), + new LocalConnectionManager(), + new Tuple2<>(0, 0), + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown); @@ -246,16 +254,15 @@ public class SingleInputGateTest { // Wait for blocking queue poll call and release input gate boolean success = false; for (int i = 0; i < 50; i++) { - if (asyncConsumer != null && asyncConsumer.isAlive()) { - StackTraceElement[] stackTrace = asyncConsumer.getStackTrace(); - success = isInBlockingQueuePoll(stackTrace); + if (asyncConsumer.isAlive()) { + success = asyncConsumer.getState() == Thread.State.WAITING; } if (success) { break; } else { // Retry - Thread.sleep(500); + Thread.sleep(100); } } @@ -279,7 +286,7 @@ public class SingleInputGateTest { */ @Test public void testRequestBackoffConfiguration() throws Exception { - ResultPartitionID[] partitionIds = new ResultPartitionID[] { + ResultPartitionID[] partitionIds = new ResultPartitionID[]{ new ResultPartitionID(), new ResultPartitionID(), new ResultPartitionID() @@ -351,33 +358,12 @@ public class SingleInputGateTest { } } - /** - * Returns whether the stack trace represents a Thread in a blocking queue - * poll call. - * - * @param stackTrace Stack trace of the Thread to check - * - * @return Flag indicating whether the Thread is in a blocking queue poll - * call. - */ - private boolean isInBlockingQueuePoll(StackTraceElement[] stackTrace) { - for (StackTraceElement elem : stackTrace) { - if (elem.getMethodName().equals("poll") && - elem.getClassName().equals("java.util.concurrent.LinkedBlockingQueue")) { - - return true; - } - } - - return false; - } - // --------------------------------------------------------------------------------------------- static void verifyBufferOrEvent( - InputGate inputGate, - boolean isBuffer, - int channelIndex) throws IOException, InterruptedException { + InputGate inputGate, + boolean isBuffer, + int channelIndex) throws IOException, InterruptedException { final BufferOrEvent boe = inputGate.getNextBufferOrEvent(); assertEquals(isBuffer, boe.isBuffer()); http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java index 7ea67b3..a6597a2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java @@ -19,10 +19,8 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; -import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.util.TestTaskEvent; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -46,7 +44,7 @@ public class TestInputChannel { private final SingleInputGate inputGate; // Abusing Mockito here... ;) - protected OngoingStubbing<Buffer> stubbing; + protected OngoingStubbing<InputChannel.BufferAndAvailability> stubbing; public TestInputChannel(SingleInputGate inputGate, int channelIndex) { checkArgument(channelIndex >= 0); @@ -57,13 +55,10 @@ public class TestInputChannel { public TestInputChannel read(Buffer buffer) throws IOException, InterruptedException { if (stubbing == null) { - stubbing = when(mock.getNextBuffer()).thenReturn(buffer); + stubbing = when(mock.getNextBuffer()).thenReturn(new InputChannel.BufferAndAvailability(buffer, true)); + } else { + stubbing = stubbing.thenReturn(new InputChannel.BufferAndAvailability(buffer, true)); } - else { - stubbing = stubbing.thenReturn(buffer); - } - - inputGate.onAvailableBuffer(mock); return this; } @@ -75,34 +70,23 @@ public class TestInputChannel { return read(buffer); } - public TestInputChannel readEvent() throws IOException, InterruptedException { - return read(EventSerializer.toBuffer(new TestTaskEvent())); - } - - public TestInputChannel readEndOfSuperstepEvent() throws IOException, InterruptedException { - return read(EventSerializer.toBuffer(EndOfSuperstepEvent.INSTANCE)); - } - public TestInputChannel readEndOfPartitionEvent() throws IOException, InterruptedException { - final Answer<Buffer> answer = new Answer<Buffer>() { + final Answer<InputChannel.BufferAndAvailability> answer = new Answer<InputChannel.BufferAndAvailability>() { @Override - public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable { + public InputChannel.BufferAndAvailability answer(InvocationOnMock invocationOnMock) throws Throwable { // Return true after finishing when(mock.isReleased()).thenReturn(true); - return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE); + return new InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false); } }; if (stubbing == null) { stubbing = when(mock.getNextBuffer()).thenAnswer(answer); - } - else { + } else { stubbing = stubbing.thenAnswer(answer); } - inputGate.onAvailableBuffer(mock); - return this; }
