This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8d277d42269fbc8bc02b69c7b5dfe1c51916ea89 Author: Piotr Nowojski <[email protected]> AuthorDate: Mon Jul 1 12:14:58 2019 +0200 [hotfix][network] Make InputGate#requestPartitions a private method --- .../io/network/partition/consumer/InputGate.java | 2 - .../partition/consumer/SingleInputGate.java | 50 +++++++-------- .../network/partition/consumer/UnionInputGate.java | 14 ----- .../runtime/taskmanager/InputGateWithMetrics.java | 5 -- .../partition/consumer/SingleInputGateTest.java | 72 +++++++++++----------- .../CheckpointBarrierAlignerMassiveRandomTest.java | 3 - .../flink/streaming/runtime/io/MockInputGate.java | 4 -- 7 files changed, 62 insertions(+), 88 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java index e9f2399..7d6ea81 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java @@ -77,8 +77,6 @@ public abstract class InputGate implements AsyncDataInput<BufferOrEvent>, AutoCl public abstract boolean isFinished(); - public abstract void requestPartitions() throws IOException, InterruptedException; - /** * Blocking call waiting for next {@link BufferOrEvent}. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 696dbe8..bd75262 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -215,6 +215,32 @@ public class SingleInputGate extends InputGate { requestPartitions(); } + private void requestPartitions() throws IOException, InterruptedException { + synchronized (requestLock) { + if (!requestedPartitionsFlag) { + if (closeFuture.isDone()) { + throw new IllegalStateException("Already released."); + } + + // Sanity checks + if (numberOfInputChannels != inputChannels.size()) { + throw new IllegalStateException(String.format( + "Bug in input gate setup logic: mismatch between " + + "number of total input channels [%s] and the currently set number of input " + + "channels [%s].", + inputChannels.size(), + numberOfInputChannels)); + } + + for (InputChannel inputChannel : inputChannels.values()) { + inputChannel.requestSubpartition(consumedSubpartitionIndex); + } + } + + requestedPartitionsFlag = true; + } + } + // ------------------------------------------------------------------------ // Properties // ------------------------------------------------------------------------ @@ -436,30 +462,6 @@ public class SingleInputGate extends InputGate { return hasReceivedAllEndOfPartitionEvents; } - @Override - public void requestPartitions() throws IOException, InterruptedException { - synchronized (requestLock) { - if (!requestedPartitionsFlag) { - if (closeFuture.isDone()) { - throw new IllegalStateException("Already released."); - } - - // Sanity checks - if (numberOfInputChannels != inputChannels.size()) { - throw new IllegalStateException("Bug in input gate setup logic: mismatch between " + - "number of total input channels and the currently set number of input " + - "channels."); - } - - for (InputChannel inputChannel : inputChannels.values()) { - inputChannel.requestSubpartition(consumedSubpartitionIndex); - } - } - - requestedPartitionsFlag = true; - } - } - // ------------------------------------------------------------------------ // Consume // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index 2b5b5c1..c612044 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -86,9 +86,6 @@ public class UnionInputGate extends InputGate { */ private final Map<InputGate, Integer> inputGateToIndexOffsetMap; - /** Flag indicating whether partitions have been requested. */ - private boolean requestedPartitionsFlag; - public UnionInputGate(InputGate... inputGates) { this.inputGates = checkNotNull(inputGates); checkArgument(inputGates.length > 1, "Union input gate should union at least two input gates."); @@ -142,17 +139,6 @@ public class UnionInputGate extends InputGate { } @Override - public void requestPartitions() throws IOException, InterruptedException { - if (!requestedPartitionsFlag) { - for (InputGate inputGate : inputGates) { - inputGate.requestPartitions(); - } - - requestedPartitionsFlag = true; - } - } - - @Override public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException { return getNextBufferOrEvent(true); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java index 27d01d5..5d2cfd9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java @@ -66,11 +66,6 @@ public class InputGateWithMetrics extends InputGate { } @Override - public void requestPartitions() throws IOException, InterruptedException { - inputGate.requestPartitions(); - } - - @Override public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException { return updateMetrics(inputGate.getNext()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index cad957f..20eae98 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -28,7 +28,6 @@ import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.TestingConnectionManager; -import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; @@ -192,51 +191,52 @@ public class SingleInputGateTest extends InputGateTestBase { // Setup reader with one local and one unknown input channel - final SingleInputGate inputGate = createInputGate(); - final BufferPool bufferPool = mock(BufferPool.class); - when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2); - - inputGate.setBufferPool(bufferPool); - - // Local - ResultPartitionID localPartitionId = new ResultPartitionID(); + NettyShuffleEnvironment environment = createNettyShuffleEnvironment(); + final SingleInputGate inputGate = createInputGate(environment, 2, ResultPartitionType.PIPELINED); + try { + // Local + ResultPartitionID localPartitionId = new ResultPartitionID(); - InputChannelBuilder.newBuilder() - .setPartitionId(localPartitionId) - .setPartitionManager(partitionManager) - .setTaskEventPublisher(taskEventDispatcher) - .buildLocalAndSetToGate(inputGate); + InputChannelBuilder.newBuilder() + .setPartitionId(localPartitionId) + .setPartitionManager(partitionManager) + .setTaskEventPublisher(taskEventDispatcher) + .buildLocalAndSetToGate(inputGate); - // Unknown - ResultPartitionID unknownPartitionId = new ResultPartitionID(); + // Unknown + ResultPartitionID unknownPartitionId = new ResultPartitionID(); - InputChannelBuilder.newBuilder() - .setChannelIndex(1) - .setPartitionId(unknownPartitionId) - .setPartitionManager(partitionManager) - .setTaskEventPublisher(taskEventDispatcher) - .buildUnknownAndSetToGate(inputGate); + InputChannelBuilder.newBuilder() + .setChannelIndex(1) + .setPartitionId(unknownPartitionId) + .setPartitionManager(partitionManager) + .setTaskEventPublisher(taskEventDispatcher) + .buildUnknownAndSetToGate(inputGate); - // Request partitions - inputGate.requestPartitions(); + inputGate.setup(); - // Only the local channel can request - verify(partitionManager, times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class)); + // Only the local channel can request + verify(partitionManager, times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class)); - // Send event backwards and initialize unknown channel afterwards - final TaskEvent event = new TestTaskEvent(); - inputGate.sendTaskEvent(event); + // Send event backwards and initialize unknown channel afterwards + final TaskEvent event = new TestTaskEvent(); + inputGate.sendTaskEvent(event); - // Only the local channel can send out the event - verify(taskEventDispatcher, times(1)).publish(any(ResultPartitionID.class), any(TaskEvent.class)); + // Only the local channel can send out the event + verify(taskEventDispatcher, times(1)).publish(any(ResultPartitionID.class), any(TaskEvent.class)); - // After the update, the pending event should be send to local channel + // After the update, the pending event should be send to local channel - ResourceID location = ResourceID.generate(); - inputGate.updateInputChannel(location, createRemoteWithIdAndLocation(unknownPartitionId.getPartitionId(), location)); + ResourceID location = ResourceID.generate(); + inputGate.updateInputChannel(location, createRemoteWithIdAndLocation(unknownPartitionId.getPartitionId(), location)); - verify(partitionManager, times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class)); - verify(taskEventDispatcher, times(2)).publish(any(ResultPartitionID.class), any(TaskEvent.class)); + verify(partitionManager, times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class)); + verify(taskEventDispatcher, times(2)).publish(any(ResultPartitionID.class), any(TaskEvent.class)); + } + finally { + inputGate.close(); + environment.close(); + } } /** diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java index 552818d..182afef 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java @@ -152,9 +152,6 @@ public class CheckpointBarrierAlignerMassiveRandomTest { } @Override - public void requestPartitions() {} - - @Override public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException { currentChannel = (currentChannel + 1) % numberOfChannels; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java index 8cb6848..7cc7194 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java @@ -98,10 +98,6 @@ public class MockInputGate extends InputGate { } @Override - public void requestPartitions() { - } - - @Override public void sendTaskEvent(TaskEvent event) { }
