This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6349db7df4576eaa637d449ee45d76d7232c48ca
Author: Andrey Zagrebin <azagre...@gmail.com>
AuthorDate: Thu May 30 15:16:16 2019 +0200

    [FLINK-11392][network] Introduce ShuffleEnvironment interface
---
 .../runtime/io/network/NetworkEnvironment.java     | 107 ++++++-------
 .../network/api/writer/ResultPartitionWriter.java  |  12 +-
 .../flink/runtime/shuffle/ShuffleEnvironment.java  | 166 +++++++++++++++++++++
 .../runtime/shuffle/ShuffleEnvironmentContext.java |  94 ++++++++++++
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  15 +-
 .../runtime/taskexecutor/TaskManagerRunner.java    |  13 +-
 .../runtime/taskexecutor/TaskManagerServices.java  |  74 ++++-----
 .../TaskManagerServicesConfiguration.java          |  59 +++++++-
 .../org/apache/flink/runtime/taskmanager/Task.java |  12 +-
 .../runtime/util/ConfigurationParserUtils.java     |   8 +-
 .../io/network/NetworkEnvironmentBuilder.java      |   2 +-
 .../runtime/io/network/NetworkEnvironmentTest.java |   4 +-
 .../io/network/partition/ResultPartitionTest.java  |   2 +-
 .../partition/consumer/SingleInputGateTest.java    |  12 +-
 .../TaskExecutorLocalStateStoresManagerTest.java   |  18 +--
 .../taskexecutor/TaskExecutorSubmissionTest.java   |  18 +--
 .../runtime/taskexecutor/TaskExecutorTest.java     |  10 +-
 .../taskexecutor/TaskManagerServicesBuilder.java   |  12 +-
 .../TaskSubmissionTestEnvironment.java             |  45 +++---
 .../runtime/taskmanager/TaskAsyncCallTest.java     |  14 +-
 .../apache/flink/runtime/taskmanager/TaskTest.java |  27 ++--
 .../runtime/util/JvmExitOnFatalErrorTest.java      |   6 +-
 .../StreamNetworkBenchmarkEnvironment.java         |   4 +-
 .../tasks/InterruptSensitiveRestoreTest.java       |   6 +-
 .../runtime/tasks/StreamTaskTerminationTest.java   |   6 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    |   6 +-
 .../runtime/tasks/SynchronousCheckpointITCase.java |   6 +-
 .../tasks/TaskCheckpointingBehaviourTest.java      |   6 +-
 28 files changed, 539 insertions(+), 225 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 01c62ba..8483f84 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -50,7 +49,8 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFac
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
-import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.util.Preconditions;
 
@@ -58,7 +58,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.InetAddress;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
@@ -68,10 +68,11 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Network I/O components of each {@link TaskExecutor} instance. The network 
environment contains
- * the data structures that keep track of all intermediate results and all 
data exchanges.
+ * The implementation of {@link ShuffleEnvironment} based on netty network 
communication, local memory and disk files.
+ * The network environment contains the data structures that keep track of all 
intermediate results
+ * and shuffle data exchanges.
  */
-public class NetworkEnvironment {
+public class NetworkEnvironment implements ShuffleEnvironment<ResultPartition, 
SingleInputGate> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(NetworkEnvironment.class);
 
@@ -102,7 +103,7 @@ public class NetworkEnvironment {
 
        private final SingleInputGateFactory singleInputGateFactory;
 
-       private boolean isShutdown;
+       private boolean isClosed;
 
        private NetworkEnvironment(
                        ResourceID taskExecutorLocation,
@@ -120,12 +121,12 @@ public class NetworkEnvironment {
                this.inputGatesById = new ConcurrentHashMap<>();
                this.resultPartitionFactory = resultPartitionFactory;
                this.singleInputGateFactory = singleInputGateFactory;
-               this.isShutdown = false;
+               this.isClosed = false;
        }
 
        public static NetworkEnvironment create(
-                       ResourceID taskExecutorLocation,
                        NetworkEnvironmentConfiguration config,
+                       ResourceID taskExecutorLocation,
                        TaskEventPublisher taskEventPublisher,
                        MetricGroup metricGroup,
                        IOManager ioManager) {
@@ -213,11 +214,7 @@ public class NetworkEnvironment {
                return Optional.ofNullable(inputGatesById.get(id));
        }
 
-       /**
-        * Batch release intermediate result partitions.
-        *
-        * @param partitionIds partition ids to release
-        */
+       @Override
        public void releasePartitions(Collection<ResultPartitionID> 
partitionIds) {
                for (ResultPartitionID partitionId : partitionIds) {
                        resultPartitionManager.releasePartition(partitionId, 
null);
@@ -230,7 +227,8 @@ public class NetworkEnvironment {
         * @return collection of partitions which still occupy some resources 
locally on this task executor
         * and have been not released yet.
         */
-       public Collection<ResultPartitionID> getUnreleasedPartitions() {
+       @Override
+       public Collection<ResultPartitionID> 
getPartitionsOccupyingLocalResources() {
                return resultPartitionManager.getUnreleasedPartitions();
        }
 
@@ -238,54 +236,56 @@ public class NetworkEnvironment {
        //  Create Output Writers and Input Readers
        // 
--------------------------------------------------------------------------------------------
 
-       public ResultPartition[] createResultPartitionWriters(
-                       String taskName,
-                       ExecutionAttemptID executionId,
+       @Override
+       public Collection<ResultPartition> createResultPartitionWriters(
+                       String ownerName,
+                       ExecutionAttemptID executionAttemptID,
                        Collection<ResultPartitionDeploymentDescriptor> 
resultPartitionDeploymentDescriptors,
                        MetricGroup outputGroup,
                        MetricGroup buffersGroup) {
                synchronized (lock) {
-                       Preconditions.checkState(!isShutdown, "The 
NetworkEnvironment has already been shut down.");
+                       Preconditions.checkState(!isClosed, "The 
NetworkEnvironment has already been shut down.");
 
                        ResultPartition[] resultPartitions = new 
ResultPartition[resultPartitionDeploymentDescriptors.size()];
                        int counter = 0;
                        for (ResultPartitionDeploymentDescriptor rpdd : 
resultPartitionDeploymentDescriptors) {
-                               resultPartitions[counter++] = 
resultPartitionFactory.create(taskName, executionId, rpdd);
+                               resultPartitions[counter++] = 
resultPartitionFactory.create(ownerName, executionAttemptID, rpdd);
                        }
 
                        registerOutputMetrics(outputGroup, buffersGroup, 
resultPartitions);
-                       return resultPartitions;
+                       return Arrays.asList(resultPartitions);
                }
        }
 
-       public SingleInputGate[] createInputGates(
-                       String taskName,
-                       ExecutionAttemptID executionId,
+       @Override
+       public Collection<SingleInputGate> createInputGates(
+                       String ownerName,
+                       ExecutionAttemptID executionAttemptID,
                        PartitionProducerStateProvider 
partitionProducerStateProvider,
                        Collection<InputGateDeploymentDescriptor> 
inputGateDeploymentDescriptors,
                        MetricGroup parentGroup,
                        MetricGroup inputGroup,
                        MetricGroup buffersGroup) {
                synchronized (lock) {
-                       Preconditions.checkState(!isShutdown, "The 
NetworkEnvironment has already been shut down.");
+                       Preconditions.checkState(!isClosed, "The 
NetworkEnvironment has already been shut down.");
 
                        InputChannelMetrics inputChannelMetrics = new 
InputChannelMetrics(parentGroup);
                        SingleInputGate[] inputGates = new 
SingleInputGate[inputGateDeploymentDescriptors.size()];
                        int counter = 0;
                        for (InputGateDeploymentDescriptor igdd : 
inputGateDeploymentDescriptors) {
                                SingleInputGate inputGate = 
singleInputGateFactory.create(
-                                       taskName,
+                                       ownerName,
                                        igdd,
                                        partitionProducerStateProvider,
                                        inputChannelMetrics);
-                               InputGateID id = new 
InputGateID(igdd.getConsumedResultId(), executionId);
+                               InputGateID id = new 
InputGateID(igdd.getConsumedResultId(), executionAttemptID);
                                inputGatesById.put(id, inputGate);
                                inputGate.getCloseFuture().thenRun(() -> 
inputGatesById.remove(id));
                                inputGates[counter++] = inputGate;
                        }
 
                        registerInputMetrics(inputGroup, buffersGroup, 
inputGates);
-                       return inputGates;
+                       return Arrays.asList(inputGates);
                }
        }
 
@@ -305,16 +305,7 @@ public class NetworkEnvironment {
                buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new 
InputBufferPoolUsageGauge(inputGates));
        }
 
-       /**
-        * Update consuming gate with newly available partition.
-        *
-        * @param consumerID execution id of consumer to identify belonging to 
it gate.
-        * @param partitionInfo telling where the partition can be retrieved 
from
-        * @return {@code true} if the partition has been updated or {@code 
false} if the partition is not available anymore.
-        * @throws IOException IO problem by the update
-        * @throws InterruptedException potentially blocking operation was 
interrupted
-        * @throws IllegalStateException the input gate with the id from the 
partitionInfo is not found
-        */
+       @Override
        public boolean updatePartitionInfo(
                        ExecutionAttemptID consumerID,
                        PartitionInfo partitionInfo) throws IOException, 
InterruptedException {
@@ -337,9 +328,10 @@ public class NetworkEnvironment {
         *
         * @return a port to connect to the task executor for shuffle data 
exchange, -1 if only local connection is possible.
         */
+       @Override
        public int start() throws IOException {
                synchronized (lock) {
-                       Preconditions.checkState(!isShutdown, "The 
NetworkEnvironment has already been shut down.");
+                       Preconditions.checkState(!isClosed, "The 
NetworkEnvironment has already been shut down.");
 
                        LOG.info("Starting the network environment and its 
components.");
 
@@ -355,9 +347,10 @@ public class NetworkEnvironment {
        /**
         * Tries to shut down all network I/O components.
         */
-       public void shutdown() {
+       @Override
+       public void close() {
                synchronized (lock) {
-                       if (isShutdown) {
+                       if (isClosed) {
                                return;
                        }
 
@@ -392,29 +385,27 @@ public class NetworkEnvironment {
                                LOG.warn("Network buffer pool did not shut down 
properly.", t);
                        }
 
-                       isShutdown = true;
+                       isClosed = true;
                }
        }
 
-       public boolean isShutdown() {
+       public boolean isClosed() {
                synchronized (lock) {
-                       return isShutdown;
+                       return isClosed;
                }
        }
 
-       public static NetworkEnvironment fromConfiguration(
-                       Configuration configuration,
-                       TaskEventPublisher taskEventPublisher,
-                       MetricGroup metricGroup,
-                       IOManager ioManager,
-                       long maxJvmHeapMemory,
-                       boolean localTaskManagerCommunication,
-                       InetAddress taskManagerAddress) {
-               final NetworkEnvironmentConfiguration networkConfig = 
NetworkEnvironmentConfiguration.fromConfiguration(
-                       configuration,
-                       maxJvmHeapMemory,
-                       localTaskManagerCommunication,
-                       taskManagerAddress);
-               return NetworkEnvironment.create(networkConfig, 
taskEventPublisher, metricGroup, ioManager);
+       public static NetworkEnvironment 
fromShuffleContext(ShuffleEnvironmentContext shuffleEnvironmentContext) {
+               NetworkEnvironmentConfiguration networkConfig = 
NetworkEnvironmentConfiguration.fromConfiguration(
+                       shuffleEnvironmentContext.getConfiguration(),
+                       shuffleEnvironmentContext.getMaxJvmHeapMemory(),
+                       shuffleEnvironmentContext.isLocalCommunicationOnly(),
+                       shuffleEnvironmentContext.getHostAddress());
+               return create(
+                       networkConfig,
+                       shuffleEnvironmentContext.getLocation(),
+                       shuffleEnvironmentContext.getEventPublisher(),
+                       shuffleEnvironmentContext.getParentMetricGroup(),
+                       shuffleEnvironmentContext.getIOManager());
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 6c869e9..1945bb0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -28,6 +28,11 @@ import java.io.IOException;
 
 /**
  * A buffer-oriented runtime result writer API for producing results.
+ *
+ * <p>If {@link ResultPartitionWriter#close()} is called before {@link 
ResultPartitionWriter#fail(Throwable)} or
+ * {@link ResultPartitionWriter#finish()}, it abruptly triggers failure and 
cancellation of production.
+ * In this case {@link ResultPartitionWriter#fail(Throwable)} still needs to 
be called afterwards to fully release
+ * all resources associated the the partition and propagate failure cause to 
the consumer if possible.
  */
 public interface ResultPartitionWriter extends AutoCloseable {
 
@@ -73,8 +78,9 @@ public interface ResultPartitionWriter extends AutoCloseable {
        /**
         * Fail the production of the partition.
         *
-        * <p>This method propagates non-{@code null} failure causes to 
consumers on a best-effort basis.
-        * Closing of partition is still needed.
+        * <p>This method propagates non-{@code null} failure causes to 
consumers on a best-effort basis. This call also
+        * leads to the release of all resources associated with the partition. 
Closing of the partition is still needed
+        * afterwards if it has not been done before.
         *
         * @param throwable failure cause
         */
@@ -83,7 +89,7 @@ public interface ResultPartitionWriter extends AutoCloseable {
        /**
         * Successfully finish the production of the partition.
         *
-        * <p>Closing of partition is still needed.
+        * <p>Closing of partition is still needed afterwards.
         */
        void finish() throws IOException;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
new file mode 100644
index 0000000..882e52c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for the implementation of shuffle service local environment.
+ *
+ * <p>Input/Output interface of local shuffle service environment is based on 
memory {@link Buffer Buffers}. A producer
+ * can write shuffle data into the buffers, obtained from the created {@link 
ResultPartitionWriter ResultPartitionWriters}
+ * and a consumer reads the buffers from the created {@link InputGate 
InputGates}.
+ *
+ * <h1>Lifecycle management.</h1>
+ *
+ * <p>The interface contains method's to manage the lifecycle of the local 
shuffle service environment:
+ * <ol>
+ *     <li>{@link ShuffleEnvironment#start} must be called before using the 
shuffle service environment.</li>
+ *     <li>{@link ShuffleEnvironment#close} is called to release the shuffle 
service environment.</li>
+ * </ol>
+ *
+ * <h1>Shuffle Input/Output management.</h1>
+ *
+ * <h2>Result partition management.</h2>
+ *
+ * <p>The interface implements a factory of result partition writers to 
produce shuffle data:
+ * {@link ShuffleEnvironment#createResultPartitionWriters}. The created 
writers are grouped per owner.
+ * The owner is responsible for the writers' lifecycle from the moment of 
creation.
+ *
+ * <p>Partitions are released in the following cases:
+ * <ol>
+ *     <li>{@link ResultPartitionWriter#fail(Throwable)} and {@link 
ResultPartitionWriter#close()} are called
+ *     if the production has failed.
+ *     </li>
+ *     <li>{@link ResultPartitionWriter#finish()} and {@link 
ResultPartitionWriter#close()} are called
+ *     if the production is done. The actual release can take some time 
depending on implementation details,
+ *     e.g. if the `end of consumption' confirmation from the consumer is 
being awaited implicitly
+ *     or the partition is later released by {@link 
ShuffleEnvironment#releasePartitions(Collection)}.</li>
+ *     <li>{@link ShuffleEnvironment#releasePartitions(Collection)} is called 
outside of the producer thread,
+ *     e.g. to manage the lifecycle of BLOCKING result partitions which can 
outlive their producers.</li>
+ * </ol>
+ * The partitions, which currently still occupy local resources, can be 
queried with
+ * {@link ShuffleEnvironment#getPartitionsOccupyingLocalResources}.
+ *
+ * <h2>Input gate management.</h2>
+ *
+ * <p>The interface implements a factory for the input gates: {@link 
ShuffleEnvironment#createInputGates}.
+ * The created gates are grouped per owner. The owner is responsible for the 
gates' lifecycle from the moment of creation.
+ *
+ * <p>When the input gates are created, it can happen that not all consumed 
partitions are known at that moment
+ * e.g. because their producers have not been started yet. Therefore, the 
{@link ShuffleEnvironment} provides
+ * a method {@link ShuffleEnvironment#updatePartitionInfo} to update them 
externally, when the producer becomes known.
+ * The update mechanism has to be threadsafe because the updated gate can be 
read concurrently from a different thread.
+ *
+ * @param <P> type of provided result partition writers
+ * @param <G> type of provided input gates
+ */
+public interface ShuffleEnvironment<P extends ResultPartitionWriter, G extends 
InputGate> extends AutoCloseable {
+
+       /**
+        * Start the internal related services before using the shuffle service 
environment.
+        *
+        * @return a port to connect for the shuffle data exchange, -1 if only 
local connection is possible.
+        */
+       int start() throws IOException;
+
+       /**
+        * Factory method for the {@link ResultPartitionWriter 
ResultPartitionWriters} to produce result partitions.
+        *
+        * <p>The order of the {@link ResultPartitionWriter 
ResultPartitionWriters} in the returned collection
+        * should be the same as the iteration order of the passed {@code 
resultPartitionDeploymentDescriptors}.
+        *
+        * @param ownerName the owner name, used for logs
+        * @param executionAttemptID execution attempt id of the producer
+        * @param resultPartitionDeploymentDescriptors descriptors of the 
partition, produced by the owner
+        * @param outputGroup shuffle specific group for output metrics
+        * @param buffersGroup shuffle specific group for buffer metrics
+        * @return collection of the {@link ResultPartitionWriter 
ResultPartitionWriters}
+        */
+       Collection<P> createResultPartitionWriters(
+               String ownerName,
+               ExecutionAttemptID executionAttemptID,
+               Collection<ResultPartitionDeploymentDescriptor> 
resultPartitionDeploymentDescriptors,
+               MetricGroup outputGroup,
+               MetricGroup buffersGroup);
+
+       /**
+        * Release local resources occupied by the given partitions.
+        *
+        * @param partitionIds identifying the partitions to be released
+        */
+       void releasePartitions(Collection<ResultPartitionID> partitionIds);
+
+       /**
+        * Report partitions which still occupy some resources locally.
+        *
+        * @return collection of partitions which still occupy some resources 
locally
+        * and have not been released yet.
+        */
+       Collection<ResultPartitionID> getPartitionsOccupyingLocalResources();
+
+       /**
+        * Factory method for the {@link InputGate InputGates} to consume 
result partitions.
+        *
+        * <p>The order of the {@link InputGate InputGates} in the returned 
collection should be the same as the iteration order
+        * of the passed {@code inputGateDeploymentDescriptors}.
+        *
+        * @param ownerName the owner name, used for logs
+        * @param executionAttemptID execution attempt id of the consumer
+        * @param partitionProducerStateProvider producer state provider to 
query whether the producer is ready for consumption
+        * @param inputGateDeploymentDescriptors descriptors of the input gates 
to consume
+        * @param parentGroup parent of shuffle specific metric group
+        * @param inputGroup shuffle specific group for input metrics
+        * @param buffersGroup shuffle specific group for buffer metrics
+        * @return collection of the {@link InputGate InputGates}
+        */
+       Collection<G> createInputGates(
+               String ownerName,
+               ExecutionAttemptID executionAttemptID,
+               PartitionProducerStateProvider partitionProducerStateProvider,
+               Collection<InputGateDeploymentDescriptor> 
inputGateDeploymentDescriptors,
+               MetricGroup parentGroup,
+               MetricGroup inputGroup,
+               MetricGroup buffersGroup);
+
+       /**
+        * Update a gate with the newly available partition information, 
previously unknown.
+        *
+        * @param consumerID execution id to distinguish gates with the same id 
from the different consumer executions
+        * @param partitionInfo information needed to consume the updated 
partition, e.g. network location
+        * @return {@code true} if the partition has been updated or {@code 
false} if the partition is not available anymore.
+        * @throws IOException IO problem by the update
+        * @throws InterruptedException potentially blocking operation was 
interrupted
+        */
+       boolean updatePartitionInfo(
+               ExecutionAttemptID consumerID,
+               PartitionInfo partitionInfo) throws IOException, 
InterruptedException;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
new file mode 100644
index 0000000..f7108b6
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.TaskEventPublisher;
+
+import java.net.InetAddress;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Local context used to create {@link ShuffleEnvironment}.
+ */
+public class ShuffleEnvironmentContext {
+       private final Configuration configuration;
+       private final ResourceID location;
+       private final long maxJvmHeapMemory;
+       private final boolean localCommunicationOnly;
+       private final InetAddress hostAddress;
+       private final TaskEventPublisher eventPublisher;
+       private final MetricGroup parentMetricGroup;
+       private final IOManager ioManager;
+
+       public ShuffleEnvironmentContext(
+                       Configuration configuration,
+                       ResourceID location,
+                       long maxJvmHeapMemory,
+                       boolean localCommunicationOnly,
+                       InetAddress hostAddress,
+                       TaskEventPublisher eventPublisher,
+                       MetricGroup parentMetricGroup,
+                       IOManager ioManager) {
+               this.configuration = checkNotNull(configuration);
+               this.location = checkNotNull(location);
+               this.maxJvmHeapMemory = maxJvmHeapMemory;
+               this.localCommunicationOnly = localCommunicationOnly;
+               this.hostAddress = checkNotNull(hostAddress);
+               this.eventPublisher = checkNotNull(eventPublisher);
+               this.parentMetricGroup = checkNotNull(parentMetricGroup);
+               this.ioManager = checkNotNull(ioManager);
+       }
+
+       public Configuration getConfiguration() {
+               return configuration;
+       }
+
+       public ResourceID getLocation() {
+               return location;
+       }
+
+       public long getMaxJvmHeapMemory() {
+               return maxJvmHeapMemory;
+       }
+
+       public boolean isLocalCommunicationOnly() {
+               return localCommunicationOnly;
+       }
+
+       public InetAddress getHostAddress() {
+               return hostAddress;
+       }
+
+       public TaskEventPublisher getEventPublisher() {
+               return eventPublisher;
+       }
+
+       public MetricGroup getParentMetricGroup() {
+               return parentMetricGroup;
+       }
+
+       public IOManager getIOManager() {
+               return ioManager;
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index e192824..a31521e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -48,7 +48,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -169,7 +169,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
        private final TaskExecutorLocalStateStoresManager 
localStateStoresManager;
 
        /** The network component in the task manager. */
-       private final NetworkEnvironment networkEnvironment;
+       private final ShuffleEnvironment shuffleEnvironment;
 
        /** The kvState registration service in the task manager. */
        private final KvStateService kvStateService;
@@ -238,7 +238,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                this.jobLeaderService = 
taskExecutorServices.getJobLeaderService();
                this.taskManagerLocation = 
taskExecutorServices.getTaskManagerLocation();
                this.localStateStoresManager = 
taskExecutorServices.getTaskManagerStateStore();
-               this.networkEnvironment = 
taskExecutorServices.getNetworkEnvironment();
+               this.shuffleEnvironment = 
taskExecutorServices.getShuffleEnvironment();
                this.kvStateService = taskExecutorServices.getKvStateService();
                this.resourceManagerLeaderRetriever = 
haServices.getResourceManagerLeaderRetriever();
 
@@ -270,8 +270,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
        @Override
        public CompletableFuture<Boolean> canBeReleased() {
-               return CompletableFuture.completedFuture(
-                       
taskExecutorServices.getNetworkEnvironment().getUnreleasedPartitions().isEmpty());
+               return 
CompletableFuture.completedFuture(shuffleEnvironment.getPartitionsOccupyingLocalResources().isEmpty());
        }
 
        // 
------------------------------------------------------------------------
@@ -533,7 +532,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                                tdd.getTargetSlotNumber(),
                                taskExecutorServices.getMemoryManager(),
                                taskExecutorServices.getIOManager(),
-                               taskExecutorServices.getNetworkEnvironment(),
+                               taskExecutorServices.getShuffleEnvironment(),
                                taskExecutorServices.getKvStateService(),
                                
taskExecutorServices.getBroadcastVariableManager(),
                                taskExecutorServices.getTaskEventDispatcher(),
@@ -615,7 +614,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                                        CompletableFuture.runAsync(
                                                () -> {
                                                        try {
-                                                               if 
(!networkEnvironment.updatePartitionInfo(executionAttemptID, partitionInfo)) {
+                                                               if 
(!shuffleEnvironment.updatePartitionInfo(executionAttemptID, partitionInfo)) {
                                                                        
log.debug(
                                                                                
"Discard update for input gate partition {} of result {} in task {}. " +
                                                                                
        "The partition is no longer available.",
@@ -643,7 +642,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
        @Override
        public void releasePartitions(Collection<ResultPartitionID> 
partitionIds) {
                try {
-                       networkEnvironment.releasePartitions(partitionIds);
+                       shuffleEnvironment.releasePartitions(partitionIds);
                } catch (Throwable t) {
                        // TODO: Do we still need this catch branch?
                        onFatalError(t);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 57a7f2b..d5985b1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -356,7 +356,11 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
                TaskManagerServicesConfiguration 
taskManagerServicesConfiguration =
                        TaskManagerServicesConfiguration.fromConfiguration(
                                configuration,
-                               remoteAddress);
+                               resourceID,
+                               remoteAddress,
+                               
EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(),
+                               EnvironmentInformation.getMaxJvmHeapMemory(),
+                               localCommunicationOnly);
 
                Tuple2<TaskManagerMetricGroup, MetricGroup> 
taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
                        metricRegistry,
@@ -365,14 +369,9 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
                        
taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
 
                TaskManagerServices taskManagerServices = 
TaskManagerServices.fromConfiguration(
-                       configuration,
                        taskManagerServicesConfiguration,
                        taskManagerMetricGroup.f1,
-                       resourceID,
-                       rpcService.getExecutor(), // TODO replace this later 
with some dedicated executor for io.
-                       
EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(),
-                       EnvironmentInformation.getMaxJvmHeapMemory(),
-                       localCommunicationOnly);
+                       rpcService.getExecutor()); // TODO replace this later 
with some dedicated executor for io.
 
                TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 266a38c..a8e909a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -27,13 +27,14 @@ import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -58,7 +59,7 @@ import static 
org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;
 
 /**
  * Container for {@link TaskExecutor} services such as the {@link 
MemoryManager}, {@link IOManager},
- * {@link NetworkEnvironment}. All services are exclusive to a single {@link 
TaskExecutor}.
+ * {@link ShuffleEnvironment}. All services are exclusive to a single {@link 
TaskExecutor}.
  * Consequently, the respective {@link TaskExecutor} is responsible for 
closing them.
  */
 public class TaskManagerServices {
@@ -71,7 +72,7 @@ public class TaskManagerServices {
        private final TaskManagerLocation taskManagerLocation;
        private final MemoryManager memoryManager;
        private final IOManager ioManager;
-       private final NetworkEnvironment networkEnvironment;
+       private final ShuffleEnvironment shuffleEnvironment;
        private final KvStateService kvStateService;
        private final BroadcastVariableManager broadcastVariableManager;
        private final TaskSlotTable taskSlotTable;
@@ -84,7 +85,7 @@ public class TaskManagerServices {
                TaskManagerLocation taskManagerLocation,
                MemoryManager memoryManager,
                IOManager ioManager,
-               NetworkEnvironment networkEnvironment,
+               ShuffleEnvironment shuffleEnvironment,
                KvStateService kvStateService,
                BroadcastVariableManager broadcastVariableManager,
                TaskSlotTable taskSlotTable,
@@ -96,7 +97,7 @@ public class TaskManagerServices {
                this.taskManagerLocation = 
Preconditions.checkNotNull(taskManagerLocation);
                this.memoryManager = Preconditions.checkNotNull(memoryManager);
                this.ioManager = Preconditions.checkNotNull(ioManager);
-               this.networkEnvironment = 
Preconditions.checkNotNull(networkEnvironment);
+               this.shuffleEnvironment = 
Preconditions.checkNotNull(shuffleEnvironment);
                this.kvStateService = 
Preconditions.checkNotNull(kvStateService);
                this.broadcastVariableManager = 
Preconditions.checkNotNull(broadcastVariableManager);
                this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable);
@@ -118,8 +119,8 @@ public class TaskManagerServices {
                return ioManager;
        }
 
-       public NetworkEnvironment getNetworkEnvironment() {
-               return networkEnvironment;
+       public ShuffleEnvironment getShuffleEnvironment() {
+               return shuffleEnvironment;
        }
 
        public KvStateService getKvStateService() {
@@ -184,7 +185,7 @@ public class TaskManagerServices {
                }
 
                try {
-                       networkEnvironment.shutdown();
+                       shuffleEnvironment.close();
                } catch (Exception e) {
                        exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                }
@@ -221,27 +222,16 @@ public class TaskManagerServices {
        /**
         * Creates and returns the task manager services.
         *
-        * @param configuration Flink configuration.
         * @param taskManagerServicesConfiguration task manager configuration
         * @param taskManagerMetricGroup metric group of the task manager
-        * @param resourceID resource ID of the task manager
         * @param taskIOExecutor executor for async IO operations
-        * @param freeHeapMemoryWithDefrag an estimate of the size of the free 
heap memory
-        * @param maxJvmHeapMemory the maximum JVM heap size
-        * @param localCommunicationOnly True, to skip initializing the network 
stack.
-        *                               Use only in cases where only one task 
manager runs.
         * @return task manager components
         * @throws Exception
         */
        public static TaskManagerServices fromConfiguration(
-                       Configuration configuration,
                        TaskManagerServicesConfiguration 
taskManagerServicesConfiguration,
                        MetricGroup taskManagerMetricGroup,
-                       ResourceID resourceID,
-                       Executor taskIOExecutor,
-                       long freeHeapMemoryWithDefrag,
-                       long maxJvmHeapMemory,
-                       boolean localCommunicationOnly) throws Exception {
+                       Executor taskIOExecutor) throws Exception {
 
                // pre-start checks
                
checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
@@ -251,26 +241,23 @@ public class TaskManagerServices {
                // start the I/O manager, it will create some temp directories.
                final IOManager ioManager = new 
IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
 
-               final NetworkEnvironment network = 
NetworkEnvironment.fromConfiguration(
-                       configuration,
+               final ShuffleEnvironment shuffleEnvironment = 
createShuffleEnvironment(
+                       taskManagerServicesConfiguration,
                        taskEventDispatcher,
                        taskManagerMetricGroup,
-                       ioManager,
-                       maxJvmHeapMemory,
-                       localCommunicationOnly,
-                       
taskManagerServicesConfiguration.getTaskManagerAddress());
-               int dataPort = network.start();
+                       ioManager);
+               final int dataPort = shuffleEnvironment.start();
 
                final KvStateService kvStateService = 
KvStateService.fromConfiguration(taskManagerServicesConfiguration);
                kvStateService.start();
 
                final TaskManagerLocation taskManagerLocation = new 
TaskManagerLocation(
-                       resourceID,
+                       taskManagerServicesConfiguration.getResourceID(),
                        
taskManagerServicesConfiguration.getTaskManagerAddress(),
                        dataPort);
 
                // this call has to happen strictly after the network stack has 
been initialized
-               final MemoryManager memoryManager = 
createMemoryManager(taskManagerServicesConfiguration, freeHeapMemoryWithDefrag, 
maxJvmHeapMemory);
+               final MemoryManager memoryManager = 
createMemoryManager(taskManagerServicesConfiguration);
 
                final BroadcastVariableManager broadcastVariableManager = new 
BroadcastVariableManager();
 
@@ -307,7 +294,7 @@ public class TaskManagerServices {
                        taskManagerLocation,
                        memoryManager,
                        ioManager,
-                       network,
+                       shuffleEnvironment,
                        kvStateService,
                        broadcastVariableManager,
                        taskSlotTable,
@@ -317,19 +304,34 @@ public class TaskManagerServices {
                        taskEventDispatcher);
        }
 
+       private static ShuffleEnvironment createShuffleEnvironment(
+                       TaskManagerServicesConfiguration 
taskManagerServicesConfiguration,
+                       TaskEventDispatcher taskEventDispatcher,
+                       MetricGroup taskManagerMetricGroup,
+                       IOManager ioManager) {
+
+               final ShuffleEnvironmentContext shuffleEnvironmentContext = new 
ShuffleEnvironmentContext(
+                       taskManagerServicesConfiguration.getConfiguration(),
+                       taskManagerServicesConfiguration.getResourceID(),
+                       taskManagerServicesConfiguration.getMaxJvmHeapMemory(),
+                       
taskManagerServicesConfiguration.isLocalCommunicationOnly(),
+                       
taskManagerServicesConfiguration.getTaskManagerAddress(),
+                       taskEventDispatcher,
+                       taskManagerMetricGroup,
+                       ioManager);
+
+               return 
NetworkEnvironment.fromShuffleContext(shuffleEnvironmentContext);
+       }
+
        /**
         * Creates a {@link MemoryManager} from the given {@link 
TaskManagerServicesConfiguration}.
         *
         * @param taskManagerServicesConfiguration to create the memory manager 
from
-        * @param freeHeapMemoryWithDefrag an estimate of the size of the free 
heap memory
-        * @param maxJvmHeapMemory the maximum JVM heap size
         * @return Memory manager
         * @throws Exception
         */
        private static MemoryManager createMemoryManager(
-                       TaskManagerServicesConfiguration 
taskManagerServicesConfiguration,
-                       long freeHeapMemoryWithDefrag,
-                       long maxJvmHeapMemory) throws Exception {
+                       TaskManagerServicesConfiguration 
taskManagerServicesConfiguration) throws Exception {
                // computing the amount of memory to use depends on how much 
memory is available
                // it strictly needs to happen AFTER the network stack has been 
initialized
 
@@ -354,6 +356,7 @@ public class TaskManagerServices {
                        float memoryFraction = 
taskManagerServicesConfiguration.getMemoryFraction();
 
                        if (memType == MemoryType.HEAP) {
+                               long freeHeapMemoryWithDefrag = 
taskManagerServicesConfiguration.getFreeHeapMemoryWithDefrag();
                                // network buffers allocated off-heap -> use 
memoryFraction of the available heap:
                                long relativeMemSize = (long) 
(freeHeapMemoryWithDefrag * memoryFraction);
                                if (preAllocateMemory) {
@@ -365,6 +368,7 @@ public class TaskManagerServices {
                                }
                                memorySize = relativeMemSize;
                        } else if (memType == MemoryType.OFF_HEAP) {
+                               long maxJvmHeapMemory = 
taskManagerServicesConfiguration.getMaxJvmHeapMemory();
                                // The maximum heap memory has been adjusted 
according to the fraction (see
                                // calculateHeapSizeMB(long 
totalJavaMemorySizeMB, Configuration config)), i.e.
                                // maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * 
memoryFraction = jvmTotalNoNet * (1 - memoryFraction)
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index f88444e..64f5a30 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 
@@ -42,8 +43,14 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class TaskManagerServicesConfiguration {
 
+       private final Configuration configuration;
+
+       private final ResourceID resourceID;
+
        private final InetAddress taskManagerAddress;
 
+       private final boolean localCommunicationOnly;
+
        private final String[] tmpDirPaths;
 
        private final String[] localRecoveryStateRootDirectories;
@@ -53,6 +60,10 @@ public class TaskManagerServicesConfiguration {
        @Nullable
        private final QueryableStateConfiguration queryableStateConfig;
 
+       private final long freeHeapMemoryWithDefrag;
+
+       private final long maxJvmHeapMemory;
+
        /**
         * Managed memory (in megabytes).
         *
@@ -77,9 +88,14 @@ public class TaskManagerServicesConfiguration {
        private Optional<Time> systemResourceMetricsProbingInterval;
 
        public TaskManagerServicesConfiguration(
+                       Configuration configuration,
+                       ResourceID resourceID,
                        InetAddress taskManagerAddress,
+                       boolean localCommunicationOnly,
                        String[] tmpDirPaths,
                        String[] localRecoveryStateRootDirectories,
+                       long freeHeapMemoryWithDefrag,
+                       long maxJvmHeapMemory,
                        boolean localRecoveryEnabled,
                        @Nullable QueryableStateConfiguration 
queryableStateConfig,
                        int numberOfSlots,
@@ -91,10 +107,15 @@ public class TaskManagerServicesConfiguration {
                        long timerServiceShutdownTimeout,
                        RetryingRegistrationConfiguration 
retryingRegistrationConfiguration,
                        Optional<Time> systemResourceMetricsProbingInterval) {
+               this.configuration = checkNotNull(configuration);
+               this.resourceID = checkNotNull(resourceID);
 
                this.taskManagerAddress = checkNotNull(taskManagerAddress);
+               this.localCommunicationOnly = localCommunicationOnly;
                this.tmpDirPaths = checkNotNull(tmpDirPaths);
                this.localRecoveryStateRootDirectories = 
checkNotNull(localRecoveryStateRootDirectories);
+               this.freeHeapMemoryWithDefrag = freeHeapMemoryWithDefrag;
+               this.maxJvmHeapMemory = maxJvmHeapMemory;
                this.localRecoveryEnabled = checkNotNull(localRecoveryEnabled);
                this.queryableStateConfig = queryableStateConfig;
                this.numberOfSlots = checkNotNull(numberOfSlots);
@@ -117,10 +138,22 @@ public class TaskManagerServicesConfiguration {
        //  Getter/Setter
        // 
--------------------------------------------------------------------------------------------
 
+       public Configuration getConfiguration() {
+               return configuration;
+       }
+
+       public ResourceID getResourceID() {
+               return resourceID;
+       }
+
        InetAddress getTaskManagerAddress() {
                return taskManagerAddress;
        }
 
+       boolean isLocalCommunicationOnly() {
+               return localCommunicationOnly;
+       }
+
        public String[] getTmpDirPaths() {
                return tmpDirPaths;
        }
@@ -155,6 +188,14 @@ public class TaskManagerServicesConfiguration {
                return memoryType;
        }
 
+       long getFreeHeapMemoryWithDefrag() {
+               return freeHeapMemoryWithDefrag;
+       }
+
+       long getMaxJvmHeapMemory() {
+               return maxJvmHeapMemory;
+       }
+
        /**
         * Returns the size of the managed memory (in megabytes), if configured.
         *
@@ -195,13 +236,22 @@ public class TaskManagerServicesConfiguration {
         * sanity check them.
         *
         * @param configuration The configuration.
+        * @param resourceID resource ID of the task manager
         * @param remoteAddress identifying the IP address under which the 
TaskManager will be accessible
+        * @param freeHeapMemoryWithDefrag an estimate of the size of the free 
heap memory
+        * @param maxJvmHeapMemory the maximum JVM heap size
+        * @param localCommunicationOnly True if only local communication is 
possible.
+        *                               Use only in cases where only one task 
manager runs.
         *
-        * @return TaskExecutorConfiguration that wrappers 
InstanceConnectionInfo, etc.
+        * @return configuration of task manager services used to create them
         */
        public static TaskManagerServicesConfiguration fromConfiguration(
                        Configuration configuration,
-                       InetAddress remoteAddress) {
+                       ResourceID resourceID,
+                       InetAddress remoteAddress,
+                       long freeHeapMemoryWithDefrag,
+                       long maxJvmHeapMemory,
+                       boolean localCommunicationOnly) {
                final String[] tmpDirs = 
ConfigurationUtils.parseTempDirectories(configuration);
                String[] localStateRootDir = 
ConfigurationUtils.parseLocalStateDirectories(configuration);
                if (localStateRootDir.length == 0) {
@@ -220,9 +270,14 @@ public class TaskManagerServicesConfiguration {
                final RetryingRegistrationConfiguration 
retryingRegistrationConfiguration = 
RetryingRegistrationConfiguration.fromConfiguration(configuration);
 
                return new TaskManagerServicesConfiguration(
+                       configuration,
+                       resourceID,
                        remoteAddress,
+                       localCommunicationOnly,
                        tmpDirs,
                        localStateRootDir,
+                       freeHeapMemoryWithDefrag,
+                       maxJvmHeapMemory,
                        localRecoveryMode,
                        queryableStateConfig,
                        ConfigurationParserUtils.getSlot(configuration),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 27d9cdc..5512c54 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -49,7 +49,6 @@ import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
@@ -63,6 +62,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
@@ -283,7 +283,7 @@ public class Task implements Runnable, TaskActions, 
PartitionProducerStateProvid
                int targetSlotNumber,
                MemoryManager memManager,
                IOManager ioManager,
-               NetworkEnvironment networkEnvironment,
+               ShuffleEnvironment<?, ?> shuffleEnvironment,
                KvStateService kvStateService,
                BroadcastVariableManager bcVarManager,
                TaskEventDispatcher taskEventDispatcher,
@@ -368,12 +368,12 @@ public class Task implements Runnable, TaskActions, 
PartitionProducerStateProvid
                final MetricGroup inputGroup = networkGroup.addGroup("Input");
 
                // produced intermediate result partitions
-               final ResultPartitionWriter[] resultPartitionWriters = 
networkEnvironment.createResultPartitionWriters(
+               final ResultPartitionWriter[] resultPartitionWriters = 
shuffleEnvironment.createResultPartitionWriters(
                        taskNameWithSubtaskAndId,
                        executionId,
                        resultPartitionDeploymentDescriptors,
                        outputGroup,
-                       buffersGroup);
+                       buffersGroup).toArray(new ResultPartitionWriter[] {});
 
                this.consumableNotifyingPartitionWriters = 
ConsumableNotifyingResultPartitionWriterDecorator.decorate(
                        resultPartitionDeploymentDescriptors,
@@ -383,14 +383,14 @@ public class Task implements Runnable, TaskActions, 
PartitionProducerStateProvid
                        resultPartitionConsumableNotifier);
 
                // consumed intermediate result partitions
-               InputGate[] gates = networkEnvironment.createInputGates(
+               final InputGate[] gates = shuffleEnvironment.createInputGates(
                        taskNameWithSubtaskAndId,
                        executionId,
                        this,
                        inputGateDeploymentDescriptors,
                        metrics.getIOMetricGroup(),
                        inputGroup,
-                       buffersGroup);
+                       buffersGroup).toArray(new InputGate[] {});
 
                this.inputGates = new InputGate[gates.length];
                int counter = 0;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java
index aad87494..948d4c8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java
@@ -146,11 +146,15 @@ public class ConfigurationParserUtils {
                        
configuration.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes());
 
                // check page size of for minimum size
-               checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, 
pageSize,
+               checkConfigParameter(
+                       pageSize >= MemoryManager.MIN_PAGE_SIZE,
+                       pageSize,
                        TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
                        "Minimum memory segment size is " + 
MemoryManager.MIN_PAGE_SIZE);
                // check page size for power of two
-               checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
+               checkConfigParameter(
+                       MathUtils.isPowerOf2(pageSize),
+                       pageSize,
                        TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
                        "Memory segment size must be a power of 2.");
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentBuilder.java
index 1950638..03ec3a4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentBuilder.java
@@ -119,7 +119,6 @@ public class NetworkEnvironmentBuilder {
 
        public NetworkEnvironment build() {
                return NetworkEnvironment.create(
-                       taskManagerLocation,
                        new NetworkEnvironmentConfiguration(
                                numNetworkBuffers,
                                networkBufferSize,
@@ -130,6 +129,7 @@ public class NetworkEnvironmentBuilder {
                                isCreditBased,
                                isNetworkDetailedMetrics,
                                nettyConfig),
+                       taskManagerLocation,
                        taskEventDispatcher,
                        metricGroup,
                        ioManager);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 933f385..4606318 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -124,7 +124,7 @@ public class NetworkEnvironmentTest {
                for (SingleInputGate ig : inputGates) {
                        ig.close();
                }
-               network.shutdown();
+               network.close();
        }
 
        /**
@@ -245,7 +245,7 @@ public class NetworkEnvironmentTest {
                for (SingleInputGate ig : inputGates) {
                        ig.close();
                }
-               network.shutdown();
+               network.close();
        }
 
        /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 36a98ff..de5eaf1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -258,7 +258,7 @@ public class ResultPartitionTest {
                        }
                } finally {
                        resultPartition.release();
-                       network.shutdown();
+                       network.close();
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 11c6a51..03fd3b2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -401,7 +401,7 @@ public class SingleInputGateTest extends InputGateTestBase {
                        }
                } finally {
                        gate.close();
-                       netEnv.shutdown();
+                       netEnv.close();
                }
        }
 
@@ -436,7 +436,7 @@ public class SingleInputGateTest extends InputGateTestBase {
                        }
                } finally {
                        inputGate.close();
-                       network.shutdown();
+                       network.close();
                }
        }
 
@@ -487,7 +487,7 @@ public class SingleInputGateTest extends InputGateTestBase {
                        }
                } finally {
                        inputGate.close();
-                       network.shutdown();
+                       network.close();
                }
        }
 
@@ -537,7 +537,7 @@ public class SingleInputGateTest extends InputGateTestBase {
                                is(instanceOf((LocalInputChannel.class))));
                } finally {
                        inputGate.close();
-                       network.shutdown();
+                       network.close();
                }
        }
 
@@ -580,7 +580,7 @@ public class SingleInputGateTest extends InputGateTestBase {
                                
assertThat(network.getInputGate(id).isPresent(), is(false));
                        }
                } finally {
-                       network.shutdown();
+                       network.close();
                }
        }
 
@@ -614,7 +614,7 @@ public class SingleInputGateTest extends InputGateTestBase {
                        Arrays.asList(gateDescs),
                        new UnregisteredMetricsGroup(),
                        new UnregisteredMetricsGroup(),
-                       new UnregisteredMetricsGroup());
+                       new UnregisteredMetricsGroup()).toArray(new 
SingleInputGate[] {});
                Map<InputGateID, SingleInputGate> inputGatesById = new 
HashMap<>();
                for (int i = 0; i < numberOfGates; i++) {
                        inputGatesById.put(new InputGateID(ids[i], consumerID), 
gates[i]);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
index e80287f..d9eac20 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
@@ -66,7 +66,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends 
TestLogger {
                // test configuration of the local state mode
                config.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, true);
 
-               TaskManagerServices taskManagerServices = 
createTaskManagerServices(config, 
createTaskManagerServiceConfiguration(config));
+               TaskManagerServices taskManagerServices = 
createTaskManagerServices(createTaskManagerServiceConfiguration(config));
 
                TaskExecutorLocalStateStoresManager taskStateManager = 
taskManagerServices.getTaskManagerStateStore();
 
@@ -99,7 +99,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends 
TestLogger {
 
                TaskManagerServicesConfiguration 
taskManagerServicesConfiguration = 
createTaskManagerServiceConfiguration(config);
 
-               TaskManagerServices taskManagerServices = 
createTaskManagerServices(config, taskManagerServicesConfiguration);
+               TaskManagerServices taskManagerServices = 
createTaskManagerServices(taskManagerServicesConfiguration);
 
                TaskExecutorLocalStateStoresManager taskStateManager = 
taskManagerServices.getTaskManagerStateStore();
 
@@ -206,20 +206,18 @@ public class TaskExecutorLocalStateStoresManagerTest 
extends TestLogger {
                        Configuration config) throws IOException {
                return TaskManagerServicesConfiguration.fromConfiguration(
                        config,
-                       InetAddress.getLocalHost());
+                       ResourceID.generate(),
+                       InetAddress.getLocalHost(),
+                       MEM_SIZE_PARAM,
+                       MEM_SIZE_PARAM,
+                       true);
        }
 
        private TaskManagerServices createTaskManagerServices(
-                       Configuration flinkConfig,
                        TaskManagerServicesConfiguration config) throws 
Exception {
                return TaskManagerServices.fromConfiguration(
-                       flinkConfig,
                        config,
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       ResourceID.generate(),
-                       Executors.directExecutor(),
-                       MEM_SIZE_PARAM,
-                       MEM_SIZE_PARAM,
-                       true);
+                       Executors.directExecutor());
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
index 8a21da9..9673e13 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
@@ -37,7 +37,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionGraphException;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -276,7 +276,7 @@ public class TaskExecutorSubmissionTest extends TestLogger {
                                .addTaskManagerActionListener(eid2, 
ExecutionState.FINISHED, task2FinishedFuture)
                                .setJobMasterId(jobMasterId)
                                .setJobMasterGateway(testingJobMasterGateway)
-                               .useRealNonMockNetworkEnvironment()
+                               .useRealNonMockShuffleEnvironment()
                                .build()) {
                        TaskExecutorGateway tmGateway = 
env.getTaskExecutorGateway();
                        TaskSlotTable taskSlotTable = env.getTaskSlotTable();
@@ -342,7 +342,7 @@ public class TaskExecutorSubmissionTest extends TestLogger {
                                .addTaskManagerActionListener(eid2, 
ExecutionState.CANCELED, task2CanceledFuture)
                                .setJobMasterId(jobMasterId)
                                .setJobMasterGateway(testingJobMasterGateway)
-                               .useRealNonMockNetworkEnvironment()
+                               .useRealNonMockShuffleEnvironment()
                                .build()) {
                        TaskExecutorGateway tmGateway = 
env.getTaskExecutorGateway();
                        TaskSlotTable taskSlotTable = env.getTaskSlotTable();
@@ -392,7 +392,7 @@ public class TaskExecutorSubmissionTest extends TestLogger {
                                .addTaskManagerActionListener(eid, 
ExecutionState.FAILED, taskFailedFuture)
                                .setConfiguration(config)
                                .setLocalCommunication(false)
-                               .useRealNonMockNetworkEnvironment()
+                               .useRealNonMockShuffleEnvironment()
                                .build()) {
                        TaskExecutorGateway tmGateway = 
env.getTaskExecutorGateway();
                        TaskSlotTable taskSlotTable = env.getTaskSlotTable();
@@ -417,11 +417,11 @@ public class TaskExecutorSubmissionTest extends 
TestLogger {
 
                final CompletableFuture<Void> taskRunningFuture = new 
CompletableFuture<>();
                final CompletableFuture<Void> taskFailedFuture = new 
CompletableFuture<>();
-               final NetworkEnvironment networkEnvironment = 
mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS);
+               final ShuffleEnvironment<?, ?> shuffleEnvironment = 
mock(ShuffleEnvironment.class, Mockito.RETURNS_MOCKS);
 
                try (TaskSubmissionTestEnvironment env =
                        new TaskSubmissionTestEnvironment.Builder(jobId)
-                               .setNetworkEnvironment(networkEnvironment)
+                               .setShuffleEnvironment(shuffleEnvironment)
                                .setSlotSize(1)
                                .addTaskManagerActionListener(eid, 
ExecutionState.RUNNING, taskRunningFuture)
                                .addTaskManagerActionListener(eid, 
ExecutionState.FAILED, taskFailedFuture)
@@ -437,7 +437,7 @@ public class TaskExecutorSubmissionTest extends TestLogger {
                        NettyShuffleDescriptor shuffleDescriptor =
                                createRemoteWithIdAndLocation(new 
IntermediateResultPartitionID(), producerLocation);
                        final PartitionInfo partitionUpdate = new 
PartitionInfo(new IntermediateDataSetID(), shuffleDescriptor);
-                       doThrow(new 
IOException()).when(networkEnvironment).updatePartitionInfo(eid, 
partitionUpdate);
+                       doThrow(new 
IOException()).when(shuffleEnvironment).updatePartitionInfo(eid, 
partitionUpdate);
 
                        final CompletableFuture<Acknowledge> updateFuture = 
tmGateway.updatePartitions(
                                eid,
@@ -477,7 +477,7 @@ public class TaskExecutorSubmissionTest extends TestLogger {
                                .addTaskManagerActionListener(eid, 
ExecutionState.RUNNING, taskRunningFuture)
                                .addTaskManagerActionListener(eid, 
ExecutionState.FAILED, taskFailedFuture)
                                .setConfiguration(config)
-                               .useRealNonMockNetworkEnvironment()
+                               .useRealNonMockShuffleEnvironment()
                                .build()) {
                        TaskExecutorGateway tmGateway = 
env.getTaskExecutorGateway();
                        TaskSlotTable taskSlotTable = env.getTaskSlotTable();
@@ -536,7 +536,7 @@ public class TaskExecutorSubmissionTest extends TestLogger {
                                .addTaskManagerActionListener(eid, 
ExecutionState.RUNNING, taskRunningFuture)
                                .setJobMasterId(jobMasterId)
                                .setJobMasterGateway(testingJobMasterGateway)
-                               .useRealNonMockNetworkEnvironment()
+                               .useRealNonMockShuffleEnvironment()
                                .build()) {
                        TaskExecutorGateway tmGateway = 
env.getTaskExecutorGateway();
                        TaskSlotTable taskSlotTable = env.getTaskSlotTable();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 4d0b4d5..f1d607a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -244,7 +244,7 @@ public class TaskExecutorTest extends TestLogger {
                }
 
                if (networkEnvironment != null) {
-                       networkEnvironment.shutdown();
+                       networkEnvironment.close();
                }
 
                testingFatalErrorHandler.rethrowError();
@@ -279,7 +279,7 @@ public class TaskExecutorTest extends TestLogger {
                        .setTaskManagerLocation(taskManagerLocation)
                        .setMemoryManager(memoryManager)
                        .setIoManager(ioManager)
-                       .setNetworkEnvironment(networkEnvironment)
+                       .setShuffleEnvironment(networkEnvironment)
                        .setKvStateService(kvStateService)
                        .setTaskSlotTable(taskSlotTable)
                        .setJobLeaderService(jobLeaderService)
@@ -295,7 +295,7 @@ public class TaskExecutorTest extends TestLogger {
                }
 
                assertThat(memoryManager.isShutdown(), is(true));
-               assertThat(networkEnvironment.isShutdown(), is(true));
+               assertThat(networkEnvironment.isClosed(), is(true));
                assertThat(ioManager.isProperlyShutDown(), is(true));
                assertThat(kvStateService.isShutdown(), is(true));
        }
@@ -709,7 +709,7 @@ public class TaskExecutorTest extends TestLogger {
                final TaskExecutorLocalStateStoresManager 
localStateStoresManager = createTaskExecutorLocalStateStoresManager();
 
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
-                       .setNetworkEnvironment(networkEnvironment)
+                       .setShuffleEnvironment(networkEnvironment)
                        .setTaskSlotTable(taskSlotTable)
                        .setJobManagerTable(jobManagerTable)
                        .setTaskStateManager(localStateStoresManager)
@@ -967,7 +967,7 @@ public class TaskExecutorTest extends TestLogger {
 
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
                        .setTaskManagerLocation(taskManagerLocation)
-                       .setNetworkEnvironment(networkEnvironment)
+                       .setShuffleEnvironment(networkEnvironment)
                        .setTaskSlotTable(taskSlotTable)
                        .setJobLeaderService(jobLeaderService)
                        .setJobManagerTable(jobManagerTable)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
index 0238edf..c763db6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.query.KvStateRegistry;
@@ -42,7 +42,7 @@ public class TaskManagerServicesBuilder {
        private TaskManagerLocation taskManagerLocation;
        private MemoryManager memoryManager;
        private IOManager ioManager;
-       private NetworkEnvironment networkEnvironment;
+       private ShuffleEnvironment<?, ?> shuffleEnvironment;
        private KvStateService kvStateService;
        private BroadcastVariableManager broadcastVariableManager;
        private TaskSlotTable taskSlotTable;
@@ -60,7 +60,7 @@ public class TaskManagerServicesBuilder {
                        MemoryType.HEAP,
                        false);
                ioManager = mock(IOManager.class);
-               networkEnvironment = mock(NetworkEnvironment.class);
+               shuffleEnvironment = mock(ShuffleEnvironment.class);
                kvStateService = new KvStateService(new KvStateRegistry(), 
null, null);
                broadcastVariableManager = new BroadcastVariableManager();
                taskEventDispatcher = new TaskEventDispatcher();
@@ -85,8 +85,8 @@ public class TaskManagerServicesBuilder {
                return this;
        }
 
-       public TaskManagerServicesBuilder 
setNetworkEnvironment(NetworkEnvironment networkEnvironment) {
-               this.networkEnvironment = networkEnvironment;
+       public TaskManagerServicesBuilder 
setShuffleEnvironment(ShuffleEnvironment<?, ?> shuffleEnvironment) {
+               this.shuffleEnvironment = shuffleEnvironment;
                return this;
        }
 
@@ -125,7 +125,7 @@ public class TaskManagerServicesBuilder {
                        taskManagerLocation,
                        memoryManager,
                        ioManager,
-                       networkEnvironment,
+                       shuffleEnvironment,
                        kvStateService,
                        broadcastVariableManager,
                        taskSlotTable,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index 6b185c6..958696f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -34,8 +34,8 @@ import 
org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
@@ -104,7 +104,7 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
                        Configuration configuration,
                        List<Tuple3<ExecutionAttemptID, ExecutionState, 
CompletableFuture<Void>>> taskManagerActionListeners,
                        TestingRpcService testingRpcService,
-                       NetworkEnvironment networkEnvironment) throws Exception 
{
+                       ShuffleEnvironment<?, ?> shuffleEnvironment) throws 
Exception {
 
                this.haServices = new TestingHighAvailabilityServices();
                this.haServices.setResourceManagerLeaderRetriever(new 
SettableLeaderRetrievalService());
@@ -154,7 +154,7 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
                        Executors.directExecutor());
 
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
-                       .setNetworkEnvironment(networkEnvironment)
+                       .setShuffleEnvironment(shuffleEnvironment)
                        .setTaskSlotTable(taskSlotTable)
                        .setJobManagerTable(jobManagerTable)
                        .setTaskStateManager(localStateStoresManager)
@@ -229,15 +229,15 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
                        partitionProducerStateChecker);
        }
 
-       private static NetworkEnvironment createNetworkEnvironment(
+       private static ShuffleEnvironment<?, ?> createShuffleEnvironment(
                        ResourceID taskManagerLocation,
                        boolean localCommunication,
                        Configuration configuration,
                        RpcService testingRpcService,
-                       boolean mockNetworkEnvironment) throws Exception {
-               final NetworkEnvironment networkEnvironment;
-               if (mockNetworkEnvironment) {
-                       networkEnvironment = mock(NetworkEnvironment.class, 
Mockito.RETURNS_MOCKS);
+                       boolean mockShuffleEnvironment) throws Exception {
+               final ShuffleEnvironment<?, ?> shuffleEnvironment;
+               if (mockShuffleEnvironment) {
+                       shuffleEnvironment = mock(ShuffleEnvironment.class, 
Mockito.RETURNS_MOCKS);
                } else {
                        final InetSocketAddress socketAddress = new 
InetSocketAddress(
                                
InetAddress.getByName(testingRpcService.getAddress()), 
configuration.getInteger(NetworkEnvironmentOptions.DATA_PORT));
@@ -245,17 +245,17 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
                        final NettyConfig nettyConfig = new 
NettyConfig(socketAddress.getAddress(), socketAddress.getPort(),
                                
ConfigurationParserUtils.getPageSize(configuration), 
ConfigurationParserUtils.getSlot(configuration), configuration);
 
-                       networkEnvironment =  new NetworkEnvironmentBuilder()
+                       shuffleEnvironment =  new NetworkEnvironmentBuilder()
                                .setTaskManagerLocation(taskManagerLocation)
                                
.setPartitionRequestInitialBackoff(configuration.getInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL))
                                
.setPartitionRequestMaxBackoff(configuration.getInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX))
                                .setNettyConfig(localCommunication ? null : 
nettyConfig)
                                .build();
 
-                       networkEnvironment.start();
+                       shuffleEnvironment.start();
                }
 
-               return networkEnvironment;
+               return shuffleEnvironment;
        }
 
        @Override
@@ -274,14 +274,14 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
        public static final class Builder {
 
                private JobID jobId;
-               private boolean mockNetworkEnvironment = true;
+               private boolean mockShuffleEnvironment = true;
                private int slotSize;
                private JobMasterId jobMasterId = JobMasterId.generate();
                private TestingJobMasterGateway jobMasterGateway;
                private boolean localCommunication = true;
                private Configuration configuration = new Configuration();
                @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
-               private Optional<NetworkEnvironment> optionalNetworkEnvironment 
= Optional.empty();
+               private Optional<ShuffleEnvironment<?, ?>> 
optionalShuffleEnvironment = Optional.empty();
                private ResourceID resourceID = ResourceID.generate();
 
                private List<Tuple3<ExecutionAttemptID, ExecutionState, 
CompletableFuture<Void>>> taskManagerActionListeners = new ArrayList<>();
@@ -290,15 +290,15 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
                        this.jobId = jobId;
                }
 
-               public Builder useRealNonMockNetworkEnvironment() {
-                       this.optionalNetworkEnvironment = Optional.empty();
-                       this.mockNetworkEnvironment = false;
+               public Builder useRealNonMockShuffleEnvironment() {
+                       this.optionalShuffleEnvironment = Optional.empty();
+                       this.mockShuffleEnvironment = false;
                        return this;
                }
 
-               public Builder setNetworkEnvironment(NetworkEnvironment 
optionalNetworkEnvironment) {
-                       this.mockNetworkEnvironment = false;
-                       this.optionalNetworkEnvironment = 
Optional.of(optionalNetworkEnvironment);
+               public Builder setShuffleEnvironment(ShuffleEnvironment<?, ?> 
optionalShuffleEnvironment) {
+                       this.mockShuffleEnvironment = false;
+                       this.optionalShuffleEnvironment = 
Optional.of(optionalShuffleEnvironment);
                        return this;
                }
 
@@ -339,14 +339,13 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
 
                public TaskSubmissionTestEnvironment build() throws Exception {
                        final TestingRpcService testingRpcService = new 
TestingRpcService();
-                       final NetworkEnvironment network = 
optionalNetworkEnvironment.orElseGet(() -> {
+                       final ShuffleEnvironment<?, ?> network = 
optionalShuffleEnvironment.orElseGet(() -> {
                                try {
-                                       return createNetworkEnvironment(
-                                               resourceID,
+                                       return 
createShuffleEnvironment(resourceID,
                                                localCommunication,
                                                configuration,
                                                testingRpcService,
-                                               mockNetworkEnvironment);
+                                               mockShuffleEnvironment);
                                } catch (Exception e) {
                                        throw new FlinkRuntimeException("Failed 
to build TaskSubmissionTestEnvironment", e);
                                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 01cd184..bfcbd13 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -40,8 +40,8 @@ import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
@@ -106,7 +106,7 @@ public class TaskAsyncCallTest extends TestLogger {
 
        private static final List<ClassLoader> classLoaders = 
Collections.synchronizedList(new ArrayList<>());
 
-       private NetworkEnvironment networkEnvironment;
+       private ShuffleEnvironment shuffleEnvironment;
 
        @Before
        public void createQueuesAndActors() {
@@ -117,15 +117,15 @@ public class TaskAsyncCallTest extends TestLogger {
                notifyCheckpointCompleteLatch = new OneShotLatch();
                stopLatch = new OneShotLatch();
 
-               networkEnvironment = new NetworkEnvironmentBuilder().build();
+               shuffleEnvironment = new NetworkEnvironmentBuilder().build();
 
                classLoaders.clear();
        }
 
        @After
-       public void teardown() {
-               if (networkEnvironment != null) {
-                       networkEnvironment.shutdown();
+       public void teardown() throws Exception {
+               if (shuffleEnvironment != null) {
+                       shuffleEnvironment.close();
                }
        }
 
@@ -255,7 +255,7 @@ public class TaskAsyncCallTest extends TestLogger {
                        0,
                        mock(MemoryManager.class),
                        mock(IOManager.class),
-                       networkEnvironment,
+                       shuffleEnvironment,
                        new KvStateService(new KvStateRegistry(), null, null),
                        mock(BroadcastVariableManager.class),
                        new TaskEventDispatcher(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index f23fc9d..a1b73df 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.blob.TransientBlobCache;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -47,8 +46,8 @@ import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import 
org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
@@ -125,7 +124,7 @@ public class TaskTest extends TestLogger {
        private static OneShotLatch awaitLatch;
        private static OneShotLatch triggerLatch;
 
-       private NetworkEnvironment networkEnvironment;
+       private ShuffleEnvironment<?, ?> shuffleEnvironment;
 
        @ClassRule
        public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
@@ -135,13 +134,13 @@ public class TaskTest extends TestLogger {
                awaitLatch = new OneShotLatch();
                triggerLatch = new OneShotLatch();
 
-               networkEnvironment = new NetworkEnvironmentBuilder().build();
+               shuffleEnvironment = new NetworkEnvironmentBuilder().build();
        }
 
        @After
-       public void teardown() {
-               if (networkEnvironment != null) {
-                       networkEnvironment.shutdown();
+       public void teardown() throws Exception {
+               if (shuffleEnvironment != null) {
+                       shuffleEnvironment.close();
                }
        }
 
@@ -308,7 +307,7 @@ public class TaskTest extends TestLogger {
                final PartitionProducerStateChecker 
partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
 
                final QueuedNoOpTaskManagerActions taskManagerActions = new 
QueuedNoOpTaskManagerActions();
-               final Task task = new TaskBuilder(networkEnvironment)
+               final Task task = new TaskBuilder(shuffleEnvironment)
                        .setTaskManagerActions(taskManagerActions)
                        .setConsumableNotifier(consumableNotifier)
                        
.setPartitionProducerStateChecker(partitionProducerStateChecker)
@@ -317,7 +316,7 @@ public class TaskTest extends TestLogger {
                        .build();
 
                // shut down the network to make the following task 
registration failure
-               networkEnvironment.shutdown();
+               shuffleEnvironment.close();
 
                // should fail
                task.run();
@@ -974,7 +973,7 @@ public class TaskTest extends TestLogger {
        }
 
        private TaskBuilder createTaskBuilder() {
-               return new TaskBuilder(networkEnvironment);
+               return new TaskBuilder(shuffleEnvironment);
        }
 
        private static final class TaskBuilder {
@@ -983,7 +982,7 @@ public class TaskTest extends TestLogger {
                private LibraryCacheManager libraryCacheManager;
                private ResultPartitionConsumableNotifier consumableNotifier;
                private PartitionProducerStateChecker 
partitionProducerStateChecker;
-               private final NetworkEnvironment networkEnvironment;
+               private final ShuffleEnvironment<?, ?> shuffleEnvironment;
                private KvStateService kvStateService;
                private Executor executor;
                private Configuration taskManagerConfig;
@@ -1012,8 +1011,8 @@ public class TaskTest extends TestLogger {
                        requiredJarFileBlobKeys = Collections.emptyList();
                }
 
-               private TaskBuilder(NetworkEnvironment networkEnvironment) {
-                       this.networkEnvironment = 
Preconditions.checkNotNull(networkEnvironment);
+               private TaskBuilder(ShuffleEnvironment<?, ?> 
shuffleEnvironment) {
+                       this.shuffleEnvironment = 
Preconditions.checkNotNull(shuffleEnvironment);
                }
 
                TaskBuilder setInvokable(Class<? extends AbstractInvokable> 
invokable) {
@@ -1117,7 +1116,7 @@ public class TaskTest extends TestLogger {
                                0,
                                mock(MemoryManager.class),
                                mock(IOManager.class),
-                               networkEnvironment,
+                               shuffleEnvironment,
                                kvStateService,
                                mock(BroadcastVariableManager.class),
                                new TaskEventDispatcher(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index 51930a0..99db686 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -43,8 +43,8 @@ import 
org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
@@ -168,7 +168,7 @@ public class JvmExitOnFatalErrorTest {
                                final MemoryManager memoryManager = new 
MemoryManager(1024 * 1024, 1);
                                final IOManager ioManager = new 
IOManagerAsync();
 
-                               final NetworkEnvironment networkEnvironment = 
new NetworkEnvironmentBuilder().build();
+                               final ShuffleEnvironment shuffleEnvironment = 
new NetworkEnvironmentBuilder().build();
 
                                final TaskManagerRuntimeInfo tmInfo = 
TaskManagerConfiguration.fromConfiguration(taskManagerConfig);
 
@@ -206,7 +206,7 @@ public class JvmExitOnFatalErrorTest {
                                                0,       // targetSlotNumber
                                                memoryManager,
                                                ioManager,
-                                               networkEnvironment,
+                                               shuffleEnvironment,
                                                new KvStateService(new 
KvStateRegistry(), null, null),
                                                new BroadcastVariableManager(),
                                                new TaskEventDispatcher(),
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 8a0ad2f..8f50198 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -166,8 +166,8 @@ public class StreamNetworkBenchmarkEnvironment<T extends 
IOReadableWritable> {
        }
 
        public void tearDown() {
-               suppressExceptions(senderEnv::shutdown);
-               suppressExceptions(receiverEnv::shutdown);
+               suppressExceptions(senderEnv::close);
+               suppressExceptions(receiverEnv::close);
                suppressExceptions(ioManager::shutdown);
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index ce5a7e1..e0f3b52 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -43,7 +43,6 @@ import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import 
org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
@@ -53,6 +52,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -181,7 +181,7 @@ public class InterruptSensitiveRestoreTest {
                        StreamStateHandle state,
                        int mode) throws IOException {
 
-               NetworkEnvironment networkEnvironment = new 
NetworkEnvironmentBuilder().build();
+               ShuffleEnvironment shuffleEnvironment = new 
NetworkEnvironmentBuilder().build();
 
                Collection<KeyedStateHandle> keyedStateFromBackend = 
Collections.emptyList();
                Collection<KeyedStateHandle> keyedStateFromStream = 
Collections.emptyList();
@@ -270,7 +270,7 @@ public class InterruptSensitiveRestoreTest {
                        0,
                        mock(MemoryManager.class),
                        mock(IOManager.class),
-                       networkEnvironment,
+                       shuffleEnvironment,
                        new KvStateService(new KvStateRegistry(), null, null),
                        mock(BroadcastVariableManager.class),
                        new TaskEventDispatcher(),
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 4cffdbd..64b55fc 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -43,7 +43,6 @@ import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import 
org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
@@ -54,6 +53,7 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -150,7 +150,7 @@ public class StreamTaskTerminationTest extends TestLogger {
 
                final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new 
TestingTaskManagerRuntimeInfo();
 
-               final NetworkEnvironment networkEnv = new 
NetworkEnvironmentBuilder().build();
+               final ShuffleEnvironment shuffleEnvironment = new 
NetworkEnvironmentBuilder().build();
 
                BlobCacheService blobService =
                        new BlobCacheService(mock(PermanentBlobCache.class), 
mock(TransientBlobCache.class));
@@ -167,7 +167,7 @@ public class StreamTaskTerminationTest extends TestLogger {
                        0,
                        new MemoryManager(32L * 1024L, 1),
                        new IOManagerAsync(),
-                       networkEnv,
+                       shuffleEnvironment,
                        new KvStateService(new KvStateRegistry(), null, null),
                        mock(BroadcastVariableManager.class),
                        new TaskEventDispatcher(),
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 3439574..c4d5965 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -49,7 +49,6 @@ import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import 
org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
@@ -65,6 +64,7 @@ import 
org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStorage;
@@ -902,7 +902,7 @@ public class StreamTaskTest extends TestLogger {
                PartitionProducerStateChecker partitionProducerStateChecker = 
mock(PartitionProducerStateChecker.class);
                Executor executor = mock(Executor.class);
 
-               NetworkEnvironment network = new 
NetworkEnvironmentBuilder().build();
+               ShuffleEnvironment shuffleEnvironment = new 
NetworkEnvironmentBuilder().build();
 
                JobInformation jobInformation = new JobInformation(
                        new JobID(),
@@ -932,7 +932,7 @@ public class StreamTaskTest extends TestLogger {
                        0,
                        mock(MemoryManager.class),
                        mock(IOManager.class),
-                       network,
+                       shuffleEnvironment,
                        new KvStateService(new KvStateRegistry(), null, null),
                        mock(BroadcastVariableManager.class),
                        new TaskEventDispatcher(),
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
index 0416e5c..5dbd5ce 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
@@ -42,7 +42,6 @@ import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import 
org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
@@ -54,6 +53,7 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.taskexecutor.KvStateService;
@@ -263,7 +263,7 @@ public class SynchronousCheckpointITCase {
                ResultPartitionConsumableNotifier consumableNotifier = new 
NoOpResultPartitionConsumableNotifier();
                PartitionProducerStateChecker partitionProducerStateChecker = 
mock(PartitionProducerStateChecker.class);
                Executor executor = mock(Executor.class);
-               NetworkEnvironment networkEnvironment = new 
NetworkEnvironmentBuilder().build();
+               ShuffleEnvironment shuffleEnvironment = new 
NetworkEnvironmentBuilder().build();
 
                TaskMetricGroup taskMetricGroup = 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
 
@@ -295,7 +295,7 @@ public class SynchronousCheckpointITCase {
                                0,
                                mock(MemoryManager.class),
                                mock(IOManager.class),
-                               networkEnvironment,
+                               shuffleEnvironment,
                                new KvStateService(new KvStateRegistry(), null, 
null),
                                mock(BroadcastVariableManager.class),
                                new TaskEventDispatcher(),
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index a6268ce..9f91a00 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -45,7 +45,6 @@ import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import 
org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
@@ -55,6 +54,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.AbstractSnapshotStrategy;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -224,7 +224,7 @@ public class TaskCheckpointingBehaviourTest extends 
TestLogger {
                                TestStreamTask.class.getName(),
                                taskConfig);
 
-               NetworkEnvironment network = new 
NetworkEnvironmentBuilder().build();
+               ShuffleEnvironment<?, ?> shuffleEnvironment = new 
NetworkEnvironmentBuilder().build();
 
                BlobCacheService blobService =
                        new BlobCacheService(mock(PermanentBlobCache.class), 
mock(TransientBlobCache.class));
@@ -241,7 +241,7 @@ public class TaskCheckpointingBehaviourTest extends 
TestLogger {
                                0,
                                mock(MemoryManager.class),
                                mock(IOManager.class),
-                               network,
+                               shuffleEnvironment,
                                new KvStateService(new KvStateRegistry(), null, 
null),
                                mock(BroadcastVariableManager.class),
                                new TaskEventDispatcher(),

Reply via email to