This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c27d0d68dc76ef4818f12296b28d78d6a78d4965
Author: Piotr Nowojski <[email protected]>
AuthorDate: Mon Jul 1 11:57:33 2019 +0200

    [FLINK-13013][network] Request partitions during InputGate#setup
    
    Before partitions were being requested on first polling/getting next buffer
    which was causing a couple of issues:
    - it was a little bit confusing
    - after first requestPartitions call, this was causing unnecessary 
synchronisation overhead
    - this was preventing data notifications to come through and isAvailable() 
future was always not
      completed before the first attempt to read the data from the input gate
    
    This commit moves requesting partitions to InputGate#setup solving those 
issues.
---
 .../io/network/partition/consumer/InputGate.java   |  2 +-
 .../consumer/RemoteChannelStateChecker.java        |  9 +-
 .../partition/consumer/SingleInputGate.java        |  5 +-
 .../network/partition/consumer/UnionInputGate.java |  3 -
 .../runtime/taskmanager/InputGateWithMetrics.java  |  2 +-
 .../org/apache/flink/runtime/taskmanager/Task.java |  4 +-
 .../io/network/NettyShuffleEnvironmentTest.java    | 30 +++----
 .../runtime/io/network/buffer/NoOpBufferPool.java  | 95 ++++++++++++++++++++++
 .../network/partition/InputChannelTestUtils.java   | 21 +++++
 .../network/partition/InputGateFairnessTest.java   | 29 ++++++-
 .../io/network/partition/PartitionTestUtils.java   |  2 +
 .../consumer/InputBuffersMetricsTest.java          | 36 ++++++--
 .../partition/consumer/LocalInputChannelTest.java  |  8 +-
 .../partition/consumer/SingleInputGateBuilder.java |  5 ++
 .../partition/consumer/SingleInputGateTest.java    | 23 +++++-
 .../apache/flink/runtime/taskmanager/TaskTest.java | 26 ++++--
 .../StreamNetworkBenchmarkEnvironment.java         |  3 +-
 .../StreamNetworkPointToPointBenchmark.java        |  2 +-
 .../StreamNetworkThroughputBenchmark.java          |  2 +-
 19 files changed, 252 insertions(+), 55 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 0ce446b..e9f2399 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -130,5 +130,5 @@ public abstract class InputGate implements 
AsyncDataInput<BufferOrEvent>, AutoCl
        /**
         * Setup gate, potentially heavy-weight, blocking operation comparing 
to just creation.
         */
-       public abstract void setup() throws IOException;
+       public abstract void setup() throws IOException, InterruptedException;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteChannelStateChecker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteChannelStateChecker.java
index 69ee3fd..4bcfb4c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteChannelStateChecker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteChannelStateChecker.java
@@ -46,7 +46,8 @@ public class RemoteChannelStateChecker {
 
        public boolean isProducerReadyOrAbortConsumption(ResponseHandle 
responseHandle) {
                Either<ExecutionState, Throwable> result = 
responseHandle.getProducerExecutionState();
-               if (responseHandle.getConsumerExecutionState() != 
ExecutionState.RUNNING) {
+               ExecutionState consumerExecutionState = 
responseHandle.getConsumerExecutionState();
+               if 
(!isConsumerStateValidForConsumption(consumerExecutionState)) {
                        LOG.debug(
                                "Ignore a partition producer state notification 
for task {}, because it's not running.",
                                taskNameWithSubtask);
@@ -64,6 +65,12 @@ public class RemoteChannelStateChecker {
                return false;
        }
 
+       private static boolean isConsumerStateValidForConsumption(
+                       ExecutionState consumerExecutionState) {
+               return consumerExecutionState == ExecutionState.RUNNING ||
+                       consumerExecutionState == ExecutionState.DEPLOYING;
+       }
+
        private boolean isProducerConsumerReady(ResponseHandle responseHandle) {
                ExecutionState producerState = getProducerState(responseHandle);
                return producerState == ExecutionState.SCHEDULED ||
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index b23572d..696dbe8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -202,7 +202,7 @@ public class SingleInputGate extends InputGate {
        }
 
        @Override
-       public void setup() throws IOException {
+       public void setup() throws IOException, InterruptedException {
                checkState(this.bufferPool == null, "Bug in input gate setup 
logic: Already registered buffer pool.");
                if (isCreditBased) {
                        // assign exclusive buffers to input channels directly 
and use the rest for floating buffers
@@ -211,6 +211,8 @@ public class SingleInputGate extends InputGate {
 
                BufferPool bufferPool = bufferPoolFactory.get();
                setBufferPool(bufferPool);
+
+               requestPartitions();
        }
 
        // 
------------------------------------------------------------------------
@@ -481,7 +483,6 @@ public class SingleInputGate extends InputGate {
                        throw new IllegalStateException("Released");
                }
 
-               requestPartitions();
                Optional<InputWithData<InputChannel, BufferAndAvailability>> 
next = waitAndGetNextData(blocking);
                if (!next.isPresent()) {
                        return Optional.empty();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 65a15ff..2b5b5c1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -167,9 +167,6 @@ public class UnionInputGate extends InputGate {
                        return Optional.empty();
                }
 
-               // Make sure to request the partitions, if they have not been 
requested before.
-               requestPartitions();
-
                Optional<InputWithData<InputGate, BufferOrEvent>> next = 
waitAndGetNextData(blocking);
                if (!next.isPresent()) {
                        return Optional.empty();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
index 669c02e..27d01d5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
@@ -61,7 +61,7 @@ public class InputGateWithMetrics extends InputGate {
        }
 
        @Override
-       public void setup() throws IOException {
+       public void setup() throws IOException, InterruptedException {
                inputGate.setup();
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 62886a5..4355821 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -834,12 +834,14 @@ public class Task implements Runnable, TaskActions, 
PartitionProducerStateProvid
 
        @VisibleForTesting
        public static void setupPartitionsAndGates(
-               ResultPartitionWriter[] producedPartitions, InputGate[] 
inputGates) throws IOException {
+               ResultPartitionWriter[] producedPartitions, InputGate[] 
inputGates) throws IOException, InterruptedException {
 
                for (ResultPartitionWriter partition : producedPartitions) {
                        partition.setup();
                }
 
+               // InputGates must be initialized after the partitions, since 
during InputGate#setup
+               // we are requesting partitions
                for (InputGate gate : inputGates) {
                        gate.setup();
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
index ab847ad..ba70a47 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
@@ -155,23 +155,19 @@ public class NettyShuffleEnvironmentTest extends 
TestLogger {
                SingleInputGate ig4 = createSingleInputGate(network, 
ResultPartitionType.PIPELINED_BOUNDED, 4);
                final SingleInputGate[] inputGates = new SingleInputGate[] 
{ig1, ig2, ig3, ig4};
 
-               // set up remote input channels for the exclusive buffers of 
the credit-based flow control
-               // (note that this does not obey the partition types which is 
ok for the scope of the test)
-               if (enableCreditBasedFlowControl) {
-                       createRemoteInputChannel(ig4, 0, rp1, connManager, 
network.getNetworkBufferPool());
-                       createRemoteInputChannel(ig4, 0, rp2, connManager, 
network.getNetworkBufferPool());
-                       createRemoteInputChannel(ig4, 0, rp3, connManager, 
network.getNetworkBufferPool());
-                       createRemoteInputChannel(ig4, 0, rp4, connManager, 
network.getNetworkBufferPool());
-
-                       createRemoteInputChannel(ig1, 1, rp1, connManager, 
network.getNetworkBufferPool());
-                       createRemoteInputChannel(ig1, 1, rp4, connManager, 
network.getNetworkBufferPool());
-
-                       createRemoteInputChannel(ig2, 1, rp2, connManager, 
network.getNetworkBufferPool());
-                       createRemoteInputChannel(ig2, 2, rp4, connManager, 
network.getNetworkBufferPool());
-
-                       createRemoteInputChannel(ig3, 1, rp3, connManager, 
network.getNetworkBufferPool());
-                       createRemoteInputChannel(ig3, 3, rp4, connManager, 
network.getNetworkBufferPool());
-               }
+               createRemoteInputChannel(ig4, 0, rp1, connManager, 
network.getNetworkBufferPool());
+               createRemoteInputChannel(ig4, 0, rp2, connManager, 
network.getNetworkBufferPool());
+               createRemoteInputChannel(ig4, 0, rp3, connManager, 
network.getNetworkBufferPool());
+               createRemoteInputChannel(ig4, 0, rp4, connManager, 
network.getNetworkBufferPool());
+
+               createRemoteInputChannel(ig1, 1, rp1, connManager, 
network.getNetworkBufferPool());
+               createRemoteInputChannel(ig1, 1, rp4, connManager, 
network.getNetworkBufferPool());
+
+               createRemoteInputChannel(ig2, 1, rp2, connManager, 
network.getNetworkBufferPool());
+               createRemoteInputChannel(ig2, 2, rp4, connManager, 
network.getNetworkBufferPool());
+
+               createRemoteInputChannel(ig3, 1, rp3, connManager, 
network.getNetworkBufferPool());
+               createRemoteInputChannel(ig3, 3, rp4, connManager, 
network.getNetworkBufferPool());
 
                Task.setupPartitionsAndGates(resultPartitions, inputGates);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
new file mode 100644
index 0000000..04c9c04
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
@@ -0,0 +1,95 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+// We have it in this package because we could not mock the methods otherwise
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+
+/**
+ * No-op implementation of {@link BufferPool}.
+ */
+public class NoOpBufferPool implements BufferPool {
+
+       @Override
+       public void lazyDestroy() {
+       }
+
+       @Override
+       public Buffer requestBuffer() throws IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public Buffer requestBufferBlocking() throws IOException, 
InterruptedException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public BufferBuilder requestBufferBuilderBlocking() throws IOException, 
InterruptedException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public boolean addBufferListener(BufferListener listener) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public boolean isDestroyed() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public int getNumberOfRequiredMemorySegments() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public int getMaxNumberOfMemorySegments() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public int getNumBuffers() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void setNumBuffers(int numBuffers) throws IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public int getNumberOfAvailableMemorySegments() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public int bestEffortGetNumOfUsedBuffers() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void recycle(MemorySegment memorySegment) {
+               throw new UnsupportedOperationException();
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index 16d6cab..7805ef8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.memory.MemorySegmentProvider;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
@@ -195,4 +196,24 @@ public class InputChannelTestUtils {
                public void recycleMemorySegments(Collection<MemorySegment> 
segments) {
                }
        }
+
+       /**
+        * {@link MemorySegmentProvider} that provides unpooled {@link 
MemorySegment}s.
+        */
+       public static class UnpooledMemorySegmentProvider implements 
MemorySegmentProvider {
+               private final int pageSize;
+
+               public UnpooledMemorySegmentProvider(int pageSize) {
+                       this.pageSize = pageSize;
+               }
+
+               @Override
+               public Collection<MemorySegment> requestMemorySegments() {
+                       return 
Collections.singletonList(MemorySegmentFactory.allocateUnpooledSegment(pageSize));
+               }
+
+               @Override
+               public void recycleMemorySegments(Collection<MemorySegment> 
segments) {
+               }
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index 2bf5a09..da05f83 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -24,8 +24,11 @@ import 
org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NoOpBufferPool;
+import 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.UnpooledMemorySegmentProvider;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
@@ -47,7 +50,6 @@ import java.util.Optional;
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager;
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createLocalInputChannel;
-import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel;
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createResultPartitionManager;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -93,6 +95,8 @@ public class InputGateFairnessTest {
                        createLocalInputChannel(gate, i, 
resultPartitionManager);
                }
 
+               gate.setup();
+
                // read all the buffers and the EOF event
                for (int i = numberOfChannels * (buffersPerChannel + 1); i > 0; 
--i) {
                        assertNotNull(gate.getNext());
@@ -141,6 +145,8 @@ public class InputGateFairnessTest {
                        // seed one initial buffer
                        sources[12].add(bufferConsumer.copy());
 
+                       gate.setup();
+
                        // read all the buffers and the EOF event
                        for (int i = 0; i < numberOfChannels * 
buffersPerChannel; i++) {
                                assertNotNull(gate.getNext());
@@ -190,6 +196,8 @@ public class InputGateFairnessTest {
                        
channel.onBuffer(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), 
buffersPerChannel, -1);
                }
 
+               gate.setup();
+
                // read all the buffers and the EOF event
                for (int i = numberOfChannels * (buffersPerChannel + 1); i > 0; 
--i) {
                        assertNotNull(gate.getNext());
@@ -233,6 +241,8 @@ public class InputGateFairnessTest {
                channels[11].onBuffer(mockBuffer, 0, -1);
                channelSequenceNums[11]++;
 
+               gate.setup();
+
                // read all the buffers and the EOF event
                for (int i = 0; i < numberOfChannels * buffersPerChannel; i++) {
                        assertNotNull(gate.getNext());
@@ -308,9 +318,8 @@ public class InputGateFairnessTest {
        // 
------------------------------------------------------------------------
 
        private static class FairnessVerifyingInputGate extends SingleInputGate 
{
-               private static final SupplierWithException<BufferPool, 
IOException> STUB_BUFFER_POOL_FACTORY = () -> {
-                       throw new UnsupportedOperationException();
-               };
+               private static final SupplierWithException<BufferPool, 
IOException> STUB_BUFFER_POOL_FACTORY =
+                       NoOpBufferPool::new;
 
                private final ArrayDeque<InputChannel> channelsWithData;
 
@@ -368,4 +377,16 @@ public class InputGateFairnessTest {
                        uniquenessChecker.clear();
                }
        }
+
+       public static RemoteInputChannel createRemoteInputChannel(
+               SingleInputGate inputGate,
+               int channelIndex,
+               ConnectionManager connectionManager) {
+
+               return InputChannelBuilder.newBuilder()
+                       .setChannelIndex(channelIndex)
+                       .setConnectionManager(connectionManager)
+                       .setMemorySegmentProvider(new 
UnpooledMemorySegmentProvider(32 * 1024))
+                       .buildRemoteAndSetToGate(inputGate);
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index e559659..5e39a43 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -63,6 +63,7 @@ public class PartitionTestUtils {
                        ResultPartitionType partitionType,
                        int numChannels) {
                return new ResultPartitionBuilder()
+                       
.setResultPartitionManager(environment.getResultPartitionManager())
                        
.setupBufferPoolFactoryFromNettyShuffleEnvironment(environment)
                        .setResultPartitionType(partitionType)
                        .setNumberOfSubpartitions(numChannels)
@@ -75,6 +76,7 @@ public class PartitionTestUtils {
                        ResultPartitionType partitionType,
                        int numChannels) {
                return new ResultPartitionBuilder()
+                       
.setResultPartitionManager(environment.getResultPartitionManager())
                        
.setupBufferPoolFactoryFromNettyShuffleEnvironment(environment)
                        .setFileChannelManager(channelManager)
                        .setResultPartitionType(partitionType)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
index 8d868cc..9b17a2e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
@@ -27,6 +27,8 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
 import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
 import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.util.TestLogger;
 
@@ -58,7 +60,7 @@ public class InputBuffersMetricsTest extends TestLogger {
        }
 
        @Test
-       public void testCalculateTotalBuffersSize() throws IOException {
+       public void testCalculateTotalBuffersSize() throws Exception {
                int numberOfRemoteChannels = 2;
                int numberOfLocalChannels = 0;
 
@@ -76,6 +78,7 @@ public class InputBuffersMetricsTest extends TestLogger {
                        numberOfRemoteChannels,
                        numberOfLocalChannels).f0;
                closeableRegistry.registerCloseable(inputGate1::close);
+               inputGate1.setup();
 
                SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
                FloatingBuffersUsageGauge floatingBuffersUsageGauge = new 
FloatingBuffersUsageGauge(inputGates);
@@ -94,7 +97,7 @@ public class InputBuffersMetricsTest extends TestLogger {
        }
 
        @Test
-       public void testExclusiveBuffersUsage() throws IOException {
+       public void testExclusiveBuffersUsage() throws Exception {
                int numberOfRemoteChannelsGate1 = 2;
                int numberOfLocalChannelsGate1 = 0;
                int numberOfRemoteChannelsGate2 = 1;
@@ -124,6 +127,8 @@ public class InputBuffersMetricsTest extends TestLogger {
                SingleInputGate inputGate2 = tuple2.f0;
                closeableRegistry.registerCloseable(inputGate1::close);
                closeableRegistry.registerCloseable(inputGate2::close);
+               inputGate1.setup();
+               inputGate2.setup();
 
                List<RemoteInputChannel> remoteInputChannels = tuple1.f1;
 
@@ -155,7 +160,7 @@ public class InputBuffersMetricsTest extends TestLogger {
        }
 
        @Test
-       public void testFloatingBuffersUsage() throws IOException, 
InterruptedException {
+       public void testFloatingBuffersUsage() throws Exception {
 
                int numberOfRemoteChannelsGate1 = 2;
                int numberOfLocalChannelsGate1 = 0;
@@ -185,6 +190,8 @@ public class InputBuffersMetricsTest extends TestLogger {
                SingleInputGate inputGate1 = tuple1.f0;
                closeableRegistry.registerCloseable(inputGate1::close);
                closeableRegistry.registerCloseable(inputGate2::close);
+               inputGate1.setup();
+               inputGate2.setup();
 
                RemoteInputChannel remoteInputChannel1 = tuple1.f1.get(0);
 
@@ -250,7 +257,7 @@ public class InputBuffersMetricsTest extends TestLogger {
        private Tuple2<SingleInputGate, List<RemoteInputChannel>> 
buildInputGate(
                NettyShuffleEnvironment network,
                int numberOfRemoteChannels,
-               int numberOfLocalChannels) throws IOException {
+               int numberOfLocalChannels) throws Exception {
 
                SingleInputGate inputGate = new SingleInputGateBuilder()
                        .setNumberOfChannels(numberOfRemoteChannels + 
numberOfLocalChannels)
@@ -262,22 +269,31 @@ public class InputBuffersMetricsTest extends TestLogger {
 
                int channelIdx = 0;
                for (int i = 0; i < numberOfRemoteChannels; i++) {
-                       res.f1.add(buildRemoteChannel(channelIdx, inputGate, 
network));
+                       ResultPartition partition = 
PartitionTestUtils.createPartition(network, 
ResultPartitionType.PIPELINED_BOUNDED, 1);
+                       closeableRegistry.registerCloseable(partition::close);
+                       partition.setup();
+
+                       res.f1.add(buildRemoteChannel(channelIdx, inputGate, 
network, partition));
                        channelIdx++;
                }
 
                for (int i = 0; i < numberOfLocalChannels; i++) {
-                       buildLocalChannel(channelIdx, inputGate, network);
+                       ResultPartition partition = 
PartitionTestUtils.createPartition(network, 
ResultPartitionType.PIPELINED_BOUNDED, 1);
+                       closeableRegistry.registerCloseable(partition::close);
+                       partition.setup();
+
+                       buildLocalChannel(channelIdx, inputGate, network, 
partition);
                }
-               inputGate.setup();
                return res;
        }
 
        private RemoteInputChannel buildRemoteChannel(
                int channelIndex,
                SingleInputGate inputGate,
-               NettyShuffleEnvironment network) {
+               NettyShuffleEnvironment network,
+               ResultPartition partition) {
                return new InputChannelBuilder()
+                       .setPartitionId(partition.getPartitionId())
                        .setChannelIndex(channelIndex)
                        .setupFromNettyShuffleEnvironment(network)
                        .setConnectionManager(new TestingConnectionManager())
@@ -287,8 +303,10 @@ public class InputBuffersMetricsTest extends TestLogger {
        private void buildLocalChannel(
                int channelIndex,
                SingleInputGate inputGate,
-               NettyShuffleEnvironment network) {
+               NettyShuffleEnvironment network,
+               ResultPartition partition) {
                new InputChannelBuilder()
+                       .setPartitionId(partition.getPartitionId())
                        .setChannelIndex(channelIndex)
                        .setupFromNettyShuffleEnvironment(network)
                        .setConnectionManager(new TestingConnectionManager())
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 72404ce..fd7cdd1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -493,7 +493,7 @@ public class LocalInputChannelTest {
                                BufferPool bufferPool,
                                ResultPartitionManager partitionManager,
                                TaskEventDispatcher taskEventDispatcher,
-                               ResultPartitionID[] consumedPartitionIds) {
+                               ResultPartitionID[] consumedPartitionIds) 
throws IOException, InterruptedException {
 
                        checkArgument(numberOfInputChannels >= 1);
                        checkArgument(numberOfExpectedBuffersPerChannel >= 1);
@@ -501,11 +501,9 @@ public class LocalInputChannelTest {
                        this.inputGate = new SingleInputGateBuilder()
                                .setConsumedSubpartitionIndex(subpartitionIndex)
                                .setNumberOfChannels(numberOfInputChannels)
+                               .setBufferPoolFactory(bufferPool)
                                .build();
 
-                       // Set buffer pool
-                       inputGate.setBufferPool(bufferPool);
-
                        // Setup input channels
                        for (int i = 0; i < numberOfInputChannels; i++) {
                                InputChannelBuilder.newBuilder()
@@ -516,6 +514,8 @@ public class LocalInputChannelTest {
                                        .buildLocalAndSetToGate(inputGate);
                        }
 
+                       inputGate.setup();
+
                        this.numberOfInputChannels = numberOfInputChannels;
                        this.numberOfExpectedBuffersPerChannel = 
numberOfExpectedBuffersPerChannel;
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index b23af06..956bad9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -87,6 +87,11 @@ public class SingleInputGateBuilder {
                return this;
        }
 
+       public SingleInputGateBuilder setBufferPoolFactory(BufferPool 
bufferPool) {
+               this.bufferPoolFactory = () -> bufferPool;
+               return this;
+       }
+
        public SingleInputGate build() {
                return new SingleInputGate(
                        "Single Input Gate",
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index d737949..cad957f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
@@ -34,6 +35,8 @@ import 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -419,6 +422,7 @@ public class SingleInputGateTest extends InputGateTestBase {
                        RemoteInputChannel remote =
                                InputChannelBuilder.newBuilder()
                                        
.setupFromNettyShuffleEnvironment(network)
+                                       .setConnectionManager(new 
TestingConnectionManager())
                                        .buildRemoteAndSetToGate(inputGate);
                        inputGate.setup();
 
@@ -498,13 +502,27 @@ public class SingleInputGateTest extends 
InputGateTestBase {
        @Test
        public void testUpdateUnknownInputChannel() throws Exception {
                final NettyShuffleEnvironment network = 
createNettyShuffleEnvironment();
+
+               final ResultPartition localResultPartition = new 
ResultPartitionBuilder()
+                       
.setResultPartitionManager(network.getResultPartitionManager())
+                       
.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+                       .build();
+
+               final ResultPartition remoteResultPartition = new 
ResultPartitionBuilder()
+                       
.setResultPartitionManager(network.getResultPartitionManager())
+                       
.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+                       .build();
+
+               localResultPartition.setup();
+               remoteResultPartition.setup();
+
                final SingleInputGate inputGate = createInputGate(network, 2, 
ResultPartitionType.PIPELINED);
 
                try {
-                       final ResultPartitionID localResultPartitionId = new 
ResultPartitionID();
+                       final ResultPartitionID localResultPartitionId = 
localResultPartition.getPartitionId();
                        addUnknownInputChannel(network, inputGate, 
localResultPartitionId, 0);
 
-                       final ResultPartitionID remoteResultPartitionId = new 
ResultPartitionID();
+                       final ResultPartitionID remoteResultPartitionId = 
remoteResultPartition.getPartitionId();
                        addUnknownInputChannel(network, inputGate, 
remoteResultPartitionId, 1);
 
                        inputGate.setup();
@@ -628,6 +646,7 @@ public class SingleInputGateTest extends InputGateTestBase {
                        .setChannelIndex(channelIndex)
                        .setPartitionId(partitionId)
                        .setupFromNettyShuffleEnvironment(network)
+                       .setConnectionManager(new TestingConnectionManager())
                        .buildUnknownAndSetToGate(inputGate);
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 46af3e5..ee78963 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -565,7 +565,21 @@ public class TaskTest extends TestLogger {
        }
 
        @Test
-       public void testOnPartitionStateUpdate() throws Exception {
+       public void testOnPartitionStateUpdateWhileRunning() throws Exception {
+               testOnPartitionStateUpdate(ExecutionState.RUNNING);
+       }
+
+       /**
+        * Partition state updates can also happen when {@link Task} is in
+        * {@link ExecutionState#DEPLOYING} state, because we are requesting 
for partitions during
+        * setting up input gates.
+        */
+       @Test
+       public void testOnPartitionStateUpdateWhileDeploying() throws Exception 
{
+               testOnPartitionStateUpdate(ExecutionState.DEPLOYING);
+       }
+
+       public void testOnPartitionStateUpdate(ExecutionState initialTaskState) 
throws Exception {
                final ResultPartitionID partitionId = new ResultPartitionID();
 
                final Task task = createTaskBuilder()
@@ -583,10 +597,10 @@ public class TaskTest extends TestLogger {
                        expected.put(state, ExecutionState.FAILED);
                }
 
-               expected.put(ExecutionState.RUNNING, ExecutionState.RUNNING);
-               expected.put(ExecutionState.SCHEDULED, ExecutionState.RUNNING);
-               expected.put(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
-               expected.put(ExecutionState.FINISHED, ExecutionState.RUNNING);
+               expected.put(ExecutionState.RUNNING, initialTaskState);
+               expected.put(ExecutionState.SCHEDULED, initialTaskState);
+               expected.put(ExecutionState.DEPLOYING, initialTaskState);
+               expected.put(ExecutionState.FINISHED, initialTaskState);
 
                expected.put(ExecutionState.CANCELED, ExecutionState.CANCELING);
                expected.put(ExecutionState.CANCELING, 
ExecutionState.CANCELING);
@@ -594,7 +608,7 @@ public class TaskTest extends TestLogger {
 
                int producingStateCounter = 0;
                for (ExecutionState state : ExecutionState.values()) {
-                       setState(task, ExecutionState.RUNNING);
+                       setState(task, initialTaskState);
 
                        if (checker.isProducerReadyOrAbortConsumption(task.new 
PartitionProducerStateResponseHandle(state, null))) {
                                producingStateCounter++;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 0cdc658..8351b3c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -50,7 +50,6 @@ import 
org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Arrays;
@@ -231,7 +230,7 @@ public class StreamNetworkBenchmarkEnvironment<T extends 
IOReadableWritable> {
                return consumableNotifyingPartitionWriter;
        }
 
-       private InputGate createInputGate(TaskManagerLocation senderLocation) 
throws IOException {
+       private InputGate createInputGate(TaskManagerLocation senderLocation) 
throws Exception {
                InputGate[] gates = new InputGate[channels];
                for (int channel = 0; channel < channels; ++channel) {
                        final InputGateDeploymentDescriptor gateDescriptor = 
createInputGateDeploymentDescriptor(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
index 7488688..bb6b9e2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
@@ -77,8 +77,8 @@ public class StreamNetworkPointToPointBenchmark {
                environment = new StreamNetworkBenchmarkEnvironment<>();
                environment.setUp(1, 1, false, false, -1, -1, config);
 
-               receiver = environment.createReceiver();
                recordWriter = environment.createRecordWriter(0, flushTimeout);
+               receiver = environment.createReceiver();
        }
 
        /**
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
index 0586f54..b5d1c07 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
@@ -110,7 +110,6 @@ public class StreamNetworkThroughputBenchmark {
                        senderBufferPoolSize,
                        receiverBufferPoolSize,
                        config);
-               receiver = environment.createReceiver();
                writerThreads = new LongRecordWriterThread[recordWriters];
                for (int writer = 0; writer < recordWriters; writer++) {
                        writerThreads[writer] = new LongRecordWriterThread(
@@ -118,6 +117,7 @@ public class StreamNetworkThroughputBenchmark {
                                broadcastMode);
                        writerThreads[writer].start();
                }
+               receiver = environment.createReceiver();
        }
 
        /**

Reply via email to