This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8d277d42269fbc8bc02b69c7b5dfe1c51916ea89
Author: Piotr Nowojski <[email protected]>
AuthorDate: Mon Jul 1 12:14:58 2019 +0200

    [hotfix][network] Make InputGate#requestPartitions a private method
---
 .../io/network/partition/consumer/InputGate.java   |  2 -
 .../partition/consumer/SingleInputGate.java        | 50 +++++++--------
 .../network/partition/consumer/UnionInputGate.java | 14 -----
 .../runtime/taskmanager/InputGateWithMetrics.java  |  5 --
 .../partition/consumer/SingleInputGateTest.java    | 72 +++++++++++-----------
 .../CheckpointBarrierAlignerMassiveRandomTest.java |  3 -
 .../flink/streaming/runtime/io/MockInputGate.java  |  4 --
 7 files changed, 62 insertions(+), 88 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index e9f2399..7d6ea81 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -77,8 +77,6 @@ public abstract class InputGate implements 
AsyncDataInput<BufferOrEvent>, AutoCl
 
        public abstract boolean isFinished();
 
-       public abstract void requestPartitions() throws IOException, 
InterruptedException;
-
        /**
         * Blocking call waiting for next {@link BufferOrEvent}.
         *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 696dbe8..bd75262 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -215,6 +215,32 @@ public class SingleInputGate extends InputGate {
                requestPartitions();
        }
 
+       private void requestPartitions() throws IOException, 
InterruptedException {
+               synchronized (requestLock) {
+                       if (!requestedPartitionsFlag) {
+                               if (closeFuture.isDone()) {
+                                       throw new 
IllegalStateException("Already released.");
+                               }
+
+                               // Sanity checks
+                               if (numberOfInputChannels != 
inputChannels.size()) {
+                                       throw new 
IllegalStateException(String.format(
+                                               "Bug in input gate setup logic: 
mismatch between " +
+                                               "number of total input channels 
[%s] and the currently set number of input " +
+                                               "channels [%s].",
+                                               inputChannels.size(),
+                                               numberOfInputChannels));
+                               }
+
+                               for (InputChannel inputChannel : 
inputChannels.values()) {
+                                       
inputChannel.requestSubpartition(consumedSubpartitionIndex);
+                               }
+                       }
+
+                       requestedPartitionsFlag = true;
+               }
+       }
+
        // 
------------------------------------------------------------------------
        // Properties
        // 
------------------------------------------------------------------------
@@ -436,30 +462,6 @@ public class SingleInputGate extends InputGate {
                return hasReceivedAllEndOfPartitionEvents;
        }
 
-       @Override
-       public void requestPartitions() throws IOException, 
InterruptedException {
-               synchronized (requestLock) {
-                       if (!requestedPartitionsFlag) {
-                               if (closeFuture.isDone()) {
-                                       throw new 
IllegalStateException("Already released.");
-                               }
-
-                               // Sanity checks
-                               if (numberOfInputChannels != 
inputChannels.size()) {
-                                       throw new IllegalStateException("Bug in 
input gate setup logic: mismatch between " +
-                                                       "number of total input 
channels and the currently set number of input " +
-                                                       "channels.");
-                               }
-
-                               for (InputChannel inputChannel : 
inputChannels.values()) {
-                                       
inputChannel.requestSubpartition(consumedSubpartitionIndex);
-                               }
-                       }
-
-                       requestedPartitionsFlag = true;
-               }
-       }
-
        // 
------------------------------------------------------------------------
        // Consume
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 2b5b5c1..c612044 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -86,9 +86,6 @@ public class UnionInputGate extends InputGate {
         */
        private final Map<InputGate, Integer> inputGateToIndexOffsetMap;
 
-       /** Flag indicating whether partitions have been requested. */
-       private boolean requestedPartitionsFlag;
-
        public UnionInputGate(InputGate... inputGates) {
                this.inputGates = checkNotNull(inputGates);
                checkArgument(inputGates.length > 1, "Union input gate should 
union at least two input gates.");
@@ -142,17 +139,6 @@ public class UnionInputGate extends InputGate {
        }
 
        @Override
-       public void requestPartitions() throws IOException, 
InterruptedException {
-               if (!requestedPartitionsFlag) {
-                       for (InputGate inputGate : inputGates) {
-                               inputGate.requestPartitions();
-                       }
-
-                       requestedPartitionsFlag = true;
-               }
-       }
-
-       @Override
        public Optional<BufferOrEvent> getNext() throws IOException, 
InterruptedException {
                return getNextBufferOrEvent(true);
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
index 27d01d5..5d2cfd9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
@@ -66,11 +66,6 @@ public class InputGateWithMetrics extends InputGate {
        }
 
        @Override
-       public void requestPartitions() throws IOException, 
InterruptedException {
-               inputGate.requestPartitions();
-       }
-
-       @Override
        public Optional<BufferOrEvent> getNext() throws IOException, 
InterruptedException {
                return updateMetrics(inputGate.getNext());
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index cad957f..20eae98 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -28,7 +28,6 @@ import 
org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.TestingConnectionManager;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
@@ -192,51 +191,52 @@ public class SingleInputGateTest extends 
InputGateTestBase {
 
                // Setup reader with one local and one unknown input channel
 
-               final SingleInputGate inputGate = createInputGate();
-               final BufferPool bufferPool = mock(BufferPool.class);
-               
when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
-
-               inputGate.setBufferPool(bufferPool);
-
-               // Local
-               ResultPartitionID localPartitionId = new ResultPartitionID();
+               NettyShuffleEnvironment environment = 
createNettyShuffleEnvironment();
+               final SingleInputGate inputGate = createInputGate(environment, 
2, ResultPartitionType.PIPELINED);
+               try {
+                       // Local
+                       ResultPartitionID localPartitionId = new 
ResultPartitionID();
 
-               InputChannelBuilder.newBuilder()
-                       .setPartitionId(localPartitionId)
-                       .setPartitionManager(partitionManager)
-                       .setTaskEventPublisher(taskEventDispatcher)
-                       .buildLocalAndSetToGate(inputGate);
+                       InputChannelBuilder.newBuilder()
+                               .setPartitionId(localPartitionId)
+                               .setPartitionManager(partitionManager)
+                               .setTaskEventPublisher(taskEventDispatcher)
+                               .buildLocalAndSetToGate(inputGate);
 
-               // Unknown
-               ResultPartitionID unknownPartitionId = new ResultPartitionID();
+                       // Unknown
+                       ResultPartitionID unknownPartitionId = new 
ResultPartitionID();
 
-               InputChannelBuilder.newBuilder()
-                       .setChannelIndex(1)
-                       .setPartitionId(unknownPartitionId)
-                       .setPartitionManager(partitionManager)
-                       .setTaskEventPublisher(taskEventDispatcher)
-                       .buildUnknownAndSetToGate(inputGate);
+                       InputChannelBuilder.newBuilder()
+                               .setChannelIndex(1)
+                               .setPartitionId(unknownPartitionId)
+                               .setPartitionManager(partitionManager)
+                               .setTaskEventPublisher(taskEventDispatcher)
+                               .buildUnknownAndSetToGate(inputGate);
 
-               // Request partitions
-               inputGate.requestPartitions();
+                       inputGate.setup();
 
-               // Only the local channel can request
-               verify(partitionManager, 
times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), 
any(BufferAvailabilityListener.class));
+                       // Only the local channel can request
+                       verify(partitionManager, 
times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), 
any(BufferAvailabilityListener.class));
 
-               // Send event backwards and initialize unknown channel 
afterwards
-               final TaskEvent event = new TestTaskEvent();
-               inputGate.sendTaskEvent(event);
+                       // Send event backwards and initialize unknown channel 
afterwards
+                       final TaskEvent event = new TestTaskEvent();
+                       inputGate.sendTaskEvent(event);
 
-               // Only the local channel can send out the event
-               verify(taskEventDispatcher, 
times(1)).publish(any(ResultPartitionID.class), any(TaskEvent.class));
+                       // Only the local channel can send out the event
+                       verify(taskEventDispatcher, 
times(1)).publish(any(ResultPartitionID.class), any(TaskEvent.class));
 
-               // After the update, the pending event should be send to local 
channel
+                       // After the update, the pending event should be send 
to local channel
 
-               ResourceID location = ResourceID.generate();
-               inputGate.updateInputChannel(location, 
createRemoteWithIdAndLocation(unknownPartitionId.getPartitionId(), location));
+                       ResourceID location = ResourceID.generate();
+                       inputGate.updateInputChannel(location, 
createRemoteWithIdAndLocation(unknownPartitionId.getPartitionId(), location));
 
-               verify(partitionManager, 
times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), 
any(BufferAvailabilityListener.class));
-               verify(taskEventDispatcher, 
times(2)).publish(any(ResultPartitionID.class), any(TaskEvent.class));
+                       verify(partitionManager, 
times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), 
any(BufferAvailabilityListener.class));
+                       verify(taskEventDispatcher, 
times(2)).publish(any(ResultPartitionID.class), any(TaskEvent.class));
+               }
+               finally {
+                       inputGate.close();
+                       environment.close();
+               }
        }
 
        /**
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
index 552818d..182afef 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
@@ -152,9 +152,6 @@ public class CheckpointBarrierAlignerMassiveRandomTest {
                }
 
                @Override
-               public void requestPartitions() {}
-
-               @Override
                public Optional<BufferOrEvent> getNext() throws IOException, 
InterruptedException {
                        currentChannel = (currentChannel + 1) % 
numberOfChannels;
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index 8cb6848..7cc7194 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -98,10 +98,6 @@ public class MockInputGate extends InputGate {
        }
 
        @Override
-       public void requestPartitions() {
-       }
-
-       @Override
        public void sendTaskEvent(TaskEvent event) {
        }
 

Reply via email to