This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5809ed32f869fa880a70a774d5b8365fe59dba4a
Author: Zhijiang <[email protected]>
AuthorDate: Tue Jun 18 12:09:04 2019 +0800

    [hotfix][runtime] IOManager implements AutoCloseable
---
 .../flink/runtime/io/disk/iomanager/IOManager.java |  5 ++--
 .../runtime/io/disk/iomanager/IOManagerAsync.java  |  8 +++---
 .../runtime/taskexecutor/TaskManagerServices.java  |  2 +-
 .../flink/runtime/io/disk/ChannelViewsTest.java    |  2 +-
 .../runtime/io/disk/FileChannelStreamsITCase.java  |  2 +-
 .../runtime/io/disk/FileChannelStreamsTest.java    | 12 ++------
 .../io/disk/SeekableFileChannelInputViewTest.java  |  6 +---
 .../flink/runtime/io/disk/SpillingBufferTest.java  |  2 +-
 .../AsynchronousBufferFileWriterTest.java          |  2 +-
 .../iomanager/AsynchronousFileIOChannelTest.java   | 19 +++----------
 .../BufferFileWriterFileSegmentReaderTest.java     |  2 +-
 .../disk/iomanager/BufferFileWriterReaderTest.java |  2 +-
 .../io/disk/iomanager/IOManagerAsyncTest.java      |  4 +--
 .../runtime/io/disk/iomanager/IOManagerITCase.java |  2 +-
 .../runtime/io/disk/iomanager/IOManagerTest.java   | 33 ++++++++--------------
 .../runtime/operators/hash/HashTableITCase.java    |  2 +-
 .../hash/HashTablePerformanceComparison.java       |  5 +---
 .../runtime/operators/hash/HashTableTest.java      | 21 ++------------
 .../hash/NonReusingHashJoinIteratorITCase.java     |  2 +-
 .../operators/hash/ReOpenableHashTableITCase.java  |  2 +-
 .../hash/ReOpenableHashTableTestBase.java          |  2 +-
 .../hash/ReusingHashJoinIteratorITCase.java        |  2 +-
 .../resettable/SpillingResettableIteratorTest.java |  2 +-
 ...pillingResettableMutableObjectIteratorTest.java |  2 +-
 .../AbstractSortMergeOuterJoinIteratorITCase.java  |  2 +-
 .../sort/CombiningUnilateralSortMergerITCase.java  |  2 +-
 .../runtime/operators/sort/ExternalSortITCase.java |  2 +-
 .../sort/ExternalSortLargeRecordsITCase.java       |  2 +-
 .../sort/FixedLengthRecordSorterTest.java          |  2 +-
 .../operators/sort/LargeRecordHandlerITCase.java   | 18 ++----------
 .../operators/sort/LargeRecordHandlerTest.java     | 21 ++------------
 ...NonReusingSortMergeInnerJoinIteratorITCase.java |  2 +-
 .../ReusingSortMergeInnerJoinIteratorITCase.java   |  2 +-
 .../operators/sort/UnilateralSortMergerTest.java   |  4 +--
 .../testutils/BinaryOperatorTestBase.java          |  2 +-
 .../operators/testutils/DriverTestBase.java        |  2 +-
 .../operators/testutils/MockEnvironment.java       |  2 +-
 .../operators/testutils/UnaryOperatorTestBase.java |  2 +-
 .../operators/util/HashVsSortMiniBenchmark.java    |  2 +-
 .../streaming/runtime/io/BufferSpillerTest.java    |  2 +-
 ...CheckpointBarrierAlignerAlignmentLimitTest.java |  2 +-
 .../CheckpointBarrierAlignerMassiveRandomTest.java |  7 +----
 .../io/SpillingCheckpointBarrierAlignerTest.java   |  2 +-
 .../StreamNetworkBenchmarkEnvironment.java         |  2 +-
 .../runtime/tasks/StreamTaskTestHarness.java       |  2 +-
 .../flink/table/runtime/aggregate/HashAggTest.java |  2 +-
 .../runtime/hashtable/BinaryHashTableTest.java     |  2 +-
 .../io/CompressedHeaderlessChannelTest.java        |  2 +-
 .../join/Int2SortMergeJoinOperatorTest.java        |  2 +-
 .../runtime/sort/BinaryExternalSorterTest.java     |  2 +-
 .../runtime/sort/BufferedKVExternalSorterTest.java |  2 +-
 .../manual/HashTableRecordWidthCombinations.java   |  7 +----
 .../flink/test/manual/MassiveStringSorting.java    | 14 ++-------
 .../test/manual/MassiveStringValueSorting.java     | 14 ++-------
 54 files changed, 82 insertions(+), 192 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index 6723597..ee54b1e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -37,7 +37,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 /**
  * The facade for the provided I/O manager services.
  */
-public abstract class IOManager {
+public abstract class IOManager implements AutoCloseable {
        protected static final Logger LOG = 
LoggerFactory.getLogger(IOManager.class);
 
        /** The temporary directories for files. */
@@ -85,7 +85,8 @@ public abstract class IOManager {
         * Close method, marks the I/O manager as closed
         * and removed all temporary files.
         */
-       public void shutdown() {
+       @Override
+       public void close() {
                // remove all of our temp directories
                for (File path : paths) {
                        try {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
index 0c3c8f1..ffa4dcf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
@@ -100,7 +100,7 @@ public class IOManagerAsync extends IOManager implements 
UncaughtExceptionHandle
                }
 
                // install a shutdown hook that makes sure the temp directories 
get deleted
-               this.shutdownHook = 
ShutdownHookUtil.addShutdownHook(this::shutdown, getClass().getSimpleName(), 
LOG);
+               this.shutdownHook = 
ShutdownHookUtil.addShutdownHook(this::close, getClass().getSimpleName(), LOG);
        }
 
        /**
@@ -109,7 +109,7 @@ public class IOManagerAsync extends IOManager implements 
UncaughtExceptionHandle
         * operation.
         */
        @Override
-       public void shutdown() {
+       public void close() {
                // mark shut down and exit if it already was shut down
                if (!isShutdown.compareAndSet(false, true)) {
                        return;
@@ -157,7 +157,7 @@ public class IOManagerAsync extends IOManager implements 
UncaughtExceptionHandle
                finally {
                        // make sure we call the super implementation in any 
case and at the last point,
                        // because this will clean up the I/O directories
-                       super.shutdown();
+                       super.close();
                }
        }
        
@@ -186,7 +186,7 @@ public class IOManagerAsync extends IOManager implements 
UncaughtExceptionHandle
        @Override
        public void uncaughtException(Thread t, Throwable e) {
                LOG.error("IO Thread '" + t.getName() + "' terminated due to an 
exception. Shutting down I/O Manager.", e);
-               shutdown();
+               close();
        }
        
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 35236d9..0aafce0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -179,7 +179,7 @@ public class TaskManagerServices {
                }
 
                try {
-                       ioManager.shutdown();
+                       ioManager.close();
                } catch (Exception e) {
                        exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
index 8c7ca1b..a617109 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
@@ -83,7 +83,7 @@ public class ChannelViewsTest
 
        @After
        public void afterTest() {
-               this.ioManager.shutdown();
+               this.ioManager.close();
                if (!this.ioManager.isProperlyShutDown()) {
                        Assert.fail("I/O Manager was not properly shut down.");
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
index 7ffb58a..44fe849 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
@@ -74,7 +74,7 @@ public class FileChannelStreamsITCase extends TestLogger {
 
        @After
        public void afterTest() {
-               ioManager.shutdown();
+               ioManager.close();
                assertTrue("I/O Manager was not properly shut down.", 
ioManager.isProperlyShutDown());
                assertTrue("The memory has not been properly released", 
memManager.verifyEmpty());
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
index 1044a35..6bba3a6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
@@ -42,8 +42,7 @@ public class FileChannelStreamsTest {
 
        @Test
        public void testCloseAndDeleteOutputView() {
-               final IOManager ioManager = new IOManagerAsync();
-               try {
+               try (IOManager ioManager = new IOManagerAsync()) {
                        MemoryManager memMan = new MemoryManager(4 * 16*1024, 
1, 16*1024, MemoryType.HEAP, true);
                        List<MemorySegment> memory = new 
ArrayList<MemorySegment>();
                        memMan.allocatePages(new DummyInvokable(), memory, 4);
@@ -69,15 +68,11 @@ public class FileChannelStreamsTest {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       ioManager.shutdown();
-               }
        }
        
        @Test
        public void testCloseAndDeleteInputView() {
-               final IOManager ioManager = new IOManagerAsync();
-               try {
+               try (IOManager ioManager = new IOManagerAsync()) {
                        MemoryManager memMan = new MemoryManager(4 * 16*1024, 
1, 16*1024, MemoryType.HEAP, true);
                        List<MemorySegment> memory = new 
ArrayList<MemorySegment>();
                        memMan.allocatePages(new DummyInvokable(), memory, 4);
@@ -110,8 +105,5 @@ public class FileChannelStreamsTest {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       ioManager.shutdown();
-               }
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
index 4c6a2b3..1c5a2ad 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
@@ -39,12 +39,11 @@ public class SeekableFileChannelInputViewTest {
 
        @Test
        public void testSeek() {
-               final IOManager ioManager = new IOManagerAsync();
                final int PAGE_SIZE = 16 * 1024;
                final int NUM_RECORDS = 120000;
                // integers across 7.x pages (7 pages = 114.688 bytes, 8 pages 
= 131.072 bytes)
                
-               try {
+               try (IOManager ioManager = new IOManagerAsync()) {
                        MemoryManager memMan = new MemoryManager(4 * PAGE_SIZE, 
1, PAGE_SIZE, MemoryType.HEAP, true);
                        List<MemorySegment> memory = new 
ArrayList<MemorySegment>();
                        memMan.allocatePages(new DummyInvokable(), memory, 4);
@@ -150,8 +149,5 @@ public class SeekableFileChannelInputViewTest {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       ioManager.shutdown();
-               }
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
index 01a9723..f94735c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
@@ -72,7 +72,7 @@ public class SpillingBufferTest {
 
        @After
        public void afterTest() {
-               ioManager.shutdown();
+               ioManager.close();
                if (!ioManager.isProperlyShutDown()) {
                        Assert.fail("I/O Manager was not properly shut down.");
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
index bc4c42a..5ef3983 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
@@ -59,7 +59,7 @@ public class AsynchronousBufferFileWriterTest {
 
        @AfterClass
        public static void shutdown() {
-               ioManager.shutdown();
+               ioManager.close();
        }
 
        @Before
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
index e3d5907..ead6490 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
@@ -58,7 +58,6 @@ public class AsynchronousFileIOChannelTest {
                final int numberOfRequests = 100;
 
                // -- Setup 
-----------------------------------------------------------
-               final IOManagerAsync ioManager = new IOManagerAsync();
 
                final ExecutorService executor = 
Executors.newFixedThreadPool(3);
 
@@ -71,7 +70,7 @@ public class AsynchronousFileIOChannelTest {
                final TestNotificationListener listener = new 
TestNotificationListener();
 
                // -- The Test 
--------------------------------------------------------
-               try {
+               try (final IOManagerAsync ioManager = new IOManagerAsync()) {
                        // Repeatedly add requests and process them and have 
one thread try to register as a
                        // listener until the channel is closed and all 
requests are processed.
 
@@ -177,7 +176,6 @@ public class AsynchronousFileIOChannelTest {
                        }
                }
                finally {
-                       ioManager.shutdown();
                        executor.shutdown();
                }
        }
@@ -188,7 +186,6 @@ public class AsynchronousFileIOChannelTest {
                final int numberOfRuns = 1024;
 
                // -- Setup 
-----------------------------------------------------------
-               final IOManagerAsync ioManager = new IOManagerAsync();
 
                final ExecutorService executor = 
Executors.newFixedThreadPool(2);
 
@@ -200,7 +197,7 @@ public class AsynchronousFileIOChannelTest {
                final TestNotificationListener listener = new 
TestNotificationListener();
 
                // -- The Test 
--------------------------------------------------------
-               try {
+               try (final IOManagerAsync ioManager = new IOManagerAsync()) {
                        // Repeatedly close the channel and add a request.
                        for (int i = 0; i < numberOfRuns; i++) {
                                final TestAsyncFileIOChannel ioChannel = new 
TestAsyncFileIOChannel(
@@ -264,15 +261,13 @@ public class AsynchronousFileIOChannelTest {
                        }
                }
                finally {
-                       ioManager.shutdown();
                        executor.shutdown();
                }
        }
 
        @Test
        public void testClosingWaits() {
-               IOManagerAsync ioMan = new IOManagerAsync();
-               try {
+               try (final IOManagerAsync ioMan = new IOManagerAsync()) {
 
                        final int NUM_BLOCKS = 100;
                        final MemorySegment seg = 
MemorySegmentFactory.allocateUnpooledSegment(32 * 1024);
@@ -318,20 +313,14 @@ public class AsynchronousFileIOChannelTest {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       ioMan.shutdown();
-               }
        }
 
        @Test
        public void testExceptionForwardsToClose() {
-               IOManagerAsync ioMan = new IOManagerAsync();
-               try {
+               try (IOManagerAsync ioMan = new IOManagerAsync()) {
                        testExceptionForwardsToClose(ioMan, 100, 1);
                        testExceptionForwardsToClose(ioMan, 100, 50);
                        testExceptionForwardsToClose(ioMan, 100, 100);
-               } finally {
-                       ioMan.shutdown();
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
index 1aaefcb..d2c388c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
@@ -61,7 +61,7 @@ public class BufferFileWriterFileSegmentReaderTest {
 
        @AfterClass
        public static void shutdown() {
-               ioManager.shutdown();
+               ioManager.close();
        }
 
        @Before
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
index 29e7b44..78026bf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
@@ -58,7 +58,7 @@ public class BufferFileWriterReaderTest {
 
        @AfterClass
        public static void shutdown() {
-               ioManager.shutdown();
+               ioManager.close();
        }
 
        @Before
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
index 4656d56..a4d3795 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
@@ -47,7 +47,7 @@ public class IOManagerAsyncTest {
 
        @After
        public void afterTest() {
-               this.ioManager.shutdown();
+               this.ioManager.close();
                assertTrue("IO Manager has not properly shut down.", 
ioManager.isProperlyShutDown());
        }
 
@@ -352,4 +352,4 @@ public class IOManagerAsyncTest {
        final class TestIOException extends IOException {
                private static final long serialVersionUID = 
-814705441998024472L;
        }
-}
\ No newline at end of file
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
index 852e6e7..4a3f13f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
@@ -65,7 +65,7 @@ public class IOManagerITCase extends TestLogger {
 
        @After
        public void afterTest() throws Exception {
-               ioManager.shutdown();
+               ioManager.close();
                Assert.assertTrue("IO Manager has not properly shut down.", 
ioManager.isProperlyShutDown());
                
                Assert.assertTrue("Not all memory was returned to the memory 
manager in the test.", memoryManager.verifyEmpty());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
index 156098e..ce16bba 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
@@ -41,22 +41,18 @@ public class IOManagerTest {
 
        @Test
        public void channelEnumerator() throws IOException {
-               IOManager ioMan = null;
-
-               try {
-                       File tempPath = temporaryFolder.newFolder();
-
-                       String[] tempDirs = new String[]{
-                                       new File(tempPath, 
"a").getAbsolutePath(),
-                                       new File(tempPath, 
"b").getAbsolutePath(),
-                                       new File(tempPath, 
"c").getAbsolutePath(),
-                                       new File(tempPath, 
"d").getAbsolutePath(),
-                                       new File(tempPath, 
"e").getAbsolutePath(),
-                       };
-
-                       int[] counters = new int[tempDirs.length];
-
-                       ioMan = new TestIOManager(tempDirs);
+               File tempPath = temporaryFolder.newFolder();
+
+               String[] tempDirs = new String[]{
+                       new File(tempPath, "a").getAbsolutePath(),
+                       new File(tempPath, "b").getAbsolutePath(),
+                       new File(tempPath, "c").getAbsolutePath(),
+                       new File(tempPath, "d").getAbsolutePath(),
+                       new File(tempPath, "e").getAbsolutePath(),
+               };
+
+               int[] counters = new int[tempDirs.length];
+               try (IOManager ioMan = new TestIOManager(tempDirs) ) {
                        FileIOChannel.Enumerator enumerator = 
ioMan.createChannelEnumerator();
 
                        for (int i = 0; i < 3 * tempDirs.length; i++) {
@@ -81,11 +77,6 @@ public class IOManagerTest {
                                assertEquals(3, counters[k]);
                        }
                }
-               finally {
-                       if (ioMan != null) {
-                               ioMan.shutdown();
-                       }
-               }
        }
        
        // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
index 85315b7..cde6d89 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
@@ -103,7 +103,7 @@ public class HashTableITCase extends TestLogger {
        public void tearDown()
        {
                // shut down I/O manager and Memory Manager and verify the 
correct shutdown
-               this.ioManager.shutdown();
+               this.ioManager.close();
                if (!this.ioManager.isProperlyShutDown()) {
                        fail("I/O manager was not property shut down.");
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
index bc9daf2..c8748cc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
@@ -130,8 +130,7 @@ public class HashTablePerformanceComparison {
        
        @Test
        public void testMutableHashMapPerformance() {
-               final IOManager ioManager = new IOManagerAsync();
-               try {
+               try (IOManager ioManager = new IOManagerAsync()) {
                        final int NUM_MEM_PAGES = SIZE * NUM_PAIRS / PAGE_SIZE;
                        
                        MutableObjectIterator<IntPair> buildInput = new 
UniformIntPairGenerator(NUM_PAIRS, 1, false);
@@ -206,8 +205,6 @@ public class HashTablePerformanceComparison {
                catch (Exception e) {
                        e.printStackTrace();
                        fail("Error: " + e.getMessage());
-               } finally {
-                       ioManager.shutdown();
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
index bcf620c..2e3748f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
@@ -110,10 +110,7 @@ public class HashTableTest {
         */
        @Test
        public void testBufferMissingForProbing() {
-
-               final IOManager ioMan = new IOManagerAsync();
-
-               try {
+               try (final IOManager ioMan = new IOManagerAsync()) {
                        final int pageSize = 32*1024;
                        final int numSegments = 34;
                        final int numRecords = 3400;
@@ -151,9 +148,6 @@ public class HashTableTest {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       ioMan.shutdown();
-               }
        }
 
        /**
@@ -163,8 +157,6 @@ public class HashTableTest {
         */
        @Test
        public void testSpillingFreesOnlyOverflowSegments() {
-               final IOManager ioMan = new IOManagerAsync();
-               
                final TypeSerializer<ByteValue> serializer = 
ByteValueSerializer.INSTANCE;
                final TypeComparator<ByteValue> buildComparator = new 
ValueComparator<>(true, ByteValue.class);
                final TypeComparator<ByteValue> probeComparator = new 
ValueComparator<>(true, ByteValue.class);
@@ -172,7 +164,7 @@ public class HashTableTest {
                @SuppressWarnings("unchecked")
                final TypePairComparator<ByteValue, ByteValue> pairComparator = 
Mockito.mock(TypePairComparator.class);
                
-               try {
+               try (final IOManager ioMan = new IOManagerAsync()) {
                        final int pageSize = 32*1024;
                        final int numSegments = 34;
 
@@ -192,9 +184,6 @@ public class HashTableTest {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       ioMan.shutdown();
-               }
        }
 
        /**
@@ -203,9 +192,7 @@ public class HashTableTest {
         */
        @Test
        public void testSpillingWhenBuildingTableWithoutOverflow() throws 
Exception {
-               final IOManager ioMan = new IOManagerAsync();
-
-               try {
+               try (final IOManager ioMan = new IOManagerAsync()) {
                        final TypeSerializer<byte[]> serializer = 
BytePrimitiveArraySerializer.INSTANCE;
                        final TypeComparator<byte[]> buildComparator = new 
BytePrimitiveArrayComparator(true);
                        final TypeComparator<byte[]> probeComparator = new 
BytePrimitiveArrayComparator(true);
@@ -254,8 +241,6 @@ public class HashTableTest {
                        }
 
                        table.close();
-               } finally {
-                       ioMan.shutdown();
                }
        }
        
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java
index 802870b..4512ee8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java
@@ -105,7 +105,7 @@ public class NonReusingHashJoinIteratorITCase extends 
TestLogger {
        @After
        public void afterTest() {
                if (this.ioManager != null) {
-                       this.ioManager.shutdown();
+                       this.ioManager.close();
                        if (!this.ioManager.isProperlyShutDown()) {
                                Assert.fail("I/O manager failed to properly 
shut down.");
                        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
index 020f1c3..754da40 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
@@ -83,7 +83,7 @@ public class ReOpenableHashTableITCase extends TestLogger {
        @After
        public void afterTest() {
                if (this.ioManager != null) {
-                       this.ioManager.shutdown();
+                       this.ioManager.close();
                        if (!this.ioManager.isProperlyShutDown()) {
                                Assert.fail("I/O manager failed to properly 
shut down.");
                        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java
index 4008aa2..e6ac58a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java
@@ -94,7 +94,7 @@ public abstract class ReOpenableHashTableTestBase extends 
TestLogger {
        @After
        public void afterTest() {
                if (this.ioManager != null) {
-                       this.ioManager.shutdown();
+                       this.ioManager.close();
                        if (!this.ioManager.isProperlyShutDown()) {
                                Assert.fail("I/O manager failed to properly 
shut down.");
                        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java
index 8a51102..c199e38 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java
@@ -113,7 +113,7 @@ public class ReusingHashJoinIteratorITCase extends 
TestLogger {
        @After
        public void afterTest() {
                if (this.ioManager != null) {
-                       this.ioManager.shutdown();
+                       this.ioManager.close();
                        if (!this.ioManager.isProperlyShutDown()) {
                                Assert.fail("I/O manager failed to properly 
shut down.");
                        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
index 0ab9a54..3f5a25d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
@@ -72,7 +72,7 @@ public class SpillingResettableIteratorTest {
 
        @After
        public void shutdown() {
-               this.ioman.shutdown();
+               this.ioman.close();
                if (!this.ioman.isProperlyShutDown()) {
                        Assert.fail("I/O Manager Shutdown was not completed 
properly.");
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
index ef48a1f..2a69a63 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
@@ -69,7 +69,7 @@ public class SpillingResettableMutableObjectIteratorTest {
 
        @After
        public void shutdown() {
-               this.ioman.shutdown();
+               this.ioman.close();
                if (!this.ioman.isProperlyShutDown()) {
                        Assert.fail("I/O Manager Shutdown was not completed 
properly.");
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
index 94c0fd4..7cdf07f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
@@ -111,7 +111,7 @@ public abstract class 
AbstractSortMergeOuterJoinIteratorITCase extends TestLogge
        @After
        public void afterTest() {
                if (this.ioManager != null) {
-                       this.ioManager.shutdown();
+                       this.ioManager.close();
                        if (!this.ioManager.isProperlyShutDown()) {
                                Assert.fail("I/O manager failed to properly 
shut down.");
                        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
index d32cad0..96831f1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
@@ -93,7 +93,7 @@ public class CombiningUnilateralSortMergerITCase extends 
TestLogger {
 
        @After
        public void afterTest() {
-               this.ioManager.shutdown();
+               this.ioManager.close();
                if (!this.ioManager.isProperlyShutDown()) {
                        Assert.fail("I/O Manager was not properly shut down.");
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
index b30adc2..d1aa0a7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
@@ -86,7 +86,7 @@ public class ExternalSortITCase extends TestLogger {
 
        @After
        public void afterTest() {
-               this.ioManager.shutdown();
+               this.ioManager.close();
                if (!this.ioManager.isProperlyShutDown()) {
                        Assert.fail("I/O Manager was not properly shut down.");
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
index 530951f..335dbee 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
@@ -72,7 +72,7 @@ public class ExternalSortLargeRecordsITCase extends 
TestLogger {
 
        @After
        public void afterTest() {
-               this.ioManager.shutdown();
+               this.ioManager.close();
                if (!this.ioManager.isProperlyShutDown()) {
                        Assert.fail("I/O Manager was not properly shut down.");
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
index bba713e..2f05c78 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
@@ -79,7 +79,7 @@ public class FixedLengthRecordSorterTest {
                }
 
                if (this.ioManager != null) {
-                       ioManager.shutdown();
+                       ioManager.close();
                        ioManager = null;
                }
                
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
index 8f9e4dd..e73ee8c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
@@ -58,13 +58,11 @@ public class LargeRecordHandlerITCase extends TestLogger {
 
        @Test
        public void testRecordHandlerCompositeKey() {
-               
-               final IOManager ioMan = new IOManagerAsync();
                final int PAGE_SIZE = 4 * 1024;
                final int NUM_PAGES = 1000;
                final int NUM_RECORDS = 10;
                
-               try {
+               try (final IOManager ioMan = new IOManagerAsync()) {
                        final MemoryManager memMan = new 
MemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
                        final AbstractInvokable owner = new DummyInvokable();
                        
@@ -145,9 +143,6 @@ public class LargeRecordHandlerITCase extends TestLogger {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       ioMan.shutdown();
-               }
        }
        
        public static final class SomeVeryLongValue implements Value {
@@ -193,15 +188,13 @@ public class LargeRecordHandlerITCase extends TestLogger {
        
        @Test
        public void fileTest() {
-               
-               final IOManager ioMan = new IOManagerAsync();
                final int PAGE_SIZE = 4 * 1024;
                final int NUM_PAGES = 4;
                final int NUM_RECORDS = 10;
                
                FileIOChannel.ID channel = null;
                
-               try {
+               try (final IOManager ioMan = new IOManagerAsync()) {
                        final MemoryManager memMan = new 
MemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
                        final AbstractInvokable owner = new DummyInvokable();
                        
@@ -260,13 +253,6 @@ public class LargeRecordHandlerITCase extends TestLogger {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       if (channel != null) {
-                               ioMan.deleteChannel(channel);
-                       }
-
-                       ioMan.shutdown();
-               }
        }
 
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
index a59e630..7e9acf6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
@@ -46,12 +46,10 @@ public class LargeRecordHandlerTest {
 
        @Test
        public void testEmptyRecordHandler() {
-               
-               final IOManager ioMan = new IOManagerAsync();
                final int PAGE_SIZE = 4 * 1024;
                final int NUM_PAGES = 50;
                
-               try {
+               try (final IOManager ioMan = new IOManagerAsync()) {
                        final MemoryManager memMan = new 
MemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
                        final AbstractInvokable owner = new DummyInvokable();
                        final List<MemorySegment> memory = 
memMan.allocatePages(owner, NUM_PAGES);
@@ -88,20 +86,15 @@ public class LargeRecordHandlerTest {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       ioMan.shutdown();
-               }
        }
        
        @Test
        public void testRecordHandlerSingleKey() {
-               
-               final IOManager ioMan = new IOManagerAsync();
                final int PAGE_SIZE = 4 * 1024;
                final int NUM_PAGES = 24;
                final int NUM_RECORDS = 25000;
                
-               try {
+               try (final IOManager ioMan = new IOManagerAsync()) {
                        final MemoryManager memMan = new 
MemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
                        final AbstractInvokable owner = new DummyInvokable();
                        
@@ -174,20 +167,15 @@ public class LargeRecordHandlerTest {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       ioMan.shutdown();
-               }
        }
        
        @Test
        public void testRecordHandlerCompositeKey() {
-               
-               final IOManager ioMan = new IOManagerAsync();
                final int PAGE_SIZE = 4 * 1024;
                final int NUM_PAGES = 24;
                final int NUM_RECORDS = 25000;
                
-               try {
+               try (final IOManager ioMan = new IOManagerAsync()) {
                        final MemoryManager memMan = new 
MemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
                        final AbstractInvokable owner = new DummyInvokable();
                        
@@ -262,8 +250,5 @@ public class LargeRecordHandlerTest {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       ioMan.shutdown();
-               }
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
index 394d44c..c729a3a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
@@ -112,7 +112,7 @@ public class NonReusingSortMergeInnerJoinIteratorITCase 
extends TestLogger {
        @After
        public void afterTest() {
                if (this.ioManager != null) {
-                       this.ioManager.shutdown();
+                       this.ioManager.close();
                        if (!this.ioManager.isProperlyShutDown()) {
                                Assert.fail("I/O manager failed to properly 
shut down.");
                        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
index 8e5bd95..d149084 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
@@ -112,7 +112,7 @@ public class ReusingSortMergeInnerJoinIteratorITCase 
extends TestLogger {
        @After
        public void afterTest() {
                if (this.ioManager != null) {
-                       this.ioManager.shutdown();
+                       this.ioManager.close();
                        if (!this.ioManager.isProperlyShutDown()) {
                                Assert.fail("I/O manager failed to properly 
shut down.");
                        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java
index cdaa5b1..c014e23 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java
@@ -52,10 +52,9 @@ public class UnilateralSortMergerTest extends TestLogger {
 
                final int numPages = 32;
                final MemoryManager memoryManager = new 
MemoryManager(MemoryManager.DEFAULT_PAGE_SIZE * numPages, 1);
-               final IOManagerAsync ioManager = new IOManagerAsync();
                final DummyInvokable parentTask = new DummyInvokable();
 
-               try {
+               try (final IOManagerAsync ioManager = new IOManagerAsync()) {
                        final List<MemorySegment> memory = 
memoryManager.allocatePages(parentTask, numPages);
                        final UnilateralSortMerger<Tuple2<Integer, Integer>> 
unilateralSortMerger = new UnilateralSortMerger<>(
                                memoryManager,
@@ -85,7 +84,6 @@ public class UnilateralSortMergerTest extends TestLogger {
                                assertThat(inMemorySorter.isDisposed(), 
is(true));
                        }
                } finally {
-                       ioManager.shutdown();
                        memoryManager.shutdown();
                }
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
index a76f110..d2a4f6a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
@@ -388,7 +388,7 @@ public abstract class BinaryOperatorTestBase<S extends 
Function, IN, OUT> extend
                this.sorters.clear();
                
                // 2nd, shutdown I/O
-               this.ioManager.shutdown();
+               this.ioManager.close();
                Assert.assertTrue("I/O Manager has not properly shut down.", 
this.ioManager.isProperlyShutDown());
                
                // last, verify all memory is returned and shutdown mem manager
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 3820bf9..e1d2cb9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -385,7 +385,7 @@ public abstract class DriverTestBase<S extends Function> 
extends TestLogger impl
                this.sorters.clear();
                
                // 2nd, shutdown I/O
-               this.ioManager.shutdown();
+               this.ioManager.close();
                Assert.assertTrue("I/O Manager has not properly shut down.", 
this.ioManager.isProperlyShutDown());
 
                // last, verify all memory is returned and shutdown mem manager
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 89a6ea4..2c96ca4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -343,7 +343,7 @@ public class MockEnvironment implements Environment, 
AutoCloseable {
                }
 
                memManager.shutdown();
-               ioManager.shutdown();
+               ioManager.close();
 
                checkState(ioManager.isProperlyShutDown(), "IO Manager has not 
properly shut down.");
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
index 2ef82da..1ff50b0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -378,7 +378,7 @@ public abstract class UnaryOperatorTestBase<S extends 
Function, IN, OUT> extends
                }
                
                // 2nd, shutdown I/O
-               this.ioManager.shutdown();
+               this.ioManager.close();
                Assert.assertTrue("I/O Manager has not properly shut down.", 
this.ioManager.isProperlyShutDown());
 
                // last, verify all memory is returned and shutdown mem manager
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index 2696045..7b4eaba 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -106,7 +106,7 @@ public class HashVsSortMiniBenchmark {
                }
                
                if (this.ioManager != null) {
-                       this.ioManager.shutdown();
+                       this.ioManager.close();
                        if (!this.ioManager.isProperlyShutDown()) {
                                Assert.fail("I/O manager failed to properly 
shut down.");
                        }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
index 4d46451..d19b99c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
@@ -55,7 +55,7 @@ public class BufferSpillerTest extends BufferStorageTestBase {
 
        @AfterClass
        public static void shutdownIOManager() {
-               ioManager.shutdown();
+               ioManager.close();
        }
 
        @Before
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
index 0621179..535dfc0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
@@ -78,7 +78,7 @@ public class CheckpointBarrierAlignerAlignmentLimitTest {
 
        @AfterClass
        public static void shutdownIOManager() {
-               ioManager.shutdown();
+               ioManager.close();
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
index 82920aa..2b452bd 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
@@ -47,11 +47,9 @@ public class CheckpointBarrierAlignerMassiveRandomTest {
 
        @Test
        public void testWithTwoChannelsAndRandomBarriers() {
-               IOManager ioMan = null;
                NetworkBufferPool networkBufferPool1 = null;
                NetworkBufferPool networkBufferPool2 = null;
-               try {
-                       ioMan = new IOManagerAsync();
+               try (IOManager ioMan = new IOManagerAsync()) {
 
                        networkBufferPool1 = new NetworkBufferPool(100, 
PAGE_SIZE, 1);
                        networkBufferPool2 = new NetworkBufferPool(100, 
PAGE_SIZE, 1);
@@ -76,9 +74,6 @@ public class CheckpointBarrierAlignerMassiveRandomTest {
                        fail(e.getMessage());
                }
                finally {
-                       if (ioMan != null) {
-                               ioMan.shutdown();
-                       }
                        if (networkBufferPool1 != null) {
                                networkBufferPool1.destroyAllBufferPools();
                                networkBufferPool1.destroy();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java
index c892073..0d28325 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java
@@ -49,7 +49,7 @@ public class SpillingCheckpointBarrierAlignerTest extends 
CheckpointBarrierAlign
 
        @AfterClass
        public static void shutdownIOManager() {
-               ioManager.shutdown();
+               ioManager.close();
        }
 
        @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 0c399f7..b7d7430 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -168,7 +168,7 @@ public class StreamNetworkBenchmarkEnvironment<T extends 
IOReadableWritable> {
        public void tearDown() {
                suppressExceptions(senderEnv::close);
                suppressExceptions(receiverEnv::close);
-               suppressExceptions(ioManager::shutdown);
+               suppressExceptions(ioManager::close);
        }
 
        public SerializingLongReceiver createReceiver() throws Exception {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 12fa8be..c059757 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -325,7 +325,7 @@ public class StreamTaskTestHarness<OUT> {
        }
 
        private void shutdownIOManager() throws Exception {
-               this.mockEnv.getIOManager().shutdown();
+               this.mockEnv.getIOManager().close();
                Assert.assertTrue("IO Manager has not properly shut down.", 
this.mockEnv.getIOManager().isProperlyShutDown());
        }
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/aggregate/HashAggTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/aggregate/HashAggTest.java
index 9bab9b9..f35a40c 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/aggregate/HashAggTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/aggregate/HashAggTest.java
@@ -95,7 +95,7 @@ public class HashAggTest {
 
        @After
        public void afterTest() {
-               this.ioManager.shutdown();
+               this.ioManager.close();
                if (!this.ioManager.isProperlyShutDown()) {
                        Assert.fail("I/O Manager was not properly shut down.");
                }
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
index ee08da5..de4e949 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
@@ -95,7 +95,7 @@ public class BinaryHashTableTest {
        @After
        public void tearDown() {
                // shut down I/O manager and Memory Manager and verify the 
correct shutdown
-               this.ioManager.shutdown();
+               this.ioManager.close();
                if (!this.ioManager.isProperlyShutDown()) {
                        fail("I/O manager was not property shut down.");
                }
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java
index 177e83b..14039c8 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java
@@ -51,7 +51,7 @@ public class CompressedHeaderlessChannelTest {
 
        @After
        public void afterTest() {
-               this.ioManager.shutdown();
+               this.ioManager.close();
                if (!this.ioManager.isProperlyShutDown()) {
                        Assert.fail("I/O Manager was not properly shut down.");
                }
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/join/Int2SortMergeJoinOperatorTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/join/Int2SortMergeJoinOperatorTest.java
index 1402c11..7b47903 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/join/Int2SortMergeJoinOperatorTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/join/Int2SortMergeJoinOperatorTest.java
@@ -78,7 +78,7 @@ public class Int2SortMergeJoinOperatorTest {
        @After
        public void tearDown() {
                // shut down I/O manager and Memory Manager and verify the 
correct shutdown
-               this.ioManager.shutdown();
+               this.ioManager.close();
                if (!this.ioManager.isProperlyShutDown()) {
                        fail("I/O manager was not property shut down.");
                }
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java
index 8b389c5..8eb62d0 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java
@@ -101,7 +101,7 @@ public class BinaryExternalSorterTest {
 
        @After
        public void afterTest() {
-               this.ioManager.shutdown();
+               this.ioManager.close();
                if (!this.ioManager.isProperlyShutDown()) {
                        Assert.fail("I/O Manager was not properly shut down.");
                }
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java
index 1928696..e095df1 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java
@@ -103,7 +103,7 @@ public class BufferedKVExternalSorterTest {
 
        @After
        public void afterTest() {
-               this.ioManager.shutdown();
+               this.ioManager.close();
                if (!this.ioManager.isProperlyShutDown()) {
                        Assert.fail("I/O Manager was not properly shut down.");
                }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
 
b/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
index f02cf1c..f1664ae 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
@@ -85,9 +85,7 @@ public class HashTableRecordWidthCombinations {
                        }
                };
 
-               final IOManager ioMan = new IOManagerAsync();
-
-               try {
+               try (final IOManager ioMan = new IOManagerAsync()) {
                        final int pageSize = 32 * 1024;
                        final int numSegments = 34;
 
@@ -175,9 +173,6 @@ public class HashTableRecordWidthCombinations {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       ioMan.shutdown();
-               }
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
 
b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
index eb99909..1d48eee 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
@@ -83,11 +83,9 @@ public class MassiveStringSorting {
                        BufferedReader reader = null;
                        BufferedReader verifyReader = null;
                        MemoryManager mm = null;
-                       IOManager ioMan = null;
 
-                       try {
+                       try (IOManager ioMan = new IOManagerAsync()) {
                                mm = new MemoryManager(1024 * 1024, 1);
-                               ioMan = new IOManagerAsync();
 
                                TypeSerializer<String> serializer = 
StringSerializer.INSTANCE;
                                TypeComparator<String> comparator = new 
StringComparator(true);
@@ -127,9 +125,6 @@ public class MassiveStringSorting {
                                if (mm != null) {
                                        mm.shutdown();
                                }
-                               if (ioMan != null) {
-                                       ioMan.shutdown();
-                               }
                        }
                }
                catch (Exception e) {
@@ -182,11 +177,9 @@ public class MassiveStringSorting {
                        BufferedReader reader = null;
                        BufferedReader verifyReader = null;
                        MemoryManager mm = null;
-                       IOManager ioMan = null;
 
-                       try {
+                       try (IOManager ioMan = new IOManagerAsync()) {
                                mm = new MemoryManager(1024 * 1024, 1);
-                               ioMan = new IOManagerAsync();
 
                                TupleTypeInfo<Tuple2<String, String[]>> 
typeInfo = (TupleTypeInfo<Tuple2<String, String[]>>)
                                                new TypeHint<Tuple2<String, 
String[]>>(){}.getTypeInfo();
@@ -256,9 +249,6 @@ public class MassiveStringSorting {
                                if (mm != null) {
                                        mm.shutdown();
                                }
-                               if (ioMan != null) {
-                                       ioMan.shutdown();
-                               }
                        }
                }
                catch (Exception e) {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
 
b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
index 5dcf209..861f1df 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
@@ -84,11 +84,9 @@ public class MassiveStringValueSorting {
                        BufferedReader reader = null;
                        BufferedReader verifyReader = null;
                        MemoryManager mm = null;
-                       IOManager ioMan = null;
 
-                       try {
+                       try (IOManager ioMan = new IOManagerAsync()) {
                                mm = new MemoryManager(1024 * 1024, 1);
-                               ioMan = new IOManagerAsync();
 
                                TypeSerializer<StringValue> serializer = new 
CopyableValueSerializer<StringValue>(StringValue.class);
                                TypeComparator<StringValue> comparator = new 
CopyableValueComparator<StringValue>(true, StringValue.class);
@@ -129,9 +127,6 @@ public class MassiveStringValueSorting {
                                if (mm != null) {
                                        mm.shutdown();
                                }
-                               if (ioMan != null) {
-                                       ioMan.shutdown();
-                               }
                        }
                }
                catch (Exception e) {
@@ -186,11 +181,9 @@ public class MassiveStringValueSorting {
                        BufferedReader reader = null;
                        BufferedReader verifyReader = null;
                        MemoryManager mm = null;
-                       IOManager ioMan = null;
 
-                       try {
+                       try (IOManager ioMan = new IOManagerAsync()) {
                                mm = new MemoryManager(1024 * 1024, 1);
-                               ioMan = new IOManagerAsync();
 
                                TupleTypeInfo<Tuple2<StringValue, 
StringValue[]>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, StringValue[]>>)
                                                new 
TypeHint<Tuple2<StringValue, StringValue[]>>(){}.getTypeInfo();
@@ -260,9 +253,6 @@ public class MassiveStringValueSorting {
                                if (mm != null) {
                                        mm.shutdown();
                                }
-                               if (ioMan != null) {
-                                       ioMan.shutdown();
-                               }
                        }
                }
                catch (Exception e) {

Reply via email to