[FLINK-8589][runtime] Add polling method to InputGate This is a preparation for changes in data notifications, which will not be that strict as they are now.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98bd689a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98bd689a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98bd689a Branch: refs/heads/master Commit: 98bd689a2565ec5cf344541f37cd0b0db691c08f Parents: 1310c72 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Thu Feb 1 14:13:42 2018 +0100 Committer: Piotr Nowojski <piotr.nowoj...@gmail.com> Committed: Mon Feb 19 12:21:40 2018 +0100 ---------------------------------------------------------------------- .../api/reader/AbstractRecordReader.java | 2 +- .../network/partition/consumer/InputGate.java | 15 ++++- .../partition/consumer/SingleInputGate.java | 25 ++++++-- .../partition/consumer/UnionInputGate.java | 53 ++++++++++++---- .../partition/InputGateFairnessTest.java | 11 ++-- .../PartialConsumePipelinedResultTest.java | 2 +- .../consumer/LocalInputChannelTest.java | 13 ++-- .../partition/consumer/SingleInputGateTest.java | 11 ++-- .../partition/consumer/UnionInputGateTest.java | 5 +- .../streaming/runtime/io/BarrierBuffer.java | 66 +++++++++++--------- .../streaming/runtime/io/BarrierTracker.java | 22 ++++--- .../io/BarrierBufferMassiveRandomTest.java | 20 ++++-- .../streaming/runtime/io/MockInputGate.java | 22 ++++--- 13 files changed, 176 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java index e3c8484..9cfc729 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java @@ -83,7 +83,7 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra } } - final BufferOrEvent bufferOrEvent = inputGate.getNextBufferOrEvent(); + final BufferOrEvent bufferOrEvent = inputGate.getNextBufferOrEvent().orElseThrow(IllegalStateException::new); if (bufferOrEvent.isBuffer()) { currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()]; http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java index 1f2182e..0413caa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.runtime.event.TaskEvent; import java.io.IOException; +import java.util.Optional; /** * An input gate consumes one or more partitions of a single produced intermediate result. @@ -72,7 +73,19 @@ public interface InputGate { void requestPartitions() throws IOException, InterruptedException; - BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException; + /** + * Blocking call waiting for next {@link BufferOrEvent}. + * + * @return {@code Optional.empty()} if {@link #isFinished()} returns true. + */ + Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException; + + /** + * Poll the {@link BufferOrEvent}. + * + * @return {@code Optional.empty()} if there is no data to return or if {@link #isFinished()} returns true. + */ + Optional<BufferOrEvent> pollNextBufferOrEvent() throws IOException, InterruptedException; void sendTaskEvent(TaskEvent event) throws IOException; http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 1175c52..337b3c2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -52,6 +52,7 @@ import java.util.BitSet; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Timer; import static org.apache.flink.util.Preconditions.checkArgument; @@ -486,9 +487,18 @@ public class SingleInputGate implements InputGate { // ------------------------------------------------------------------------ @Override - public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException { + public Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException { + return getNextBufferOrEvent(true); + } + + @Override + public Optional<BufferOrEvent> pollNextBufferOrEvent() throws IOException, InterruptedException { + return getNextBufferOrEvent(false); + } + + private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException { if (hasReceivedAllEndOfPartitionEvents) { - return null; + return Optional.empty(); } if (isReleased) { @@ -505,7 +515,12 @@ public class SingleInputGate implements InputGate { throw new IllegalStateException("Released"); } - inputChannelsWithData.wait(); + if (blocking) { + inputChannelsWithData.wait(); + } + else { + return Optional.empty(); + } } currentChannel = inputChannelsWithData.remove(); @@ -528,7 +543,7 @@ public class SingleInputGate implements InputGate { final Buffer buffer = result.buffer(); if (buffer.isBuffer()) { - return new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable); + return Optional.of(new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable)); } else { final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader()); @@ -545,7 +560,7 @@ public class SingleInputGate implements InputGate { currentChannel.releaseAllResources(); } - return new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable); + return Optional.of(new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index 87443d2..14c04bc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -27,6 +27,7 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; import java.io.IOException; import java.util.ArrayDeque; import java.util.Map; +import java.util.Optional; import java.util.Set; import static org.apache.flink.util.Preconditions.checkArgument; @@ -139,24 +140,17 @@ public class UnionInputGate implements InputGate, InputGateListener { } @Override - public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException { + public Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException { if (inputGatesWithRemainingData.isEmpty()) { - return null; + return Optional.empty(); } // Make sure to request the partitions, if they have not been requested before. requestPartitions(); - final InputGate inputGate; - synchronized (inputGatesWithData) { - while (inputGatesWithData.size() == 0) { - inputGatesWithData.wait(); - } - - inputGate = inputGatesWithData.remove(); - } - - final BufferOrEvent bufferOrEvent = inputGate.getNextBufferOrEvent(); + InputGateWithData inputGateWithData = waitAndGetNextInputGate(); + InputGate inputGate = inputGateWithData.inputGate; + BufferOrEvent bufferOrEvent = inputGateWithData.bufferOrEvent; if (bufferOrEvent.moreAvailable()) { // this buffer or event was now removed from the non-empty gates queue @@ -180,7 +174,40 @@ public class UnionInputGate implements InputGate, InputGateListener { bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex()); - return bufferOrEvent; + return Optional.ofNullable(bufferOrEvent); + } + + @Override + public Optional<BufferOrEvent> pollNextBufferOrEvent() throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + private InputGateWithData waitAndGetNextInputGate() throws IOException, InterruptedException { + while (true) { + InputGate inputGate; + synchronized (inputGatesWithData) { + while (inputGatesWithData.size() == 0) { + inputGatesWithData.wait(); + } + inputGate = inputGatesWithData.remove(); + } + + // In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data. + Optional<BufferOrEvent> bufferOrEvent = inputGate.pollNextBufferOrEvent(); + if (bufferOrEvent.isPresent()) { + return new InputGateWithData(inputGate, bufferOrEvent.get()); + } + } + } + + private static class InputGateWithData { + private final InputGate inputGate; + private final BufferOrEvent bufferOrEvent; + + public InputGateWithData(InputGate inputGate, BufferOrEvent bufferOrEvent) { + this.inputGate = checkNotNull(inputGate); + this.bufferOrEvent = checkNotNull(bufferOrEvent); + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java index c58d20a..45df56f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.taskmanager.TaskActions; + import org.junit.Test; import java.io.IOException; @@ -46,13 +47,13 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Optional; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer; import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager; import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createResultPartitionManager; -import static org.apache.flink.util.Preconditions.checkState; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -116,7 +117,7 @@ public class InputGateFairnessTest { assertTrue(max == min || max == min+1); } - assertNull(gate.getNextBufferOrEvent()); + assertFalse(gate.getNextBufferOrEvent().isPresent()); } @Test @@ -232,7 +233,7 @@ public class InputGateFairnessTest { assertTrue(max == min || max == min+1); } - assertNull(gate.getNextBufferOrEvent()); + assertFalse(gate.getNextBufferOrEvent().isPresent()); } @Test @@ -368,7 +369,7 @@ public class InputGateFairnessTest { @Override - public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException { + public Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException { synchronized (channelsWithData) { assertTrue("too many input channels", channelsWithData.size() <= getNumberOfInputChannels()); ensureUnique(channelsWithData); http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java index 666581c..76e6f2c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java @@ -141,7 +141,7 @@ public class PartialConsumePipelinedResultTest extends TestLogger { @Override public void invoke() throws Exception { InputGate gate = getEnvironment().getInputGate(0); - Buffer buffer = gate.getNextBufferOrEvent().getBuffer(); + Buffer buffer = gate.getNextBufferOrEvent().orElseThrow(IllegalStateException::new).getBuffer(); if (buffer != null) { buffer.recycleBuffer(); } http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 1cdf5c3..ab276cd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -52,6 +52,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.Callable; @@ -517,17 +518,17 @@ public class LocalInputChannelTest { final int[] numberOfBuffersPerChannel = new int[numberOfInputChannels]; try { - BufferOrEvent boe; - while ((boe = inputGate.getNextBufferOrEvent()) != null) { - if (boe.isBuffer()) { - boe.getBuffer().recycleBuffer(); + Optional<BufferOrEvent> boe; + while ((boe = inputGate.getNextBufferOrEvent()).isPresent()) { + if (boe.get().isBuffer()) { + boe.get().getBuffer().recycleBuffer(); // Check that we don't receive too many buffers - if (++numberOfBuffersPerChannel[boe.getChannelIndex()] + if (++numberOfBuffersPerChannel[boe.get().getChannelIndex()] > numberOfExpectedBuffersPerChannel) { throw new IllegalStateException("Received more buffers than expected " + - "on channel " + boe.getChannelIndex() + "."); + "on channel " + boe.get().getChannelIndex() + "."); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 2a2b364..17425f2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -52,12 +52,12 @@ import org.junit.Test; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; @@ -115,7 +115,7 @@ public class SingleInputGateTest { // Return null when the input gate has received all end-of-partition events assertTrue(inputGate.isFinished()); - assertNull(inputGate.getNextBufferOrEvent()); + assertFalse(inputGate.getNextBufferOrEvent().isPresent()); } @Test @@ -448,8 +448,9 @@ public class SingleInputGateTest { boolean isBuffer, int channelIndex) throws IOException, InterruptedException { - final BufferOrEvent boe = inputGate.getNextBufferOrEvent(); - assertEquals(isBuffer, boe.isBuffer()); - assertEquals(channelIndex, boe.getChannelIndex()); + final Optional<BufferOrEvent> bufferOrEvent = inputGate.getNextBufferOrEvent(); + assertTrue(bufferOrEvent.isPresent()); + assertEquals(isBuffer, bufferOrEvent.get().isBuffer()); + assertEquals(channelIndex, bufferOrEvent.get().getChannelIndex()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java index 9884855..9b16471 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java @@ -23,10 +23,11 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.taskmanager.TaskActions; + import org.junit.Test; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -111,6 +112,6 @@ public class UnionInputGateTest { // Return null when the input gate has received all end-of-partition events assertTrue(union.isFinished()); - assertNull(union.getNextBufferOrEvent()); + assertFalse(union.getNextBufferOrEvent().isPresent()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java index ecfd732..7ef9fef 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayDeque; +import java.util.Optional; import static org.apache.flink.util.Preconditions.checkArgument; @@ -158,52 +159,55 @@ public class BarrierBuffer implements CheckpointBarrierHandler { public BufferOrEvent getNextNonBlocked() throws Exception { while (true) { // process buffered BufferOrEvents before grabbing new ones - BufferOrEvent next; + Optional<BufferOrEvent> next; if (currentBuffered == null) { next = inputGate.getNextBufferOrEvent(); } else { - next = currentBuffered.getNext(); - if (next == null) { + next = Optional.ofNullable(currentBuffered.getNext()); + if (!next.isPresent()) { completeBufferedSequence(); return getNextNonBlocked(); } } - if (next != null) { - if (isBlocked(next.getChannelIndex())) { - // if the channel is blocked we, we just store the BufferOrEvent - bufferSpiller.add(next); - checkSizeLimit(); - } - else if (next.isBuffer()) { - return next; - } - else if (next.getEvent().getClass() == CheckpointBarrier.class) { - if (!endOfStream) { - // process barriers only if there is a chance of the checkpoint completing - processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex()); - } - } - else if (next.getEvent().getClass() == CancelCheckpointMarker.class) { - processCancellationBarrier((CancelCheckpointMarker) next.getEvent()); + if (!next.isPresent()) { + if (!endOfStream) { + // end of input stream. stream continues with the buffered data + endOfStream = true; + releaseBlocksAndResetBarriers(); + return getNextNonBlocked(); } else { - if (next.getEvent().getClass() == EndOfPartitionEvent.class) { - processEndOfPartition(); - } - return next; + // final end of both input and buffered data + return null; } } - else if (!endOfStream) { - // end of input stream. stream continues with the buffered data - endOfStream = true; - releaseBlocksAndResetBarriers(); - return getNextNonBlocked(); + + BufferOrEvent bufferOrEvent = next.get(); + + if (isBlocked(bufferOrEvent.getChannelIndex())) { + // if the channel is blocked we, we just store the BufferOrEvent + bufferSpiller.add(bufferOrEvent); + checkSizeLimit(); + } + else if (bufferOrEvent.isBuffer()) { + return bufferOrEvent; + } + else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) { + if (!endOfStream) { + // process barriers only if there is a chance of the checkpoint completing + processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex()); + } + } + else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) { + processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent()); } else { - // final end of both input and buffered data - return null; + if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) { + processEndOfPartition(); + } + return bufferOrEvent; } } } http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java index 8178fbc..f929226 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java @@ -33,6 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayDeque; +import java.util.Optional; /** * The BarrierTracker keeps track of what checkpoint barriers have been received from @@ -90,20 +91,25 @@ public class BarrierTracker implements CheckpointBarrierHandler { @Override public BufferOrEvent getNextNonBlocked() throws Exception { while (true) { - BufferOrEvent next = inputGate.getNextBufferOrEvent(); - if (next == null || next.isBuffer()) { + Optional<BufferOrEvent> next = inputGate.getNextBufferOrEvent(); + if (!next.isPresent()) { // buffer or input exhausted - return next; + return null; } - else if (next.getEvent().getClass() == CheckpointBarrier.class) { - processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex()); + + BufferOrEvent bufferOrEvent = next.get(); + if (bufferOrEvent.isBuffer()) { + return bufferOrEvent; + } + else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) { + processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex()); } - else if (next.getEvent().getClass() == CancelCheckpointMarker.class) { - processCheckpointAbortBarrier((CancelCheckpointMarker) next.getEvent(), next.getChannelIndex()); + else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) { + processCheckpointAbortBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex()); } else { // some other event - return next; + return bufferOrEvent; } } } http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java index 96d79bb..39c41ef 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGateListener; import org.junit.Test; import java.io.IOException; +import java.util.Optional; import java.util.Random; import static org.junit.Assert.fail; @@ -159,21 +160,30 @@ public class BarrierBufferMassiveRandomTest { public void requestPartitions() {} @Override - public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException { + public Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException { currentChannel = (currentChannel + 1) % numChannels; if (barrierGens[currentChannel].isNextBarrier()) { - return new BufferOrEvent( - new CheckpointBarrier(++currentBarriers[currentChannel], System.currentTimeMillis(), CheckpointOptions.forCheckpointWithDefaultLocation()), - currentChannel); + return Optional.of( + new BufferOrEvent( + new CheckpointBarrier( + ++currentBarriers[currentChannel], + System.currentTimeMillis(), + CheckpointOptions.forCheckpointWithDefaultLocation()), + currentChannel)); } else { Buffer buffer = bufferPools[currentChannel].requestBuffer(); buffer.getMemorySegment().putLong(0, c++); - return new BufferOrEvent(buffer, currentChannel); + return Optional.of(new BufferOrEvent(buffer, currentChannel)); } } @Override + public Optional<BufferOrEvent> pollNextBufferOrEvent() throws IOException, InterruptedException { + return getNextBufferOrEvent(); + } + + @Override public void sendTaskEvent(TaskEvent event) {} @Override http://git-wip-us.apache.org/repos/asf/flink/blob/98bd689a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java index 77c938a..e62b709 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGateListener; import java.util.ArrayDeque; import java.util.List; +import java.util.Optional; import java.util.Queue; /** @@ -37,16 +38,16 @@ public class MockInputGate implements InputGate { private final int numChannels; - private final Queue<BufferOrEvent> boes; + private final Queue<BufferOrEvent> bufferOrEvents; private final boolean[] closed; private int closedChannels; - public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> boes) { + public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> bufferOrEvents) { this.pageSize = pageSize; this.numChannels = numChannels; - this.boes = new ArrayDeque<BufferOrEvent>(boes); + this.bufferOrEvents = new ArrayDeque<BufferOrEvent>(bufferOrEvents); this.closed = new boolean[numChannels]; } @@ -62,14 +63,14 @@ public class MockInputGate implements InputGate { @Override public boolean isFinished() { - return boes.isEmpty(); + return bufferOrEvents.isEmpty(); } @Override - public BufferOrEvent getNextBufferOrEvent() { - BufferOrEvent next = boes.poll(); + public Optional<BufferOrEvent> getNextBufferOrEvent() { + BufferOrEvent next = bufferOrEvents.poll(); if (next == null) { - return null; + return Optional.empty(); } int channelIdx = next.getChannelIndex(); @@ -81,7 +82,12 @@ public class MockInputGate implements InputGate { closed[channelIdx] = true; closedChannels++; } - return next; + return Optional.of(next); + } + + @Override + public Optional<BufferOrEvent> pollNextBufferOrEvent() { + return getNextBufferOrEvent(); } @Override