This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 261efa8f6a321a9303705c35681c0d67880213bb Author: Piotr Nowojski <[email protected]> AuthorDate: Thu May 9 10:42:33 2019 +0200 [hotfix][network] Refactor InputGates code --- .../io/network/partition/consumer/InputGate.java | 17 ++++++ .../partition/consumer/SingleInputGate.java | 51 +++++++++++------ .../network/partition/consumer/UnionInputGate.java | 64 +++++++++++----------- 3 files changed, 83 insertions(+), 49 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java index 83e18e9..03ac822 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java @@ -24,6 +24,8 @@ import java.io.IOException; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * An input gate consumes one or more partitions of a single produced intermediate result. * @@ -112,4 +114,19 @@ public abstract class InputGate implements AutoCloseable { isAvailable = new CompletableFuture<>(); } } + + /** + * Simple pojo for INPUT, DATA and moreAvailable. + */ + protected static class InputWithData<INPUT, DATA> { + protected final INPUT input; + protected final DATA data; + protected final boolean moreAvailable; + + InputWithData(INPUT input, DATA data, boolean moreAvailable) { + this.input = checkNotNull(input); + this.data = checkNotNull(data); + this.moreAvailable = moreAvailable; + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index c0830fa..19912b2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -529,12 +529,21 @@ public class SingleInputGate extends InputGate { } requestPartitions(); + Optional<InputWithData<InputChannel, BufferAndAvailability>> next = waitAndGetNextData(blocking); + if (!next.isPresent()) { + return Optional.empty(); + } - InputChannel currentChannel; - boolean moreAvailable; - Optional<BufferAndAvailability> result = Optional.empty(); + InputWithData<InputChannel, BufferAndAvailability> inputWithData = next.get(); + return Optional.of(transformToBufferOrEvent( + inputWithData.data.buffer(), + inputWithData.moreAvailable, + inputWithData.input)); + } - do { + private Optional<InputWithData<InputChannel, BufferAndAvailability>> waitAndGetNextData(boolean blocking) + throws IOException, InterruptedException { + while (true) { synchronized (inputChannelsWithData) { while (inputChannelsWithData.size() == 0) { if (isReleased) { @@ -550,29 +559,38 @@ public class SingleInputGate extends InputGate { } } - currentChannel = inputChannelsWithData.remove(); + InputChannel inputChannel = inputChannelsWithData.remove(); - result = currentChannel.getNextBuffer(); + Optional<BufferAndAvailability> result = inputChannel.getNextBuffer(); if (result.isPresent() && result.get().moreAvailable()) { - // enqueue the currentChannel at the end to avoid starvation - inputChannelsWithData.add(currentChannel); + // enqueue the inputChannel at the end to avoid starvation + inputChannelsWithData.add(inputChannel); } else { - enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex()); + enqueuedInputChannelsWithData.clear(inputChannel.getChannelIndex()); } - moreAvailable = !inputChannelsWithData.isEmpty(); - - if (!moreAvailable) { + if (inputChannelsWithData.isEmpty()) { resetIsAvailable(); } + + if (result.isPresent()) { + return Optional.of(new InputWithData<>( + inputChannel, + result.get(), + !inputChannelsWithData.isEmpty())); + } } - } while (!result.isPresent()); + } + } - final Buffer buffer = result.get().buffer(); + private BufferOrEvent transformToBufferOrEvent( + Buffer buffer, + boolean moreAvailable, + InputChannel currentChannel) throws IOException, InterruptedException { numBytesIn.inc(buffer.getSizeUnsafe()); if (buffer.isBuffer()) { - return Optional.of(new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable)); + return new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable); } else { final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader()); @@ -591,11 +609,10 @@ public class SingleInputGate extends InputGate { } currentChannel.notifySubpartitionConsumed(); - currentChannel.releaseAllResources(); } - return Optional.of(new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable)); + return new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index 986d1bb..5019cfc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -182,36 +182,22 @@ public class UnionInputGate extends InputGate { // Make sure to request the partitions, if they have not been requested before. requestPartitions(); - Optional<InputGateWithData> next = waitAndGetNextInputGate(blocking); + Optional<InputWithData<InputGate, BufferOrEvent>> next = waitAndGetNextData(blocking); if (!next.isPresent()) { return Optional.empty(); } - InputGateWithData inputGateWithData = next.get(); - InputGate inputGate = inputGateWithData.inputGate; - BufferOrEvent bufferOrEvent = inputGateWithData.bufferOrEvent; + InputWithData<InputGate, BufferOrEvent> inputWithData = next.get(); - if (bufferOrEvent.isEvent() - && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class - && inputGate.isFinished()) { - - checkState(!bufferOrEvent.moreAvailable()); - if (!inputGatesWithRemainingData.remove(inputGate)) { - throw new IllegalStateException("Couldn't find input gate in set of remaining " + - "input gates."); - } - } - - // Set the channel index to identify the input channel (across all unioned input gates) - final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate); - - bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex()); - bufferOrEvent.setMoreAvailable(bufferOrEvent.moreAvailable() || inputGateWithData.moreInputGatesAvailable); - - return Optional.of(bufferOrEvent); + handleEndOfPartitionEvent(inputWithData.data, inputWithData.input); + return Optional.of(adjustForUnionInputGate( + inputWithData.data, + inputWithData.input, + inputWithData.moreAvailable)); } - private Optional<InputGateWithData> waitAndGetNextInputGate(boolean blocking) throws IOException, InterruptedException { + private Optional<InputWithData<InputGate, BufferOrEvent>> waitAndGetNextData(boolean blocking) + throws IOException, InterruptedException { while (true) { synchronized (inputGatesWithData) { while (inputGatesWithData.size() == 0) { @@ -242,7 +228,7 @@ public class UnionInputGate extends InputGate { } if (bufferOrEvent.isPresent()) { - return Optional.of(new InputGateWithData( + return Optional.of(new InputWithData<>( inputGate, bufferOrEvent.get(), !inputGatesWithData.isEmpty())); @@ -251,15 +237,29 @@ public class UnionInputGate extends InputGate { } } - private static class InputGateWithData { - private final InputGate inputGate; - private final BufferOrEvent bufferOrEvent; - private final boolean moreInputGatesAvailable; + private BufferOrEvent adjustForUnionInputGate( + BufferOrEvent bufferOrEvent, + InputGate inputGate, + boolean moreInputGatesAvailable) { + // Set the channel index to identify the input channel (across all unioned input gates) + final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate); + + bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex()); + bufferOrEvent.setMoreAvailable(bufferOrEvent.moreAvailable() || moreInputGatesAvailable); - InputGateWithData(InputGate inputGate, BufferOrEvent bufferOrEvent, boolean moreInputGatesAvailable) { - this.inputGate = checkNotNull(inputGate); - this.bufferOrEvent = checkNotNull(bufferOrEvent); - this.moreInputGatesAvailable = moreInputGatesAvailable; + return bufferOrEvent; + } + + private void handleEndOfPartitionEvent(BufferOrEvent bufferOrEvent, InputGate inputGate) { + if (bufferOrEvent.isEvent() + && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class + && inputGate.isFinished()) { + + checkState(!bufferOrEvent.moreAvailable()); + if (!inputGatesWithRemainingData.remove(inputGate)) { + throw new IllegalStateException("Couldn't find input gate in set of remaining " + + "input gates."); + } } }
