This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit dd1eb082fdd291523ec931a01b295e407a2df1f5 Author: Piotr Nowojski <[email protected]> AuthorDate: Fri May 10 15:22:23 2019 +0200 Revert "Merge pull request #8361 from pnowojski/f12434" This reverts commit d3fd7a6794a8ffd16e00bb1867b6e62c3909a2d2. --- .../io/network/partition/consumer/InputGate.java | 55 +----- .../partition/consumer/InputGateListener.java | 35 ++++ .../partition/consumer/SingleInputGate.java | 87 ++++----- .../network/partition/consumer/UnionInputGate.java | 206 ++++++++++----------- .../partition/consumer/InputGateTestBase.java | 91 --------- .../partition/consumer/SingleInputGateTest.java | 54 ++++-- .../partition/consumer/TestSingleInputGate.java | 44 ++++- .../partition/consumer/UnionInputGateTest.java | 22 +-- .../consumer/StreamTestSingleInputGate.java | 84 +-------- .../runtime/io/BarrierBufferMassiveRandomTest.java | 7 +- .../flink/streaming/runtime/io/MockInputGate.java | 9 +- 11 files changed, 281 insertions(+), 413 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java index 03ac822..c270d37 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java @@ -22,9 +22,6 @@ import org.apache.flink.runtime.event.TaskEvent; import java.io.IOException; import java.util.Optional; -import java.util.concurrent.CompletableFuture; - -import static org.apache.flink.util.Preconditions.checkNotNull; /** * An input gate consumes one or more partitions of a single produced intermediate result. @@ -68,65 +65,33 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * will have an input gate attached to it. This will provide its input, which will consist of one * subpartition from each partition of the intermediate result. */ -public abstract class InputGate implements AutoCloseable { - - public static final CompletableFuture<?> AVAILABLE = CompletableFuture.completedFuture(null); - - protected CompletableFuture<?> isAvailable = new CompletableFuture<>(); +public interface InputGate extends AutoCloseable { - public abstract int getNumberOfInputChannels(); + int getNumberOfInputChannels(); - public abstract String getOwningTaskName(); + String getOwningTaskName(); - public abstract boolean isFinished(); + boolean isFinished(); - public abstract void requestPartitions() throws IOException, InterruptedException; + void requestPartitions() throws IOException, InterruptedException; /** * Blocking call waiting for next {@link BufferOrEvent}. * * @return {@code Optional.empty()} if {@link #isFinished()} returns true. */ - public abstract Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException; + Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException; /** * Poll the {@link BufferOrEvent}. * * @return {@code Optional.empty()} if there is no data to return or if {@link #isFinished()} returns true. */ - public abstract Optional<BufferOrEvent> pollNextBufferOrEvent() throws IOException, InterruptedException; - - public abstract void sendTaskEvent(TaskEvent event) throws IOException; - - public abstract int getPageSize(); - - /** - * @return a future that is completed if there are more records available. If there more records - * available immediately, {@link #AVAILABLE} should be returned. - */ - public CompletableFuture<?> isAvailable() { - return isAvailable; - } + Optional<BufferOrEvent> pollNextBufferOrEvent() throws IOException, InterruptedException; - protected void resetIsAvailable() { - // try to avoid volatile access in isDone()} - if (isAvailable == AVAILABLE || isAvailable.isDone()) { - isAvailable = new CompletableFuture<>(); - } - } + void sendTaskEvent(TaskEvent event) throws IOException; - /** - * Simple pojo for INPUT, DATA and moreAvailable. - */ - protected static class InputWithData<INPUT, DATA> { - protected final INPUT input; - protected final DATA data; - protected final boolean moreAvailable; + void registerListener(InputGateListener listener); - InputWithData(INPUT input, DATA data, boolean moreAvailable) { - this.input = checkNotNull(input); - this.data = checkNotNull(data); - this.moreAvailable = moreAvailable; - } - } + int getPageSize(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java new file mode 100644 index 0000000..00fa782 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +/** + * Listener interface implemented by consumers of {@link InputGate} instances + * that want to be notified of availability of buffer or event instances. + */ +public interface InputGateListener { + + /** + * Notification callback if the input gate moves from zero to non-zero + * available input channels with data. + * + * @param inputGate Input Gate that became available. + */ + void notifyInputGateNonEmpty(InputGate inputGate); + +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 19912b2..d40af83 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -56,7 +56,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Timer; -import java.util.concurrent.CompletableFuture; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -102,7 +101,7 @@ import static org.apache.flink.util.Preconditions.checkState; * in two partitions (Partition 1 and 2). Each of these partitions is further partitioned into two * subpartitions -- one for each parallel reduce subtask. */ -public class SingleInputGate extends InputGate { +public class SingleInputGate implements InputGate { private static final Logger LOG = LoggerFactory.getLogger(SingleInputGate.class); @@ -173,6 +172,9 @@ public class SingleInputGate extends InputGate { /** Flag indicating whether all resources have been released. */ private volatile boolean isReleased; + /** Registered listener to forward buffer notifications to. */ + private volatile InputGateListener inputGateListener; + private final List<TaskEvent> pendingEvents = new ArrayList<>(); private int numberOfUninitializedChannels; @@ -529,21 +531,12 @@ public class SingleInputGate extends InputGate { } requestPartitions(); - Optional<InputWithData<InputChannel, BufferAndAvailability>> next = waitAndGetNextData(blocking); - if (!next.isPresent()) { - return Optional.empty(); - } - InputWithData<InputChannel, BufferAndAvailability> inputWithData = next.get(); - return Optional.of(transformToBufferOrEvent( - inputWithData.data.buffer(), - inputWithData.moreAvailable, - inputWithData.input)); - } + InputChannel currentChannel; + boolean moreAvailable; + Optional<BufferAndAvailability> result = Optional.empty(); - private Optional<InputWithData<InputChannel, BufferAndAvailability>> waitAndGetNextData(boolean blocking) - throws IOException, InterruptedException { - while (true) { + do { synchronized (inputChannelsWithData) { while (inputChannelsWithData.size() == 0) { if (isReleased) { @@ -554,43 +547,30 @@ public class SingleInputGate extends InputGate { inputChannelsWithData.wait(); } else { - resetIsAvailable(); return Optional.empty(); } } - InputChannel inputChannel = inputChannelsWithData.remove(); - - Optional<BufferAndAvailability> result = inputChannel.getNextBuffer(); + currentChannel = inputChannelsWithData.remove(); + enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex()); + moreAvailable = !inputChannelsWithData.isEmpty(); + } - if (result.isPresent() && result.get().moreAvailable()) { - // enqueue the inputChannel at the end to avoid starvation - inputChannelsWithData.add(inputChannel); - } else { - enqueuedInputChannelsWithData.clear(inputChannel.getChannelIndex()); - } + result = currentChannel.getNextBuffer(); + } while (!result.isPresent()); - if (inputChannelsWithData.isEmpty()) { - resetIsAvailable(); - } - - if (result.isPresent()) { - return Optional.of(new InputWithData<>( - inputChannel, - result.get(), - !inputChannelsWithData.isEmpty())); - } - } + // this channel was now removed from the non-empty channels queue + // we re-add it in case it has more data, because in that case no "non-empty" notification + // will come for that channel + if (result.get().moreAvailable()) { + queueChannel(currentChannel); + moreAvailable = true; } - } - private BufferOrEvent transformToBufferOrEvent( - Buffer buffer, - boolean moreAvailable, - InputChannel currentChannel) throws IOException, InterruptedException { + final Buffer buffer = result.get().buffer(); numBytesIn.inc(buffer.getSizeUnsafe()); if (buffer.isBuffer()) { - return new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable); + return Optional.of(new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable)); } else { final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader()); @@ -609,10 +589,11 @@ public class SingleInputGate extends InputGate { } currentChannel.notifySubpartitionConsumed(); + currentChannel.releaseAllResources(); } - return new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable); + return Optional.of(new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable)); } } @@ -633,6 +614,15 @@ public class SingleInputGate extends InputGate { // Channel notifications // ------------------------------------------------------------------------ + @Override + public void registerListener(InputGateListener inputGateListener) { + if (this.inputGateListener == null) { + this.inputGateListener = inputGateListener; + } else { + throw new IllegalStateException("Multiple listeners"); + } + } + void notifyChannelNonEmpty(InputChannel channel) { queueChannel(checkNotNull(channel)); } @@ -644,8 +634,6 @@ public class SingleInputGate extends InputGate { private void queueChannel(InputChannel channel) { int availableChannels; - CompletableFuture<?> toNotify = null; - synchronized (inputChannelsWithData) { if (enqueuedInputChannelsWithData.get(channel.getChannelIndex())) { return; @@ -657,13 +645,14 @@ public class SingleInputGate extends InputGate { if (availableChannels == 0) { inputChannelsWithData.notifyAll(); - toNotify = isAvailable; - isAvailable = AVAILABLE; } } - if (toNotify != null) { - toNotify.complete(null); + if (availableChannels == 0) { + InputGateListener listener = inputGateListener; + if (listener != null) { + listener.notifyInputGateNonEmpty(this); + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index 5019cfc..ea83004 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -25,12 +25,11 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; import java.io.IOException; -import java.util.Iterator; -import java.util.LinkedHashSet; +import java.util.ArrayDeque; +import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.CompletableFuture; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -64,22 +63,27 @@ import static org.apache.flink.util.Preconditions.checkState; * * <strong>It is NOT possible to recursively union union input gates.</strong> */ -public class UnionInputGate extends InputGate { +public class UnionInputGate implements InputGate, InputGateListener { /** The input gates to union. */ private final InputGate[] inputGates; private final Set<InputGate> inputGatesWithRemainingData; + /** Gates, which notified this input gate about available data. */ + private final ArrayDeque<InputGate> inputGatesWithData = new ArrayDeque<>(); + /** - * Gates, which notified this input gate about available data. We are using it as a FIFO - * queue of {@link InputGate}s to avoid starvation and provide some basic fairness. + * Guardian against enqueuing an {@link InputGate} multiple times on {@code inputGatesWithData}. */ - private final LinkedHashSet<InputGate> inputGatesWithData = new LinkedHashSet<>(); + private final Set<InputGate> enqueuedInputGatesWithData = new HashSet<>(); /** The total number of input channels across all unioned input gates. */ private final int totalNumberOfInputChannels; + /** Registered listener to forward input gate notifications to. */ + private volatile InputGateListener inputGateListener; + /** * A mapping from input gate to (logical) channel index offset. Valid channel indexes go from 0 * (inclusive) to the total number of input channels (exclusive). @@ -98,31 +102,20 @@ public class UnionInputGate extends InputGate { int currentNumberOfInputChannels = 0; - synchronized (inputGatesWithData) { - for (InputGate inputGate : inputGates) { - if (inputGate instanceof UnionInputGate) { - // if we want to add support for this, we need to implement pollNextBufferOrEvent() - throw new UnsupportedOperationException("Cannot union a union of input gates."); - } - - // The offset to use for buffer or event instances received from this input gate. - inputGateToIndexOffsetMap.put(checkNotNull(inputGate), currentNumberOfInputChannels); - inputGatesWithRemainingData.add(inputGate); - - currentNumberOfInputChannels += inputGate.getNumberOfInputChannels(); + for (InputGate inputGate : inputGates) { + if (inputGate instanceof UnionInputGate) { + // if we want to add support for this, we need to implement pollNextBufferOrEvent() + throw new UnsupportedOperationException("Cannot union a union of input gates."); + } - CompletableFuture<?> available = inputGate.isAvailable(); + // The offset to use for buffer or event instances received from this input gate. + inputGateToIndexOffsetMap.put(checkNotNull(inputGate), currentNumberOfInputChannels); + inputGatesWithRemainingData.add(inputGate); - if (available.isDone()) { - inputGatesWithData.add(inputGate); - } else { - available.thenRun(() -> queueInputGate(inputGate)); - } - } + currentNumberOfInputChannels += inputGate.getNumberOfInputChannels(); - if (!inputGatesWithData.isEmpty()) { - isAvailable = AVAILABLE; - } + // Register the union gate as a listener for all input gates + inputGate.registerListener(this); } this.totalNumberOfInputChannels = currentNumberOfInputChannels; @@ -166,15 +159,6 @@ public class UnionInputGate extends InputGate { @Override public Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException { - return getNextBufferOrEvent(true); - } - - @Override - public Optional<BufferOrEvent> pollNextBufferOrEvent() throws IOException, InterruptedException { - return getNextBufferOrEvent(false); - } - - private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException { if (inputGatesWithRemainingData.isEmpty()) { return Optional.empty(); } @@ -182,87 +166,75 @@ public class UnionInputGate extends InputGate { // Make sure to request the partitions, if they have not been requested before. requestPartitions(); - Optional<InputWithData<InputGate, BufferOrEvent>> next = waitAndGetNextData(blocking); - if (!next.isPresent()) { - return Optional.empty(); - } - - InputWithData<InputGate, BufferOrEvent> inputWithData = next.get(); - - handleEndOfPartitionEvent(inputWithData.data, inputWithData.input); - return Optional.of(adjustForUnionInputGate( - inputWithData.data, - inputWithData.input, - inputWithData.moreAvailable)); - } - - private Optional<InputWithData<InputGate, BufferOrEvent>> waitAndGetNextData(boolean blocking) - throws IOException, InterruptedException { - while (true) { - synchronized (inputGatesWithData) { - while (inputGatesWithData.size() == 0) { - if (blocking) { - inputGatesWithData.wait(); - } else { - resetIsAvailable(); - return Optional.empty(); - } - } + InputGateWithData inputGateWithData = waitAndGetNextInputGate(); + InputGate inputGate = inputGateWithData.inputGate; + BufferOrEvent bufferOrEvent = inputGateWithData.bufferOrEvent; - Iterator<InputGate> inputGateIterator = inputGatesWithData.iterator(); - final InputGate inputGate = inputGateIterator.next(); - inputGateIterator.remove(); - - // In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data. - Optional<BufferOrEvent> bufferOrEvent = inputGate.pollNextBufferOrEvent(); - - if (bufferOrEvent.isPresent() && bufferOrEvent.get().moreAvailable()) { - // enqueue the inputGate at the end to avoid starvation - inputGatesWithData.add(inputGate); - } else { - inputGate.isAvailable().thenRun(() -> queueInputGate(inputGate)); - } + if (bufferOrEvent.moreAvailable()) { + // this buffer or event was now removed from the non-empty gates queue + // we re-add it in case it has more data, because in that case no "non-empty" notification + // will come for that gate + queueInputGate(inputGate); + } - if (inputGatesWithData.isEmpty()) { - resetIsAvailable(); - } + if (bufferOrEvent.isEvent() + && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class + && inputGate.isFinished()) { - if (bufferOrEvent.isPresent()) { - return Optional.of(new InputWithData<>( - inputGate, - bufferOrEvent.get(), - !inputGatesWithData.isEmpty())); - } + checkState(!bufferOrEvent.moreAvailable()); + if (!inputGatesWithRemainingData.remove(inputGate)) { + throw new IllegalStateException("Couldn't find input gate in set of remaining " + + "input gates."); } } - } - private BufferOrEvent adjustForUnionInputGate( - BufferOrEvent bufferOrEvent, - InputGate inputGate, - boolean moreInputGatesAvailable) { // Set the channel index to identify the input channel (across all unioned input gates) final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate); bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex()); - bufferOrEvent.setMoreAvailable(bufferOrEvent.moreAvailable() || moreInputGatesAvailable); + bufferOrEvent.setMoreAvailable(bufferOrEvent.moreAvailable() || inputGateWithData.moreInputGatesAvailable); - return bufferOrEvent; + return Optional.of(bufferOrEvent); } - private void handleEndOfPartitionEvent(BufferOrEvent bufferOrEvent, InputGate inputGate) { - if (bufferOrEvent.isEvent() - && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class - && inputGate.isFinished()) { + @Override + public Optional<BufferOrEvent> pollNextBufferOrEvent() throws UnsupportedOperationException { + throw new UnsupportedOperationException(); + } - checkState(!bufferOrEvent.moreAvailable()); - if (!inputGatesWithRemainingData.remove(inputGate)) { - throw new IllegalStateException("Couldn't find input gate in set of remaining " + - "input gates."); + private InputGateWithData waitAndGetNextInputGate() throws IOException, InterruptedException { + while (true) { + InputGate inputGate; + boolean moreInputGatesAvailable; + synchronized (inputGatesWithData) { + while (inputGatesWithData.size() == 0) { + inputGatesWithData.wait(); + } + inputGate = inputGatesWithData.remove(); + enqueuedInputGatesWithData.remove(inputGate); + moreInputGatesAvailable = enqueuedInputGatesWithData.size() > 0; + } + + // In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data. + Optional<BufferOrEvent> bufferOrEvent = inputGate.pollNextBufferOrEvent(); + if (bufferOrEvent.isPresent()) { + return new InputGateWithData(inputGate, bufferOrEvent.get(), moreInputGatesAvailable); } } } + private static class InputGateWithData { + private final InputGate inputGate; + private final BufferOrEvent bufferOrEvent; + private final boolean moreInputGatesAvailable; + + InputGateWithData(InputGate inputGate, BufferOrEvent bufferOrEvent, boolean moreInputGatesAvailable) { + this.inputGate = checkNotNull(inputGate); + this.bufferOrEvent = checkNotNull(bufferOrEvent); + this.moreInputGatesAvailable = moreInputGatesAvailable; + } + } + @Override public void sendTaskEvent(TaskEvent event) throws IOException { for (InputGate inputGate : inputGates) { @@ -271,6 +243,15 @@ public class UnionInputGate extends InputGate { } @Override + public void registerListener(InputGateListener listener) { + if (this.inputGateListener == null) { + this.inputGateListener = listener; + } else { + throw new IllegalStateException("Multiple listeners"); + } + } + + @Override public int getPageSize() { int pageSize = -1; for (InputGate gate : inputGates) { @@ -287,29 +268,34 @@ public class UnionInputGate extends InputGate { public void close() throws IOException { } - private void queueInputGate(InputGate inputGate) { - checkNotNull(inputGate); + @Override + public void notifyInputGateNonEmpty(InputGate inputGate) { + queueInputGate(checkNotNull(inputGate)); + } - CompletableFuture<?> toNotify = null; + private void queueInputGate(InputGate inputGate) { + int availableInputGates; synchronized (inputGatesWithData) { - if (inputGatesWithData.contains(inputGate)) { + if (enqueuedInputGatesWithData.contains(inputGate)) { return; } - int availableInputGates = inputGatesWithData.size(); + availableInputGates = inputGatesWithData.size(); inputGatesWithData.add(inputGate); + enqueuedInputGatesWithData.add(inputGate); if (availableInputGates == 0) { inputGatesWithData.notifyAll(); - toNotify = isAvailable; - isAvailable = AVAILABLE; } } - if (toNotify != null) { - toNotify.complete(null); + if (availableInputGates == 0) { + InputGateListener listener = inputGateListener; + if (listener != null) { + listener.notifyInputGateNonEmpty(this); + } } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java deleted file mode 100644 index ddf027c..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.partition.consumer; - -import org.apache.flink.runtime.io.network.partition.ResultPartitionType; - -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameter; -import org.junit.runners.Parameterized.Parameters; - -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CompletableFuture; - -import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -/** - * Test base for {@link InputGate}. - */ -@RunWith(Parameterized.class) -public abstract class InputGateTestBase { - - @Parameter - public boolean enableCreditBasedFlowControl; - - @Parameters(name = "Credit-based = {0}") - public static List<Boolean> parameters() { - return Arrays.asList(Boolean.TRUE, Boolean.FALSE); - } - - protected void testIsAvailable( - InputGate inputGateToTest, - SingleInputGate inputGateToNotify, - TestInputChannel inputChannelWithNewData) throws Exception { - - assertFalse(inputGateToTest.isAvailable().isDone()); - assertFalse(inputGateToTest.pollNextBufferOrEvent().isPresent()); - - CompletableFuture<?> isAvailable = inputGateToTest.isAvailable(); - - assertFalse(inputGateToTest.isAvailable().isDone()); - assertFalse(inputGateToTest.pollNextBufferOrEvent().isPresent()); - - assertEquals(isAvailable, inputGateToTest.isAvailable()); - - inputChannelWithNewData.readBuffer(); - inputGateToNotify.notifyChannelNonEmpty(inputChannelWithNewData); - - assertTrue(isAvailable.isDone()); - assertTrue(inputGateToTest.isAvailable().isDone()); - } - - protected SingleInputGate createInputGate() { - return createInputGate(2); - } - - protected SingleInputGate createInputGate(int numberOfInputChannels) { - return createInputGate(numberOfInputChannels, ResultPartitionType.PIPELINED); - } - - protected SingleInputGate createInputGate( - int numberOfInputChannels, ResultPartitionType partitionType) { - SingleInputGate inputGate = createSingleInputGate( - numberOfInputChannels, - partitionType, - enableCreditBasedFlowControl); - - assertEquals(partitionType, inputGate.getConsumedPartitionType()); - return inputGate; - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index a6f824d..71e4f5a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -50,13 +50,18 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.taskmanager.NoOpTaskActions; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -75,7 +80,17 @@ import static org.mockito.Mockito.when; /** * Tests for {@link SingleInputGate}. */ -public class SingleInputGateTest extends InputGateTestBase { +@RunWith(Parameterized.class) +public class SingleInputGateTest { + + @Parameterized.Parameter + public boolean enableCreditBasedFlowControl; + + @Parameterized.Parameters(name = "Credit-based = {0}") + public static List<Boolean> parameters() { + return Arrays.asList(Boolean.TRUE, Boolean.FALSE); + } + /** * Tests basic correctness of buffer-or-event interleaving and correct <code>null</code> return * value after receiving all end-of-partition events. @@ -116,15 +131,6 @@ public class SingleInputGateTest extends InputGateTestBase { assertTrue(inputGate.isFinished()); } - @Test - public void testIsAvailable() throws Exception { - final SingleInputGate inputGate = createInputGate(1); - TestInputChannel inputChannel = new TestInputChannel(inputGate, 0); - inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel); - - testIsAvailable(inputGate, inputGate, inputChannel); - } - @Test(timeout = 120 * 1000) public void testIsMoreAvailableReadingFromSingleInputChannel() throws Exception { // Setup @@ -540,6 +546,22 @@ public class SingleInputGateTest extends InputGateTestBase { // --------------------------------------------------------------------------------------------- + private SingleInputGate createInputGate() { + return createInputGate(2); + } + + private SingleInputGate createInputGate(int numberOfInputChannels) { + return createInputGate(numberOfInputChannels, ResultPartitionType.PIPELINED); + } + + private SingleInputGate createInputGate(int numberOfInputChannels, ResultPartitionType partitionType) { + SingleInputGate inputGate = createSingleInputGate(numberOfInputChannels, partitionType, enableCreditBasedFlowControl); + + assertEquals(partitionType, inputGate.getConsumedPartitionType()); + + return inputGate; + } + private void addUnknownInputChannel( NetworkEnvironment network, SingleInputGate inputGate, @@ -597,7 +619,17 @@ public class SingleInputGateTest extends InputGateTestBase { assertEquals(expectedChannelIndex, bufferOrEvent.get().getChannelIndex()); assertEquals(expectedMoreAvailable, bufferOrEvent.get().moreAvailable()); if (!expectedMoreAvailable) { - assertFalse(inputGate.pollNextBufferOrEvent().isPresent()); + try { + assertFalse(inputGate.pollNextBufferOrEvent().isPresent()); + } + catch (UnsupportedOperationException ex) { + /** + * {@link UnionInputGate#pollNextBufferOrEvent()} is unsupported at the moment. + */ + if (!(inputGate instanceof UnionInputGate)) { + throw ex; + } + } } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java index f60cdb9..464ab7c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java @@ -20,8 +20,17 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.lang.reflect.Field; +import java.util.ArrayDeque; + import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; import static org.apache.flink.util.Preconditions.checkArgument; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; /** * A test input gate to mock reading data. @@ -35,8 +44,39 @@ public class TestSingleInputGate { public TestSingleInputGate(int numberOfInputChannels, boolean initialize) { checkArgument(numberOfInputChannels >= 1); - inputGate = createSingleInputGate(numberOfInputChannels); - inputChannels = new TestInputChannel[numberOfInputChannels]; + SingleInputGate realGate = createSingleInputGate(numberOfInputChannels); + + this.inputGate = spy(realGate); + + // Notify about late registrations (added for DataSinkTaskTest#testUnionDataSinkTask). + // After merging registerInputOutput and invoke, we have to make sure that the test + // notifications happen at the expected time. In real programs, this is guaranteed by + // the instantiation and request partition life cycle. + try { + Field f = realGate.getClass().getDeclaredField("inputChannelsWithData"); + f.setAccessible(true); + final ArrayDeque<InputChannel> notifications = (ArrayDeque<InputChannel>) f.get(realGate); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + invocation.callRealMethod(); + + synchronized (notifications) { + if (!notifications.isEmpty()) { + InputGateListener listener = (InputGateListener) invocation.getArguments()[0]; + listener.notifyInputGateNonEmpty(inputGate); + } + } + + return null; + } + }).when(inputGate).registerListener(any(InputGateListener.class)); + } catch (Exception e) { + throw new RuntimeException(e); + } + + this.inputChannels = new TestInputChannel[numberOfInputChannels]; if (initialize) { for (int i = 0; i < numberOfInputChannels; i++) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java index 9dca613..082ccec 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java @@ -18,10 +18,9 @@ package org.apache.flink.runtime.io.network.partition.consumer; -import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; - import org.junit.Test; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; import static org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateTest.verifyBufferOrEvent; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -30,7 +29,7 @@ import static org.junit.Assert.assertTrue; /** * Tests for {@link UnionInputGate}. */ -public class UnionInputGateTest extends InputGateTestBase { +public class UnionInputGateTest { /** * Tests basic correctness of buffer-or-event interleaving and correct <code>null</code> return @@ -42,8 +41,8 @@ public class UnionInputGateTest extends InputGateTestBase { @Test(timeout = 120 * 1000) public void testBasicGetNextLogic() throws Exception { // Setup - final SingleInputGate ig1 = createInputGate(3); - final SingleInputGate ig2 = createInputGate(5); + final SingleInputGate ig1 = createSingleInputGate(3); + final SingleInputGate ig2 = createSingleInputGate(5); final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2}); @@ -102,17 +101,4 @@ public class UnionInputGateTest extends InputGateTestBase { assertTrue(union.isFinished()); assertFalse(union.getNextBufferOrEvent().isPresent()); } - - @Test - public void testIsAvailable() throws Exception { - final SingleInputGate inputGate1 = createInputGate(1); - TestInputChannel inputChannel1 = new TestInputChannel(inputGate1, 0); - inputGate1.setInputChannel(new IntermediateResultPartitionID(), inputChannel1); - - final SingleInputGate inputGate2 = createInputGate(1); - TestInputChannel inputChannel2 = new TestInputChannel(inputGate2, 0); - inputGate2.setInputChannel(new IntermediateResultPartitionID(), inputChannel2); - - testIsAvailable(new UnionInputGate(inputGate1, inputGate2), inputGate1, inputChannel1); - } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java index b9fe84f..dbb81ab 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java @@ -21,16 +21,12 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; -import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; -import org.apache.flink.runtime.io.network.buffer.BufferListener; -import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability; import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel.BufferAndAvailabilityProvider; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; @@ -44,6 +40,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; +import static org.mockito.Mockito.doReturn; /** * Test {@link InputGate} that allows setting multiple channels. Use @@ -79,7 +76,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate { inputQueues = new ConcurrentLinkedQueue[numInputChannels]; setupInputChannels(); - inputGate.setBufferPool(new NoOpBufferPool(bufferSize)); + doReturn(bufferSize).when(inputGate).getPageSize(); } @SuppressWarnings("unchecked") @@ -222,81 +219,4 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate { return isEvent; } } - - private static class NoOpBufferPool implements BufferPool { - private int bufferSize; - - public NoOpBufferPool(int bufferSize) { - this.bufferSize = bufferSize; - } - - @Override - public void lazyDestroy() { - } - - @Override - public int getMemorySegmentSize() { - return bufferSize; - } - - @Override - public Buffer requestBuffer() throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public Buffer requestBufferBlocking() throws IOException, InterruptedException { - throw new UnsupportedOperationException(); - } - - @Override - public BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException { - throw new UnsupportedOperationException(); - } - - @Override - public boolean addBufferListener(BufferListener listener) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isDestroyed() { - throw new UnsupportedOperationException(); - } - - @Override - public int getNumberOfRequiredMemorySegments() { - throw new UnsupportedOperationException(); - } - - @Override - public int getMaxNumberOfMemorySegments() { - throw new UnsupportedOperationException(); - } - - @Override - public int getNumBuffers() { - throw new UnsupportedOperationException(); - } - - @Override - public void setNumBuffers(int numBuffers) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public int getNumberOfAvailableMemorySegments() { - throw new UnsupportedOperationException(); - } - - @Override - public int bestEffortGetNumOfUsedBuffers() { - throw new UnsupportedOperationException(); - } - - @Override - public void recycle(MemorySegment memorySegment) { - throw new UnsupportedOperationException(); - } - } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java index b1a3ad5..ded52ff 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.io.network.partition.consumer.InputGateListener; import org.junit.Test; @@ -129,7 +130,7 @@ public class BarrierBufferMassiveRandomTest { } } - private static class RandomGeneratingInputGate extends InputGate { + private static class RandomGeneratingInputGate implements InputGate { private final int numberOfChannels; private final BufferPool[] bufferPools; @@ -150,7 +151,6 @@ public class BarrierBufferMassiveRandomTest { this.bufferPools = bufferPools; this.barrierGens = barrierGens; this.owningTaskName = owningTaskName; - this.isAvailable = AVAILABLE; } @Override @@ -199,6 +199,9 @@ public class BarrierBufferMassiveRandomTest { public void sendTaskEvent(TaskEvent event) {} @Override + public void registerListener(InputGateListener listener) {} + + @Override public int getPageSize() { return PAGE_SIZE; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java index 30941ab..a29cbf5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.io.network.partition.consumer.InputGateListener; import java.util.ArrayDeque; import java.util.List; @@ -31,7 +32,7 @@ import java.util.Queue; /** * Mock {@link InputGate}. */ -public class MockInputGate extends InputGate { +public class MockInputGate implements InputGate { private final int pageSize; @@ -55,8 +56,6 @@ public class MockInputGate extends InputGate { this.bufferOrEvents = new ArrayDeque<BufferOrEvent>(bufferOrEvents); this.closed = new boolean[numberOfChannels]; this.owningTaskName = owningTaskName; - - isAvailable = AVAILABLE; } @Override @@ -112,6 +111,10 @@ public class MockInputGate extends InputGate { } @Override + public void registerListener(InputGateListener listener) { + } + + @Override public void close() { } }
