This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5809ed32f869fa880a70a774d5b8365fe59dba4a Author: Zhijiang <[email protected]> AuthorDate: Tue Jun 18 12:09:04 2019 +0800 [hotfix][runtime] IOManager implements AutoCloseable --- .../flink/runtime/io/disk/iomanager/IOManager.java | 5 ++-- .../runtime/io/disk/iomanager/IOManagerAsync.java | 8 +++--- .../runtime/taskexecutor/TaskManagerServices.java | 2 +- .../flink/runtime/io/disk/ChannelViewsTest.java | 2 +- .../runtime/io/disk/FileChannelStreamsITCase.java | 2 +- .../runtime/io/disk/FileChannelStreamsTest.java | 12 ++------ .../io/disk/SeekableFileChannelInputViewTest.java | 6 +--- .../flink/runtime/io/disk/SpillingBufferTest.java | 2 +- .../AsynchronousBufferFileWriterTest.java | 2 +- .../iomanager/AsynchronousFileIOChannelTest.java | 19 +++---------- .../BufferFileWriterFileSegmentReaderTest.java | 2 +- .../disk/iomanager/BufferFileWriterReaderTest.java | 2 +- .../io/disk/iomanager/IOManagerAsyncTest.java | 4 +-- .../runtime/io/disk/iomanager/IOManagerITCase.java | 2 +- .../runtime/io/disk/iomanager/IOManagerTest.java | 33 ++++++++-------------- .../runtime/operators/hash/HashTableITCase.java | 2 +- .../hash/HashTablePerformanceComparison.java | 5 +--- .../runtime/operators/hash/HashTableTest.java | 21 ++------------ .../hash/NonReusingHashJoinIteratorITCase.java | 2 +- .../operators/hash/ReOpenableHashTableITCase.java | 2 +- .../hash/ReOpenableHashTableTestBase.java | 2 +- .../hash/ReusingHashJoinIteratorITCase.java | 2 +- .../resettable/SpillingResettableIteratorTest.java | 2 +- ...pillingResettableMutableObjectIteratorTest.java | 2 +- .../AbstractSortMergeOuterJoinIteratorITCase.java | 2 +- .../sort/CombiningUnilateralSortMergerITCase.java | 2 +- .../runtime/operators/sort/ExternalSortITCase.java | 2 +- .../sort/ExternalSortLargeRecordsITCase.java | 2 +- .../sort/FixedLengthRecordSorterTest.java | 2 +- .../operators/sort/LargeRecordHandlerITCase.java | 18 ++---------- .../operators/sort/LargeRecordHandlerTest.java | 21 ++------------ ...NonReusingSortMergeInnerJoinIteratorITCase.java | 2 +- .../ReusingSortMergeInnerJoinIteratorITCase.java | 2 +- .../operators/sort/UnilateralSortMergerTest.java | 4 +-- .../testutils/BinaryOperatorTestBase.java | 2 +- .../operators/testutils/DriverTestBase.java | 2 +- .../operators/testutils/MockEnvironment.java | 2 +- .../operators/testutils/UnaryOperatorTestBase.java | 2 +- .../operators/util/HashVsSortMiniBenchmark.java | 2 +- .../streaming/runtime/io/BufferSpillerTest.java | 2 +- ...CheckpointBarrierAlignerAlignmentLimitTest.java | 2 +- .../CheckpointBarrierAlignerMassiveRandomTest.java | 7 +---- .../io/SpillingCheckpointBarrierAlignerTest.java | 2 +- .../StreamNetworkBenchmarkEnvironment.java | 2 +- .../runtime/tasks/StreamTaskTestHarness.java | 2 +- .../flink/table/runtime/aggregate/HashAggTest.java | 2 +- .../runtime/hashtable/BinaryHashTableTest.java | 2 +- .../io/CompressedHeaderlessChannelTest.java | 2 +- .../join/Int2SortMergeJoinOperatorTest.java | 2 +- .../runtime/sort/BinaryExternalSorterTest.java | 2 +- .../runtime/sort/BufferedKVExternalSorterTest.java | 2 +- .../manual/HashTableRecordWidthCombinations.java | 7 +---- .../flink/test/manual/MassiveStringSorting.java | 14 ++------- .../test/manual/MassiveStringValueSorting.java | 14 ++------- 54 files changed, 82 insertions(+), 192 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java index 6723597..ee54b1e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java @@ -37,7 +37,7 @@ import java.util.concurrent.LinkedBlockingQueue; /** * The facade for the provided I/O manager services. */ -public abstract class IOManager { +public abstract class IOManager implements AutoCloseable { protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class); /** The temporary directories for files. */ @@ -85,7 +85,8 @@ public abstract class IOManager { * Close method, marks the I/O manager as closed * and removed all temporary files. */ - public void shutdown() { + @Override + public void close() { // remove all of our temp directories for (File path : paths) { try { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java index 0c3c8f1..ffa4dcf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java @@ -100,7 +100,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle } // install a shutdown hook that makes sure the temp directories get deleted - this.shutdownHook = ShutdownHookUtil.addShutdownHook(this::shutdown, getClass().getSimpleName(), LOG); + this.shutdownHook = ShutdownHookUtil.addShutdownHook(this::close, getClass().getSimpleName(), LOG); } /** @@ -109,7 +109,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle * operation. */ @Override - public void shutdown() { + public void close() { // mark shut down and exit if it already was shut down if (!isShutdown.compareAndSet(false, true)) { return; @@ -157,7 +157,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle finally { // make sure we call the super implementation in any case and at the last point, // because this will clean up the I/O directories - super.shutdown(); + super.close(); } } @@ -186,7 +186,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle @Override public void uncaughtException(Thread t, Throwable e) { LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Shutting down I/O Manager.", e); - shutdown(); + close(); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index 35236d9..0aafce0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -179,7 +179,7 @@ public class TaskManagerServices { } try { - ioManager.shutdown(); + ioManager.close(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java index 8c7ca1b..a617109 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java @@ -83,7 +83,7 @@ public class ChannelViewsTest @After public void afterTest() { - this.ioManager.shutdown(); + this.ioManager.close(); if (!this.ioManager.isProperlyShutDown()) { Assert.fail("I/O Manager was not properly shut down."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java index 7ffb58a..44fe849 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java @@ -74,7 +74,7 @@ public class FileChannelStreamsITCase extends TestLogger { @After public void afterTest() { - ioManager.shutdown(); + ioManager.close(); assertTrue("I/O Manager was not properly shut down.", ioManager.isProperlyShutDown()); assertTrue("The memory has not been properly released", memManager.verifyEmpty()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java index 1044a35..6bba3a6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java @@ -42,8 +42,7 @@ public class FileChannelStreamsTest { @Test public void testCloseAndDeleteOutputView() { - final IOManager ioManager = new IOManagerAsync(); - try { + try (IOManager ioManager = new IOManagerAsync()) { MemoryManager memMan = new MemoryManager(4 * 16*1024, 1, 16*1024, MemoryType.HEAP, true); List<MemorySegment> memory = new ArrayList<MemorySegment>(); memMan.allocatePages(new DummyInvokable(), memory, 4); @@ -69,15 +68,11 @@ public class FileChannelStreamsTest { e.printStackTrace(); fail(e.getMessage()); } - finally { - ioManager.shutdown(); - } } @Test public void testCloseAndDeleteInputView() { - final IOManager ioManager = new IOManagerAsync(); - try { + try (IOManager ioManager = new IOManagerAsync()) { MemoryManager memMan = new MemoryManager(4 * 16*1024, 1, 16*1024, MemoryType.HEAP, true); List<MemorySegment> memory = new ArrayList<MemorySegment>(); memMan.allocatePages(new DummyInvokable(), memory, 4); @@ -110,8 +105,5 @@ public class FileChannelStreamsTest { e.printStackTrace(); fail(e.getMessage()); } - finally { - ioManager.shutdown(); - } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java index 4c6a2b3..1c5a2ad 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java @@ -39,12 +39,11 @@ public class SeekableFileChannelInputViewTest { @Test public void testSeek() { - final IOManager ioManager = new IOManagerAsync(); final int PAGE_SIZE = 16 * 1024; final int NUM_RECORDS = 120000; // integers across 7.x pages (7 pages = 114.688 bytes, 8 pages = 131.072 bytes) - try { + try (IOManager ioManager = new IOManagerAsync()) { MemoryManager memMan = new MemoryManager(4 * PAGE_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true); List<MemorySegment> memory = new ArrayList<MemorySegment>(); memMan.allocatePages(new DummyInvokable(), memory, 4); @@ -150,8 +149,5 @@ public class SeekableFileChannelInputViewTest { e.printStackTrace(); fail(e.getMessage()); } - finally { - ioManager.shutdown(); - } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java index 01a9723..f94735c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java @@ -72,7 +72,7 @@ public class SpillingBufferTest { @After public void afterTest() { - ioManager.shutdown(); + ioManager.close(); if (!ioManager.isProperlyShutDown()) { Assert.fail("I/O Manager was not properly shut down."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java index bc4c42a..5ef3983 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java @@ -59,7 +59,7 @@ public class AsynchronousBufferFileWriterTest { @AfterClass public static void shutdown() { - ioManager.shutdown(); + ioManager.close(); } @Before diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java index e3d5907..ead6490 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java @@ -58,7 +58,6 @@ public class AsynchronousFileIOChannelTest { final int numberOfRequests = 100; // -- Setup ----------------------------------------------------------- - final IOManagerAsync ioManager = new IOManagerAsync(); final ExecutorService executor = Executors.newFixedThreadPool(3); @@ -71,7 +70,7 @@ public class AsynchronousFileIOChannelTest { final TestNotificationListener listener = new TestNotificationListener(); // -- The Test -------------------------------------------------------- - try { + try (final IOManagerAsync ioManager = new IOManagerAsync()) { // Repeatedly add requests and process them and have one thread try to register as a // listener until the channel is closed and all requests are processed. @@ -177,7 +176,6 @@ public class AsynchronousFileIOChannelTest { } } finally { - ioManager.shutdown(); executor.shutdown(); } } @@ -188,7 +186,6 @@ public class AsynchronousFileIOChannelTest { final int numberOfRuns = 1024; // -- Setup ----------------------------------------------------------- - final IOManagerAsync ioManager = new IOManagerAsync(); final ExecutorService executor = Executors.newFixedThreadPool(2); @@ -200,7 +197,7 @@ public class AsynchronousFileIOChannelTest { final TestNotificationListener listener = new TestNotificationListener(); // -- The Test -------------------------------------------------------- - try { + try (final IOManagerAsync ioManager = new IOManagerAsync()) { // Repeatedly close the channel and add a request. for (int i = 0; i < numberOfRuns; i++) { final TestAsyncFileIOChannel ioChannel = new TestAsyncFileIOChannel( @@ -264,15 +261,13 @@ public class AsynchronousFileIOChannelTest { } } finally { - ioManager.shutdown(); executor.shutdown(); } } @Test public void testClosingWaits() { - IOManagerAsync ioMan = new IOManagerAsync(); - try { + try (final IOManagerAsync ioMan = new IOManagerAsync()) { final int NUM_BLOCKS = 100; final MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(32 * 1024); @@ -318,20 +313,14 @@ public class AsynchronousFileIOChannelTest { e.printStackTrace(); fail(e.getMessage()); } - finally { - ioMan.shutdown(); - } } @Test public void testExceptionForwardsToClose() { - IOManagerAsync ioMan = new IOManagerAsync(); - try { + try (IOManagerAsync ioMan = new IOManagerAsync()) { testExceptionForwardsToClose(ioMan, 100, 1); testExceptionForwardsToClose(ioMan, 100, 50); testExceptionForwardsToClose(ioMan, 100, 100); - } finally { - ioMan.shutdown(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java index 1aaefcb..d2c388c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java @@ -61,7 +61,7 @@ public class BufferFileWriterFileSegmentReaderTest { @AfterClass public static void shutdown() { - ioManager.shutdown(); + ioManager.close(); } @Before diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java index 29e7b44..78026bf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java @@ -58,7 +58,7 @@ public class BufferFileWriterReaderTest { @AfterClass public static void shutdown() { - ioManager.shutdown(); + ioManager.close(); } @Before diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java index 4656d56..a4d3795 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java @@ -47,7 +47,7 @@ public class IOManagerAsyncTest { @After public void afterTest() { - this.ioManager.shutdown(); + this.ioManager.close(); assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown()); } @@ -352,4 +352,4 @@ public class IOManagerAsyncTest { final class TestIOException extends IOException { private static final long serialVersionUID = -814705441998024472L; } -} \ No newline at end of file +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java index 852e6e7..4a3f13f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java @@ -65,7 +65,7 @@ public class IOManagerITCase extends TestLogger { @After public void afterTest() throws Exception { - ioManager.shutdown(); + ioManager.close(); Assert.assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown()); Assert.assertTrue("Not all memory was returned to the memory manager in the test.", memoryManager.verifyEmpty()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java index 156098e..ce16bba 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java @@ -41,22 +41,18 @@ public class IOManagerTest { @Test public void channelEnumerator() throws IOException { - IOManager ioMan = null; - - try { - File tempPath = temporaryFolder.newFolder(); - - String[] tempDirs = new String[]{ - new File(tempPath, "a").getAbsolutePath(), - new File(tempPath, "b").getAbsolutePath(), - new File(tempPath, "c").getAbsolutePath(), - new File(tempPath, "d").getAbsolutePath(), - new File(tempPath, "e").getAbsolutePath(), - }; - - int[] counters = new int[tempDirs.length]; - - ioMan = new TestIOManager(tempDirs); + File tempPath = temporaryFolder.newFolder(); + + String[] tempDirs = new String[]{ + new File(tempPath, "a").getAbsolutePath(), + new File(tempPath, "b").getAbsolutePath(), + new File(tempPath, "c").getAbsolutePath(), + new File(tempPath, "d").getAbsolutePath(), + new File(tempPath, "e").getAbsolutePath(), + }; + + int[] counters = new int[tempDirs.length]; + try (IOManager ioMan = new TestIOManager(tempDirs) ) { FileIOChannel.Enumerator enumerator = ioMan.createChannelEnumerator(); for (int i = 0; i < 3 * tempDirs.length; i++) { @@ -81,11 +77,6 @@ public class IOManagerTest { assertEquals(3, counters[k]); } } - finally { - if (ioMan != null) { - ioMan.shutdown(); - } - } } // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java index 85315b7..cde6d89 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java @@ -103,7 +103,7 @@ public class HashTableITCase extends TestLogger { public void tearDown() { // shut down I/O manager and Memory Manager and verify the correct shutdown - this.ioManager.shutdown(); + this.ioManager.close(); if (!this.ioManager.isProperlyShutDown()) { fail("I/O manager was not property shut down."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java index bc9daf2..c8748cc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java @@ -130,8 +130,7 @@ public class HashTablePerformanceComparison { @Test public void testMutableHashMapPerformance() { - final IOManager ioManager = new IOManagerAsync(); - try { + try (IOManager ioManager = new IOManagerAsync()) { final int NUM_MEM_PAGES = SIZE * NUM_PAIRS / PAGE_SIZE; MutableObjectIterator<IntPair> buildInput = new UniformIntPairGenerator(NUM_PAIRS, 1, false); @@ -206,8 +205,6 @@ public class HashTablePerformanceComparison { catch (Exception e) { e.printStackTrace(); fail("Error: " + e.getMessage()); - } finally { - ioManager.shutdown(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java index bcf620c..2e3748f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java @@ -110,10 +110,7 @@ public class HashTableTest { */ @Test public void testBufferMissingForProbing() { - - final IOManager ioMan = new IOManagerAsync(); - - try { + try (final IOManager ioMan = new IOManagerAsync()) { final int pageSize = 32*1024; final int numSegments = 34; final int numRecords = 3400; @@ -151,9 +148,6 @@ public class HashTableTest { e.printStackTrace(); fail(e.getMessage()); } - finally { - ioMan.shutdown(); - } } /** @@ -163,8 +157,6 @@ public class HashTableTest { */ @Test public void testSpillingFreesOnlyOverflowSegments() { - final IOManager ioMan = new IOManagerAsync(); - final TypeSerializer<ByteValue> serializer = ByteValueSerializer.INSTANCE; final TypeComparator<ByteValue> buildComparator = new ValueComparator<>(true, ByteValue.class); final TypeComparator<ByteValue> probeComparator = new ValueComparator<>(true, ByteValue.class); @@ -172,7 +164,7 @@ public class HashTableTest { @SuppressWarnings("unchecked") final TypePairComparator<ByteValue, ByteValue> pairComparator = Mockito.mock(TypePairComparator.class); - try { + try (final IOManager ioMan = new IOManagerAsync()) { final int pageSize = 32*1024; final int numSegments = 34; @@ -192,9 +184,6 @@ public class HashTableTest { e.printStackTrace(); fail(e.getMessage()); } - finally { - ioMan.shutdown(); - } } /** @@ -203,9 +192,7 @@ public class HashTableTest { */ @Test public void testSpillingWhenBuildingTableWithoutOverflow() throws Exception { - final IOManager ioMan = new IOManagerAsync(); - - try { + try (final IOManager ioMan = new IOManagerAsync()) { final TypeSerializer<byte[]> serializer = BytePrimitiveArraySerializer.INSTANCE; final TypeComparator<byte[]> buildComparator = new BytePrimitiveArrayComparator(true); final TypeComparator<byte[]> probeComparator = new BytePrimitiveArrayComparator(true); @@ -254,8 +241,6 @@ public class HashTableTest { } table.close(); - } finally { - ioMan.shutdown(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java index 802870b..4512ee8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java @@ -105,7 +105,7 @@ public class NonReusingHashJoinIteratorITCase extends TestLogger { @After public void afterTest() { if (this.ioManager != null) { - this.ioManager.shutdown(); + this.ioManager.close(); if (!this.ioManager.isProperlyShutDown()) { Assert.fail("I/O manager failed to properly shut down."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java index 020f1c3..754da40 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java @@ -83,7 +83,7 @@ public class ReOpenableHashTableITCase extends TestLogger { @After public void afterTest() { if (this.ioManager != null) { - this.ioManager.shutdown(); + this.ioManager.close(); if (!this.ioManager.isProperlyShutDown()) { Assert.fail("I/O manager failed to properly shut down."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java index 4008aa2..e6ac58a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java @@ -94,7 +94,7 @@ public abstract class ReOpenableHashTableTestBase extends TestLogger { @After public void afterTest() { if (this.ioManager != null) { - this.ioManager.shutdown(); + this.ioManager.close(); if (!this.ioManager.isProperlyShutDown()) { Assert.fail("I/O manager failed to properly shut down."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java index 8a51102..c199e38 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java @@ -113,7 +113,7 @@ public class ReusingHashJoinIteratorITCase extends TestLogger { @After public void afterTest() { if (this.ioManager != null) { - this.ioManager.shutdown(); + this.ioManager.close(); if (!this.ioManager.isProperlyShutDown()) { Assert.fail("I/O manager failed to properly shut down."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java index 0ab9a54..3f5a25d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java @@ -72,7 +72,7 @@ public class SpillingResettableIteratorTest { @After public void shutdown() { - this.ioman.shutdown(); + this.ioman.close(); if (!this.ioman.isProperlyShutDown()) { Assert.fail("I/O Manager Shutdown was not completed properly."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java index ef48a1f..2a69a63 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java @@ -69,7 +69,7 @@ public class SpillingResettableMutableObjectIteratorTest { @After public void shutdown() { - this.ioman.shutdown(); + this.ioman.close(); if (!this.ioman.isProperlyShutDown()) { Assert.fail("I/O Manager Shutdown was not completed properly."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java index 94c0fd4..7cdf07f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java @@ -111,7 +111,7 @@ public abstract class AbstractSortMergeOuterJoinIteratorITCase extends TestLogge @After public void afterTest() { if (this.ioManager != null) { - this.ioManager.shutdown(); + this.ioManager.close(); if (!this.ioManager.isProperlyShutDown()) { Assert.fail("I/O manager failed to properly shut down."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java index d32cad0..96831f1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java @@ -93,7 +93,7 @@ public class CombiningUnilateralSortMergerITCase extends TestLogger { @After public void afterTest() { - this.ioManager.shutdown(); + this.ioManager.close(); if (!this.ioManager.isProperlyShutDown()) { Assert.fail("I/O Manager was not properly shut down."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java index b30adc2..d1aa0a7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java @@ -86,7 +86,7 @@ public class ExternalSortITCase extends TestLogger { @After public void afterTest() { - this.ioManager.shutdown(); + this.ioManager.close(); if (!this.ioManager.isProperlyShutDown()) { Assert.fail("I/O Manager was not properly shut down."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java index 530951f..335dbee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java @@ -72,7 +72,7 @@ public class ExternalSortLargeRecordsITCase extends TestLogger { @After public void afterTest() { - this.ioManager.shutdown(); + this.ioManager.close(); if (!this.ioManager.isProperlyShutDown()) { Assert.fail("I/O Manager was not properly shut down."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java index bba713e..2f05c78 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java @@ -79,7 +79,7 @@ public class FixedLengthRecordSorterTest { } if (this.ioManager != null) { - ioManager.shutdown(); + ioManager.close(); ioManager = null; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java index 8f9e4dd..e73ee8c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java @@ -58,13 +58,11 @@ public class LargeRecordHandlerITCase extends TestLogger { @Test public void testRecordHandlerCompositeKey() { - - final IOManager ioMan = new IOManagerAsync(); final int PAGE_SIZE = 4 * 1024; final int NUM_PAGES = 1000; final int NUM_RECORDS = 10; - try { + try (final IOManager ioMan = new IOManagerAsync()) { final MemoryManager memMan = new MemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true); final AbstractInvokable owner = new DummyInvokable(); @@ -145,9 +143,6 @@ public class LargeRecordHandlerITCase extends TestLogger { e.printStackTrace(); fail(e.getMessage()); } - finally { - ioMan.shutdown(); - } } public static final class SomeVeryLongValue implements Value { @@ -193,15 +188,13 @@ public class LargeRecordHandlerITCase extends TestLogger { @Test public void fileTest() { - - final IOManager ioMan = new IOManagerAsync(); final int PAGE_SIZE = 4 * 1024; final int NUM_PAGES = 4; final int NUM_RECORDS = 10; FileIOChannel.ID channel = null; - try { + try (final IOManager ioMan = new IOManagerAsync()) { final MemoryManager memMan = new MemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true); final AbstractInvokable owner = new DummyInvokable(); @@ -260,13 +253,6 @@ public class LargeRecordHandlerITCase extends TestLogger { e.printStackTrace(); fail(e.getMessage()); } - finally { - if (channel != null) { - ioMan.deleteChannel(channel); - } - - ioMan.shutdown(); - } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java index a59e630..7e9acf6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java @@ -46,12 +46,10 @@ public class LargeRecordHandlerTest { @Test public void testEmptyRecordHandler() { - - final IOManager ioMan = new IOManagerAsync(); final int PAGE_SIZE = 4 * 1024; final int NUM_PAGES = 50; - try { + try (final IOManager ioMan = new IOManagerAsync()) { final MemoryManager memMan = new MemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true); final AbstractInvokable owner = new DummyInvokable(); final List<MemorySegment> memory = memMan.allocatePages(owner, NUM_PAGES); @@ -88,20 +86,15 @@ public class LargeRecordHandlerTest { e.printStackTrace(); fail(e.getMessage()); } - finally { - ioMan.shutdown(); - } } @Test public void testRecordHandlerSingleKey() { - - final IOManager ioMan = new IOManagerAsync(); final int PAGE_SIZE = 4 * 1024; final int NUM_PAGES = 24; final int NUM_RECORDS = 25000; - try { + try (final IOManager ioMan = new IOManagerAsync()) { final MemoryManager memMan = new MemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true); final AbstractInvokable owner = new DummyInvokable(); @@ -174,20 +167,15 @@ public class LargeRecordHandlerTest { e.printStackTrace(); fail(e.getMessage()); } - finally { - ioMan.shutdown(); - } } @Test public void testRecordHandlerCompositeKey() { - - final IOManager ioMan = new IOManagerAsync(); final int PAGE_SIZE = 4 * 1024; final int NUM_PAGES = 24; final int NUM_RECORDS = 25000; - try { + try (final IOManager ioMan = new IOManagerAsync()) { final MemoryManager memMan = new MemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true); final AbstractInvokable owner = new DummyInvokable(); @@ -262,8 +250,5 @@ public class LargeRecordHandlerTest { e.printStackTrace(); fail(e.getMessage()); } - finally { - ioMan.shutdown(); - } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java index 394d44c..c729a3a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java @@ -112,7 +112,7 @@ public class NonReusingSortMergeInnerJoinIteratorITCase extends TestLogger { @After public void afterTest() { if (this.ioManager != null) { - this.ioManager.shutdown(); + this.ioManager.close(); if (!this.ioManager.isProperlyShutDown()) { Assert.fail("I/O manager failed to properly shut down."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java index 8e5bd95..d149084 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java @@ -112,7 +112,7 @@ public class ReusingSortMergeInnerJoinIteratorITCase extends TestLogger { @After public void afterTest() { if (this.ioManager != null) { - this.ioManager.shutdown(); + this.ioManager.close(); if (!this.ioManager.isProperlyShutDown()) { Assert.fail("I/O manager failed to properly shut down."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java index cdaa5b1..c014e23 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java @@ -52,10 +52,9 @@ public class UnilateralSortMergerTest extends TestLogger { final int numPages = 32; final MemoryManager memoryManager = new MemoryManager(MemoryManager.DEFAULT_PAGE_SIZE * numPages, 1); - final IOManagerAsync ioManager = new IOManagerAsync(); final DummyInvokable parentTask = new DummyInvokable(); - try { + try (final IOManagerAsync ioManager = new IOManagerAsync()) { final List<MemorySegment> memory = memoryManager.allocatePages(parentTask, numPages); final UnilateralSortMerger<Tuple2<Integer, Integer>> unilateralSortMerger = new UnilateralSortMerger<>( memoryManager, @@ -85,7 +84,6 @@ public class UnilateralSortMergerTest extends TestLogger { assertThat(inMemorySorter.isDisposed(), is(true)); } } finally { - ioManager.shutdown(); memoryManager.shutdown(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java index a76f110..d2a4f6a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java @@ -388,7 +388,7 @@ public abstract class BinaryOperatorTestBase<S extends Function, IN, OUT> extend this.sorters.clear(); // 2nd, shutdown I/O - this.ioManager.shutdown(); + this.ioManager.close(); Assert.assertTrue("I/O Manager has not properly shut down.", this.ioManager.isProperlyShutDown()); // last, verify all memory is returned and shutdown mem manager diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java index 3820bf9..e1d2cb9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java @@ -385,7 +385,7 @@ public abstract class DriverTestBase<S extends Function> extends TestLogger impl this.sorters.clear(); // 2nd, shutdown I/O - this.ioManager.shutdown(); + this.ioManager.close(); Assert.assertTrue("I/O Manager has not properly shut down.", this.ioManager.isProperlyShutDown()); // last, verify all memory is returned and shutdown mem manager diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index 89a6ea4..2c96ca4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -343,7 +343,7 @@ public class MockEnvironment implements Environment, AutoCloseable { } memManager.shutdown(); - ioManager.shutdown(); + ioManager.close(); checkState(ioManager.isProperlyShutDown(), "IO Manager has not properly shut down."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java index 2ef82da..1ff50b0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java @@ -378,7 +378,7 @@ public abstract class UnaryOperatorTestBase<S extends Function, IN, OUT> extends } // 2nd, shutdown I/O - this.ioManager.shutdown(); + this.ioManager.close(); Assert.assertTrue("I/O Manager has not properly shut down.", this.ioManager.isProperlyShutDown()); // last, verify all memory is returned and shutdown mem manager diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java index 2696045..7b4eaba 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java @@ -106,7 +106,7 @@ public class HashVsSortMiniBenchmark { } if (this.ioManager != null) { - this.ioManager.shutdown(); + this.ioManager.close(); if (!this.ioManager.isProperlyShutDown()) { Assert.fail("I/O manager failed to properly shut down."); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java index 4d46451..d19b99c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java @@ -55,7 +55,7 @@ public class BufferSpillerTest extends BufferStorageTestBase { @AfterClass public static void shutdownIOManager() { - ioManager.shutdown(); + ioManager.close(); } @Before diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java index 0621179..535dfc0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java @@ -78,7 +78,7 @@ public class CheckpointBarrierAlignerAlignmentLimitTest { @AfterClass public static void shutdownIOManager() { - ioManager.shutdown(); + ioManager.close(); } // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java index 82920aa..2b452bd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java @@ -47,11 +47,9 @@ public class CheckpointBarrierAlignerMassiveRandomTest { @Test public void testWithTwoChannelsAndRandomBarriers() { - IOManager ioMan = null; NetworkBufferPool networkBufferPool1 = null; NetworkBufferPool networkBufferPool2 = null; - try { - ioMan = new IOManagerAsync(); + try (IOManager ioMan = new IOManagerAsync()) { networkBufferPool1 = new NetworkBufferPool(100, PAGE_SIZE, 1); networkBufferPool2 = new NetworkBufferPool(100, PAGE_SIZE, 1); @@ -76,9 +74,6 @@ public class CheckpointBarrierAlignerMassiveRandomTest { fail(e.getMessage()); } finally { - if (ioMan != null) { - ioMan.shutdown(); - } if (networkBufferPool1 != null) { networkBufferPool1.destroyAllBufferPools(); networkBufferPool1.destroy(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java index c892073..0d28325 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java @@ -49,7 +49,7 @@ public class SpillingCheckpointBarrierAlignerTest extends CheckpointBarrierAlign @AfterClass public static void shutdownIOManager() { - ioManager.shutdown(); + ioManager.close(); } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java index 0c399f7..b7d7430 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java @@ -168,7 +168,7 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> { public void tearDown() { suppressExceptions(senderEnv::close); suppressExceptions(receiverEnv::close); - suppressExceptions(ioManager::shutdown); + suppressExceptions(ioManager::close); } public SerializingLongReceiver createReceiver() throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index 12fa8be..c059757 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -325,7 +325,7 @@ public class StreamTaskTestHarness<OUT> { } private void shutdownIOManager() throws Exception { - this.mockEnv.getIOManager().shutdown(); + this.mockEnv.getIOManager().close(); Assert.assertTrue("IO Manager has not properly shut down.", this.mockEnv.getIOManager().isProperlyShutDown()); } diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/aggregate/HashAggTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/aggregate/HashAggTest.java index 9bab9b9..f35a40c 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/aggregate/HashAggTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/aggregate/HashAggTest.java @@ -95,7 +95,7 @@ public class HashAggTest { @After public void afterTest() { - this.ioManager.shutdown(); + this.ioManager.close(); if (!this.ioManager.isProperlyShutDown()) { Assert.fail("I/O Manager was not properly shut down."); } diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java index ee08da5..de4e949 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/hashtable/BinaryHashTableTest.java @@ -95,7 +95,7 @@ public class BinaryHashTableTest { @After public void tearDown() { // shut down I/O manager and Memory Manager and verify the correct shutdown - this.ioManager.shutdown(); + this.ioManager.close(); if (!this.ioManager.isProperlyShutDown()) { fail("I/O manager was not property shut down."); } diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java index 177e83b..14039c8 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java @@ -51,7 +51,7 @@ public class CompressedHeaderlessChannelTest { @After public void afterTest() { - this.ioManager.shutdown(); + this.ioManager.close(); if (!this.ioManager.isProperlyShutDown()) { Assert.fail("I/O Manager was not properly shut down."); } diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/join/Int2SortMergeJoinOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/join/Int2SortMergeJoinOperatorTest.java index 1402c11..7b47903 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/join/Int2SortMergeJoinOperatorTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/join/Int2SortMergeJoinOperatorTest.java @@ -78,7 +78,7 @@ public class Int2SortMergeJoinOperatorTest { @After public void tearDown() { // shut down I/O manager and Memory Manager and verify the correct shutdown - this.ioManager.shutdown(); + this.ioManager.close(); if (!this.ioManager.isProperlyShutDown()) { fail("I/O manager was not property shut down."); } diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java index 8b389c5..8eb62d0 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryExternalSorterTest.java @@ -101,7 +101,7 @@ public class BinaryExternalSorterTest { @After public void afterTest() { - this.ioManager.shutdown(); + this.ioManager.close(); if (!this.ioManager.isProperlyShutDown()) { Assert.fail("I/O Manager was not properly shut down."); } diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java index 1928696..e095df1 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java @@ -103,7 +103,7 @@ public class BufferedKVExternalSorterTest { @After public void afterTest() { - this.ioManager.shutdown(); + this.ioManager.close(); if (!this.ioManager.isProperlyShutDown()) { Assert.fail("I/O Manager was not properly shut down."); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java b/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java index f02cf1c..f1664ae 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java @@ -85,9 +85,7 @@ public class HashTableRecordWidthCombinations { } }; - final IOManager ioMan = new IOManagerAsync(); - - try { + try (final IOManager ioMan = new IOManagerAsync()) { final int pageSize = 32 * 1024; final int numSegments = 34; @@ -175,9 +173,6 @@ public class HashTableRecordWidthCombinations { e.printStackTrace(); fail(e.getMessage()); } - finally { - ioMan.shutdown(); - } } // ------------------------------------------------------------------------ diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java index eb99909..1d48eee 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java @@ -83,11 +83,9 @@ public class MassiveStringSorting { BufferedReader reader = null; BufferedReader verifyReader = null; MemoryManager mm = null; - IOManager ioMan = null; - try { + try (IOManager ioMan = new IOManagerAsync()) { mm = new MemoryManager(1024 * 1024, 1); - ioMan = new IOManagerAsync(); TypeSerializer<String> serializer = StringSerializer.INSTANCE; TypeComparator<String> comparator = new StringComparator(true); @@ -127,9 +125,6 @@ public class MassiveStringSorting { if (mm != null) { mm.shutdown(); } - if (ioMan != null) { - ioMan.shutdown(); - } } } catch (Exception e) { @@ -182,11 +177,9 @@ public class MassiveStringSorting { BufferedReader reader = null; BufferedReader verifyReader = null; MemoryManager mm = null; - IOManager ioMan = null; - try { + try (IOManager ioMan = new IOManagerAsync()) { mm = new MemoryManager(1024 * 1024, 1); - ioMan = new IOManagerAsync(); TupleTypeInfo<Tuple2<String, String[]>> typeInfo = (TupleTypeInfo<Tuple2<String, String[]>>) new TypeHint<Tuple2<String, String[]>>(){}.getTypeInfo(); @@ -256,9 +249,6 @@ public class MassiveStringSorting { if (mm != null) { mm.shutdown(); } - if (ioMan != null) { - ioMan.shutdown(); - } } } catch (Exception e) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java index 5dcf209..861f1df 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java @@ -84,11 +84,9 @@ public class MassiveStringValueSorting { BufferedReader reader = null; BufferedReader verifyReader = null; MemoryManager mm = null; - IOManager ioMan = null; - try { + try (IOManager ioMan = new IOManagerAsync()) { mm = new MemoryManager(1024 * 1024, 1); - ioMan = new IOManagerAsync(); TypeSerializer<StringValue> serializer = new CopyableValueSerializer<StringValue>(StringValue.class); TypeComparator<StringValue> comparator = new CopyableValueComparator<StringValue>(true, StringValue.class); @@ -129,9 +127,6 @@ public class MassiveStringValueSorting { if (mm != null) { mm.shutdown(); } - if (ioMan != null) { - ioMan.shutdown(); - } } } catch (Exception e) { @@ -186,11 +181,9 @@ public class MassiveStringValueSorting { BufferedReader reader = null; BufferedReader verifyReader = null; MemoryManager mm = null; - IOManager ioMan = null; - try { + try (IOManager ioMan = new IOManagerAsync()) { mm = new MemoryManager(1024 * 1024, 1); - ioMan = new IOManagerAsync(); TupleTypeInfo<Tuple2<StringValue, StringValue[]>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, StringValue[]>>) new TypeHint<Tuple2<StringValue, StringValue[]>>(){}.getTypeInfo(); @@ -260,9 +253,6 @@ public class MassiveStringValueSorting { if (mm != null) { mm.shutdown(); } - if (ioMan != null) { - ioMan.shutdown(); - } } } catch (Exception e) {
