This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 423e8a8afa4fd440ba2abb6c2b535f881ef84374 Author: Zhijiang <[email protected]> AuthorDate: Fri Jun 28 12:18:47 2019 +0800 [hotfix][runtime] Refactor IOManager#close and remove #isProperlyShutDown IOManager#close would ignore any exceptions internally in order not to interrupt other close operations, then IOManager#isProperlyShutDown is used for checking any exceptions during close process. We could use IOUtils#closeAll for handling all the close operations and finally throwing the suppressed exceptions to get the same effect, then isProperlyShutDown method could be removed completely. --- .../flink/runtime/io/disk/iomanager/IOManager.java | 47 ++++----- .../runtime/io/disk/iomanager/IOManagerAsync.java | 105 ++++++++++----------- .../flink/runtime/io/disk/ChannelViewsTest.java | 5 +- .../runtime/io/disk/FileChannelStreamsITCase.java | 3 +- .../flink/runtime/io/disk/SpillingBufferTest.java | 5 +- .../AsynchronousBufferFileWriterTest.java | 2 +- .../iomanager/AsynchronousFileIOChannelTest.java | 2 +- .../BufferFileWriterFileSegmentReaderTest.java | 2 +- .../disk/iomanager/BufferFileWriterReaderTest.java | 2 +- .../io/disk/iomanager/IOManagerAsyncTest.java | 3 +- .../runtime/io/disk/iomanager/IOManagerITCase.java | 1 - .../runtime/io/disk/iomanager/IOManagerTest.java | 2 +- .../runtime/operators/hash/HashTableITCase.java | 5 +- .../hash/NonReusingHashJoinIteratorITCase.java | 5 +- .../operators/hash/ReOpenableHashTableITCase.java | 5 +- .../hash/ReOpenableHashTableTestBase.java | 5 +- .../hash/ReusingHashJoinIteratorITCase.java | 5 +- .../resettable/SpillingResettableIteratorTest.java | 5 +- ...pillingResettableMutableObjectIteratorTest.java | 5 +- .../AbstractSortMergeOuterJoinIteratorITCase.java | 5 +- .../sort/CombiningUnilateralSortMergerITCase.java | 5 +- .../runtime/operators/sort/ExternalSortITCase.java | 5 +- .../sort/ExternalSortLargeRecordsITCase.java | 5 +- .../sort/FixedLengthRecordSorterTest.java | 2 +- ...NonReusingSortMergeInnerJoinIteratorITCase.java | 5 +- .../ReusingSortMergeInnerJoinIteratorITCase.java | 5 +- .../testutils/BinaryOperatorTestBase.java | 3 +- .../operators/testutils/DriverTestBase.java | 1 - .../operators/testutils/MockEnvironment.java | 4 +- .../runtime/operators/testutils/TaskTestBase.java | 2 +- .../operators/testutils/UnaryOperatorTestBase.java | 1 - .../operators/util/HashVsSortMiniBenchmark.java | 5 +- .../runtime/taskexecutor/TaskExecutorTest.java | 1 - .../streaming/runtime/io/BufferSpillerTest.java | 2 +- ...CheckpointBarrierAlignerAlignmentLimitTest.java | 2 +- .../io/SpillingCheckpointBarrierAlignerTest.java | 2 +- .../streaming/runtime/tasks/OperatorChainTest.java | 2 +- .../runtime/tasks/StreamTaskTestHarness.java | 1 - .../util/AbstractStreamOperatorTestHarness.java | 4 +- .../flink/table/runtime/aggregate/HashAggTest.java | 5 +- .../runtime/hashtable/BinaryHashTableTest.java | 5 +- .../io/CompressedHeaderlessChannelTest.java | 6 +- .../join/Int2SortMergeJoinOperatorTest.java | 5 +- .../runtime/sort/BinaryExternalSorterTest.java | 5 +- .../runtime/sort/BufferedKVExternalSorterTest.java | 5 +- 45 files changed, 111 insertions(+), 196 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java index ee54b1e..a649e42 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java @@ -24,15 +24,19 @@ import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.util.FileUtils; +import org.apache.flink.util.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.Random; import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; /** * The facade for the provided I/O manager services. @@ -82,39 +86,26 @@ public abstract class IOManager implements AutoCloseable { } /** - * Close method, marks the I/O manager as closed - * and removed all temporary files. + * Removes all temporary files. */ @Override - public void close() { - // remove all of our temp directories - for (File path : paths) { - try { - if (path != null) { - if (path.exists()) { - FileUtils.deleteDirectory(path); - LOG.info("I/O manager removed spill file directory {}", path.getAbsolutePath()); - } - } - } catch (Throwable t) { - LOG.error("IOManager failed to properly clean up temp file directory: " + path, t); - } - } + public void close() throws Exception { + IOUtils.closeAll(Arrays.stream(paths) + .filter(File::exists) + .map(IOManager::getFileCloser) + .collect(Collectors.toList())); } - /** - * Utility method to check whether the IO manager has been properly shut down. - * For this base implementation, this means that all files have been removed. - * - * @return True, if the IO manager has properly shut down, false otherwise. - */ - public boolean isProperlyShutDown() { - for (File path : paths) { - if (path != null && path.exists()) { - return false; + private static AutoCloseable getFileCloser(File path) { + return () -> { + try { + FileUtils.deleteDirectory(path); + LOG.info("I/O manager removed spill file directory {}", path.getAbsolutePath()); + } catch (IOException e) { + String errorMessage = String.format("IOManager failed to properly clean up temp file directory: %s", path); + throw new IOException(errorMessage, e); } - } - return true; + }; } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java index ffa4dcf..2133430 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java @@ -21,10 +21,12 @@ package org.apache.flink.runtime.io.disk.iomanager; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.util.IOUtils; import org.apache.flink.util.ShutdownHookUtil; import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -109,7 +111,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle * operation. */ @Override - public void close() { + public void close() throws Exception { // mark shut down and exit if it already was shut down if (!isShutdown.compareAndSet(false, true)) { return; @@ -118,30 +120,25 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle // Remove shutdown hook to prevent resource leaks ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG); - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Shutting down I/O manager."); - } - // close writing and reading threads with best effort and log problems - // first notify all to close, then wait until all are closed + if (LOG.isDebugEnabled()) { + LOG.debug("Shutting down I/O manager."); + } - for (WriterThread wt : writers) { - try { - wt.shutdown(); - } - catch (Throwable t) { - LOG.error("Error while shutting down IO Manager writer thread.", t); - } - } - for (ReaderThread rt : readers) { - try { - rt.shutdown(); - } - catch (Throwable t) { - LOG.error("Error while shutting down IO Manager reader thread.", t); - } - } + // close writing and reading threads with best effort and log problems + // first notify all to close, then wait until all are closed + + List<AutoCloseable> closeables = new ArrayList<>(writers.length + readers.length + 2); + + for (WriterThread wt : writers) { + closeables.add(getWriterThreadCloser(wt)); + } + + for (ReaderThread rt : readers) { + closeables.add(getReaderThreadCloser(rt)); + } + + closeables.add(() -> { try { for (WriterThread wt : writers) { wt.join(); @@ -149,44 +146,46 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle for (ReaderThread rt : readers) { rt.join(); } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); } - catch (InterruptedException iex) { - // ignore this on shutdown - } - } - finally { - // make sure we call the super implementation in any case and at the last point, - // because this will clean up the I/O directories - super.close(); - } + }); + + // make sure we call the super implementation in any case and at the last point, + // because this will clean up the I/O directories + closeables.add(super::close); + + IOUtils.closeAll(closeables); } - - /** - * Utility method to check whether the IO manager has been properly shut down. The IO manager is considered - * to be properly shut down when it is closed and its threads have ceased operation. - * - * @return True, if the IO manager has properly shut down, false otherwise. - */ - @Override - public boolean isProperlyShutDown() { - boolean readersShutDown = true; - for (ReaderThread rt : readers) { - readersShutDown &= rt.getState() == Thread.State.TERMINATED; - } - - boolean writersShutDown = true; - for (WriterThread wt : writers) { - writersShutDown &= wt.getState() == Thread.State.TERMINATED; - } - - return isShutdown.get() && readersShutDown && writersShutDown && super.isProperlyShutDown(); + + private static AutoCloseable getWriterThreadCloser(WriterThread thread) { + return () -> { + try { + thread.shutdown(); + } catch (Throwable t) { + throw new IOException("Error while shutting down IO Manager writer thread.", t); + } + }; } + private static AutoCloseable getReaderThreadCloser(ReaderThread thread) { + return () -> { + try { + thread.shutdown(); + } catch (Throwable t) { + throw new IOException("Error while shutting down IO Manager reader thread.", t); + } + }; + } @Override public void uncaughtException(Thread t, Throwable e) { LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Shutting down I/O Manager.", e); - close(); + try { + close(); + } catch (Exception ex) { + LOG.warn("IOManagerAsync did not shut down properly.", ex); + } } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java index a617109..4e94de2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java @@ -82,11 +82,8 @@ public class ChannelViewsTest } @After - public void afterTest() { + public void afterTest() throws Exception { this.ioManager.close(); - if (!this.ioManager.isProperlyShutDown()) { - Assert.fail("I/O Manager was not properly shut down."); - } if (memoryManager != null) { Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.", diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java index 44fe849..c83e942 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java @@ -73,9 +73,8 @@ public class FileChannelStreamsITCase extends TestLogger { } @After - public void afterTest() { + public void afterTest() throws Exception { ioManager.close(); - assertTrue("I/O Manager was not properly shut down.", ioManager.isProperlyShutDown()); assertTrue("The memory has not been properly released", memManager.verifyEmpty()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java index f94735c..d7d2f28 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java @@ -71,11 +71,8 @@ public class SpillingBufferTest { } @After - public void afterTest() { + public void afterTest() throws Exception { ioManager.close(); - if (!ioManager.isProperlyShutDown()) { - Assert.fail("I/O Manager was not properly shut down."); - } if (memoryManager != null) { Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.", diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java index 5ef3983..a8e8d81 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java @@ -58,7 +58,7 @@ public class AsynchronousBufferFileWriterTest { private AsynchronousBufferFileWriter writer; @AfterClass - public static void shutdown() { + public static void shutdown() throws Exception { ioManager.close(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java index ead6490..5bf4db4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java @@ -316,7 +316,7 @@ public class AsynchronousFileIOChannelTest { } @Test - public void testExceptionForwardsToClose() { + public void testExceptionForwardsToClose() throws Exception { try (IOManagerAsync ioMan = new IOManagerAsync()) { testExceptionForwardsToClose(ioMan, 100, 1); testExceptionForwardsToClose(ioMan, 100, 50); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java index d2c388c..88f241e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java @@ -60,7 +60,7 @@ public class BufferFileWriterFileSegmentReaderTest { private LinkedBlockingQueue<FileSegment> returnedFileSegments = new LinkedBlockingQueue<>(); @AfterClass - public static void shutdown() { + public static void shutdown() throws Exception { ioManager.close(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java index 78026bf..0eab18d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java @@ -57,7 +57,7 @@ public class BufferFileWriterReaderTest { private LinkedBlockingQueue<Buffer> returnedBuffers = new LinkedBlockingQueue<>(); @AfterClass - public static void shutdown() { + public static void shutdown() throws Exception { ioManager.close(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java index a4d3795..b0ce877 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java @@ -46,9 +46,8 @@ public class IOManagerAsyncTest { } @After - public void afterTest() { + public void afterTest() throws Exception { this.ioManager.close(); - assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown()); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java index 4a3f13f..83e15be 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java @@ -66,7 +66,6 @@ public class IOManagerITCase extends TestLogger { @After public void afterTest() throws Exception { ioManager.close(); - Assert.assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown()); Assert.assertTrue("Not all memory was returned to the memory manager in the test.", memoryManager.verifyEmpty()); memoryManager.shutdown(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java index ce16bba..4c0e6a2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java @@ -40,7 +40,7 @@ public class IOManagerTest { public final TemporaryFolder temporaryFolder = new TemporaryFolder(); @Test - public void channelEnumerator() throws IOException { + public void channelEnumerator() throws Exception { File tempPath = temporaryFolder.newFolder(); String[] tempDirs = new String[]{ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java index cde6d89..1b86cec 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java @@ -100,13 +100,10 @@ public class HashTableITCase extends TestLogger { } @After - public void tearDown() + public void tearDown() throws Exception { // shut down I/O manager and Memory Manager and verify the correct shutdown this.ioManager.close(); - if (!this.ioManager.isProperlyShutDown()) { - fail("I/O manager was not property shut down."); - } if (!this.memManager.verifyEmpty()) { fail("Not all memory was properly released to the memory manager --> Memory Leak."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java index 4512ee8..553fd24 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java @@ -103,12 +103,9 @@ public class NonReusingHashJoinIteratorITCase extends TestLogger { } @After - public void afterTest() { + public void afterTest() throws Exception { if (this.ioManager != null) { this.ioManager.close(); - if (!this.ioManager.isProperlyShutDown()) { - Assert.fail("I/O manager failed to properly shut down."); - } this.ioManager = null; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java index 754da40..95f36e4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java @@ -81,12 +81,9 @@ public class ReOpenableHashTableITCase extends TestLogger { } @After - public void afterTest() { + public void afterTest() throws Exception { if (this.ioManager != null) { this.ioManager.close(); - if (!this.ioManager.isProperlyShutDown()) { - Assert.fail("I/O manager failed to properly shut down."); - } this.ioManager = null; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java index e6ac58a..9acd87a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java @@ -92,12 +92,9 @@ public abstract class ReOpenableHashTableTestBase extends TestLogger { } @After - public void afterTest() { + public void afterTest() throws Exception { if (this.ioManager != null) { this.ioManager.close(); - if (!this.ioManager.isProperlyShutDown()) { - Assert.fail("I/O manager failed to properly shut down."); - } this.ioManager = null; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java index c199e38..9a69d89 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java @@ -111,12 +111,9 @@ public class ReusingHashJoinIteratorITCase extends TestLogger { } @After - public void afterTest() { + public void afterTest() throws Exception { if (this.ioManager != null) { this.ioManager.close(); - if (!this.ioManager.isProperlyShutDown()) { - Assert.fail("I/O manager failed to properly shut down."); - } this.ioManager = null; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java index 3f5a25d..15c32fc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java @@ -71,11 +71,8 @@ public class SpillingResettableIteratorTest { } @After - public void shutdown() { + public void shutdown() throws Exception { this.ioman.close(); - if (!this.ioman.isProperlyShutDown()) { - Assert.fail("I/O Manager Shutdown was not completed properly."); - } this.ioman = null; if (!this.memman.verifyEmpty()) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java index 2a69a63..c442e7d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java @@ -68,11 +68,8 @@ public class SpillingResettableMutableObjectIteratorTest { } @After - public void shutdown() { + public void shutdown() throws Exception { this.ioman.close(); - if (!this.ioman.isProperlyShutDown()) { - Assert.fail("I/O Manager Shutdown was not completed properly."); - } this.ioman = null; if (!this.memman.verifyEmpty()) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java index 7cdf07f..364dc32 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java @@ -109,12 +109,9 @@ public abstract class AbstractSortMergeOuterJoinIteratorITCase extends TestLogge } @After - public void afterTest() { + public void afterTest() throws Exception { if (this.ioManager != null) { this.ioManager.close(); - if (!this.ioManager.isProperlyShutDown()) { - Assert.fail("I/O manager failed to properly shut down."); - } this.ioManager = null; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java index 96831f1..e50798d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java @@ -92,11 +92,8 @@ public class CombiningUnilateralSortMergerITCase extends TestLogger { } @After - public void afterTest() { + public void afterTest() throws Exception { this.ioManager.close(); - if (!this.ioManager.isProperlyShutDown()) { - Assert.fail("I/O Manager was not properly shut down."); - } if (this.memoryManager != null) { Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.", diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java index d1aa0a7..691a065 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java @@ -85,11 +85,8 @@ public class ExternalSortITCase extends TestLogger { } @After - public void afterTest() { + public void afterTest() throws Exception { this.ioManager.close(); - if (!this.ioManager.isProperlyShutDown()) { - Assert.fail("I/O Manager was not properly shut down."); - } if (this.memoryManager != null && testSuccess) { Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.", diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java index 335dbee..48d6ce7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java @@ -71,11 +71,8 @@ public class ExternalSortLargeRecordsITCase extends TestLogger { } @After - public void afterTest() { + public void afterTest() throws Exception { this.ioManager.close(); - if (!this.ioManager.isProperlyShutDown()) { - Assert.fail("I/O Manager was not properly shut down."); - } if (this.memoryManager != null && testSuccess) { Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.", diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java index 2f05c78..fc33ffa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java @@ -73,7 +73,7 @@ public class FixedLengthRecordSorterTest { } @After - public void afterTest() { + public void afterTest() throws Exception { if (!this.memoryManager.verifyEmpty()) { Assert.fail("Memory Leak: Some memory has not been returned to the memory manager."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java index c729a3a..cc2736d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java @@ -110,12 +110,9 @@ public class NonReusingSortMergeInnerJoinIteratorITCase extends TestLogger { } @After - public void afterTest() { + public void afterTest() throws Exception { if (this.ioManager != null) { this.ioManager.close(); - if (!this.ioManager.isProperlyShutDown()) { - Assert.fail("I/O manager failed to properly shut down."); - } this.ioManager = null; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java index d149084..e6e3828 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java @@ -110,12 +110,9 @@ public class ReusingSortMergeInnerJoinIteratorITCase extends TestLogger { } @After - public void afterTest() { + public void afterTest() throws Exception { if (this.ioManager != null) { this.ioManager.close(); - if (!this.ioManager.isProperlyShutDown()) { - Assert.fail("I/O manager failed to properly shut down."); - } this.ioManager = null; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java index d2a4f6a..1ffac3f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java @@ -389,8 +389,7 @@ public abstract class BinaryOperatorTestBase<S extends Function, IN, OUT> extend // 2nd, shutdown I/O this.ioManager.close(); - Assert.assertTrue("I/O Manager has not properly shut down.", this.ioManager.isProperlyShutDown()); - + // last, verify all memory is returned and shutdown mem manager MemoryManager memMan = getMemoryManager(); if (memMan != null) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java index e1d2cb9..ec106ec 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java @@ -386,7 +386,6 @@ public abstract class DriverTestBase<S extends Function> extends TestLogger impl // 2nd, shutdown I/O this.ioManager.close(); - Assert.assertTrue("I/O Manager has not properly shut down.", this.ioManager.isProperlyShutDown()); // last, verify all memory is returned and shutdown mem manager MemoryManager memMan = getMemoryManager(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index 2c96ca4..d3d462f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -336,7 +336,7 @@ public class MockEnvironment implements Environment, AutoCloseable { } @Override - public void close() { + public void close() throws Exception { // close() method should be idempotent and calling memManager.verifyEmpty() will throw after it was shutdown. if (!memManager.isShutdown()) { checkState(memManager.verifyEmpty(), "Memory Manager managed memory was not completely freed."); @@ -344,8 +344,6 @@ public class MockEnvironment implements Environment, AutoCloseable { memManager.shutdown(); ioManager.close(); - - checkState(ioManager.isProperlyShutDown(), "IO Manager has not properly shut down."); } public void setExpectedExternalFailureCause(Class<Throwable> expectedThrowableClass) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java index 1a46539..16839aa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java @@ -156,7 +156,7 @@ public abstract class TaskTestBase extends TestLogger { } @After - public void shutdown() { + public void shutdown() throws Exception { mockEnv.close(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java index 1ff50b0..73ea30c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java @@ -379,7 +379,6 @@ public abstract class UnaryOperatorTestBase<S extends Function, IN, OUT> extends // 2nd, shutdown I/O this.ioManager.close(); - Assert.assertTrue("I/O Manager has not properly shut down.", this.ioManager.isProperlyShutDown()); // last, verify all memory is returned and shutdown mem manager MemoryManager memMan = getMemoryManager(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java index 7b4eaba..a9581f5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java @@ -97,7 +97,7 @@ public class HashVsSortMiniBenchmark { } @After - public void afterTest() { + public void afterTest() throws Exception { if (this.memoryManager != null) { Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.", this.memoryManager.verifyEmpty()); @@ -107,9 +107,6 @@ public class HashVsSortMiniBenchmark { if (this.ioManager != null) { this.ioManager.close(); - if (!this.ioManager.isProperlyShutDown()) { - Assert.fail("I/O manager failed to properly shut down."); - } this.ioManager = null; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index e67e979..c7b8e12 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -306,7 +306,6 @@ public class TaskExecutorTest extends TestLogger { assertThat(memoryManager.isShutdown(), is(true)); assertThat(nettyShuffleEnvironment.isClosed(), is(true)); - assertThat(ioManager.isProperlyShutDown(), is(true)); assertThat(kvStateService.isShutdown(), is(true)); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java index d19b99c..ead6bd8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java @@ -54,7 +54,7 @@ public class BufferSpillerTest extends BufferStorageTestBase { } @AfterClass - public static void shutdownIOManager() { + public static void shutdownIOManager() throws Exception { ioManager.close(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java index 535dfc0..341e291 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java @@ -77,7 +77,7 @@ public class CheckpointBarrierAlignerAlignmentLimitTest { } @AfterClass - public static void shutdownIOManager() { + public static void shutdownIOManager() throws Exception { ioManager.close(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java index 0d28325..c95e1f2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java @@ -48,7 +48,7 @@ public class SpillingCheckpointBarrierAlignerTest extends CheckpointBarrierAlign } @AfterClass - public static void shutdownIOManager() { + public static void shutdownIOManager() throws Exception { ioManager.close(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java index 8b1b183..e7bf45a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java @@ -72,7 +72,7 @@ public class OperatorChainTest { @SafeVarargs private static <T, OP extends StreamOperator<T>> OperatorChain<T, OP> setupOperatorChain( - OneInputStreamOperator<T, T>... operators) { + OneInputStreamOperator<T, T>... operators) throws Exception { checkNotNull(operators); checkArgument(operators.length > 0); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index c059757..2049da1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -326,7 +326,6 @@ public class StreamTaskTestHarness<OUT> { private void shutdownIOManager() throws Exception { this.mockEnv.getIOManager().close(); - Assert.assertTrue("IO Manager has not properly shut down.", this.mockEnv.getIOManager().isProperlyShutDown()); } private void shutdownMemoryManager() throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 3ab5390..ce561d4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -523,7 +523,9 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { } setupCalled = false; - internalEnvironment.ifPresent(MockEnvironment::close); + if (internalEnvironment.isPresent()) { + internalEnvironment.get().close(); + } } public void setProcessingTime(long time) throws Exception { diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/aggregate/HashAggTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/aggregate/HashAggTest.java index f35a40c..5a74016 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/aggregate/HashAggTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/aggregate/HashAggTest.java @@ -94,11 +94,8 @@ public class HashAggTest { } @After - public void afterTest() { + public void afterTest() throws Exception { this.ioManager.close(); - if (!this.ioManager.isProperlyShutDown()) { - Assert.fail("I/O Manager was not properly shut down."); - } if (this.memoryManager != null) { Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.", diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java index de4e949..cf509ed 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java @@ -93,12 +93,9 @@ public class BinaryHashTableTest { } @After - public void tearDown() { + public void tearDown() throws Exception { // shut down I/O manager and Memory Manager and verify the correct shutdown this.ioManager.close(); - if (!this.ioManager.isProperlyShutDown()) { - fail("I/O manager was not property shut down."); - } } @Test diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java index 14039c8..6f886d6 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java @@ -26,7 +26,6 @@ import org.apache.flink.table.runtime.compression.BlockCompressionFactory; import org.apache.flink.table.runtime.compression.Lz4BlockCompressionFactory; import org.junit.After; -import org.junit.Assert; import org.junit.Test; import java.io.IOException; @@ -50,11 +49,8 @@ public class CompressedHeaderlessChannelTest { } @After - public void afterTest() { + public void afterTest() throws Exception { this.ioManager.close(); - if (!this.ioManager.isProperlyShutDown()) { - Assert.fail("I/O Manager was not properly shut down."); - } } @Test diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/join/Int2SortMergeJoinOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/join/Int2SortMergeJoinOperatorTest.java index 7b47903..c8c9bdb 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/join/Int2SortMergeJoinOperatorTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/join/Int2SortMergeJoinOperatorTest.java @@ -76,12 +76,9 @@ public class Int2SortMergeJoinOperatorTest { } @After - public void tearDown() { + public void tearDown() throws Exception { // shut down I/O manager and Memory Manager and verify the correct shutdown this.ioManager.close(); - if (!this.ioManager.isProperlyShutDown()) { - fail("I/O manager was not property shut down."); - } if (!this.memManager.verifyEmpty()) { fail("Not all memory was properly released to the memory manager --> Memory Leak."); } diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java index 8eb62d0..7bbf45c 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java @@ -100,11 +100,8 @@ public class BinaryExternalSorterTest { } @After - public void afterTest() { + public void afterTest() throws Exception { this.ioManager.close(); - if (!this.ioManager.isProperlyShutDown()) { - Assert.fail("I/O Manager was not properly shut down."); - } if (this.memoryManager != null) { Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.", diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java index e095df1..069d40e 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java @@ -102,11 +102,8 @@ public class BufferedKVExternalSorterTest { } @After - public void afterTest() { + public void afterTest() throws Exception { this.ioManager.close(); - if (!this.ioManager.isProperlyShutDown()) { - Assert.fail("I/O Manager was not properly shut down."); - } } @Test
