This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 06b2b1d81f851a77f7b39af5bbceaed3248126f0 Author: Jing Ge <[email protected]> AuthorDate: Tue Mar 15 17:35:00 2022 +0100 [FLINK-26420][test] migrate FileWriterTest to AssertJ. (cherry picked from commit c34160ba643cfe562ff37241da007b1d28570c70) --- .../connector/file/sink/writer/FileWriterTest.java | 156 +++++++++++---------- 1 file changed, 85 insertions(+), 71 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java index 8966ca7..c0a5ac6 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java @@ -42,13 +42,10 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies. import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; import org.apache.flink.util.ExceptionUtils; -import org.junit.Assert; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; -import java.io.File; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; @@ -62,27 +59,21 @@ import java.util.PriorityQueue; import java.util.Queue; import java.util.concurrent.ScheduledFuture; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link FileWriter}. */ public class FileWriterTest { - @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); - private MetricListener metricListener; - @Before + @BeforeEach public void setUp() { metricListener = new MetricListener(); } @Test - public void testPreCommit() throws Exception { - File outDir = TEMP_FOLDER.newFolder(); - Path path = new Path(outDir.toURI()); + public void testPreCommit(@TempDir java.nio.file.Path tempDir) throws Exception { + Path path = new Path(tempDir.toUri()); FileWriter<String> fileWriter = createWriter( @@ -95,13 +86,13 @@ public class FileWriterTest { fileWriter.write("test3", new ContextImpl()); Collection<FileSinkCommittable> committables = fileWriter.prepareCommit(); - assertEquals(3, committables.size()); + + assertThat(committables.size()).isEqualTo(3); } @Test - public void testSnapshotAndRestore() throws Exception { - File outDir = TEMP_FOLDER.newFolder(); - Path path = new Path(outDir.toURI()); + public void testSnapshotAndRestore(@TempDir java.nio.file.Path tempDir) throws Exception { + Path path = new Path(tempDir.toUri()); FileWriter<String> fileWriter = createWriter( @@ -112,11 +103,12 @@ public class FileWriterTest { fileWriter.write("test1", new ContextImpl()); fileWriter.write("test2", new ContextImpl()); fileWriter.write("test3", new ContextImpl()); - assertEquals(3, fileWriter.getActiveBuckets().size()); + assertThat(fileWriter.getActiveBuckets().size()).isEqualTo(3); fileWriter.prepareCommit(); List<FileWriterBucketState> states = fileWriter.snapshotState(1L); - assertEquals(3, states.size()); + + assertThat(states.size()).isEqualTo(3); fileWriter = restoreWriter( @@ -124,18 +116,20 @@ public class FileWriterTest { path, OnCheckpointRollingPolicy.build(), new OutputFileConfig("part-", "")); - assertEquals( - fileWriter.getActiveBuckets().keySet(), - new HashSet<>(Arrays.asList("test1", "test2", "test3"))); + + assertThat(fileWriter.getActiveBuckets().keySet()) + .isEqualTo(new HashSet<>(Arrays.asList("test1", "test2", "test3"))); + for (FileWriterBucket<String> bucket : fileWriter.getActiveBuckets().values()) { - assertNotNull("The in-progress file should be recovered", bucket.getInProgressPart()); + assertThat(bucket.getInProgressPart()) + .as("The in-progress file should be recovered") + .isNotNull(); } } @Test - public void testMergingForRescaling() throws Exception { - File outDir = TEMP_FOLDER.newFolder(); - Path path = new Path(outDir.toURI()); + public void testMergingForRescaling(@TempDir java.nio.file.Path tempDir) throws Exception { + Path path = new Path(tempDir.toUri()); FileWriter<String> firstFileWriter = createWriter( @@ -172,27 +166,34 @@ public class FileWriterTest { path, DefaultRollingPolicy.builder().build(), new OutputFileConfig("part-", "")); - assertEquals(3, restoredWriter.getActiveBuckets().size()); + + assertThat(restoredWriter.getActiveBuckets().size()).isEqualTo(3); // Merged buckets for (String bucketId : Arrays.asList("test1", "test2")) { FileWriterBucket<String> bucket = restoredWriter.getActiveBuckets().get(bucketId); - assertNotNull("The in-progress file should be recovered", bucket.getInProgressPart()); - assertEquals(1, bucket.getPendingFiles().size()); + + assertThat(bucket.getInProgressPart()) + .as("The in-progress file should be recovered") + .isNotNull(); + assertThat(bucket.getPendingFiles().size()).isEqualTo(1); } // Not merged buckets for (String bucketId : Collections.singletonList("test3")) { FileWriterBucket<String> bucket = restoredWriter.getActiveBuckets().get(bucketId); - assertNotNull("The in-progress file should be recovered", bucket.getInProgressPart()); - assertEquals(0, bucket.getPendingFiles().size()); + + assertThat(bucket.getInProgressPart()) + .as("The in-progress file should be recovered") + .isNotNull(); + assertThat(bucket.getPendingFiles().size()).isEqualTo(0); } } @Test - public void testBucketIsRemovedWhenNotActive() throws Exception { - File outDir = TEMP_FOLDER.newFolder(); - Path path = new Path(outDir.toURI()); + public void testBucketIsRemovedWhenNotActive(@TempDir java.nio.file.Path tempDir) + throws Exception { + Path path = new Path(tempDir.toUri()); FileWriter<String> fileWriter = createWriter( @@ -205,13 +206,12 @@ public class FileWriterTest { // No more records and another call to prepareCommit will makes it inactive fileWriter.prepareCommit(); - assertTrue(fileWriter.getActiveBuckets().isEmpty()); + assertThat(fileWriter.getActiveBuckets().isEmpty()).isTrue(); } @Test - public void testOnProcessingTime() throws Exception { - File outDir = TEMP_FOLDER.newFolder(); - Path path = new Path(outDir.toURI()); + public void testOnProcessingTime(@TempDir java.nio.file.Path tempDir) throws Exception { + Path path = new Path(tempDir.toUri()); // Create the processing timer service starts from 10. ManuallyTriggeredProcessingTimeService processingTimeService = @@ -237,15 +237,18 @@ public class FileWriterTest { processingTimeService.advanceTo(20); FileWriterBucket<String> test1Bucket = fileWriter.getActiveBuckets().get("test1"); - assertNull( - "The in-progress part of test1 should be rolled", test1Bucket.getInProgressPart()); - assertEquals(1, test1Bucket.getPendingFiles().size()); + + assertThat(test1Bucket.getInProgressPart()) + .as("The in-progress part of test1 should be rolled") + .isNull(); + assertThat(test1Bucket.getPendingFiles().size()).isEqualTo(1); FileWriterBucket<String> test2Bucket = fileWriter.getActiveBuckets().get("test2"); - assertNotNull( - "The in-progress part of test2 should not be rolled", - test2Bucket.getInProgressPart()); - assertEquals(0, test2Bucket.getPendingFiles().size()); + + assertThat(test2Bucket.getInProgressPart()) + .as("The in-progress part of test2 should not be rolled") + .isNotNull(); + assertThat(test2Bucket.getPendingFiles().size()).isEqualTo(0); // Close, pre-commit & clear all the pending records. processingTimeService.advanceTo(30); @@ -258,36 +261,43 @@ public class FileWriterTest { processingTimeService.advanceTo(40); test1Bucket = fileWriter.getActiveBuckets().get("test1"); - assertNull( - "The in-progress part of test1 should be rolled", test1Bucket.getInProgressPart()); - assertEquals(1, test1Bucket.getPendingFiles().size()); + + assertThat(test1Bucket.getInProgressPart()) + .as("The in-progress part of test1 should be rolled") + .isNull(); + assertThat(test1Bucket.getPendingFiles().size()).isEqualTo(1); test2Bucket = fileWriter.getActiveBuckets().get("test2"); - assertNotNull( - "The in-progress part of test2 should not be rolled", - test2Bucket.getInProgressPart()); - assertEquals(0, test2Bucket.getPendingFiles().size()); + + assertThat(test2Bucket.getInProgressPart()) + .as("The in-progress part of test2 should not be rolled") + .isNotNull(); + assertThat(test2Bucket.getPendingFiles().size()).isEqualTo(0); } @Test - public void testContextPassingNormalExecution() throws Exception { - testCorrectTimestampPassingInContext(1L, 2L, 3L); + public void testContextPassingNormalExecution(@TempDir java.nio.file.Path tempDir) + throws Exception { + testCorrectTimestampPassingInContext(1L, 2L, 3L, tempDir); } @Test - public void testContextPassingNullTimestamp() throws Exception { - testCorrectTimestampPassingInContext(null, 4L, 5L); + public void testContextPassingNullTimestamp(@TempDir java.nio.file.Path tempDir) + throws Exception { + testCorrectTimestampPassingInContext(null, 4L, 5L, tempDir); } @Test - public void testNumberRecordsOutCounter() throws IOException, InterruptedException { + public void testNumberRecordsOutCounter(@TempDir java.nio.file.Path tempDir) + throws IOException, InterruptedException { + Path path = new Path(tempDir.toUri()); + final OperatorIOMetricGroup operatorIOMetricGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup(); final SinkWriterMetricGroup sinkWriterMetricGroup = InternalSinkWriterMetricGroup.mock( metricListener.getMetricGroup(), operatorIOMetricGroup); - File outDir = TEMP_FOLDER.newFolder(); - Path path = new Path(outDir.toURI()); + Counter recordsCounter = sinkWriterMetricGroup.getNumRecordsSendCounter(); SinkWriter.Context context = new ContextImpl(); FileWriter<String> fileWriter = @@ -297,18 +307,22 @@ public class FileWriterTest { new OutputFileConfig("part-", ""), sinkWriterMetricGroup); - assertEquals(0, recordsCounter.getCount()); + assertThat(recordsCounter.getCount()).isEqualTo(0); + fileWriter.write("1", context); - assertEquals(1, recordsCounter.getCount()); + + assertThat(recordsCounter.getCount()).isEqualTo(1); + fileWriter.write("2", context); fileWriter.write("3", context); - assertEquals(3, recordsCounter.getCount()); + + assertThat(recordsCounter.getCount()).isEqualTo(3); } private void testCorrectTimestampPassingInContext( - Long timestamp, long watermark, long processingTime) throws Exception { - final File outDir = TEMP_FOLDER.newFolder(); - final Path path = new Path(outDir.toURI()); + Long timestamp, long watermark, long processingTime, java.nio.file.Path tempDir) + throws Exception { + Path path = new Path(tempDir.toUri()); // Create the processing timer service starts from 10. ManuallyTriggeredProcessingTimeService processingTimeService = @@ -416,9 +430,9 @@ public class FileWriterTest { long watermark = context.currentWatermark(); long processingTime = context.currentProcessingTime(); - Assert.assertEquals(expectedTimestamp, elementTimestamp); - Assert.assertEquals(expectedProcessingTime, processingTime); - Assert.assertEquals(expectedWatermark, watermark); + assertThat(elementTimestamp).isEqualTo(expectedTimestamp); + assertThat(processingTime).isEqualTo(expectedProcessingTime); + assertThat(watermark).isEqualTo(expectedWatermark); return element; }
