http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java index 607da94..0749467 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java @@ -20,24 +20,17 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; -import org.apache.flink.runtime.util.event.EventListener; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import java.io.IOException; import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.BlockingQueue; +import java.util.ArrayDeque; import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkElementIndex; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -60,34 +53,42 @@ public class TestSingleInputGate { checkArgument(numberOfInputChannels >= 1); SingleInputGate realGate = new SingleInputGate( - "Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numberOfInputChannels, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + "Test Task Name", + new JobID(), + new ExecutionAttemptID(), + new IntermediateDataSetID(), + 0, + numberOfInputChannels, + mock(PartitionStateChecker.class), + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); this.inputGate = spy(realGate); // Notify about late registrations (added for DataSinkTaskTest#testUnionDataSinkTask). // After merging registerInputOutput and invoke, we have to make sure that the test - // notifcations happen at the expected time. In real programs, this is guaranteed by + // notifications happen at the expected time. In real programs, this is guaranteed by // the instantiation and request partition life cycle. try { Field f = realGate.getClass().getDeclaredField("inputChannelsWithData"); f.setAccessible(true); - final BlockingQueue<InputChannel> notifications = (BlockingQueue<InputChannel>) f.get(realGate); + final ArrayDeque<InputChannel> notifications = (ArrayDeque<InputChannel>) f.get(realGate); doAnswer(new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { invocation.callRealMethod(); - if (!notifications.isEmpty()) { - EventListener<InputGate> listener = (EventListener<InputGate>) invocation.getArguments()[0]; - listener.onEvent(inputGate); + synchronized (notifications) { + if (!notifications.isEmpty()) { + InputGateListener listener = (InputGateListener) invocation.getArguments()[0]; + listener.notifyInputGateNonEmpty(inputGate); + } } return null; } - }).when(inputGate).registerListener(any(EventListener.class)); - } - catch (Exception e) { + }).when(inputGate).registerListener(any(InputGateListener.class)); + } catch (Exception e) { throw new RuntimeException(e); } @@ -101,81 +102,8 @@ public class TestSingleInputGate { } } - public TestSingleInputGate read(Buffer buffer, int channelIndex) throws IOException, InterruptedException { - checkElementIndex(channelIndex, inputGate.getNumberOfInputChannels()); - - inputChannels[channelIndex].read(buffer); - - return this; - } - - public TestSingleInputGate readBuffer() throws IOException, InterruptedException { - return readBuffer(0); - } - - public TestSingleInputGate readBuffer(int channelIndex) throws IOException, InterruptedException { - inputChannels[channelIndex].readBuffer(); - - return this; - } - - public TestSingleInputGate readEvent() throws IOException, InterruptedException { - return readEvent(0); - } - - public TestSingleInputGate readEvent(int channelIndex) throws IOException, InterruptedException { - inputChannels[channelIndex].readEvent(); - - return this; - } - - public TestSingleInputGate readEndOfSuperstepEvent() throws IOException, InterruptedException { - for (TestInputChannel inputChannel : inputChannels) { - inputChannel.readEndOfSuperstepEvent(); - } - - return this; - } - - public TestSingleInputGate readEndOfSuperstepEvent(int channelIndex) throws IOException, InterruptedException { - inputChannels[channelIndex].readEndOfSuperstepEvent(); - - return this; - } - - public TestSingleInputGate readEndOfPartitionEvent() throws IOException, InterruptedException { - for (TestInputChannel inputChannel : inputChannels) { - inputChannel.readEndOfPartitionEvent(); - } - - return this; - } - - public TestSingleInputGate readEndOfPartitionEvent(int channelIndex) throws IOException, InterruptedException { - inputChannels[channelIndex].readEndOfPartitionEvent(); - - return this; - } - public SingleInputGate getInputGate() { return inputGate; } - // ------------------------------------------------------------------------ - - public List<Integer> readAllChannels() throws IOException, InterruptedException { - final List<Integer> readOrder = new ArrayList<Integer>(inputChannels.length); - - for (int i = 0; i < inputChannels.length; i++) { - readOrder.add(i); - } - - Collections.shuffle(readOrder); - - for (int channelIndex : readOrder) { - inputChannels[channelIndex].readBuffer(); - } - - return readOrder; - } }
http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java index 28f621f..faec77e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; - import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.junit.Test; @@ -73,22 +72,32 @@ public class UnionInputGateTest { inputChannels[1][1].readEndOfPartitionEvent(); // 0 => 3 inputChannels[1][0].readEndOfPartitionEvent(); // 0 => 3 - SingleInputGateTest.verifyBufferOrEvent(union, true, 0); - SingleInputGateTest.verifyBufferOrEvent(union, false, 0); - SingleInputGateTest.verifyBufferOrEvent(union, true, 5); - SingleInputGateTest.verifyBufferOrEvent(union, false, 5); - SingleInputGateTest.verifyBufferOrEvent(union, true, 3); - SingleInputGateTest.verifyBufferOrEvent(union, true, 4); - SingleInputGateTest.verifyBufferOrEvent(union, true, 1); - SingleInputGateTest.verifyBufferOrEvent(union, true, 6); - SingleInputGateTest.verifyBufferOrEvent(union, false, 1); - SingleInputGateTest.verifyBufferOrEvent(union, false, 6); - SingleInputGateTest.verifyBufferOrEvent(union, true, 2); - SingleInputGateTest.verifyBufferOrEvent(union, false, 2); - SingleInputGateTest.verifyBufferOrEvent(union, true, 7); - SingleInputGateTest.verifyBufferOrEvent(union, false, 7); - SingleInputGateTest.verifyBufferOrEvent(union, false, 4); - SingleInputGateTest.verifyBufferOrEvent(union, false, 3); + ig1.notifyChannelNonEmpty(inputChannels[0][0].getInputChannel()); + ig1.notifyChannelNonEmpty(inputChannels[0][1].getInputChannel()); + ig1.notifyChannelNonEmpty(inputChannels[0][2].getInputChannel()); + + ig2.notifyChannelNonEmpty(inputChannels[1][0].getInputChannel()); + ig2.notifyChannelNonEmpty(inputChannels[1][1].getInputChannel()); + ig2.notifyChannelNonEmpty(inputChannels[1][2].getInputChannel()); + ig2.notifyChannelNonEmpty(inputChannels[1][3].getInputChannel()); + ig2.notifyChannelNonEmpty(inputChannels[1][4].getInputChannel()); + + SingleInputGateTest.verifyBufferOrEvent(union, true, 0); // gate 1, channel 0 + SingleInputGateTest.verifyBufferOrEvent(union, true, 3); // gate 2, channel 0 + SingleInputGateTest.verifyBufferOrEvent(union, true, 1); // gate 1, channel 1 + SingleInputGateTest.verifyBufferOrEvent(union, true, 4); // gate 2, channel 1 + SingleInputGateTest.verifyBufferOrEvent(union, true, 2); // gate 1, channel 2 + SingleInputGateTest.verifyBufferOrEvent(union, true, 5); // gate 2, channel 1 + SingleInputGateTest.verifyBufferOrEvent(union, false, 0); // gate 1, channel 0 + SingleInputGateTest.verifyBufferOrEvent(union, true, 6); // gate 2, channel 1 + SingleInputGateTest.verifyBufferOrEvent(union, false, 1); // gate 1, channel 1 + SingleInputGateTest.verifyBufferOrEvent(union, true, 7); // gate 2, channel 1 + SingleInputGateTest.verifyBufferOrEvent(union, false, 2); // gate 1, channel 2 + SingleInputGateTest.verifyBufferOrEvent(union, false, 3); // gate 2, channel 0 + SingleInputGateTest.verifyBufferOrEvent(union, false, 4); // gate 2, channel 1 + SingleInputGateTest.verifyBufferOrEvent(union, false, 5); // gate 2, channel 2 + SingleInputGateTest.verifyBufferOrEvent(union, false, 6); // gate 2, channel 3 + SingleInputGateTest.verifyBufferOrEvent(union, false, 7); // gate 2, channel 4 // Return null when the input gate has received all end-of-partition events assertTrue(union.isFinished()); http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java index 1b51805..676a304 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java @@ -22,26 +22,32 @@ import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import java.util.Random; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicLong; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A test subpartition view consumer. + * A test subpartition viewQueue consumer. * * <p> The behaviour of the consumer is customizable by specifying a callback. * * @see TestConsumerCallback */ -public class TestSubpartitionConsumer implements Callable<Boolean> { +public class TestSubpartitionConsumer implements Callable<Boolean>, BufferAvailabilityListener { private static final int MAX_SLEEP_TIME_MS = 20; - /** The subpartition view to consume. */ - private final ResultSubpartitionView subpartitionView; + /** The subpartition viewQueue to consume. */ + private volatile ResultSubpartitionView subpartitionView; + + private BlockingQueue<ResultSubpartitionView> viewQueue = new ArrayBlockingQueue<>(1); /** * Flag indicating whether the consumer is slow. If true, the consumer will sleep a random @@ -49,33 +55,43 @@ public class TestSubpartitionConsumer implements Callable<Boolean> { */ private final boolean isSlowConsumer; - /** The callback to handle a read buffer. */ + /** The callback to handle a notifyNonEmpty buffer. */ private final TestConsumerCallback callback; /** Random source for sleeps. */ private final Random random; + private final AtomicLong numBuffersAvailable = new AtomicLong(); + public TestSubpartitionConsumer( - ResultSubpartitionView subpartitionView, - boolean isSlowConsumer, - TestConsumerCallback callback) { + boolean isSlowConsumer, + TestConsumerCallback callback) { - this.subpartitionView = checkNotNull(subpartitionView); this.isSlowConsumer = isSlowConsumer; this.random = isSlowConsumer ? new Random() : null; this.callback = checkNotNull(callback); } + public void setSubpartitionView(ResultSubpartitionView subpartitionView) { + this.subpartitionView = checkNotNull(subpartitionView); + } + @Override public Boolean call() throws Exception { - final TestNotificationListener listener = new TestNotificationListener(); - try { while (true) { if (Thread.interrupted()) { throw new InterruptedException(); } + if (numBuffersAvailable.get() == 0) { + synchronized (numBuffersAvailable) { + while (numBuffersAvailable.get() == 0) { + numBuffersAvailable.wait(); + } + } + } + final Buffer buffer = subpartitionView.getNextBuffer(); if (isSlowConsumer) { @@ -83,12 +99,13 @@ public class TestSubpartitionConsumer implements Callable<Boolean> { } if (buffer != null) { + numBuffersAvailable.decrementAndGet(); + if (buffer.isBuffer()) { callback.onBuffer(buffer); - } - else { + } else { final AbstractEvent event = EventSerializer.fromBuffer(buffer, - getClass().getClassLoader()); + getClass().getClassLoader()); callback.onEvent(event); @@ -100,22 +117,22 @@ public class TestSubpartitionConsumer implements Callable<Boolean> { return true; } } - } - else { - int current = listener.getNumberOfNotifications(); - - if (subpartitionView.registerListener(listener)) { - listener.waitForNotification(current); - } - else if (subpartitionView.isReleased()) { - return true; - } + } else if (subpartitionView.isReleased()) { + return true; } } - } - finally { + } finally { subpartitionView.releaseAllResources(); } } + @Override + public void notifyBuffersAvailable(long numBuffers) { + if (numBuffers > 0 && numBuffersAvailable.getAndAdd(numBuffers) == 0) { + synchronized (numBuffersAvailable) { + numBuffersAvailable.notifyAll(); + } + ; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java index a41c25b..67071f9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java @@ -34,7 +34,6 @@ import org.apache.flink.types.Record; import org.junit.After; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.core.classloader.annotations.PowerMockIgnore; @@ -139,7 +138,6 @@ public class DataSinkTaskTest extends TaskTestBase { @Test public void testUnionDataSinkTask() { - int keyCnt = 10; int valCnt = 20; @@ -157,9 +155,10 @@ public class DataSinkTaskTest extends TaskTestBase { try { // For the union reader to work, we need to start notifications *after* the union reader - // has been initialized. + // has been initialized. This is accomplished via a mockito hack in TestSingleInputGate, + // which checks forwards existing notifications on registerListener calls. for (IteratorWrappingTestSingleInputGate<?> reader : readers) { - reader.read(); + reader.notifyNonEmpty(); } testTask.invoke(); http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java index 02c420c..fb8ed68 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java @@ -53,7 +53,6 @@ import org.powermock.modules.junit4.PowerMockRunner; @RunWith(PowerMockRunner.class) @PrepareForTest({Task.class, ResultPartitionWriter.class}) @PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"}) - public class ChainTaskTest extends TaskTestBase { private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3; http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index 22dee63..05c4814 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -64,9 +64,9 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class MockEnvironment implements Environment { - + private final TaskInfo taskInfo; - + private final ExecutionConfig executionConfig; private final MemoryManager memManager; @@ -158,7 +158,7 @@ public class MockEnvironment implements Environment { } if (result == RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER - || result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) { + || result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) { break; } } @@ -208,9 +208,9 @@ public class MockEnvironment implements Environment { @Override public TaskManagerRuntimeInfo getTaskManagerInfo() { return new TaskManagerRuntimeInfo( - "localhost", - new UnmodifiableConfiguration(new Configuration()), - System.getProperty("java.io.tmpdir")); + "localhost", + new UnmodifiableConfiguration(new Configuration()), + System.getProperty("java.io.tmpdir")); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java index eaf44db..53d75b3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java @@ -67,7 +67,7 @@ public abstract class TaskTestBase extends TestLogger { conf.setInputSerializer(RecordSerializerFactory.get(), groupId); if (read) { - reader.read(); + reader.notifyNonEmpty(); } return reader; http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java index a093233..876e908 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.taskmanager; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.instance.ActorGateway; @@ -43,6 +42,7 @@ import scala.concurrent.Future; import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; +import java.util.Arrays; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; @@ -129,18 +129,17 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger { } // Verify that async producer is in blocking request - assertTrue("Producer thread is not blocked.", producerBlocked); + assertTrue("Producer thread is not blocked: " + Arrays.toString(ASYNC_CONSUMER_THREAD.getStackTrace()), producerBlocked); - boolean consumerBlocked = false; + boolean consumerWaiting = false; for (int i = 0; i < 50; i++) { Thread thread = ASYNC_CONSUMER_THREAD; if (thread != null && thread.isAlive()) { - StackTraceElement[] stackTrace = thread.getStackTrace(); - consumerBlocked = isInBlockingQueuePoll(stackTrace); + consumerWaiting = thread.getState() == Thread.State.WAITING; } - if (consumerBlocked) { + if (consumerWaiting) { break; } else { // Retry @@ -149,7 +148,7 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger { } // Verify that async consumer is in blocking request - assertTrue("Consumer thread is not blocked.", consumerBlocked); + assertTrue("Consumer thread is not blocked.", consumerWaiting); msg = new CancelJob(jobGraph.getJobID()); Future<?> cancelFuture = jobManager.ask(msg, deadline.timeLeft()); @@ -186,27 +185,6 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger { } /** - * Returns whether the stack trace represents a Thread in a blocking queue - * poll call. - * - * @param stackTrace Stack trace of the Thread to check - * - * @return Flag indicating whether the Thread is in a blocking queue poll - * call. - */ - private boolean isInBlockingQueuePoll(StackTraceElement[] stackTrace) { - for (StackTraceElement elem : stackTrace) { - if (elem.getMethodName().equals("poll") && - elem.getClassName().equals("java.util.concurrent.LinkedBlockingQueue")) { - - return true; - } - } - - return false; - } - - /** * Invokable emitting records in a separate Thread (not the main Task * thread). */ http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java index 1187fe6..1b31c2c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; @@ -62,9 +63,9 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate { @SuppressWarnings("unchecked") public StreamTestSingleInputGate( - int numInputChannels, - int bufferSize, - TypeSerializer<T> serializer) throws IOException, InterruptedException { + int numInputChannels, + int bufferSize, + TypeSerializer<T> serializer) throws IOException, InterruptedException { super(numInputChannels, false); this.bufferSize = bufferSize; @@ -86,39 +87,36 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate { final int channelIndex = i; final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>(); final SerializationDelegate<Object> delegate = (SerializationDelegate<Object>) (SerializationDelegate<?>) - new SerializationDelegate<StreamElement>(new MultiplexingStreamRecordSerializer<T>(serializer)); + new SerializationDelegate<StreamElement>(new MultiplexingStreamRecordSerializer<T>(serializer)); inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>(); inputChannels[channelIndex] = new TestInputChannel(inputGate, i); - final Answer<Buffer> answer = new Answer<Buffer>() { + final Answer<BufferAndAvailability> answer = new Answer<BufferAndAvailability>() { @Override - public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable { + public BufferAndAvailability answer(InvocationOnMock invocationOnMock) throws Throwable { InputValue<Object> input = inputQueues[channelIndex].poll(); if (input != null && input.isStreamEnd()) { when(inputChannels[channelIndex].getInputChannel().isReleased()).thenReturn( - true); - return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE); - } - else if (input != null && input.isStreamRecord()) { + true); + return new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false); + } else if (input != null && input.isStreamRecord()) { Object inputElement = input.getStreamRecord(); final Buffer buffer = new Buffer( - MemorySegmentFactory.allocateUnpooledSegment(bufferSize), - mock(BufferRecycler.class)); - + MemorySegmentFactory.allocateUnpooledSegment(bufferSize), + mock(BufferRecycler.class)); + recordSerializer.setNextBuffer(buffer); delegate.setInstance(inputElement); recordSerializer.addRecord(delegate); // Call getCurrentBuffer to ensure size is set - return recordSerializer.getCurrentBuffer(); - } - else if (input != null && input.isEvent()) { + return new BufferAndAvailability(recordSerializer.getCurrentBuffer(), false); + } else if (input != null && input.isEvent()) { AbstractEvent event = input.getEvent(); - return EventSerializer.toBuffer(event); - } - else { + return new BufferAndAvailability(EventSerializer.toBuffer(event), false); + } else { synchronized (inputQueues[channelIndex]) { inputQueues[channelIndex].wait(); return answer(invocationOnMock); @@ -130,7 +128,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate { when(inputChannels[channelIndex].getInputChannel().getNextBuffer()).thenAnswer(answer); inputGate.setInputChannel(new IntermediateResultPartitionID(), - inputChannels[channelIndex].getInputChannel()); + inputChannels[channelIndex].getInputChannel()); } } @@ -139,7 +137,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate { inputQueues[channel].add(InputValue.element(element)); inputQueues[channel].notifyAll(); } - inputGate.onAvailableBuffer(inputChannels[channel].getInputChannel()); + inputGate.notifyChannelNonEmpty(inputChannels[channel].getInputChannel()); } public void sendEvent(AbstractEvent event, int channel) { @@ -147,7 +145,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate { inputQueues[channel].add(InputValue.event(event)); inputQueues[channel].notifyAll(); } - inputGate.onAvailableBuffer(inputChannels[channel].getInputChannel()); + inputGate.notifyChannelNonEmpty(inputChannels[channel].getInputChannel()); } public void endInput() { @@ -156,7 +154,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate { inputQueues[i].add(InputValue.streamEnd()); inputQueues[i].notifyAll(); } - inputGate.onAvailableBuffer(inputChannels[i].getInputChannel()); + inputGate.notifyChannelNonEmpty(inputChannels[i].getInputChannel()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java index a8a989b..0cf866a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java @@ -17,25 +17,24 @@ package org.apache.flink.streaming.runtime.io; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.util.Random; - import org.apache.flink.core.memory.MemoryType; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.apache.flink.runtime.util.event.EventListener; -import org.apache.flink.runtime.io.network.api.CheckpointBarrier; - +import org.apache.flink.runtime.io.network.partition.consumer.InputGateListener; import org.junit.Test; +import java.io.IOException; +import java.util.Random; + +import static org.junit.Assert.fail; + /** * The test generates two random streams (input channels) which independently * and randomly generate checkpoint barriers. The two streams are very @@ -165,7 +164,7 @@ public class BarrierBufferMassiveRandomTest { public void sendTaskEvent(TaskEvent event) {} @Override - public void registerListener(EventListener<InputGate> listener) {} + public void registerListener(InputGateListener listener) {} @Override public int getPageSize() { http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java index cb8a058..3e2a75a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java @@ -22,7 +22,7 @@ import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.apache.flink.runtime.util.event.EventListener; +import org.apache.flink.runtime.io.network.partition.consumer.InputGateListener; import java.util.ArrayDeque; import java.util.List; @@ -31,16 +31,15 @@ import java.util.Queue; public class MockInputGate implements InputGate { private final int pageSize; - + private final int numChannels; - + private final Queue<BufferOrEvent> boes; private final boolean[] closed; - + private int closedChannels; - public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> boes) { this.pageSize = pageSize; this.numChannels = numChannels; @@ -52,7 +51,7 @@ public class MockInputGate implements InputGate { public int getPageSize() { return pageSize; } - + @Override public int getNumberOfInputChannels() { return numChannels; @@ -69,11 +68,11 @@ public class MockInputGate implements InputGate { if (next == null) { return null; } - + int channelIdx = next.getChannelIndex(); if (closed[channelIdx]) { throw new RuntimeException("Inconsistent: Channel " + channelIdx - + " has data even though it is already closed."); + + " has data even though it is already closed."); } if (next.isEvent() && next.getEvent() instanceof EndOfPartitionEvent) { closed[channelIdx] = true; @@ -83,12 +82,15 @@ public class MockInputGate implements InputGate { } @Override - public void requestPartitions() {} + public void requestPartitions() { + } @Override - public void sendTaskEvent(TaskEvent event) {} + public void sendTaskEvent(TaskEvent event) { + } @Override - public void registerListener(EventListener<InputGate> listener) {} - -} \ No newline at end of file + public void registerListener(InputGateListener listener) { + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java index 145edc2..25ac356 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java @@ -45,7 +45,7 @@ import java.io.IOException; * * <p> * When Elements or Events are offered to the Task they are put into a queue. The input gates - * of the Task read from this queue. Use {@link #waitForInputProcessing()} to wait until all + * of the Task notifyNonEmpty from this queue. Use {@link #waitForInputProcessing()} to wait until all * queues are empty. This must be used after entering some elements before checking the * desired output. * @@ -62,11 +62,13 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes * Creates a test harness with the specified number of input gates and specified number * of channels per input gate. */ - public OneInputStreamTaskTestHarness(OneInputStreamTask<IN, OUT> task, - int numInputGates, - int numInputChannelsPerGate, - TypeInformation<IN> inputType, - TypeInformation<OUT> outputType) { + public OneInputStreamTaskTestHarness( + OneInputStreamTask<IN, OUT> task, + int numInputGates, + int numInputChannelsPerGate, + TypeInformation<IN> inputType, + TypeInformation<OUT> outputType) { + super(task, outputType); this.inputType = inputType; @@ -79,9 +81,10 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes /** * Creates a test harness with one input gate that has one input channel. */ - public OneInputStreamTaskTestHarness(OneInputStreamTask<IN, OUT> task, - TypeInformation<IN> inputType, - TypeInformation<OUT> outputType) { + public OneInputStreamTaskTestHarness( + OneInputStreamTask<IN, OUT> task, + TypeInformation<IN> inputType, + TypeInformation<OUT> outputType) { this(task, 1, 1, inputType, outputType); } @@ -91,9 +94,9 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes for (int i = 0; i < numInputGates; i++) { inputGates[i] = new StreamTestSingleInputGate<IN>( - numInputChannelsPerGate, - bufferSize, - inputSerializer); + numInputChannelsPerGate, + bufferSize, + inputSerializer); this.mockEnv.addInputGate(inputGates[i].getInputGate()); } http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index 36ad8ff..94a1bcb 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -112,8 +112,10 @@ public class StreamMockEnvironment implements Environment { this.accumulatorRegistry = new AccumulatorRegistry(jobID, getExecutionId()); } - public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, long memorySize, - MockInputSplitProvider inputSplitProvider, int bufferSize) { + public StreamMockEnvironment( + Configuration jobConfig, Configuration taskConfig, long memorySize, + MockInputSplitProvider inputSplitProvider, int bufferSize) { + this(jobConfig, taskConfig, null, memorySize, inputSplitProvider, bufferSize); } http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java index 8b8b659..6845548 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java @@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.api.operators.co.CoStreamMap; - import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.core.classloader.annotations.PowerMockIgnore; @@ -38,7 +37,6 @@ import org.powermock.modules.junit4.PowerMockRunner; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; - import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.any; import static org.mockito.Mockito.eq; @@ -218,4 +216,4 @@ public class StreamTaskCancellationBarrierTest { return value; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index 0bd8d9a..aaac3f8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -91,7 +91,7 @@ public class StreamTaskTestHarness<OUT> { // input related methods only need to be implemented once, in generic form protected int numInputGates; protected int numInputChannelsPerGate; - + @SuppressWarnings("rawtypes") protected StreamTestSingleInputGate[] inputGates; @@ -154,7 +154,7 @@ public class StreamTaskTestHarness<OUT> { return new StreamMockEnvironment( jobConfig, taskConfig, executionConfig, memorySize, new MockInputSplitProvider(), bufferSize); } - + /** * Invoke the Task. This resets the output of any previous invocation. This will start a new * Thread to execute the Task in. Use {@link #waitForTaskCompletion()} to wait for the @@ -297,7 +297,7 @@ public class StreamTaskTestHarness<OUT> { try { Thread.sleep(10); } catch (InterruptedException ignored) {} - + if (allEmpty) { break; } @@ -305,7 +305,7 @@ public class StreamTaskTestHarness<OUT> { // then wait for the Task Thread to be in a blocked state // Check whether the state is blocked, this should be the case if it cannot - // read more input, i.e. all currently available input has been processed. + // notifyNonEmpty more input, i.e. all currently available input has been processed. while (true) { Thread.State state = taskThread.getState(); if (state == Thread.State.BLOCKED || state == Thread.State.TERMINATED || @@ -328,13 +328,13 @@ public class StreamTaskTestHarness<OUT> { inputGates[i].endInput(); } } - + // ------------------------------------------------------------------------ - + private class TaskThread extends Thread { - + private final AbstractInvokable task; - + private volatile Throwable error; TaskThread(AbstractInvokable task) { http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java index 92f8553..09522cd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java @@ -206,6 +206,8 @@ public class TwoInputStreamTaskTest { testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 1); expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime)); + testHarness.waitForInputProcessing(); + // These elements should be forwarded, since we did not yet receive a checkpoint barrier // on that input, only add to same input, otherwise we would not know the ordering // of the output since the Task might read the inputs in any order @@ -217,8 +219,8 @@ public class TwoInputStreamTaskTest { testHarness.waitForInputProcessing(); // we should not yet see the barrier, only the two elements from non-blocked input TestHarnessUtil.assertOutputEquals("Output was not correct.", - testHarness.getOutput(), - expectedOutput); + expectedOutput, + testHarness.getOutput()); testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1); testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0); http://git-wip-us.apache.org/repos/asf/flink/blob/8d97eaaf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java index 0e7565e..9c2284f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java @@ -128,7 +128,6 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTest bufferSize, inputSerializer1); - StreamEdge streamEdge = new StreamEdge(sourceVertexDummy, targetVertexDummy, 1,
