This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 423e8a8afa4fd440ba2abb6c2b535f881ef84374
Author: Zhijiang <[email protected]>
AuthorDate: Fri Jun 28 12:18:47 2019 +0800

    [hotfix][runtime] Refactor IOManager#close and remove #isProperlyShutDown
    
    IOManager#close would ignore any exceptions internally in order not to 
interrupt other close operations,
    then IOManager#isProperlyShutDown is used for checking any exceptions 
during close process. We could use
    IOUtils#closeAll for handling all the close operations and finally throwing 
the suppressed exceptions to
    get the same effect, then isProperlyShutDown method could be removed 
completely.
---
 .../flink/runtime/io/disk/iomanager/IOManager.java |  47 ++++-----
 .../runtime/io/disk/iomanager/IOManagerAsync.java  | 105 ++++++++++-----------
 .../flink/runtime/io/disk/ChannelViewsTest.java    |   5 +-
 .../runtime/io/disk/FileChannelStreamsITCase.java  |   3 +-
 .../flink/runtime/io/disk/SpillingBufferTest.java  |   5 +-
 .../AsynchronousBufferFileWriterTest.java          |   2 +-
 .../iomanager/AsynchronousFileIOChannelTest.java   |   2 +-
 .../BufferFileWriterFileSegmentReaderTest.java     |   2 +-
 .../disk/iomanager/BufferFileWriterReaderTest.java |   2 +-
 .../io/disk/iomanager/IOManagerAsyncTest.java      |   3 +-
 .../runtime/io/disk/iomanager/IOManagerITCase.java |   1 -
 .../runtime/io/disk/iomanager/IOManagerTest.java   |   2 +-
 .../runtime/operators/hash/HashTableITCase.java    |   5 +-
 .../hash/NonReusingHashJoinIteratorITCase.java     |   5 +-
 .../operators/hash/ReOpenableHashTableITCase.java  |   5 +-
 .../hash/ReOpenableHashTableTestBase.java          |   5 +-
 .../hash/ReusingHashJoinIteratorITCase.java        |   5 +-
 .../resettable/SpillingResettableIteratorTest.java |   5 +-
 ...pillingResettableMutableObjectIteratorTest.java |   5 +-
 .../AbstractSortMergeOuterJoinIteratorITCase.java  |   5 +-
 .../sort/CombiningUnilateralSortMergerITCase.java  |   5 +-
 .../runtime/operators/sort/ExternalSortITCase.java |   5 +-
 .../sort/ExternalSortLargeRecordsITCase.java       |   5 +-
 .../sort/FixedLengthRecordSorterTest.java          |   2 +-
 ...NonReusingSortMergeInnerJoinIteratorITCase.java |   5 +-
 .../ReusingSortMergeInnerJoinIteratorITCase.java   |   5 +-
 .../testutils/BinaryOperatorTestBase.java          |   3 +-
 .../operators/testutils/DriverTestBase.java        |   1 -
 .../operators/testutils/MockEnvironment.java       |   4 +-
 .../runtime/operators/testutils/TaskTestBase.java  |   2 +-
 .../operators/testutils/UnaryOperatorTestBase.java |   1 -
 .../operators/util/HashVsSortMiniBenchmark.java    |   5 +-
 .../runtime/taskexecutor/TaskExecutorTest.java     |   1 -
 .../streaming/runtime/io/BufferSpillerTest.java    |   2 +-
 ...CheckpointBarrierAlignerAlignmentLimitTest.java |   2 +-
 .../io/SpillingCheckpointBarrierAlignerTest.java   |   2 +-
 .../streaming/runtime/tasks/OperatorChainTest.java |   2 +-
 .../runtime/tasks/StreamTaskTestHarness.java       |   1 -
 .../util/AbstractStreamOperatorTestHarness.java    |   4 +-
 .../flink/table/runtime/aggregate/HashAggTest.java |   5 +-
 .../runtime/hashtable/BinaryHashTableTest.java     |   5 +-
 .../io/CompressedHeaderlessChannelTest.java        |   6 +-
 .../join/Int2SortMergeJoinOperatorTest.java        |   5 +-
 .../runtime/sort/BinaryExternalSorterTest.java     |   5 +-
 .../runtime/sort/BufferedKVExternalSorterTest.java |   5 +-
 45 files changed, 111 insertions(+), 196 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index ee54b1e..a649e42 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -24,15 +24,19 @@ import 
org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.util.FileUtils;
 
+import org.apache.flink.util.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
 
 /**
  * The facade for the provided I/O manager services.
@@ -82,39 +86,26 @@ public abstract class IOManager implements AutoCloseable {
        }
 
        /**
-        * Close method, marks the I/O manager as closed
-        * and removed all temporary files.
+        * Removes all temporary files.
         */
        @Override
-       public void close() {
-               // remove all of our temp directories
-               for (File path : paths) {
-                       try {
-                               if (path != null) {
-                                       if (path.exists()) {
-                                               FileUtils.deleteDirectory(path);
-                                               LOG.info("I/O manager removed 
spill file directory {}", path.getAbsolutePath());
-                                       }
-                               }
-                       } catch (Throwable t) {
-                               LOG.error("IOManager failed to properly clean 
up temp file directory: " + path, t);
-                       }
-               }
+       public void close() throws Exception {
+               IOUtils.closeAll(Arrays.stream(paths)
+                       .filter(File::exists)
+                       .map(IOManager::getFileCloser)
+                       .collect(Collectors.toList()));
        }
 
-       /**
-        * Utility method to check whether the IO manager has been properly 
shut down.
-        * For this base implementation, this means that all files have been 
removed.
-        *
-        * @return True, if the IO manager has properly shut down, false 
otherwise.
-        */
-       public boolean isProperlyShutDown() {
-               for (File path : paths) {
-                       if (path != null && path.exists()) {
-                               return false;
+       private static AutoCloseable getFileCloser(File path) {
+               return () -> {
+                       try {
+                               FileUtils.deleteDirectory(path);
+                               LOG.info("I/O manager removed spill file 
directory {}", path.getAbsolutePath());
+                       } catch (IOException e) {
+                               String errorMessage = String.format("IOManager 
failed to properly clean up temp file directory: %s", path);
+                               throw new IOException(errorMessage, e);
                        }
-               }
-               return true;
+               };
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
index ffa4dcf..2133430 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
@@ -21,10 +21,12 @@ package org.apache.flink.runtime.io.disk.iomanager;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.ShutdownHookUtil;
 
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -109,7 +111,7 @@ public class IOManagerAsync extends IOManager implements 
UncaughtExceptionHandle
         * operation.
         */
        @Override
-       public void close() {
+       public void close() throws Exception {
                // mark shut down and exit if it already was shut down
                if (!isShutdown.compareAndSet(false, true)) {
                        return;
@@ -118,30 +120,25 @@ public class IOManagerAsync extends IOManager implements 
UncaughtExceptionHandle
                // Remove shutdown hook to prevent resource leaks
                ShutdownHookUtil.removeShutdownHook(shutdownHook, 
getClass().getSimpleName(), LOG);
 
-               try {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Shutting down I/O manager.");
-                       }
 
-                       // close writing and reading threads with best effort 
and log problems
-                       // first notify all to close, then wait until all are 
closed
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Shutting down I/O manager.");
+               }
 
-                       for (WriterThread wt : writers) {
-                               try {
-                                       wt.shutdown();
-                               }
-                               catch (Throwable t) {
-                                       LOG.error("Error while shutting down IO 
Manager writer thread.", t);
-                               }
-                       }
-                       for (ReaderThread rt : readers) {
-                               try {
-                                       rt.shutdown();
-                               }
-                               catch (Throwable t) {
-                                       LOG.error("Error while shutting down IO 
Manager reader thread.", t);
-                               }
-                       }
+               // close writing and reading threads with best effort and log 
problems
+               // first notify all to close, then wait until all are closed
+
+               List<AutoCloseable> closeables = new ArrayList<>(writers.length 
+ readers.length + 2);
+
+               for (WriterThread wt : writers) {
+                       closeables.add(getWriterThreadCloser(wt));
+               }
+
+               for (ReaderThread rt : readers) {
+                       closeables.add(getReaderThreadCloser(rt));
+               }
+
+               closeables.add(() -> {
                        try {
                                for (WriterThread wt : writers) {
                                        wt.join();
@@ -149,44 +146,46 @@ public class IOManagerAsync extends IOManager implements 
UncaughtExceptionHandle
                                for (ReaderThread rt : readers) {
                                        rt.join();
                                }
+                       } catch (InterruptedException ie) {
+                               Thread.currentThread().interrupt();
                        }
-                       catch (InterruptedException iex) {
-                               // ignore this on shutdown
-                       }
-               }
-               finally {
-                       // make sure we call the super implementation in any 
case and at the last point,
-                       // because this will clean up the I/O directories
-                       super.close();
-               }
+               });
+
+               // make sure we call the super implementation in any case and 
at the last point,
+               // because this will clean up the I/O directories
+               closeables.add(super::close);
+
+               IOUtils.closeAll(closeables);
        }
-       
-       /**
-        * Utility method to check whether the IO manager has been properly 
shut down. The IO manager is considered
-        * to be properly shut down when it is closed and its threads have 
ceased operation.
-        * 
-        * @return True, if the IO manager has properly shut down, false 
otherwise.
-        */
-       @Override
-       public boolean isProperlyShutDown() {
-               boolean readersShutDown = true;
-               for (ReaderThread rt : readers) {
-                       readersShutDown &= rt.getState() == 
Thread.State.TERMINATED;
-               }
-               
-               boolean writersShutDown = true;
-               for (WriterThread wt : writers) {
-                       writersShutDown &= wt.getState() == 
Thread.State.TERMINATED;
-               }
-               
-               return isShutdown.get() && readersShutDown && writersShutDown 
&& super.isProperlyShutDown();
+
+       private static AutoCloseable getWriterThreadCloser(WriterThread thread) 
{
+               return () -> {
+                       try {
+                               thread.shutdown();
+                       } catch (Throwable t) {
+                               throw new IOException("Error while shutting 
down IO Manager writer thread.", t);
+                       }
+               };
        }
 
+       private static AutoCloseable getReaderThreadCloser(ReaderThread thread) 
{
+               return () -> {
+                       try {
+                               thread.shutdown();
+                       } catch (Throwable t) {
+                               throw new IOException("Error while shutting 
down IO Manager reader thread.", t);
+                       }
+               };
+       }
 
        @Override
        public void uncaughtException(Thread t, Throwable e) {
                LOG.error("IO Thread '" + t.getName() + "' terminated due to an 
exception. Shutting down I/O Manager.", e);
-               close();
+               try {
+                       close();
+               } catch (Exception ex) {
+                       LOG.warn("IOManagerAsync did not shut down properly.", 
ex);
+               }
        }
        
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
index a617109..4e94de2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
@@ -82,11 +82,8 @@ public class ChannelViewsTest
        }
 
        @After
-       public void afterTest() {
+       public void afterTest() throws Exception {
                this.ioManager.close();
-               if (!this.ioManager.isProperlyShutDown()) {
-                       Assert.fail("I/O Manager was not properly shut down.");
-               }
                
                if (memoryManager != null) {
                        Assert.assertTrue("Memory leak: not all segments have 
been returned to the memory manager.", 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
index 44fe849..c83e942 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
@@ -73,9 +73,8 @@ public class FileChannelStreamsITCase extends TestLogger {
        }
 
        @After
-       public void afterTest() {
+       public void afterTest() throws Exception {
                ioManager.close();
-               assertTrue("I/O Manager was not properly shut down.", 
ioManager.isProperlyShutDown());
                assertTrue("The memory has not been properly released", 
memManager.verifyEmpty());
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
index f94735c..d7d2f28 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
@@ -71,11 +71,8 @@ public class SpillingBufferTest {
        }
 
        @After
-       public void afterTest() {
+       public void afterTest() throws Exception {
                ioManager.close();
-               if (!ioManager.isProperlyShutDown()) {
-                       Assert.fail("I/O Manager was not properly shut down.");
-               }
                
                if (memoryManager != null) {
                        Assert.assertTrue("Memory leak: not all segments have 
been returned to the memory manager.", 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
index 5ef3983..a8e8d81 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
@@ -58,7 +58,7 @@ public class AsynchronousBufferFileWriterTest {
        private AsynchronousBufferFileWriter writer;
 
        @AfterClass
-       public static void shutdown() {
+       public static void shutdown() throws Exception {
                ioManager.close();
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
index ead6490..5bf4db4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
@@ -316,7 +316,7 @@ public class AsynchronousFileIOChannelTest {
        }
 
        @Test
-       public void testExceptionForwardsToClose() {
+       public void testExceptionForwardsToClose() throws Exception {
                try (IOManagerAsync ioMan = new IOManagerAsync()) {
                        testExceptionForwardsToClose(ioMan, 100, 1);
                        testExceptionForwardsToClose(ioMan, 100, 50);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
index d2c388c..88f241e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
@@ -60,7 +60,7 @@ public class BufferFileWriterFileSegmentReaderTest {
        private LinkedBlockingQueue<FileSegment> returnedFileSegments = new 
LinkedBlockingQueue<>();
 
        @AfterClass
-       public static void shutdown() {
+       public static void shutdown() throws Exception {
                ioManager.close();
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
index 78026bf..0eab18d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
@@ -57,7 +57,7 @@ public class BufferFileWriterReaderTest {
        private LinkedBlockingQueue<Buffer> returnedBuffers = new 
LinkedBlockingQueue<>();
 
        @AfterClass
-       public static void shutdown() {
+       public static void shutdown() throws Exception {
                ioManager.close();
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
index a4d3795..b0ce877 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
@@ -46,9 +46,8 @@ public class IOManagerAsyncTest {
        }
 
        @After
-       public void afterTest() {
+       public void afterTest() throws Exception {
                this.ioManager.close();
-               assertTrue("IO Manager has not properly shut down.", 
ioManager.isProperlyShutDown());
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
index 4a3f13f..83e15be 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
@@ -66,7 +66,6 @@ public class IOManagerITCase extends TestLogger {
        @After
        public void afterTest() throws Exception {
                ioManager.close();
-               Assert.assertTrue("IO Manager has not properly shut down.", 
ioManager.isProperlyShutDown());
                
                Assert.assertTrue("Not all memory was returned to the memory 
manager in the test.", memoryManager.verifyEmpty());
                memoryManager.shutdown();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
index ce16bba..4c0e6a2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
@@ -40,7 +40,7 @@ public class IOManagerTest {
        public final TemporaryFolder  temporaryFolder = new TemporaryFolder();
 
        @Test
-       public void channelEnumerator() throws IOException {
+       public void channelEnumerator() throws Exception {
                File tempPath = temporaryFolder.newFolder();
 
                String[] tempDirs = new String[]{
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
index cde6d89..1b86cec 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
@@ -100,13 +100,10 @@ public class HashTableITCase extends TestLogger {
        }
        
        @After
-       public void tearDown()
+       public void tearDown() throws Exception
        {
                // shut down I/O manager and Memory Manager and verify the 
correct shutdown
                this.ioManager.close();
-               if (!this.ioManager.isProperlyShutDown()) {
-                       fail("I/O manager was not property shut down.");
-               }
                if (!this.memManager.verifyEmpty()) {
                        fail("Not all memory was properly released to the 
memory manager --> Memory Leak.");
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java
index 4512ee8..553fd24 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java
@@ -103,12 +103,9 @@ public class NonReusingHashJoinIteratorITCase extends 
TestLogger {
        }
 
        @After
-       public void afterTest() {
+       public void afterTest() throws Exception {
                if (this.ioManager != null) {
                        this.ioManager.close();
-                       if (!this.ioManager.isProperlyShutDown()) {
-                               Assert.fail("I/O manager failed to properly 
shut down.");
-                       }
                        this.ioManager = null;
                }
                
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
index 754da40..95f36e4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
@@ -81,12 +81,9 @@ public class ReOpenableHashTableITCase extends TestLogger {
        }
 
        @After
-       public void afterTest() {
+       public void afterTest() throws Exception {
                if (this.ioManager != null) {
                        this.ioManager.close();
-                       if (!this.ioManager.isProperlyShutDown()) {
-                               Assert.fail("I/O manager failed to properly 
shut down.");
-                       }
                        this.ioManager = null;
                }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java
index e6ac58a..9acd87a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java
@@ -92,12 +92,9 @@ public abstract class ReOpenableHashTableTestBase extends 
TestLogger {
        }
 
        @After
-       public void afterTest() {
+       public void afterTest() throws Exception {
                if (this.ioManager != null) {
                        this.ioManager.close();
-                       if (!this.ioManager.isProperlyShutDown()) {
-                               Assert.fail("I/O manager failed to properly 
shut down.");
-                       }
                        this.ioManager = null;
                }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java
index c199e38..9a69d89 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java
@@ -111,12 +111,9 @@ public class ReusingHashJoinIteratorITCase extends 
TestLogger {
        }
 
        @After
-       public void afterTest() {
+       public void afterTest() throws Exception {
                if (this.ioManager != null) {
                        this.ioManager.close();
-                       if (!this.ioManager.isProperlyShutDown()) {
-                               Assert.fail("I/O manager failed to properly 
shut down.");
-                       }
                        this.ioManager = null;
                }
                
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
index 3f5a25d..15c32fc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
@@ -71,11 +71,8 @@ public class SpillingResettableIteratorTest {
        }
 
        @After
-       public void shutdown() {
+       public void shutdown() throws Exception {
                this.ioman.close();
-               if (!this.ioman.isProperlyShutDown()) {
-                       Assert.fail("I/O Manager Shutdown was not completed 
properly.");
-               }
                this.ioman = null;
 
                if (!this.memman.verifyEmpty()) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
index 2a69a63..c442e7d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
@@ -68,11 +68,8 @@ public class SpillingResettableMutableObjectIteratorTest {
        }
 
        @After
-       public void shutdown() {
+       public void shutdown() throws Exception {
                this.ioman.close();
-               if (!this.ioman.isProperlyShutDown()) {
-                       Assert.fail("I/O Manager Shutdown was not completed 
properly.");
-               }
                this.ioman = null;
 
                if (!this.memman.verifyEmpty()) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
index 7cdf07f..364dc32 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
@@ -109,12 +109,9 @@ public abstract class 
AbstractSortMergeOuterJoinIteratorITCase extends TestLogge
        }
 
        @After
-       public void afterTest() {
+       public void afterTest() throws Exception {
                if (this.ioManager != null) {
                        this.ioManager.close();
-                       if (!this.ioManager.isProperlyShutDown()) {
-                               Assert.fail("I/O manager failed to properly 
shut down.");
-                       }
                        this.ioManager = null;
                }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
index 96831f1..e50798d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
@@ -92,11 +92,8 @@ public class CombiningUnilateralSortMergerITCase extends 
TestLogger {
        }
 
        @After
-       public void afterTest() {
+       public void afterTest() throws Exception {
                this.ioManager.close();
-               if (!this.ioManager.isProperlyShutDown()) {
-                       Assert.fail("I/O Manager was not properly shut down.");
-               }
                
                if (this.memoryManager != null) {
                        Assert.assertTrue("Memory leak: not all segments have 
been returned to the memory manager.", 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
index d1aa0a7..691a065 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
@@ -85,11 +85,8 @@ public class ExternalSortITCase extends TestLogger {
        }
 
        @After
-       public void afterTest() {
+       public void afterTest() throws Exception {
                this.ioManager.close();
-               if (!this.ioManager.isProperlyShutDown()) {
-                       Assert.fail("I/O Manager was not properly shut down.");
-               }
                
                if (this.memoryManager != null && testSuccess) {
                        Assert.assertTrue("Memory leak: not all segments have 
been returned to the memory manager.",
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
index 335dbee..48d6ce7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
@@ -71,11 +71,8 @@ public class ExternalSortLargeRecordsITCase extends 
TestLogger {
        }
 
        @After
-       public void afterTest() {
+       public void afterTest() throws Exception {
                this.ioManager.close();
-               if (!this.ioManager.isProperlyShutDown()) {
-                       Assert.fail("I/O Manager was not properly shut down.");
-               }
                
                if (this.memoryManager != null && testSuccess) {
                        Assert.assertTrue("Memory leak: not all segments have 
been returned to the memory manager.", 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
index 2f05c78..fc33ffa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
@@ -73,7 +73,7 @@ public class FixedLengthRecordSorterTest {
        }
 
        @After
-       public void afterTest() {
+       public void afterTest() throws Exception {
                if (!this.memoryManager.verifyEmpty()) {
                        Assert.fail("Memory Leak: Some memory has not been 
returned to the memory manager.");
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
index c729a3a..cc2736d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
@@ -110,12 +110,9 @@ public class NonReusingSortMergeInnerJoinIteratorITCase 
extends TestLogger {
        }
 
        @After
-       public void afterTest() {
+       public void afterTest() throws Exception {
                if (this.ioManager != null) {
                        this.ioManager.close();
-                       if (!this.ioManager.isProperlyShutDown()) {
-                               Assert.fail("I/O manager failed to properly 
shut down.");
-                       }
                        this.ioManager = null;
                }
                
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
index d149084..e6e3828 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
@@ -110,12 +110,9 @@ public class ReusingSortMergeInnerJoinIteratorITCase 
extends TestLogger {
        }
 
        @After
-       public void afterTest() {
+       public void afterTest() throws Exception {
                if (this.ioManager != null) {
                        this.ioManager.close();
-                       if (!this.ioManager.isProperlyShutDown()) {
-                               Assert.fail("I/O manager failed to properly 
shut down.");
-                       }
                        this.ioManager = null;
                }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
index d2a4f6a..1ffac3f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
@@ -389,8 +389,7 @@ public abstract class BinaryOperatorTestBase<S extends 
Function, IN, OUT> extend
                
                // 2nd, shutdown I/O
                this.ioManager.close();
-               Assert.assertTrue("I/O Manager has not properly shut down.", 
this.ioManager.isProperlyShutDown());
-               
+
                // last, verify all memory is returned and shutdown mem manager
                MemoryManager memMan = getMemoryManager();
                if (memMan != null) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index e1d2cb9..ec106ec 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -386,7 +386,6 @@ public abstract class DriverTestBase<S extends Function> 
extends TestLogger impl
                
                // 2nd, shutdown I/O
                this.ioManager.close();
-               Assert.assertTrue("I/O Manager has not properly shut down.", 
this.ioManager.isProperlyShutDown());
 
                // last, verify all memory is returned and shutdown mem manager
                MemoryManager memMan = getMemoryManager();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 2c96ca4..d3d462f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -336,7 +336,7 @@ public class MockEnvironment implements Environment, 
AutoCloseable {
        }
 
        @Override
-       public void close() {
+       public void close() throws Exception {
                // close() method should be idempotent and calling 
memManager.verifyEmpty() will throw after it was shutdown.
                if (!memManager.isShutdown()) {
                        checkState(memManager.verifyEmpty(), "Memory Manager 
managed memory was not completely freed.");
@@ -344,8 +344,6 @@ public class MockEnvironment implements Environment, 
AutoCloseable {
 
                memManager.shutdown();
                ioManager.close();
-
-               checkState(ioManager.isProperlyShutDown(), "IO Manager has not 
properly shut down.");
        }
 
        public void setExpectedExternalFailureCause(Class<Throwable> 
expectedThrowableClass) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index 1a46539..16839aa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -156,7 +156,7 @@ public abstract class TaskTestBase extends TestLogger {
        }
 
        @After
-       public void shutdown() {
+       public void shutdown() throws Exception {
                mockEnv.close();
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
index 1ff50b0..73ea30c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -379,7 +379,6 @@ public abstract class UnaryOperatorTestBase<S extends 
Function, IN, OUT> extends
                
                // 2nd, shutdown I/O
                this.ioManager.close();
-               Assert.assertTrue("I/O Manager has not properly shut down.", 
this.ioManager.isProperlyShutDown());
 
                // last, verify all memory is returned and shutdown mem manager
                MemoryManager memMan = getMemoryManager();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index 7b4eaba..a9581f5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -97,7 +97,7 @@ public class HashVsSortMiniBenchmark {
        }
 
        @After
-       public void afterTest() {
+       public void afterTest() throws Exception {
                if (this.memoryManager != null) {
                        Assert.assertTrue("Memory Leak: Not all memory has been 
returned to the memory manager.",
                                this.memoryManager.verifyEmpty());
@@ -107,9 +107,6 @@ public class HashVsSortMiniBenchmark {
                
                if (this.ioManager != null) {
                        this.ioManager.close();
-                       if (!this.ioManager.isProperlyShutDown()) {
-                               Assert.fail("I/O manager failed to properly 
shut down.");
-                       }
                        this.ioManager = null;
                }
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index e67e979..c7b8e12 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -306,7 +306,6 @@ public class TaskExecutorTest extends TestLogger {
 
                assertThat(memoryManager.isShutdown(), is(true));
                assertThat(nettyShuffleEnvironment.isClosed(), is(true));
-               assertThat(ioManager.isProperlyShutDown(), is(true));
                assertThat(kvStateService.isShutdown(), is(true));
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
index d19b99c..ead6bd8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
@@ -54,7 +54,7 @@ public class BufferSpillerTest extends BufferStorageTestBase {
        }
 
        @AfterClass
-       public static void shutdownIOManager() {
+       public static void shutdownIOManager() throws Exception {
                ioManager.close();
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
index 535dfc0..341e291 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
@@ -77,7 +77,7 @@ public class CheckpointBarrierAlignerAlignmentLimitTest {
        }
 
        @AfterClass
-       public static void shutdownIOManager() {
+       public static void shutdownIOManager() throws Exception {
                ioManager.close();
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java
index 0d28325..c95e1f2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java
@@ -48,7 +48,7 @@ public class SpillingCheckpointBarrierAlignerTest extends 
CheckpointBarrierAlign
        }
 
        @AfterClass
-       public static void shutdownIOManager() {
+       public static void shutdownIOManager() throws Exception {
                ioManager.close();
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
index 8b1b183..e7bf45a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
@@ -72,7 +72,7 @@ public class OperatorChainTest {
 
        @SafeVarargs
        private static <T, OP extends StreamOperator<T>> OperatorChain<T, OP> 
setupOperatorChain(
-                       OneInputStreamOperator<T, T>... operators) {
+                       OneInputStreamOperator<T, T>... operators) throws 
Exception {
 
                checkNotNull(operators);
                checkArgument(operators.length > 0);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index c059757..2049da1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -326,7 +326,6 @@ public class StreamTaskTestHarness<OUT> {
 
        private void shutdownIOManager() throws Exception {
                this.mockEnv.getIOManager().close();
-               Assert.assertTrue("IO Manager has not properly shut down.", 
this.mockEnv.getIOManager().isProperlyShutDown());
        }
 
        private void shutdownMemoryManager() throws Exception {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 3ab5390..ce561d4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -523,7 +523,9 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
                }
                setupCalled = false;
 
-               internalEnvironment.ifPresent(MockEnvironment::close);
+               if (internalEnvironment.isPresent()) {
+                       internalEnvironment.get().close();
+               }
        }
 
        public void setProcessingTime(long time) throws Exception {
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/aggregate/HashAggTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/aggregate/HashAggTest.java
index f35a40c..5a74016 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/aggregate/HashAggTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/aggregate/HashAggTest.java
@@ -94,11 +94,8 @@ public class HashAggTest {
        }
 
        @After
-       public void afterTest() {
+       public void afterTest() throws Exception {
                this.ioManager.close();
-               if (!this.ioManager.isProperlyShutDown()) {
-                       Assert.fail("I/O Manager was not properly shut down.");
-               }
 
                if (this.memoryManager != null) {
                        Assert.assertTrue("Memory leak: not all segments have 
been returned to the memory manager.",
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
index de4e949..cf509ed 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java
@@ -93,12 +93,9 @@ public class BinaryHashTableTest {
        }
 
        @After
-       public void tearDown() {
+       public void tearDown() throws Exception {
                // shut down I/O manager and Memory Manager and verify the 
correct shutdown
                this.ioManager.close();
-               if (!this.ioManager.isProperlyShutDown()) {
-                       fail("I/O manager was not property shut down.");
-               }
        }
 
        @Test
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java
index 14039c8..6f886d6 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.table.runtime.compression.BlockCompressionFactory;
 import org.apache.flink.table.runtime.compression.Lz4BlockCompressionFactory;
 
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -50,11 +49,8 @@ public class CompressedHeaderlessChannelTest {
        }
 
        @After
-       public void afterTest() {
+       public void afterTest() throws Exception {
                this.ioManager.close();
-               if (!this.ioManager.isProperlyShutDown()) {
-                       Assert.fail("I/O Manager was not properly shut down.");
-               }
        }
 
        @Test
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/join/Int2SortMergeJoinOperatorTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/join/Int2SortMergeJoinOperatorTest.java
index 7b47903..c8c9bdb 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/join/Int2SortMergeJoinOperatorTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/join/Int2SortMergeJoinOperatorTest.java
@@ -76,12 +76,9 @@ public class Int2SortMergeJoinOperatorTest {
        }
 
        @After
-       public void tearDown() {
+       public void tearDown() throws Exception {
                // shut down I/O manager and Memory Manager and verify the 
correct shutdown
                this.ioManager.close();
-               if (!this.ioManager.isProperlyShutDown()) {
-                       fail("I/O manager was not property shut down.");
-               }
                if (!this.memManager.verifyEmpty()) {
                        fail("Not all memory was properly released to the 
memory manager --> Memory Leak.");
                }
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java
index 8eb62d0..7bbf45c 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java
@@ -100,11 +100,8 @@ public class BinaryExternalSorterTest {
        }
 
        @After
-       public void afterTest() {
+       public void afterTest() throws Exception {
                this.ioManager.close();
-               if (!this.ioManager.isProperlyShutDown()) {
-                       Assert.fail("I/O Manager was not properly shut down.");
-               }
 
                if (this.memoryManager != null) {
                        Assert.assertTrue("Memory leak: not all segments have 
been returned to the memory manager.",
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java
index e095df1..069d40e 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java
@@ -102,11 +102,8 @@ public class BufferedKVExternalSorterTest {
        }
 
        @After
-       public void afterTest() {
+       public void afterTest() throws Exception {
                this.ioManager.close();
-               if (!this.ioManager.isProperlyShutDown()) {
-                       Assert.fail("I/O Manager was not properly shut down.");
-               }
        }
 
        @Test

Reply via email to