This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 80003d62f386fc95a3dbbc414f2cd4de7a26e1bd
Author: Zhijiang <[email protected]>
AuthorDate: Thu Jun 27 00:29:16 2019 +0800

    [FLINK-12735][network] Make shuffle environment implementation independent 
with IOManager
    
    The current creation of NettyShuffleEnvironment relies on IOManager from 
TaskManagerServices. Actually the shuffle only needs the
    file channel during creating partition, so it could internally create a 
light-weight FileChannelManager with its own prefix folder
    name instead of the heavy-weight IOManagerAsync.
---
 .../io/network/NettyShuffleEnvironment.java        | 13 ++++++
 .../io/network/NettyShuffleServiceFactory.java     | 17 +++++---
 .../network/partition/ResultPartitionFactory.java  | 16 +++----
 .../runtime/shuffle/ShuffleEnvironmentContext.java | 10 +----
 .../runtime/taskexecutor/TaskManagerServices.java  |  9 ++--
 .../NettyShuffleEnvironmentConfiguration.java      | 23 ++++++++--
 .../runtime/io/disk/NoOpFileChannelManager.java    | 51 ++++++++++++++++++++++
 .../io/network/NettyShuffleEnvironmentBuilder.java | 17 ++++----
 .../io/network/NettyShuffleEnvironmentTest.java    | 23 +++++++++-
 .../partition/BoundedBlockingSubpartitionTest.java | 24 +++++++++-
 .../BoundedBlockingSubpartitionWriteReadTest.java  | 21 ++++++++-
 .../io/network/partition/PartitionTestUtils.java   | 21 +++++++++
 .../network/partition/ResultPartitionBuilder.java  | 12 ++---
 .../partition/ResultPartitionFactoryTest.java      | 22 +++++++++-
 .../io/network/partition/ResultPartitionTest.java  | 27 +++++++++++-
 .../StreamNetworkBenchmarkEnvironment.java         |  7 ---
 16 files changed, 250 insertions(+), 63 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
index 5171d75..17fb2cc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
@@ -85,6 +86,8 @@ public class NettyShuffleEnvironment implements 
ShuffleEnvironment<ResultPartiti
 
        private final ResultPartitionManager resultPartitionManager;
 
+       private final FileChannelManager fileChannelManager;
+
        private final Map<InputGateID, SingleInputGate> inputGatesById;
 
        private final ResultPartitionFactory resultPartitionFactory;
@@ -99,6 +102,7 @@ public class NettyShuffleEnvironment implements 
ShuffleEnvironment<ResultPartiti
                        NetworkBufferPool networkBufferPool,
                        ConnectionManager connectionManager,
                        ResultPartitionManager resultPartitionManager,
+                       FileChannelManager fileChannelManager,
                        ResultPartitionFactory resultPartitionFactory,
                        SingleInputGateFactory singleInputGateFactory) {
                this.taskExecutorResourceId = taskExecutorResourceId;
@@ -107,6 +111,7 @@ public class NettyShuffleEnvironment implements 
ShuffleEnvironment<ResultPartiti
                this.connectionManager = connectionManager;
                this.resultPartitionManager = resultPartitionManager;
                this.inputGatesById = new ConcurrentHashMap<>(10);
+               this.fileChannelManager = fileChannelManager;
                this.resultPartitionFactory = resultPartitionFactory;
                this.singleInputGateFactory = singleInputGateFactory;
                this.isClosed = false;
@@ -326,6 +331,14 @@ public class NettyShuffleEnvironment implements 
ShuffleEnvironment<ResultPartiti
                                LOG.warn("Network buffer pool did not shut down 
properly.", t);
                        }
 
+                       // delete all the temp directories
+                       try {
+                               fileChannelManager.close();
+                       }
+                       catch (Throwable t) {
+                               LOG.warn("Cannot close the file channel manager 
properly.", t);
+                       }
+
                        isClosed = true;
                }
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index 74dbe0f..be31fb7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -22,7 +22,8 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
@@ -45,6 +46,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class NettyShuffleServiceFactory implements 
ShuffleServiceFactory<NettyShuffleDescriptor, ResultPartition, SingleInputGate> 
{
 
+       private static final String DIR_NAME_PREFIX = "netty-shuffle";
+
        @Override
        public NettyShuffleMaster createShuffleMaster(Configuration 
configuration) {
                return NettyShuffleMaster.INSTANCE;
@@ -62,8 +65,7 @@ public class NettyShuffleServiceFactory implements 
ShuffleServiceFactory<NettySh
                        networkConfig,
                        shuffleEnvironmentContext.getTaskExecutorResourceId(),
                        shuffleEnvironmentContext.getEventPublisher(),
-                       shuffleEnvironmentContext.getParentMetricGroup(),
-                       shuffleEnvironmentContext.getIOManager());
+                       shuffleEnvironmentContext.getParentMetricGroup());
        }
 
        @VisibleForTesting
@@ -71,18 +73,18 @@ public class NettyShuffleServiceFactory implements 
ShuffleServiceFactory<NettySh
                        NettyShuffleEnvironmentConfiguration config,
                        ResourceID taskExecutorResourceId,
                        TaskEventPublisher taskEventPublisher,
-                       MetricGroup metricGroup,
-                       IOManager ioManager) {
+                       MetricGroup metricGroup) {
                checkNotNull(config);
                checkNotNull(taskExecutorResourceId);
                checkNotNull(taskEventPublisher);
                checkNotNull(metricGroup);
-               checkNotNull(ioManager);
 
                NettyConfig nettyConfig = config.nettyConfig();
 
                ResultPartitionManager resultPartitionManager = new 
ResultPartitionManager();
 
+               FileChannelManager fileChannelManager = new 
FileChannelManagerImpl(config.getTempDirs(), DIR_NAME_PREFIX);
+
                ConnectionManager connectionManager = nettyConfig != null ?
                        new NettyConnectionManager(resultPartitionManager, 
taskEventPublisher, nettyConfig, config.isCreditBased()) :
                        new LocalConnectionManager();
@@ -96,7 +98,7 @@ public class NettyShuffleServiceFactory implements 
ShuffleServiceFactory<NettySh
 
                ResultPartitionFactory resultPartitionFactory = new 
ResultPartitionFactory(
                        resultPartitionManager,
-                       ioManager,
+                       fileChannelManager,
                        networkBufferPool,
                        config.networkBuffersPerChannel(),
                        config.floatingNetworkBuffersPerGate());
@@ -115,6 +117,7 @@ public class NettyShuffleServiceFactory implements 
ShuffleServiceFactory<NettySh
                        networkBufferPool,
                        connectionManager,
                        resultPartitionManager,
+                       fileChannelManager,
                        resultPartitionFactory,
                        singleInputGateFactory);
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index 137ea5f..aaaecf6 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
@@ -52,7 +52,7 @@ public class ResultPartitionFactory {
        private final ResultPartitionManager partitionManager;
 
        @Nonnull
-       private final IOManager ioManager;
+       private final FileChannelManager channelManager;
 
        @Nonnull
        private final BufferPoolFactory bufferPoolFactory;
@@ -63,13 +63,13 @@ public class ResultPartitionFactory {
 
        public ResultPartitionFactory(
                @Nonnull ResultPartitionManager partitionManager,
-               @Nonnull IOManager ioManager,
+               @Nonnull FileChannelManager channelManager,
                @Nonnull BufferPoolFactory bufferPoolFactory,
                int networkBuffersPerChannel,
                int floatingNetworkBuffersPerGate) {
 
                this.partitionManager = partitionManager;
-               this.ioManager = ioManager;
+               this.channelManager = channelManager;
                this.networkBuffersPerChannel = networkBuffersPerChannel;
                this.floatingNetworkBuffersPerGate = 
floatingNetworkBuffersPerGate;
                this.bufferPoolFactory = bufferPoolFactory;
@@ -135,7 +135,7 @@ public class ResultPartitionFactory {
                // Create the subpartitions.
                switch (type) {
                        case BLOCKING:
-                               
initializeBoundedBlockingPartitions(subpartitions, partition, ioManager, 
networkBufferSize);
+                               
initializeBoundedBlockingPartitions(subpartitions, partition, 
networkBufferSize, channelManager);
                                break;
 
                        case PIPELINED:
@@ -154,13 +154,13 @@ public class ResultPartitionFactory {
        private static void initializeBoundedBlockingPartitions(
                ResultSubpartition[] subpartitions,
                ResultPartition parent,
-               IOManager ioManager,
-               int networkBufferSize) {
+               int networkBufferSize,
+               FileChannelManager channelManager) {
 
                int i = 0;
                try {
                        for (; i < subpartitions.length; i++) {
-                               final File spillFile = 
ioManager.createChannel().getPathFile();
+                               final File spillFile = 
channelManager.createChannel().getPathFile();
                                subpartitions[i] = 
BOUNDED_BLOCKING_TYPE.create(i, parent, spillFile, networkBufferSize);
                        }
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
index bde82eb..7c1abc3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.shuffle;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.TaskEventPublisher;
 
 import java.net.InetAddress;
@@ -39,7 +38,6 @@ public class ShuffleEnvironmentContext {
        private final InetAddress hostAddress;
        private final TaskEventPublisher eventPublisher;
        private final MetricGroup parentMetricGroup;
-       private final IOManager ioManager;
 
        public ShuffleEnvironmentContext(
                        Configuration configuration,
@@ -48,8 +46,7 @@ public class ShuffleEnvironmentContext {
                        boolean localCommunicationOnly,
                        InetAddress hostAddress,
                        TaskEventPublisher eventPublisher,
-                       MetricGroup parentMetricGroup,
-                       IOManager ioManager) {
+                       MetricGroup parentMetricGroup) {
                this.configuration = checkNotNull(configuration);
                this.taskExecutorResourceId = 
checkNotNull(taskExecutorResourceId);
                this.maxJvmHeapMemory = maxJvmHeapMemory;
@@ -57,7 +54,6 @@ public class ShuffleEnvironmentContext {
                this.hostAddress = checkNotNull(hostAddress);
                this.eventPublisher = checkNotNull(eventPublisher);
                this.parentMetricGroup = checkNotNull(parentMetricGroup);
-               this.ioManager = checkNotNull(ioManager);
        }
 
        public Configuration getConfiguration() {
@@ -87,8 +83,4 @@ public class ShuffleEnvironmentContext {
        public MetricGroup getParentMetricGroup() {
                return parentMetricGroup;
        }
-
-       public IOManager getIOManager() {
-               return ioManager;
-       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 0aafce0..8de72f1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -244,8 +244,7 @@ public class TaskManagerServices {
                final ShuffleEnvironment<?, ?> shuffleEnvironment = 
createShuffleEnvironment(
                        taskManagerServicesConfiguration,
                        taskEventDispatcher,
-                       taskManagerMetricGroup,
-                       ioManager);
+                       taskManagerMetricGroup);
                final int dataPort = shuffleEnvironment.start();
 
                final KvStateService kvStateService = 
KvStateService.fromConfiguration(taskManagerServicesConfiguration);
@@ -307,8 +306,7 @@ public class TaskManagerServices {
        private static ShuffleEnvironment<?, ?> createShuffleEnvironment(
                        TaskManagerServicesConfiguration 
taskManagerServicesConfiguration,
                        TaskEventDispatcher taskEventDispatcher,
-                       MetricGroup taskManagerMetricGroup,
-                       IOManager ioManager) throws FlinkException {
+                       MetricGroup taskManagerMetricGroup) throws 
FlinkException {
 
                final ShuffleEnvironmentContext shuffleEnvironmentContext = new 
ShuffleEnvironmentContext(
                        taskManagerServicesConfiguration.getConfiguration(),
@@ -317,8 +315,7 @@ public class TaskManagerServices {
                        
taskManagerServicesConfiguration.isLocalCommunicationOnly(),
                        
taskManagerServicesConfiguration.getTaskManagerAddress(),
                        taskEventDispatcher,
-                       taskManagerMetricGroup,
-                       ioManager);
+                       taskManagerMetricGroup);
 
                return ShuffleServiceLoader
                        
.loadShuffleServiceFactory(taskManagerServicesConfiguration.getConfiguration())
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index a6ff17b..daccd3e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
@@ -28,6 +29,7 @@ import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,6 +37,7 @@ import javax.annotation.Nullable;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.Arrays;
 
 /**
  * Configuration object for the network stack.
@@ -62,6 +65,8 @@ public class NettyShuffleEnvironmentConfiguration {
 
        private final NettyConfig nettyConfig;
 
+       private final String[] tempDirs;
+
        public NettyShuffleEnvironmentConfiguration(
                        int numNetworkBuffers,
                        int networkBufferSize,
@@ -71,7 +76,8 @@ public class NettyShuffleEnvironmentConfiguration {
                        int floatingNetworkBuffersPerGate,
                        boolean isCreditBased,
                        boolean isNetworkDetailedMetrics,
-                       @Nullable NettyConfig nettyConfig) {
+                       @Nullable NettyConfig nettyConfig,
+                       String[] tempDirs) {
 
                this.numNetworkBuffers = numNetworkBuffers;
                this.networkBufferSize = networkBufferSize;
@@ -82,6 +88,7 @@ public class NettyShuffleEnvironmentConfiguration {
                this.isCreditBased = isCreditBased;
                this.isNetworkDetailedMetrics = isNetworkDetailedMetrics;
                this.nettyConfig = nettyConfig;
+               this.tempDirs = Preconditions.checkNotNull(tempDirs);
        }
 
        // 
------------------------------------------------------------------------
@@ -122,6 +129,10 @@ public class NettyShuffleEnvironmentConfiguration {
                return isNetworkDetailedMetrics;
        }
 
+       public String[] getTempDirs() {
+               return tempDirs;
+       }
+
        // 
------------------------------------------------------------------------
 
        /**
@@ -158,6 +169,8 @@ public class NettyShuffleEnvironmentConfiguration {
 
                boolean isNetworkDetailedMetrics = 
configuration.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_DETAILED_METRICS);
 
+               String[] tempDirs = 
ConfigurationUtils.parseTempDirectories(configuration);
+
                return new NettyShuffleEnvironmentConfiguration(
                        numberOfNetworkBuffers,
                        pageSize,
@@ -167,7 +180,8 @@ public class NettyShuffleEnvironmentConfiguration {
                        extraBuffersPerGate,
                        isCreditBased,
                        isNetworkDetailedMetrics,
-                       nettyConfig);
+                       nettyConfig,
+                       tempDirs);
        }
 
        /**
@@ -467,6 +481,7 @@ public class NettyShuffleEnvironmentConfiguration {
                result = 31 * result + floatingNetworkBuffersPerGate;
                result = 31 * result + (isCreditBased ? 1 : 0);
                result = 31 * result + (nettyConfig != null ? 
nettyConfig.hashCode() : 0);
+               result = 31 * result + Arrays.hashCode(tempDirs);
                return result;
        }
 
@@ -488,7 +503,8 @@ public class NettyShuffleEnvironmentConfiguration {
                                        this.networkBuffersPerChannel == 
that.networkBuffersPerChannel &&
                                        this.floatingNetworkBuffersPerGate == 
that.floatingNetworkBuffersPerGate &&
                                        this.isCreditBased == 
that.isCreditBased &&
-                                       (nettyConfig != null ? 
nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null);
+                                       (nettyConfig != null ? 
nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null) &&
+                                       Arrays.equals(this.tempDirs, 
that.tempDirs);
                }
        }
 
@@ -503,6 +519,7 @@ public class NettyShuffleEnvironmentConfiguration {
                                ", floatingNetworkBuffersPerGate=" + 
floatingNetworkBuffersPerGate +
                                ", isCreditBased=" + isCreditBased +
                                ", nettyConfig=" + nettyConfig +
+                               ", tempDirs=" + Arrays.toString(tempDirs) +
                                '}';
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/NoOpFileChannelManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/NoOpFileChannelManager.java
new file mode 100644
index 0000000..4fcf3ab
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/NoOpFileChannelManager.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
+
+import java.io.File;
+
+/**
+ * An {@link FileChannelManager} that cannot do I/O but serves as a mock for 
tests.
+ */
+public enum NoOpFileChannelManager implements FileChannelManager {
+
+       INSTANCE;
+
+       @Override
+       public ID createChannel() {
+               throw  new UnsupportedOperationException();
+       }
+
+       @Override
+       public Enumerator createChannelEnumerator() {
+               throw  new UnsupportedOperationException();
+       }
+
+       @Override
+       public File[] getPaths() {
+               throw  new UnsupportedOperationException();
+       }
+
+       @Override
+       public void close() {
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index b0ef430..32b3744 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -20,17 +20,18 @@ package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import 
org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 
 /**
  * Builder for the {@link NettyShuffleEnvironment}.
  */
 public class NettyShuffleEnvironmentBuilder {
 
+       private static final String[] DEFAULT_TEMP_DIRS = new String[] 
{EnvironmentInformation.getTemporaryFileDirectory()};
+
        private int numNetworkBuffers = 1024;
 
        private int networkBufferSize = 32 * 1024;
@@ -55,7 +56,7 @@ public class NettyShuffleEnvironmentBuilder {
 
        private MetricGroup metricGroup = 
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup();
 
-       private IOManager ioManager = new IOManagerAsync();
+       private String[] tempDirs = DEFAULT_TEMP_DIRS;
 
        public NettyShuffleEnvironmentBuilder setTaskManagerLocation(ResourceID 
taskManagerLocation) {
                this.taskManagerLocation = taskManagerLocation;
@@ -112,8 +113,8 @@ public class NettyShuffleEnvironmentBuilder {
                return this;
        }
 
-       public NettyShuffleEnvironmentBuilder setIOManager(IOManager ioManager) 
{
-               this.ioManager = ioManager;
+       public NettyShuffleEnvironmentBuilder setTempDirs(String[] tempDirs) {
+               this.tempDirs = tempDirs;
                return this;
        }
 
@@ -128,10 +129,10 @@ public class NettyShuffleEnvironmentBuilder {
                                floatingNetworkBuffersPerGate,
                                isCreditBased,
                                isNetworkDetailedMetrics,
-                               nettyConfig),
+                               nettyConfig,
+                               tempDirs),
                        taskManagerLocation,
                        taskEventDispatcher,
-                       metricGroup,
-                       ioManager);
+                       metricGroup);
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
index d4f0727..11951f2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -28,8 +30,11 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
 import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -53,6 +58,10 @@ import static org.powermock.api.mockito.PowerMockito.spy;
 @RunWith(Parameterized.class)
 public class NettyShuffleEnvironmentTest extends TestLogger {
 
+       private static final String tempDir = 
EnvironmentInformation.getTemporaryFileDirectory();
+
+       private static FileChannelManager fileChannelManager;
+
        @Parameterized.Parameter
        public boolean enableCreditBasedFlowControl;
 
@@ -64,6 +73,16 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
        @Rule
        public ExpectedException expectedException = ExpectedException.none();
 
+       @BeforeClass
+       public static void setUp() {
+               fileChannelManager = new FileChannelManagerImpl(new String[] 
{tempDir}, "testing");
+       }
+
+       @AfterClass
+       public static void shutdown() throws Exception {
+               fileChannelManager.close();
+       }
+
        /**
         * Verifies that {@link 
Task#setupPartitionsAndGates(ResultPartitionWriter[], InputGate[])}} sets up 
(un)bounded buffer pool
         * instances for various types of input and output channels.
@@ -76,7 +95,7 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 
                // result partitions
                ResultPartition rp1 = createPartition(network, 
ResultPartitionType.PIPELINED, 2);
-               ResultPartition rp2 = createPartition(network, 
ResultPartitionType.BLOCKING, 2);
+               ResultPartition rp2 = createPartition(network, 
fileChannelManager, ResultPartitionType.BLOCKING, 2);
                ResultPartition rp3 = createPartition(network, 
ResultPartitionType.PIPELINED_BOUNDED, 2);
                ResultPartition rp4 = createPartition(network, 
ResultPartitionType.PIPELINED_BOUNDED, 8);
                final ResultPartition[] resultPartitions = new 
ResultPartition[] {rp1, rp2, rp3, rp4};
@@ -179,7 +198,7 @@ public class NettyShuffleEnvironmentTest extends TestLogger 
{
 
                // result partitions
                ResultPartition rp1 = createPartition(network, 
ResultPartitionType.PIPELINED, 2);
-               ResultPartition rp2 = createPartition(network, 
ResultPartitionType.BLOCKING, 2);
+               ResultPartition rp2 = createPartition(network, 
fileChannelManager, ResultPartitionType.BLOCKING, 2);
                ResultPartition rp3 = createPartition(network, 
ResultPartitionType.PIPELINED_BOUNDED, 2);
                ResultPartition rp4 = createPartition(network, 
ResultPartitionType.PIPELINED_BOUNDED, 4);
                final ResultPartition[] resultPartitions = new 
ResultPartition[] {rp1, rp2, rp3, rp4};
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
index e76cd1e..9bd0c4b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
@@ -18,8 +18,13 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -29,6 +34,7 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 
+import static 
org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -41,9 +47,23 @@ import static org.junit.Assert.fail;
  */
 public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase {
 
+       private static final String tempDir = 
EnvironmentInformation.getTemporaryFileDirectory();
+
+       private static FileChannelManager fileChannelManager;
+
        @ClassRule
        public static final TemporaryFolder TMP_DIR = new TemporaryFolder();
 
+       @BeforeClass
+       public static void setUp() {
+               fileChannelManager = new FileChannelManagerImpl(new String[] 
{tempDir}, "testing");
+       }
+
+       @AfterClass
+       public static void shutdown() throws Exception {
+               fileChannelManager.close();
+       }
+
        // 
------------------------------------------------------------------------
 
        @Test
@@ -74,14 +94,14 @@ public class BoundedBlockingSubpartitionTest extends 
SubpartitionTestBase {
 
        @Override
        ResultSubpartition createSubpartition() throws Exception {
-               final ResultPartition resultPartition = 
PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING);
+               final ResultPartition resultPartition = 
createPartition(ResultPartitionType.BLOCKING, fileChannelManager);
                return BoundedBlockingSubpartition.createWithMemoryMappedFile(
                                0, resultPartition, new 
File(TMP_DIR.newFolder(), "subpartition"));
        }
 
        @Override
        ResultSubpartition createFailingWritesSubpartition() throws Exception {
-               final ResultPartition resultPartition = 
PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING);
+               final ResultPartition resultPartition = 
createPartition(ResultPartitionType.BLOCKING, fileChannelManager);
 
                return new BoundedBlockingSubpartition(
                                0,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
index 359ad8d..67ab6fc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
@@ -21,9 +21,14 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -48,6 +53,10 @@ import static org.junit.Assert.assertTrue;
 @RunWith(Parameterized.class)
 public class BoundedBlockingSubpartitionWriteReadTest {
 
+       private static final String tempDir = 
EnvironmentInformation.getTemporaryFileDirectory();
+
+       private static FileChannelManager fileChannelManager;
+
        @ClassRule
        public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
 
@@ -71,6 +80,16 @@ public class BoundedBlockingSubpartitionWriteReadTest {
        //  tests
        // 
------------------------------------------------------------------------
 
+       @BeforeClass
+       public static void setUp() {
+               fileChannelManager = new FileChannelManagerImpl(new String[] 
{tempDir}, "testing");
+       }
+
+       @AfterClass
+       public static void shutdown() throws Exception {
+               fileChannelManager.close();
+       }
+
        @Test
        public void testWriteAndReadData() throws Exception {
                final int numLongs = 15_000_000; // roughly 115 MiBytes
@@ -188,7 +207,7 @@ public class BoundedBlockingSubpartitionWriteReadTest {
        private BoundedBlockingSubpartition createSubpartition() throws 
IOException {
                return type.create(
                                0,
-                               
PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING),
+                               
PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING, 
fileChannelManager),
                                new File(TMP_FOLDER.newFolder(), 
"partitiondata"),
                                BUFFER_SIZE);
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index 51eff94..e559659 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
@@ -50,12 +51,32 @@ public class PartitionTestUtils {
                return new 
ResultPartitionBuilder().setResultPartitionType(type).build();
        }
 
+       public static ResultPartition createPartition(ResultPartitionType type, 
FileChannelManager channelManager) {
+               return new ResultPartitionBuilder()
+                       .setResultPartitionType(type)
+                       .setFileChannelManager(channelManager)
+                       .build();
+       }
+
+       public static ResultPartition createPartition(
+                       NettyShuffleEnvironment environment,
+                       ResultPartitionType partitionType,
+                       int numChannels) {
+               return new ResultPartitionBuilder()
+                       
.setupBufferPoolFactoryFromNettyShuffleEnvironment(environment)
+                       .setResultPartitionType(partitionType)
+                       .setNumberOfSubpartitions(numChannels)
+                       .build();
+       }
+
        public static ResultPartition createPartition(
                        NettyShuffleEnvironment environment,
+                       FileChannelManager channelManager,
                        ResultPartitionType partitionType,
                        int numChannels) {
                return new ResultPartitionBuilder()
                        
.setupBufferPoolFactoryFromNettyShuffleEnvironment(environment)
+                       .setFileChannelManager(channelManager)
                        .setResultPartitionType(partitionType)
                        .setNumberOfSubpartitions(numChannels)
                        .build();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index bf403ac..3da8eb8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.NoOpFileChannelManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
@@ -44,7 +44,7 @@ public class ResultPartitionBuilder {
 
        private ResultPartitionManager partitionManager = new 
ResultPartitionManager();
 
-       private IOManager ioManager = new IOManagerAsync();
+       private FileChannelManager channelManager = 
NoOpFileChannelManager.INSTANCE;
 
        private NetworkBufferPool networkBufferPool = new NetworkBufferPool(1, 
1, 1);
 
@@ -82,8 +82,8 @@ public class ResultPartitionBuilder {
                return this;
        }
 
-       public ResultPartitionBuilder setIOManager(IOManager ioManager) {
-               this.ioManager = ioManager;
+       public ResultPartitionBuilder setFileChannelManager(FileChannelManager 
channelManager) {
+               this.channelManager = channelManager;
                return this;
        }
 
@@ -122,7 +122,7 @@ public class ResultPartitionBuilder {
        public ResultPartition build() {
                ResultPartitionFactory resultPartitionFactory = new 
ResultPartitionFactory(
                        partitionManager,
-                       ioManager,
+                       channelManager,
                        networkBufferPool,
                        networkBuffersPerChannel,
                        floatingNetworkBuffersPerGate);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index 0c6848b..f2e0e58 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -18,15 +18,19 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.io.disk.iomanager.NoOpIOManager;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
@@ -38,6 +42,20 @@ import static org.hamcrest.MatcherAssert.assertThat;
  */
 public class ResultPartitionFactoryTest extends TestLogger {
 
+       private static final String tempDir = 
EnvironmentInformation.getTemporaryFileDirectory();
+
+       private static FileChannelManager fileChannelManager;
+
+       @BeforeClass
+       public static void setUp() {
+               fileChannelManager = new FileChannelManagerImpl(new String[] 
{tempDir}, "testing");
+       }
+
+       @AfterClass
+       public static void shutdown() throws Exception {
+               fileChannelManager.close();
+       }
+
        @Test
        public void testConsumptionOnReleaseEnabled() {
                final ResultPartition resultPartition = 
createResultPartition(ShuffleDescriptor.ReleaseType.AUTO);
@@ -53,7 +71,7 @@ public class ResultPartitionFactoryTest extends TestLogger {
        private static ResultPartition 
createResultPartition(ShuffleDescriptor.ReleaseType releaseType) {
                ResultPartitionFactory factory = new ResultPartitionFactory(
                        new ResultPartitionManager(),
-                       new NoOpIOManager(),
+                       fileChannelManager,
                        new NetworkBufferPool(1, 64, 1),
                        1,
                        1
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 2072a12..1024780 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -28,8 +30,11 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import 
org.apache.flink.runtime.taskmanager.ConsumableNotifyingResultPartitionWriterDecorator;
 import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -55,6 +60,20 @@ import static org.mockito.Mockito.verify;
  */
 public class ResultPartitionTest {
 
+       private static final String tempDir = 
EnvironmentInformation.getTemporaryFileDirectory();
+
+       private static FileChannelManager fileChannelManager;
+
+       @BeforeClass
+       public static void setUp() {
+               fileChannelManager = new FileChannelManagerImpl(new String[] 
{tempDir}, "testing");
+       }
+
+       @AfterClass
+       public static void shutdown() throws Exception {
+               fileChannelManager.close();
+       }
+
        /**
         * Tests the schedule or update consumers message sending behaviour 
depending on the relevant flags.
         */
@@ -107,6 +126,7 @@ public class ResultPartitionTest {
                        .isReleasedOnConsumption(false)
                        .setResultPartitionManager(manager)
                        .setResultPartitionType(ResultPartitionType.BLOCKING)
+                       .setFileChannelManager(fileChannelManager)
                        .build();
 
                manager.registerResultPartition(partition);
@@ -181,7 +201,8 @@ public class ResultPartitionTest {
                ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
                JobID jobId = new JobID();
                TaskActions taskActions = new NoOpTaskActions();
-               ResultPartition partition = createPartition(partitionType);
+               ResultPartition partition = partitionType == 
ResultPartitionType.BLOCKING ?
+                       createPartition(partitionType, fileChannelManager) : 
createPartition(partitionType);
                ResultPartitionWriter consumableNotifyingPartitionWriter = 
ConsumableNotifyingResultPartitionWriterDecorator.decorate(
                        
Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
                        new ResultPartitionWriter[] {partition},
@@ -311,9 +332,11 @@ public class ResultPartitionTest {
                        TaskActions taskActions,
                        JobID jobId,
                        ResultPartitionConsumableNotifier notifier) {
+               ResultPartition partition = partitionType == 
ResultPartitionType.BLOCKING ?
+                       createPartition(partitionType, fileChannelManager) : 
createPartition(partitionType);
                return 
ConsumableNotifyingResultPartitionWriterDecorator.decorate(
                        
Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
-                       new ResultPartitionWriter[] 
{createPartition(partitionType)},
+                       new ResultPartitionWriter[] {partition},
                        taskActions,
                        jobId,
                        notifier)[0];
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index b7d7430..0cdc658 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -24,8 +24,6 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -81,7 +79,6 @@ public class StreamNetworkBenchmarkEnvironment<T extends 
IOReadableWritable> {
 
        protected NettyShuffleEnvironment senderEnv;
        protected NettyShuffleEnvironment receiverEnv;
-       protected IOManager ioManager;
 
        protected int channels;
        protected boolean broadcastMode = false;
@@ -142,8 +139,6 @@ public class StreamNetworkBenchmarkEnvironment<T extends 
IOReadableWritable> {
                        receiverBufferPoolSize = Math.max(2048, writers * 
channels * 4);
                }
 
-               ioManager = new IOManagerAsync();
-
                senderEnv = createShuffleEnvironment(senderBufferPoolSize, 
config);
                this.dataPort = senderEnv.start();
                if (localMode && senderBufferPoolSize == 
receiverBufferPoolSize) {
@@ -168,7 +163,6 @@ public class StreamNetworkBenchmarkEnvironment<T extends 
IOReadableWritable> {
        public void tearDown() {
                suppressExceptions(senderEnv::close);
                suppressExceptions(receiverEnv::close);
-               suppressExceptions(ioManager::close);
        }
 
        public SerializingLongReceiver createReceiver() throws Exception {
@@ -223,7 +217,6 @@ public class StreamNetworkBenchmarkEnvironment<T extends 
IOReadableWritable> {
                        
.setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED)
                        .setNumberOfSubpartitions(channels)
                        
.setResultPartitionManager(environment.getResultPartitionManager())
-                       .setIOManager(ioManager)
                        
.setupBufferPoolFactoryFromNettyShuffleEnvironment(environment)
                        .build();
 

Reply via email to