Repository: flink Updated Branches: refs/heads/master 4c71e90e4 -> 8b3cbb525
[runtime] Return null at input gate after all end-of-partition events Previously, the input gate user needed to be aware whether it is safe to query the input gate for more data or not. If a user mistakenly queried an input gate for data, the blocking call never returned. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b3cbb52 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b3cbb52 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b3cbb52 Branch: refs/heads/master Commit: 8b3cbb52527f77ed43f3035c88c5750abef2c5de Parents: 4c71e90 Author: Ufuk Celebi <[email protected]> Authored: Tue Mar 24 11:13:26 2015 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Tue Mar 24 13:47:26 2015 +0100 ---------------------------------------------------------------------- .../partition/consumer/SingleInputGate.java | 18 ++++++- .../partition/consumer/UnionInputGate.java | 31 +++++++++-- .../partition/consumer/SingleInputGateTest.java | 56 ++++++++++++++++++++ .../partition/consumer/UnionInputGateTest.java | 51 ++++++++++++++---- .../io/network/util/TestInputChannel.java | 5 ++ 5 files changed, 144 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8b3cbb52/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 5b97d26..867a4b9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.BitSet; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -124,12 +125,16 @@ public class SingleInputGate implements InputGate { /** Channels, which notified this input gate about available data. */ private final BlockingQueue<InputChannel> inputChannelsWithData = new LinkedBlockingQueue<InputChannel>(); + private final BitSet channelsWithEndOfPartitionEvents; + /** * Buffer pool for incoming buffers. Incoming data from remote channels is copied to buffers * from this pool. */ private BufferPool bufferPool; + private boolean hasReceivedAllEndOfPartitionEvents; + /** Flag indicating whether partitions have been requested. */ private boolean requestedPartitionsFlag; @@ -153,6 +158,7 @@ public class SingleInputGate implements InputGate { this.numberOfInputChannels = numberOfInputChannels; this.inputChannels = Maps.newHashMapWithExpectedSize(numberOfInputChannels); + this.channelsWithEndOfPartitionEvents = new BitSet(numberOfInputChannels); } // ------------------------------------------------------------------------ @@ -311,8 +317,12 @@ public class SingleInputGate implements InputGate { @Override public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException { + if (hasReceivedAllEndOfPartitionEvents) { + return null; + } + if (isReleased) { - throw new IllegalStateException("The input has already been consumed. This indicates misuse of the input gate."); + throw new IllegalStateException("Already released."); } requestPartitions(); @@ -337,6 +347,12 @@ public class SingleInputGate implements InputGate { final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader()); if (event.getClass() == EndOfPartitionEvent.class) { + channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex()); + + if (channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) { + hasReceivedAllEndOfPartitionEvents = true; + } + currentChannel.notifySubpartitionConsumed(); currentChannel.releaseAllResources(); http://git-wip-us.apache.org/repos/asf/flink/blob/8b3cbb52/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index 5a7a5b0..1f974de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -19,12 +19,15 @@ package org.apache.flink.runtime.io.network.partition.consumer; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.flink.runtime.event.task.TaskEvent; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.util.event.EventListener; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingQueue; @@ -34,8 +37,8 @@ import static com.google.common.base.Preconditions.checkNotNull; /** * Input gate wrapper to union the input from multiple input gates. - * <p> - * Each input gate has input channels attached from which it reads data. At each input gate, the + * + * <p> Each input gate has input channels attached from which it reads data. At each input gate, the * input channels have unique IDs from 0 (inclusive) to the number of input channels (exclusive). * * <pre> @@ -65,6 +68,8 @@ public class UnionInputGate implements InputGate { /** The input gates to union. */ private final InputGate[] inputGates; + private final Set<InputGate> inputGatesWithRemainingData; + /** Data availability listener across all unioned input gates. */ private final InputGateListener inputGateListener; @@ -85,12 +90,14 @@ public class UnionInputGate implements InputGate { checkArgument(inputGates.length > 1, "Union input gate should union at least two input gates."); this.inputGateToIndexOffsetMap = Maps.newHashMapWithExpectedSize(inputGates.length); + this.inputGatesWithRemainingData = Sets.newHashSetWithExpectedSize(inputGates.length); int currentNumberOfInputChannels = 0; for (InputGate inputGate : inputGates) { // The offset to use for buffer or event instances received from this input gate. inputGateToIndexOffsetMap.put(checkNotNull(inputGate), currentNumberOfInputChannels); + inputGatesWithRemainingData.add(inputGate); currentNumberOfInputChannels += inputGate.getNumberOfInputChannels(); } @@ -133,6 +140,10 @@ public class UnionInputGate implements InputGate { @Override public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException { + if (inputGatesWithRemainingData.isEmpty()) { + return null; + } + // Make sure to request the partitions, if they have not been requested before. requestPartitions(); @@ -140,6 +151,16 @@ public class UnionInputGate implements InputGate { final BufferOrEvent bufferOrEvent = inputGate.getNextBufferOrEvent(); + if (bufferOrEvent.isEvent() + && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class + && inputGate.isFinished()) { + + if (!inputGatesWithRemainingData.remove(inputGate)) { + throw new IllegalStateException("Couldn't find input gate in set of remaining " + + "input gates."); + } + } + // Set the channel index to identify the input channel (across all unioned input gates) final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate); @@ -163,9 +184,9 @@ public class UnionInputGate implements InputGate { /** * Data availability listener at all unioned input gates. - * <p> - * The listener registers itself at each input gate and is notified for *each incoming buffer* - * at one of the unioned input gates. + * + * <p> The listener registers itself at each input gate and is notified for *each incoming + * buffer* at one of the unioned input gates. */ private static class InputGateListener implements EventListener<InputGate> { http://git-wip-us.apache.org/repos/asf/flink/blob/8b3cbb52/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 6cd5469..e1e3cff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -32,11 +32,17 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; +import org.apache.flink.runtime.io.network.util.TestInputChannel; import org.apache.flink.runtime.io.network.util.TestTaskEvent; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.junit.Test; +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.mock; @@ -46,6 +52,44 @@ import static org.mockito.Mockito.when; public class SingleInputGateTest { + /** + * Tests basic correctness of buffer-or-event interleaving and correct <code>null</code> return + * value after receiving all end-of-partition events. + */ + @Test(timeout = 120 * 1000) + public void testBasicGetNextLogic() throws Exception { + // Setup + final SingleInputGate inputGate = new SingleInputGate(new IntermediateDataSetID(), 0, 2); + + final TestInputChannel[] inputChannels = new TestInputChannel[]{ + new TestInputChannel(inputGate, 0), + new TestInputChannel(inputGate, 1) + }; + + inputGate.setInputChannel( + new IntermediateResultPartitionID(), inputChannels[0].getInputChannel()); + + inputGate.setInputChannel( + new IntermediateResultPartitionID(), inputChannels[1].getInputChannel()); + + // Test + inputChannels[0].readBuffer(); + inputChannels[0].readBuffer(); + inputChannels[1].readBuffer(); + inputChannels[1].readEndOfPartitionEvent(); + inputChannels[0].readEndOfPartitionEvent(); + + verifyBufferOrEvent(inputGate, true, 0); + verifyBufferOrEvent(inputGate, true, 0); + verifyBufferOrEvent(inputGate, true, 1); + verifyBufferOrEvent(inputGate, false, 1); + verifyBufferOrEvent(inputGate, false, 0); + + // Return null when the input gate has received all end-of-partition events + assertTrue(inputGate.isFinished()); + assertNull(inputGate.getNextBufferOrEvent()); + } + @Test @SuppressWarnings("unchecked") public void testBackwardsEventWithUninitializedChannel() throws Exception { @@ -101,4 +145,16 @@ public class SingleInputGateTest { verify(partitionManager, times(2)).getSubpartition(any(ResultPartitionID.class), anyInt(), any(Optional.class)); verify(taskEventDispatcher, times(2)).publish(any(ResultPartitionID.class), any(TaskEvent.class)); } + + // --------------------------------------------------------------------------------------------- + + static void verifyBufferOrEvent( + InputGate inputGate, + boolean isBuffer, + int channelIndex) throws IOException, InterruptedException { + + final BufferOrEvent boe = inputGate.getNextBufferOrEvent(); + assertEquals(isBuffer, boe.isBuffer()); + assertEquals(channelIndex, boe.getChannelIndex()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8b3cbb52/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java index 131c4f6..c7cb413 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java @@ -23,12 +23,21 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class UnionInputGateTest { - @Test - public void testChannelMapping() throws Exception { - + /** + * Tests basic correctness of buffer-or-event interleaving and correct <code>null</code> return + * value after receiving all end-of-partition events. + * + * <p> For buffer-or-event instances, it is important to verify that they have been set off to + * the correct logical index. + */ + @Test(timeout = 120 * 1000) + public void testBasicGetNextLogic() throws Exception { + // Setup final SingleInputGate ig1 = new SingleInputGate(new IntermediateDataSetID(), 0, 3); final SingleInputGate ig2 = new SingleInputGate(new IntermediateDataSetID(), 0, 5); @@ -42,21 +51,41 @@ public class UnionInputGateTest { }; inputChannels[0][0].readBuffer(); // 0 => 0 + inputChannels[0][0].readEndOfPartitionEvent(); // 0 => 0 inputChannels[1][2].readBuffer(); // 2 => 5 + inputChannels[1][2].readEndOfPartitionEvent(); // 2 => 5 inputChannels[1][0].readBuffer(); // 0 => 3 inputChannels[1][1].readBuffer(); // 1 => 4 inputChannels[0][1].readBuffer(); // 1 => 1 inputChannels[1][3].readBuffer(); // 3 => 6 + inputChannels[0][1].readEndOfPartitionEvent(); // 1 => 1 + inputChannels[1][3].readEndOfPartitionEvent(); // 3 => 6 inputChannels[0][2].readBuffer(); // 1 => 2 + inputChannels[0][2].readEndOfPartitionEvent(); // 1 => 2 inputChannels[1][4].readBuffer(); // 4 => 7 + inputChannels[1][4].readEndOfPartitionEvent(); // 4 => 7 + inputChannels[1][1].readEndOfPartitionEvent(); // 0 => 3 + inputChannels[1][0].readEndOfPartitionEvent(); // 0 => 3 + + SingleInputGateTest.verifyBufferOrEvent(union, true, 0); + SingleInputGateTest.verifyBufferOrEvent(union, false, 0); + SingleInputGateTest.verifyBufferOrEvent(union, true, 5); + SingleInputGateTest.verifyBufferOrEvent(union, false, 5); + SingleInputGateTest.verifyBufferOrEvent(union, true, 3); + SingleInputGateTest.verifyBufferOrEvent(union, true, 4); + SingleInputGateTest.verifyBufferOrEvent(union, true, 1); + SingleInputGateTest.verifyBufferOrEvent(union, true, 6); + SingleInputGateTest.verifyBufferOrEvent(union, false, 1); + SingleInputGateTest.verifyBufferOrEvent(union, false, 6); + SingleInputGateTest.verifyBufferOrEvent(union, true, 2); + SingleInputGateTest.verifyBufferOrEvent(union, false, 2); + SingleInputGateTest.verifyBufferOrEvent(union, true, 7); + SingleInputGateTest.verifyBufferOrEvent(union, false, 7); + SingleInputGateTest.verifyBufferOrEvent(union, false, 4); + SingleInputGateTest.verifyBufferOrEvent(union, false, 3); - assertEquals(0, union.getNextBufferOrEvent().getChannelIndex()); - assertEquals(5, union.getNextBufferOrEvent().getChannelIndex()); - assertEquals(3, union.getNextBufferOrEvent().getChannelIndex()); - assertEquals(4, union.getNextBufferOrEvent().getChannelIndex()); - assertEquals(1, union.getNextBufferOrEvent().getChannelIndex()); - assertEquals(6, union.getNextBufferOrEvent().getChannelIndex()); - assertEquals(2, union.getNextBufferOrEvent().getChannelIndex()); - assertEquals(7, union.getNextBufferOrEvent().getChannelIndex()); + // Return null when the input gate has received all end-of-partition events + assertTrue(union.isFinished()); + assertNull(union.getNextBufferOrEvent()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8b3cbb52/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java index 306de4c..0e9e8e7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java @@ -113,6 +113,11 @@ public class TestInputChannel { // ------------------------------------------------------------------------ + /** + * Creates test input channels and attaches them to the specified input gate. + * + * @return The created test input channels. + */ public static TestInputChannel[] createInputChannels(SingleInputGate inputGate, int numberOfInputChannels) { checkNotNull(inputGate); checkArgument(numberOfInputChannels > 0);
