This is an automated email from the ASF dual-hosted git repository. yingjie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e92b87df73897843a136bd273a38e90be8ab2d68 Author: Weijie Guo <[email protected]> AuthorDate: Wed Aug 3 01:38:35 2022 +0800 [hotfix] Migrate PartitionFileWriteReadTest, SortMergeResultPartitionReadSchedulerTest, SortMergeSubpartitionReaderTest to Junit5 and AssertJ This closes #20333. --- .../partition/PartitionedFileWriteReadTest.java | 54 +++++++++------- .../SortMergeResultPartitionReadSchedulerTest.java | 75 ++++++++++------------ .../partition/SortMergeSubpartitionReaderTest.java | 72 ++++++++++----------- 3 files changed, 98 insertions(+), 103 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java index adfe71792d5..741fd3f1882 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java @@ -26,9 +26,8 @@ import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import org.apache.flink.util.IOUtils; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import java.io.IOException; import java.nio.channels.FileChannel; @@ -44,17 +43,17 @@ import java.util.Random; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** * Tests for writing and reading {@link PartitionedFile} with {@link PartitionedFileWriter} and * {@link PartitionedFileReader}. */ -public class PartitionedFileWriteReadTest { - - @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); +class PartitionedFileWriteReadTest { + private @TempDir Path tempPath; @Test - public void testWriteAndReadPartitionedFile() throws Exception { + void testWriteAndReadPartitionedFile() throws Exception { int numSubpartitions = 10; int bufferSize = 1024; int numBuffers = 1000; @@ -122,8 +121,7 @@ public class PartitionedFileWriteReadTest { IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel); for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) { - assertThat(buffersWritten[subpartition].size()) - .isEqualTo(buffersRead[subpartition].size()); + assertThat(buffersWritten[subpartition]).hasSameSizeAs(buffersRead[subpartition]); for (int i = 0; i < buffersWritten[subpartition].size(); ++i) { assertBufferEquals( buffersWritten[subpartition].get(i), buffersRead[subpartition].get(i)); @@ -158,7 +156,7 @@ public class PartitionedFileWriteReadTest { } @Test - public void testWriteAndReadWithEmptySubpartition() throws Exception { + void testWriteAndReadWithEmptySubpartition() throws Exception { int numRegions = 10; int numSubpartitions = 5; int bufferSize = 1024; @@ -220,39 +218,50 @@ public class PartitionedFileWriteReadTest { return new NetworkBuffer(MemorySegmentFactory.wrap(data), (buf) -> {}, dataType, dataSize); } - @Test(expected = IllegalStateException.class) + @Test public void testNotWriteDataOfTheSameSubpartitionTogether() throws Exception { PartitionedFileWriter partitionedFileWriter = createPartitionedFileWriter(2); try { MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(1024); - NetworkBuffer buffer1 = new NetworkBuffer(segment, (buf) -> {}); - partitionedFileWriter.writeBuffers(getBufferWithChannels(buffer1, 1)); + assertThatThrownBy( + () -> { + NetworkBuffer buffer1 = new NetworkBuffer(segment, (buf) -> {}); + partitionedFileWriter.writeBuffers( + getBufferWithChannels(buffer1, 1)); + + NetworkBuffer buffer2 = new NetworkBuffer(segment, (buf) -> {}); + partitionedFileWriter.writeBuffers( + getBufferWithChannels(buffer2, 0)); - NetworkBuffer buffer2 = new NetworkBuffer(segment, (buf) -> {}); - partitionedFileWriter.writeBuffers(getBufferWithChannels(buffer2, 0)); + NetworkBuffer buffer3 = new NetworkBuffer(segment, (buf) -> {}); + partitionedFileWriter.writeBuffers( + getBufferWithChannels(buffer3, 1)); + }) + .isInstanceOf(IllegalStateException.class); - NetworkBuffer buffer3 = new NetworkBuffer(segment, (buf) -> {}); - partitionedFileWriter.writeBuffers(getBufferWithChannels(buffer3, 1)); } finally { partitionedFileWriter.finish(); } } - @Test(expected = IllegalStateException.class) + @Test public void testWriteFinishedPartitionedFile() throws Exception { PartitionedFileWriter partitionedFileWriter = createAndFinishPartitionedFileWriter(); MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(1024); NetworkBuffer buffer = new NetworkBuffer(segment, (buf) -> {}); - partitionedFileWriter.writeBuffers(getBufferWithChannels(buffer, 0)); + assertThatThrownBy( + () -> partitionedFileWriter.writeBuffers(getBufferWithChannels(buffer, 0))) + .isInstanceOf(IllegalStateException.class); } - @Test(expected = IllegalStateException.class) + @Test public void testFinishPartitionedFileWriterTwice() throws Exception { PartitionedFileWriter partitionedFileWriter = createAndFinishPartitionedFileWriter(); - partitionedFileWriter.finish(); + assertThatThrownBy(() -> partitionedFileWriter.finish()) + .isInstanceOf(IllegalStateException.class); } @Test @@ -296,8 +305,7 @@ public class PartitionedFileWriteReadTest { private PartitionedFileWriter createPartitionedFileWriter(int numSubpartitions) throws IOException { - String basePath = temporaryFolder.newFile().getPath(); - return new PartitionedFileWriter(numSubpartitions, 640, basePath); + return new PartitionedFileWriter(numSubpartitions, 640, tempPath.toString()); } private PartitionedFileWriter createAndFinishPartitionedFileWriter() throws IOException { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java index 301ecffee1d..26213d4d43d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java @@ -23,15 +23,12 @@ import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.CompositeBuffer; -import org.apache.flink.util.TestLogger; -import org.assertj.core.api.Assertions; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.rules.Timeout; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.io.TempDir; import java.io.IOException; import java.nio.ByteBuffer; @@ -46,13 +43,13 @@ import java.util.Queue; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link SortMergeResultPartitionReadScheduler}. */ -public class SortMergeResultPartitionReadSchedulerTest extends TestLogger { +class SortMergeResultPartitionReadSchedulerTest { private static final int bufferSize = 1024; @@ -80,17 +77,13 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger { private SortMergeResultPartitionReadScheduler readScheduler; - @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); - - @Rule public Timeout timeout = new Timeout(60, TimeUnit.SECONDS); - - @Before - public void before() throws Exception { + @BeforeEach + void before(@TempDir Path basePath) throws Exception { Random random = new Random(); random.nextBytes(dataBytes); partitionedFile = PartitionTestUtils.createPartitionedFile( - temporaryFolder.newFile().getAbsolutePath(), + basePath.toString(), numSubpartitions, numBuffersPerSubpartition, bufferSize, @@ -105,8 +98,8 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger { new SortMergeResultPartitionReadScheduler(bufferPool, executor, new Object()); } - @After - public void after() throws Exception { + @AfterEach + void after() throws Exception { dataFileChannel.close(); indexFileChannel.close(); partitionedFile.deleteQuietly(); @@ -115,7 +108,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger { } @Test - public void testCreateSubpartitionReader() throws Exception { + void testCreateSubpartitionReader() throws Exception { SortMergeSubpartitionReader subpartitionReader = readScheduler.createSubpartitionReader( new NoOpBufferAvailablityListener(), 0, partitionedFile); @@ -141,7 +134,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger { } @Test - public void testOnSubpartitionReaderError() throws Exception { + void testOnSubpartitionReaderError() throws Exception { SortMergeSubpartitionReader subpartitionReader = readScheduler.createSubpartitionReader( new NoOpBufferAvailablityListener(), 0, partitionedFile); @@ -152,7 +145,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger { } @Test - public void testReleaseWhileReading() throws Exception { + void testReleaseWhileReading() throws Exception { SortMergeSubpartitionReader subpartitionReader = readScheduler.createSubpartitionReader( new NoOpBufferAvailablityListener(), 0, partitionedFile); @@ -169,20 +162,20 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger { assertAllResourcesReleased(); } - @Test(expected = IllegalStateException.class) - public void testCreateSubpartitionReaderAfterReleased() throws Exception { + @Test + void testCreateSubpartitionReaderAfterReleased() throws Exception { bufferPool.initialize(); readScheduler.release(); - try { - readScheduler.createSubpartitionReader( - new NoOpBufferAvailablityListener(), 0, partitionedFile); - } finally { - assertAllResourcesReleased(); - } + assertThatThrownBy( + () -> + readScheduler.createSubpartitionReader( + new NoOpBufferAvailablityListener(), 0, partitionedFile)) + .isInstanceOf(IllegalStateException.class); + assertAllResourcesReleased(); } @Test - public void testOnDataReadError() throws Exception { + void testOnDataReadError() throws Exception { SortMergeSubpartitionReader subpartitionReader = readScheduler.createSubpartitionReader( new NoOpBufferAvailablityListener(), 0, partitionedFile); @@ -205,7 +198,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger { } @Test - public void testOnReadBufferRequestError() throws Exception { + void testOnReadBufferRequestError() throws Exception { SortMergeSubpartitionReader subpartitionReader = readScheduler.createSubpartitionReader( new NoOpBufferAvailablityListener(), 0, partitionedFile); @@ -219,8 +212,9 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger { assertAllResourcesReleased(); } - @Test(timeout = 60000) - public void testNoDeadlockWhenReadAndReleaseBuffers() throws Exception { + @Test + @Timeout(60) + void testNoDeadlockWhenReadAndReleaseBuffers() throws Exception { bufferPool.initialize(); SortMergeSubpartitionReader subpartitionReader = new SortMergeSubpartitionReader(new NoOpBufferAvailablityListener(), fileReader); @@ -248,7 +242,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger { } @Test - public void testRequestBufferTimeout() throws Exception { + void testRequestBufferTimeout() throws Exception { Duration bufferRequestTimeout = Duration.ofSeconds(3); List<MemorySegment> buffers = bufferPool.requestBuffers(); SortMergeResultPartitionReadScheduler readScheduler = @@ -256,8 +250,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger { bufferPool, executor, this, bufferRequestTimeout); long startTimestamp = System.nanoTime(); - Assertions.assertThatThrownBy(readScheduler::allocateBuffers) - .isInstanceOf(TimeoutException.class); + assertThatThrownBy(readScheduler::allocateBuffers).isInstanceOf(TimeoutException.class); long requestDuration = System.nanoTime() - startTimestamp; assertThat(requestDuration > bufferRequestTimeout.toNanos()).isTrue(); @@ -266,7 +259,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger { } @Test - public void testRequestTimeoutIsRefreshedAndSuccess() throws Exception { + void testRequestTimeoutIsRefreshedAndSuccess() throws Exception { Duration bufferRequestTimeout = Duration.ofSeconds(3); FakeBatchShuffleReadBufferPool bufferPool = new FakeBatchShuffleReadBufferPool(bufferSize * 3, bufferSize); @@ -280,8 +273,8 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger { Queue<MemorySegment> allocatedBuffers = readScheduler.allocateBuffers(); long requestDuration = System.nanoTime() - startTimestamp; - assertThat(allocatedBuffers.size()).isEqualTo(3); - assertThat(requestDuration > bufferRequestTimeout.toNanos() * 2).isTrue(); + assertThat(allocatedBuffers).hasSize(3); + assertThat(requestDuration).isGreaterThan(bufferRequestTimeout.toNanos() * 2); assertThat(subpartitionReader.getFailureCause()).isNull(); bufferPool.recycle(allocatedBuffers); @@ -319,7 +312,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger { assertThat(readScheduler.getDataFileChannel()).isNull(); assertThat(readScheduler.getIndexFileChannel()).isNull(); assertThat(readScheduler.isRunning()).isFalse(); - assertThat(readScheduler.getNumPendingReaders()).isEqualTo(0); + assertThat(readScheduler.getNumPendingReaders()).isZero(); if (!bufferPool.isDestroyed()) { assertThat(bufferPool.getNumTotalBuffers()).isEqualTo(bufferPool.getAvailableBuffers()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java index cf28333de1f..f899a0ea142 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java @@ -24,14 +24,11 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.CompositeBuffer; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.util.IOUtils; -import org.apache.flink.util.TestLogger; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.rules.Timeout; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import java.io.IOException; import java.nio.ByteBuffer; @@ -41,13 +38,13 @@ import java.nio.file.StandardOpenOption; import java.util.ArrayDeque; import java.util.Queue; import java.util.Random; -import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link SortMergeSubpartitionReader}. */ -public class SortMergeSubpartitionReaderTest extends TestLogger { +class SortMergeSubpartitionReaderTest { private static final int bufferSize = 1024; @@ -63,17 +60,13 @@ public class SortMergeSubpartitionReaderTest extends TestLogger { private FileChannel indexFileChannel; - @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); - - @Rule public Timeout timeout = new Timeout(60, TimeUnit.SECONDS); - - @Before - public void before() throws Exception { + @BeforeEach + void before(@TempDir Path basePath) throws Exception { Random random = new Random(); random.nextBytes(dataBytes); partitionedFile = PartitionTestUtils.createPartitionedFile( - temporaryFolder.newFile().getAbsolutePath(), + basePath.toString(), numSubpartitions, numBuffersPerSubpartition, bufferSize, @@ -82,34 +75,34 @@ public class SortMergeSubpartitionReaderTest extends TestLogger { indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath()); } - @After - public void after() { + @AfterEach + void after() { IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel); partitionedFile.deleteQuietly(); } @Test - public void testReadBuffers() throws Exception { + void testReadBuffers() throws Exception { CountingAvailabilityListener listener = new CountingAvailabilityListener(); SortMergeSubpartitionReader subpartitionReader = createSortMergeSubpartitionReader(listener); - assertThat(listener.numNotifications).isEqualTo(0); - assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(0); + assertThat(listener.numNotifications).isZero(); + assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isZero(); Queue<MemorySegment> segments = createsMemorySegments(2); subpartitionReader.readBuffers(segments, FreeingBufferRecycler.INSTANCE); assertThat(listener.numNotifications).isEqualTo(1); assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(1); - assertThat(segments.size()).isEqualTo(0); + assertThat(segments).isEmpty(); segments = createsMemorySegments(2); subpartitionReader.readBuffers(segments, FreeingBufferRecycler.INSTANCE); assertThat(listener.numNotifications).isEqualTo(1); assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(2); - assertThat(segments.size()).isEqualTo(0); + assertThat(segments).isEmpty(); while (subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers() > 0) { checkNotNull(subpartitionReader.getNextBuffer()).buffer().recycleBuffer(); @@ -119,13 +112,13 @@ public class SortMergeSubpartitionReaderTest extends TestLogger { subpartitionReader.readBuffers(segments, FreeingBufferRecycler.INSTANCE); assertThat(listener.numNotifications).isEqualTo(2); - assertThat(numBuffersPerSubpartition - 2) - .isEqualTo(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()); + assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()) + .isEqualTo(numBuffersPerSubpartition - 2); assertThat(segments.size()).isEqualTo(1); } @Test - public void testPollBuffers() throws Exception { + void testPollBuffers() throws Exception { SortMergeSubpartitionReader subpartitionReader = createSortMergeSubpartitionReader(new CountingAvailabilityListener()); @@ -155,7 +148,7 @@ public class SortMergeSubpartitionReaderTest extends TestLogger { } @Test - public void testFail() throws Exception { + void testFail() throws Exception { int numSegments = 5; Queue<MemorySegment> segments = createsMemorySegments(numSegments); @@ -169,20 +162,20 @@ public class SortMergeSubpartitionReaderTest extends TestLogger { assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(4); subpartitionReader.fail(new RuntimeException("Test exception.")); - assertThat(subpartitionReader.getReleaseFuture().isDone()).isTrue(); - assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(0); + assertThat(subpartitionReader.getReleaseFuture()).isDone(); + assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isZero(); assertThat(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue(); assertThat(subpartitionReader.isReleased()).isTrue(); assertThat(listener.numNotifications).isEqualTo(2); assertThat(subpartitionReader.getFailureCause()).isNotNull(); } finally { - assertThat(numSegments).isEqualTo(segments.size()); + assertThat(segments).hasSize(numSegments); } } @Test - public void testReleaseAllResources() throws Exception { + void testReleaseAllResources() throws Exception { int numSegments = 5; Queue<MemorySegment> segments = createsMemorySegments(numSegments); @@ -196,20 +189,20 @@ public class SortMergeSubpartitionReaderTest extends TestLogger { assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(4); subpartitionReader.releaseAllResources(); - assertThat(subpartitionReader.getReleaseFuture().isDone()).isTrue(); - assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(0); + assertThat(subpartitionReader.getReleaseFuture()).isDone(); + assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isZero(); assertThat(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue(); assertThat(subpartitionReader.isReleased()).isTrue(); assertThat(listener.numNotifications).isEqualTo(1); assertThat(subpartitionReader.getFailureCause()).isNull(); } finally { - assertThat(numSegments).isEqualTo(segments.size()); + assertThat(segments).hasSize(numSegments); } } - @Test(expected = IllegalStateException.class) - public void testReadBuffersAfterReleased() throws Exception { + @Test + void testReadBuffersAfterReleased() throws Exception { int numSegments = 5; Queue<MemorySegment> segments = createsMemorySegments(numSegments); @@ -219,14 +212,15 @@ public class SortMergeSubpartitionReaderTest extends TestLogger { subpartitionReader.readBuffers(segments, segments::add); subpartitionReader.releaseAllResources(); - subpartitionReader.readBuffers(segments, segments::add); + assertThatThrownBy(() -> subpartitionReader.readBuffers(segments, segments::add)) + .isInstanceOf(IllegalStateException.class); } finally { - assertThat(numSegments).isEqualTo(segments.size()); + assertThat(segments).hasSize(numSegments); } } @Test - public void testPollBuffersAfterReleased() throws Exception { + void testPollBuffersAfterReleased() throws Exception { SortMergeSubpartitionReader subpartitionReader = createSortMergeSubpartitionReader(new CountingAvailabilityListener());
