This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d8d69005033 [FLINK-27885][tests][JUnit5 migration] Module: flink-csv
d8d69005033 is described below

commit d8d69005033063566d7e160900f6370df18ebb1e
Author: Ryan Skraba <[email protected]>
AuthorDate: Fri Aug 30 16:59:23 2024 +0200

    [FLINK-27885][tests][JUnit5 migration] Module: flink-csv
---
 .../architecture/TestCodeArchitectureTest.java     |  2 +-
 .../flink/formats/csv/CsvFormatFactoryTest.java    | 96 +++++++++++-----------
 .../CsvFormatFilesystemStatisticsReportTest.java   |  6 +-
 .../formats/csv/CsvFormatStatisticsReportTest.java | 19 +++--
 .../flink/formats/csv/CsvReaderFormatTest.java     |  2 +-
 .../formats/csv/CsvRowDataSerDeSchemaTest.java     | 57 +++++++------
 .../csv/CsvRowDeSerializationSchemaTest.java       | 48 +++++++----
 .../flink/formats/csv/DataStreamCsvITCase.java     | 13 ++-
 .../formats/csv/RowCsvInputFormatSplitTest.java    | 28 +++----
 .../flink/formats/csv/RowCsvInputFormatTest.java   | 32 ++++----
 .../flink/formats/csv/TableCsvFormatITCase.java    | 77 ++++++++---------
 .../flink/orc/OrcFormatStatisticsReportTest.java   |  4 +-
 12 files changed, 205 insertions(+), 179 deletions(-)

diff --git 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
index 081d6e0fd9a..b3dc4d6d97f 100644
--- 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
+++ 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
@@ -33,7 +33,7 @@ import com.tngtech.archunit.junit.ArchTests;
             ImportOptions.ExcludeScalaImportOption.class,
             ImportOptions.ExcludeShadedImportOption.class
         })
-public class TestCodeArchitectureTest {
+class TestCodeArchitectureTest {
 
     @ArchTest
     public static final ArchTests COMMON_TESTS = 
ArchTests.in(TestCodeArchitectureTestBase.class);
diff --git 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
index 21974e14864..b31148781e8 100644
--- 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
+++ 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
@@ -33,11 +33,8 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.TestDynamicTableFactory;
 import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -49,7 +46,6 @@ import java.util.Map;
 import java.util.function.Consumer;
 
 import static 
org.apache.flink.connector.testutils.formats.SchemaTestUtils.open;
-import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
 import static org.apache.flink.table.data.DecimalData.fromBigDecimal;
 import static org.apache.flink.table.data.StringData.fromString;
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.PHYSICAL_DATA_TYPE;
@@ -58,13 +54,13 @@ import static 
org.apache.flink.table.factories.utils.FactoryMocks.SCHEMA;
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link CsvFormatFactory}. */
-public class CsvFormatFactoryTest extends TestLogger {
-    @Rule public ExpectedException thrown = ExpectedException.none();
+class CsvFormatFactoryTest {
 
     @Test
-    public void testSeDeSchema() {
+    void testSeDeSchema() {
         final CsvRowDataDeserializationSchema expectedDeser =
                 new CsvRowDataDeserializationSchema.Builder(
                                 PHYSICAL_TYPE, 
InternalTypeInfo.of(PHYSICAL_TYPE))
@@ -94,7 +90,7 @@ public class CsvFormatFactoryTest extends TestLogger {
     }
 
     @Test
-    public void testDisableQuoteCharacter() {
+    void testDisableQuoteCharacter() {
         final Map<String, String> options =
                 getModifiedOptions(
                         opts -> {
@@ -131,35 +127,41 @@ public class CsvFormatFactoryTest extends TestLogger {
     }
 
     @Test
-    public void testDisableQuoteCharacterException() {
-        thrown.expect(ValidationException.class);
-        thrown.expect(
-                containsCause(
+    void testDisableQuoteCharacterException() {
+        assertThatThrownBy(
+                        () -> {
+                            final Map<String, String> options =
+                                    getModifiedOptions(
+                                            opts ->
+                                                    opts.put(
+                                                            
"csv.disable-quote-character", "true"));
+
+                            createTableSink(SCHEMA, options);
+                        })
+                .isInstanceOf(ValidationException.class)
+                .hasRootCause(
                         new ValidationException(
-                                "Format cannot define a quote character and 
disabled quote character at the same time.")));
-
-        final Map<String, String> options =
-                getModifiedOptions(opts -> 
opts.put("csv.disable-quote-character", "true"));
-
-        createTableSink(SCHEMA, options);
+                                "Format cannot define a quote character and 
disabled quote character at the same time."));
     }
 
     @Test
-    public void testInvalidCharacterOption() {
-        thrown.expect(ValidationException.class);
-        thrown.expect(
-                containsCause(
+    void testInvalidCharacterOption() {
+        assertThatThrownBy(
+                        () -> {
+                            final Map<String, String> options =
+                                    getModifiedOptions(
+                                            opts -> 
opts.put("csv.quote-character", "abc"));
+
+                            createTableSink(SCHEMA, options);
+                        })
+                .isInstanceOf(ValidationException.class)
+                .hasRootCause(
                         new ValidationException(
-                                "Option 'csv.quote-character' must be a string 
with single character, but was: abc")));
-
-        final Map<String, String> options =
-                getModifiedOptions(opts -> opts.put("csv.quote-character", 
"abc"));
-
-        createTableSink(SCHEMA, options);
+                                "Option 'csv.quote-character' must be a string 
with single character, but was: abc"));
     }
 
     @Test
-    public void testEscapedFieldDelimiter() throws IOException {
+    void testEscapedFieldDelimiter() throws IOException {
         final CsvRowDataSerializationSchema expectedSer =
                 new CsvRowDataSerializationSchema.Builder(PHYSICAL_TYPE)
                         .setFieldDelimiter('\t')
@@ -206,7 +208,7 @@ public class CsvFormatFactoryTest extends TestLogger {
     }
 
     @Test
-    public void testDeserializeWithEscapedFieldDelimiter() throws IOException {
+    void testDeserializeWithEscapedFieldDelimiter() throws IOException {
         // test deserialization schema
         final Map<String, String> options =
                 getModifiedOptions(opts -> opts.put("csv.field-delimiter", 
"\t"));
@@ -224,22 +226,24 @@ public class CsvFormatFactoryTest extends TestLogger {
     }
 
     @Test
-    public void testInvalidIgnoreParseError() {
-        thrown.expect(ValidationException.class);
-        thrown.expect(
-                containsCause(
+    void testInvalidIgnoreParseError() {
+        assertThatThrownBy(
+                        () -> {
+                            final Map<String, String> options =
+                                    getModifiedOptions(
+                                            opts -> 
opts.put("csv.ignore-parse-errors", "abc"));
+
+                            createTableSink(SCHEMA, options);
+                        })
+                .isInstanceOf(ValidationException.class)
+                .hasRootCause(
                         new IllegalArgumentException(
                                 "Unrecognized option for boolean: abc. "
-                                        + "Expected either true or false(case 
insensitive)")));
-
-        final Map<String, String> options =
-                getModifiedOptions(opts -> opts.put("csv.ignore-parse-errors", 
"abc"));
-
-        createTableSink(SCHEMA, options);
+                                        + "Expected either true or false(case 
insensitive)"));
     }
 
     @Test
-    public void testSerializationWithWriteBigDecimalInScientificNotation() {
+    void testSerializationWithWriteBigDecimalInScientificNotation() {
         final Map<String, String> options =
                 getModifiedOptions(
                         opts -> 
opts.put("csv.write-bigdecimal-in-scientific-notation", "true"));
@@ -266,7 +270,7 @@ public class CsvFormatFactoryTest extends TestLogger {
     }
 
     @Test
-    public void testSerializationWithNotWriteBigDecimalInScientificNotation() {
+    void testSerializationWithNotWriteBigDecimalInScientificNotation() {
         final Map<String, String> options =
                 getModifiedOptions(
                         opts -> 
opts.put("csv.write-bigdecimal-in-scientific-notation", "false"));
@@ -293,7 +297,7 @@ public class CsvFormatFactoryTest extends TestLogger {
     }
 
     @Test
-    public void testProjectionPushdown() throws IOException {
+    void testProjectionPushdown() throws IOException {
         final Map<String, String> options = getAllOptions();
 
         final Projection projection =
@@ -311,7 +315,7 @@ public class CsvFormatFactoryTest extends TestLogger {
     }
 
     @Test
-    public void testProjectionPushdownNoOpProjection() throws IOException {
+    void testProjectionPushdownNoOpProjection() throws IOException {
         final Map<String, String> options = getAllOptions();
 
         List<String> fields = Arrays.asList("a", "b", "c");
@@ -329,7 +333,7 @@ public class CsvFormatFactoryTest extends TestLogger {
     }
 
     @Test
-    public void testProjectionPushdownEmptyProjection() throws IOException {
+    void testProjectionPushdownEmptyProjection() throws IOException {
         final Map<String, String> options = getAllOptions();
 
         final int[][] projectionMatrix = new int[][] {};
diff --git 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFilesystemStatisticsReportTest.java
 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFilesystemStatisticsReportTest.java
index 72ecbeeabbc..5a6f3c00551 100644
--- 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFilesystemStatisticsReportTest.java
+++ 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFilesystemStatisticsReportTest.java
@@ -34,16 +34,16 @@ import static org.assertj.core.api.Assertions.assertThat;
 /**
  * Test for statistics functionality in {@link CsvFormatFactory} in the case 
of file system source.
  */
-public class CsvFormatFilesystemStatisticsReportTest extends 
CsvFormatStatisticsReportTest {
+class CsvFormatFilesystemStatisticsReportTest extends 
CsvFormatStatisticsReportTest {
 
+    @Override
     @BeforeEach
     public void setup(@TempDir File file) throws Exception {
         super.setup(file);
     }
 
     @Test
-    public void testCsvFileSystemStatisticsReport()
-            throws ExecutionException, InterruptedException {
+    void testCsvFileSystemStatisticsReport() throws ExecutionException, 
InterruptedException {
         // insert data and get statistics by get plan.
         DataType dataType = 
tEnv.from("sourceTable").getResolvedSchema().toPhysicalRowDataType();
         tEnv.fromValues(dataType, 
getData()).executeInsert("sourceTable").await();
diff --git 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatStatisticsReportTest.java
 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatStatisticsReportTest.java
index a96f938dc7f..ec0332e5caf 100644
--- 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatStatisticsReportTest.java
+++ 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatStatisticsReportTest.java
@@ -41,10 +41,11 @@ import java.util.Map;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for statistics functionality in {@link CsvFormatFactory}. */
-public class CsvFormatStatisticsReportTest extends StatisticsReportTestBase {
+class CsvFormatStatisticsReportTest extends StatisticsReportTestBase {
 
     private static CsvFileFormatFactory.CsvBulkDecodingFormat 
csvBulkDecodingFormat;
 
+    @Override
     @BeforeEach
     public void setup(@TempDir File file) throws Exception {
         super.setup(file);
@@ -61,13 +62,13 @@ public class CsvFormatStatisticsReportTest extends 
StatisticsReportTestBase {
     }
 
     @Test
-    public void testCsvFormatStatsReportWithSingleFile() throws Exception {
+    void testCsvFormatStatsReportWithSingleFile() throws Exception {
         // insert data and get statistics.
         DataType dataType = 
tEnv.from("sourceTable").getResolvedSchema().toPhysicalRowDataType();
         tEnv.fromValues(dataType, 
getData()).executeInsert("sourceTable").await();
-        assertThat(folder.listFiles()).isNotNull().hasSize(1);
+        assertThat(folder.listFiles()).hasSize(1);
         File[] files = folder.listFiles();
-        assert files != null;
+        assertThat(files).isNotNull();
         TableStats tableStats =
                 csvBulkDecodingFormat.reportStatistics(
                         Collections.singletonList(new 
Path(files[0].toURI().toString())), null);
@@ -75,15 +76,15 @@ public class CsvFormatStatisticsReportTest extends 
StatisticsReportTestBase {
     }
 
     @Test
-    public void testCsvFormatStatsReportWithMultiFile() throws Exception {
+    void testCsvFormatStatsReportWithMultiFile() throws Exception {
         // insert data and get statistics.
         DataType dataType = 
tEnv.from("sourceTable").getResolvedSchema().toPhysicalRowDataType();
         tEnv.fromValues(dataType, 
getData()).executeInsert("sourceTable").await();
         tEnv.fromValues(dataType, 
getData()).executeInsert("sourceTable").await();
-        assertThat(folder.listFiles()).isNotNull().hasSize(2);
+        assertThat(folder.listFiles()).hasSize(2);
         File[] files = folder.listFiles();
         List<Path> paths = new ArrayList<>();
-        assert files != null;
+        assertThat(files).isNotNull();
         paths.add(new Path(files[0].toURI().toString()));
         paths.add(new Path(files[1].toURI().toString()));
         TableStats tableStats = csvBulkDecodingFormat.reportStatistics(paths, 
null);
@@ -91,7 +92,7 @@ public class CsvFormatStatisticsReportTest extends 
StatisticsReportTestBase {
     }
 
     @Test
-    public void testRowSizeBiggerThanTotalSampleLineCnt() throws IOException {
+    void testRowSizeBiggerThanTotalSampleLineCnt() throws IOException {
         StringBuilder builder = new StringBuilder();
         int lineCnt = 1000;
         for (int i = 0; i < lineCnt; i++) {
@@ -104,7 +105,7 @@ public class CsvFormatStatisticsReportTest extends 
StatisticsReportTestBase {
     }
 
     @Test
-    public void testCsvFormatStatsReportWithEmptyFile() {
+    void testCsvFormatStatsReportWithEmptyFile() {
         TableStats tableStats = csvBulkDecodingFormat.reportStatistics(null, 
null);
         assertThat(tableStats).isEqualTo(TableStats.UNKNOWN);
     }
diff --git 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvReaderFormatTest.java
 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvReaderFormatTest.java
index f6b04748f98..2531b6dcce1 100644
--- 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvReaderFormatTest.java
+++ 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvReaderFormatTest.java
@@ -89,7 +89,7 @@ class CsvReaderFormatTest {
     }
 
     @Test
-    void testCreatedMapperPassedToSchemaFunction() throws IOException, 
ClassNotFoundException {
+    void testCreatedMapperPassedToSchemaFunction() throws IOException {
         final CsvMapper csvMapper = new CsvMapper();
 
         AtomicReference<CsvMapper> passedMapper = new AtomicReference<>();
diff --git 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java
 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java
index 6427c4b0d7a..402168a3d8d 100644
--- 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java
+++ 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.InstantiationUtil;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -66,10 +66,10 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link CsvRowDataDeserializationSchema} and {@link 
CsvRowDataSerializationSchema}. */
-public class CsvRowDataSerDeSchemaTest {
+class CsvRowDataSerDeSchemaTest {
 
     @Test
-    public void testSerializeDeserialize() throws Exception {
+    void testSerializeDeserialize() throws Exception {
         testNullableField(BIGINT(), "null", null);
         testNullableField(STRING(), "null", null);
         testNullableField(STRING(), "\"This is a test.\"", "This is a test.");
@@ -117,7 +117,7 @@ public class CsvRowDataSerDeSchemaTest {
     }
 
     @Test
-    public void testSerializeDeserializeCustomizedProperties() throws 
Exception {
+    void testSerializeDeserializeCustomizedProperties() throws Exception {
 
         Consumer<CsvRowDataSerializationSchema.Builder> serConfig =
                 (serSchemaBuilder) ->
@@ -204,51 +204,51 @@ public class CsvRowDataSerDeSchemaTest {
     }
 
     @Test
-    public void testDeserializeParseError() {
+    void testDeserializeParseError() {
         assertThatThrownBy(() -> testDeserialization(false, false, 
"Test,null,Test"))
                 .isInstanceOf(IOException.class);
     }
 
     @Test
-    public void testDeserializeUnsupportedNull() throws Exception {
+    void testDeserializeUnsupportedNull() throws Exception {
         // unsupported null for integer
         assertThat(testDeserialization(true, false, "Test,null,Test"))
                 .isEqualTo(Row.of("Test", null, "Test"));
     }
 
     @Test
-    public void testDeserializeNullRow() throws Exception {
+    void testDeserializeNullRow() throws Exception {
         // return null for null input
         assertThat(testDeserialization(false, false, null)).isNull();
     }
 
     @Test
-    public void testDeserializeIncompleteRow() throws Exception {
+    void testDeserializeIncompleteRow() throws Exception {
         // last two columns are missing
         assertThat(testDeserialization(true, false, 
"Test")).isEqualTo(Row.of("Test", null, null));
     }
 
     @Test
-    public void testDeserializeMoreColumnsThanExpected() throws Exception {
+    void testDeserializeMoreColumnsThanExpected() throws Exception {
         // one additional string column
         assertThat(testDeserialization(true, false, 
"Test,12,Test,Test")).isNull();
     }
 
     @Test
-    public void testDeserializeIgnoreComment() throws Exception {
+    void testDeserializeIgnoreComment() throws Exception {
         // # is part of the string
         assertThat(testDeserialization(false, false, "#Test,12,Test"))
                 .isEqualTo(Row.of("#Test", 12, "Test"));
     }
 
     @Test
-    public void testDeserializeAllowComment() throws Exception {
+    void testDeserializeAllowComment() throws Exception {
         // entire row is ignored
         assertThat(testDeserialization(true, true, "#Test,12,Test")).isNull();
     }
 
     @Test
-    public void testSerializationProperties() throws Exception {
+    void testSerializationProperties() throws Exception {
         DataType dataType = ROW(FIELD("f0", STRING()), FIELD("f1", INT()), 
FIELD("f2", STRING()));
         RowType rowType = (RowType) dataType.getLogicalType();
         CsvRowDataSerializationSchema.Builder serSchemaBuilder =
@@ -268,19 +268,30 @@ public class CsvRowDataSerDeSchemaTest {
                 .isEqualTo("Test,12,2019-12-26 12:12:12".getBytes());
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testInvalidNesting() throws Exception {
-        testNullableField(
-                ROW(FIELD("f0", ROW(FIELD("f0", STRING())))), "FAIL", 
Row.of(Row.of("FAIL")));
+    @Test
+    void testInvalidNesting() {
+        assertThatThrownBy(
+                        () ->
+                                testNullableField(
+                                        ROW(FIELD("f0", ROW(FIELD("f0", 
STRING())))),
+                                        "FAIL",
+                                        Row.of(Row.of("FAIL"))))
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testInvalidType() throws Exception {
-        testNullableField(RAW(Void.class, VoidSerializer.INSTANCE), "FAIL", 
new java.util.Date());
+    @Test
+    void testInvalidType() {
+        assertThatThrownBy(
+                        () ->
+                                testNullableField(
+                                        RAW(Void.class, 
VoidSerializer.INSTANCE),
+                                        "FAIL",
+                                        new java.util.Date()))
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
-    public void testSerializeDeserializeNestedTypes() throws Exception {
+    void testSerializeDeserializeNestedTypes() throws Exception {
         DataType subDataType0 =
                 ROW(
                         FIELD("f0c0", STRING()),
@@ -334,7 +345,7 @@ public class CsvRowDataSerDeSchemaTest {
     }
 
     @Test
-    public void testDeserializationWithDisableQuoteCharacter() throws 
Exception {
+    void testDeserializationWithDisableQuoteCharacter() throws Exception {
         Consumer<CsvRowDataDeserializationSchema.Builder> deserConfig =
                 (deserSchemaBuilder) ->
                         
deserSchemaBuilder.disableQuoteCharacter().setFieldDelimiter(',');
@@ -343,7 +354,7 @@ public class CsvRowDataSerDeSchemaTest {
     }
 
     @Test
-    public void testSerializationWithTypesMismatch() {
+    void testSerializationWithTypesMismatch() {
         DataType dataType = ROW(FIELD("f0", STRING()), FIELD("f1", INT()), 
FIELD("f2", INT()));
         RowType rowType = (RowType) dataType.getLogicalType();
         CsvRowDataSerializationSchema.Builder serSchemaBuilder =
@@ -355,7 +366,7 @@ public class CsvRowDataSerDeSchemaTest {
     }
 
     @Test
-    public void testDeserializationWithTypesMismatch() {
+    void testDeserializationWithTypesMismatch() {
         DataType dataType = ROW(FIELD("f0", STRING()), FIELD("f1", INT()), 
FIELD("f2", INT()));
         RowType rowType = (RowType) dataType.getLogicalType();
         CsvRowDataDeserializationSchema.Builder deserSchemaBuilder =
diff --git 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java
 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java
index 3f7b04f5723..b26732b371e 100644
--- 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java
+++ 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.InstantiationUtil;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -40,7 +40,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link CsvRowSerializationSchema} and {@link 
CsvRowDeserializationSchema}. */
-public class CsvRowDeSerializationSchemaTest {
+class CsvRowDeSerializationSchemaTest {
 
     @Test
     @SuppressWarnings("unchecked")
@@ -97,7 +97,7 @@ public class CsvRowDeSerializationSchemaTest {
     }
 
     @Test
-    public void testSerializeDeserializeCustomizedProperties() throws 
Exception {
+    void testSerializeDeserializeCustomizedProperties() throws Exception {
 
         final Consumer<CsvRowSerializationSchema.Builder> serConfig =
                 (serSchemaBuilder) ->
@@ -157,45 +157,45 @@ public class CsvRowDeSerializationSchemaTest {
     }
 
     @Test
-    public void testDeserializeParseError() throws Exception {
+    void testDeserializeParseError() {
         assertThatThrownBy(() -> testDeserialization(false, false, 
"Test,null,Test"))
                 .isInstanceOf(IOException.class);
     }
 
     @Test
-    public void testDeserializeUnsupportedNull() throws Exception {
+    void testDeserializeUnsupportedNull() throws Exception {
         // unsupported null for integer
         assertThat(testDeserialization(true, false, "Test,null,Test"))
                 .isEqualTo(Row.of("Test", null, "Test"));
     }
 
     @Test
-    public void testDeserializeIncompleteRow() throws Exception {
+    void testDeserializeIncompleteRow() throws Exception {
         // last two columns are missing
         assertThat(testDeserialization(true, false, 
"Test")).isEqualTo(Row.of("Test", null, null));
     }
 
     @Test
-    public void testDeserializeMoreColumnsThanExpected() throws Exception {
+    void testDeserializeMoreColumnsThanExpected() throws Exception {
         // one additional string column
         assertThat(testDeserialization(true, false, 
"Test,12,Test,Test")).isNull();
     }
 
     @Test
-    public void testDeserializeIgnoreComment() throws Exception {
+    void testDeserializeIgnoreComment() throws Exception {
         // # is part of the string
         assertThat(testDeserialization(false, false, "#Test,12,Test"))
                 .isEqualTo(Row.of("#Test", 12, "Test"));
     }
 
     @Test
-    public void testDeserializeAllowComment() throws Exception {
+    void testDeserializeAllowComment() throws Exception {
         // entire row is ignored
         assertThat(testDeserialization(true, true, "#Test,12,Test")).isNull();
     }
 
     @Test
-    public void testSerializationProperties() throws Exception {
+    void testSerializationProperties() throws Exception {
         final TypeInformation<Row> rowInfo = Types.ROW(Types.STRING, 
Types.INT, Types.STRING);
         final CsvRowSerializationSchema.Builder serSchemaBuilder =
                 new 
CsvRowSerializationSchema.Builder(rowInfo).setLineDelimiter("\r");
@@ -215,7 +215,7 @@ public class CsvRowDeSerializationSchemaTest {
     }
 
     @Test
-    public void testEmptyLineDelimiter() throws Exception {
+    void testEmptyLineDelimiter() throws Exception {
         final TypeInformation<Row> rowInfo = Types.ROW(Types.STRING, 
Types.INT, Types.STRING);
         final CsvRowSerializationSchema.Builder serSchemaBuilder =
                 new 
CsvRowSerializationSchema.Builder(rowInfo).setLineDelimiter("");
@@ -224,18 +224,30 @@ public class CsvRowDeSerializationSchemaTest {
                 .isEqualTo("Test,12,Hello".getBytes());
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testInvalidNesting() throws Exception {
-        testNullableField(Types.ROW(Types.ROW(Types.STRING)), "FAIL", 
Row.of(Row.of("FAIL")));
+    @Test
+    void testInvalidNesting() {
+        assertThatThrownBy(
+                        () ->
+                                testNullableField(
+                                        Types.ROW(Types.ROW(Types.STRING)),
+                                        "FAIL",
+                                        Row.of(Row.of("FAIL"))))
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testInvalidType() throws Exception {
-        testNullableField(Types.GENERIC(java.util.Date.class), "FAIL", new 
java.util.Date());
+    @Test
+    void testInvalidType() {
+        assertThatThrownBy(
+                        () ->
+                                testNullableField(
+                                        Types.GENERIC(java.util.Date.class),
+                                        "FAIL",
+                                        new java.util.Date()))
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
-    public void testSerializeDeserializeNestedTypes() throws Exception {
+    void testSerializeDeserializeNestedTypes() throws Exception {
         final TypeInformation<Row> subDataType0 =
                 Types.ROW(
                         Types.STRING,
diff --git 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
index fc1364bb5e9..d6b50badb07 100644
--- 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
+++ 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
@@ -35,7 +35,6 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
 import org.apache.flink.streaming.api.operators.collect.ClientAndIterator;
 import org.apache.flink.test.junit5.MiniClusterExtension;
-import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.function.FunctionWithException;
 import org.apache.flink.util.jackson.JacksonMapperFactory;
 
@@ -45,7 +44,6 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.Csv
 
 import org.apache.commons.io.FileUtils;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.api.io.TempDir;
 
@@ -73,8 +71,7 @@ import java.util.stream.Collectors;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** MiniCluster-based integration tests CSV data format. */
-@ExtendWith({TestLoggerExtension.class})
-public class DataStreamCsvITCase {
+class DataStreamCsvITCase {
 
     private static final CsvMapper CSV_MAPPER = 
JacksonMapperFactory.createCsvMapper();
 
@@ -152,7 +149,7 @@ public class DataStreamCsvITCase {
     //  test cases
     // ------------------------------------------------------------------------
     @Test
-    public void testCsvReaderFormatFromPojo() throws Exception {
+    void testCsvReaderFormatFromPojo() throws Exception {
         writeFile(outDir, "data.csv", CSV_LINES);
 
         final CsvReaderFormat<CityPojo> csvFormat = 
CsvReaderFormat.forPojo(CityPojo.class);
@@ -162,7 +159,7 @@ public class DataStreamCsvITCase {
     }
 
     @Test
-    public void testCsvReaderFormatFromSchema() throws Exception {
+    void testCsvReaderFormatFromSchema() throws Exception {
         writeFile(outDir, "data.csv", CSV_LINES_PIPE_SEPARATED);
 
         final CsvReaderFormat<CityPojo> csvFormat =
@@ -179,7 +176,7 @@ public class DataStreamCsvITCase {
     }
 
     @Test
-    public void testCsvReaderFormatMalformed() throws Exception {
+    void testCsvReaderFormatMalformed() throws Exception {
         writeFile(outDir, "data.csv", CSV_LINES_MALFORMED);
 
         final CsvReaderFormat<CityPojo> csvFormat =
@@ -194,7 +191,7 @@ public class DataStreamCsvITCase {
     }
 
     @Test
-    public void testCustomBulkWriter() throws Exception {
+    void testCustomBulkWriter() throws Exception {
 
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(PARALLELISM);
diff --git 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/RowCsvInputFormatSplitTest.java
 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/RowCsvInputFormatSplitTest.java
index 455be848e5e..cb197e2f5a3 100644
--- 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/RowCsvInputFormatSplitTest.java
+++ 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/RowCsvInputFormatSplitTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.types.Row;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -37,10 +37,10 @@ import static 
org.apache.flink.formats.csv.RowCsvInputFormatTest.createTempFile;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test split logic for {@link RowCsvInputFormat}. */
-public class RowCsvInputFormatSplitTest {
+class RowCsvInputFormatSplitTest {
 
     @Test
-    public void readAll() throws Exception {
+    void readAll() throws Exception {
         test(
                 "11$\n1,222\n" + "22$2,333\n",
                 0,
@@ -50,17 +50,17 @@ public class RowCsvInputFormatSplitTest {
     }
 
     @Test
-    public void readStartOffset() throws Exception {
+    void readStartOffset() throws Exception {
         test("11$\n1,222\n" + "22$2,333\n", 1, -1, '$', 
singletonList(Row.of("222", "333")));
     }
 
     @Test
-    public void readStartOffsetWithSeparator() throws Exception {
+    void readStartOffsetWithSeparator() throws Exception {
         test("11$\n1,222\n" + "22$2,333\n", 3, -1, '$', 
singletonList(Row.of("222", "333")));
     }
 
     @Test
-    public void readLengthWithSeparator() throws Exception {
+    void readLengthWithSeparator() throws Exception {
         test(
                 "11$\n1,222\n" + "22$\n2,333\n",
                 0,
@@ -70,7 +70,7 @@ public class RowCsvInputFormatSplitTest {
     }
 
     @Test
-    public void readLengthWithMultiBytesEscapeChar() throws Exception {
+    void readLengthWithMultiBytesEscapeChar() throws Exception {
         test(
                 "11好\n1,222\n" + "22好\n2,333\n",
                 0,
@@ -80,7 +80,7 @@ public class RowCsvInputFormatSplitTest {
     }
 
     @Test
-    public void readLengthWithMultiBytesEscapeChar2() throws Exception {
+    void readLengthWithMultiBytesEscapeChar2() throws Exception {
         test(
                 "11好\n1,222\n" + "22好\n2,333\n",
                 0,
@@ -90,7 +90,7 @@ public class RowCsvInputFormatSplitTest {
     }
 
     @Test
-    public void readLengthWithMultiBytesEscapeChar3() throws Exception {
+    void readLengthWithMultiBytesEscapeChar3() throws Exception {
         test(
                 "11好\n1,222\n" + "22好\n2,333\n",
                 0,
@@ -100,22 +100,22 @@ public class RowCsvInputFormatSplitTest {
     }
 
     @Test
-    public void readStartOffsetAndLength() throws Exception {
+    void readStartOffsetAndLength() throws Exception {
         test("11好\n1,222\n" + "22好\n2,333\n", 3, 18, '好', 
singletonList(Row.of("22\n2", "333")));
     }
 
     @Test
-    public void readMultiLineSeparator() throws Exception {
+    void readMultiLineSeparator() throws Exception {
         test("111,222\r\n" + "222,333\r\n", 3, 18, '好', 
singletonList(Row.of("222", "333")));
     }
 
     @Test
-    public void readRLineSeparator() throws Exception {
+    void readRLineSeparator() throws Exception {
         test("111,222\r" + "222,333\r", 3, 18, '好', 
singletonList(Row.of("222", "333")));
     }
 
     @Test
-    public void testQuotationMark() throws Exception {
+    void testQuotationMark() throws Exception {
         test(
                 "\"111\",222\r" + "222,333\r",
                 0,
@@ -132,7 +132,7 @@ public class RowCsvInputFormatSplitTest {
     }
 
     @Test
-    public void testSurroundEscapedDelimiter() throws Exception {
+    void testSurroundEscapedDelimiter() throws Exception {
         test(
                 "$11$1,222\r" + "222,333\r",
                 0,
diff --git 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/RowCsvInputFormatTest.java
 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/RowCsvInputFormatTest.java
index 4942eb5a6fc..2056d56f168 100644
--- 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/RowCsvInputFormatTest.java
+++ 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/RowCsvInputFormatTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.Row;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -42,7 +42,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link RowCsvInputFormat}. */
-public class RowCsvInputFormatTest {
+class RowCsvInputFormatTest {
 
     static final Path PATH = new Path("an/ignored/file/");
 
@@ -51,7 +51,7 @@ public class RowCsvInputFormatTest {
     private static final String SECOND_PART = "That is the second part";
 
     @Test
-    public void ignoreInvalidLines() throws Exception {
+    void ignoreInvalidLines() throws Exception {
         String fileContent =
                 "#description of the data\n"
                         + "header1|header2|header3|\n"
@@ -163,7 +163,7 @@ public class RowCsvInputFormatTest {
     }
 
     @Test
-    public void ignorePrefixComments() throws Exception {
+    void ignorePrefixComments() throws Exception {
         String fileContent =
                 "#description of the data\n"
                         + "#successive commented line\n"
@@ -208,7 +208,7 @@ public class RowCsvInputFormatTest {
     }
 
     @Test
-    public void readStringFields() throws Exception {
+    void readStringFields() throws Exception {
         String fileContent = "abc|def|ghijk\nabc||hhg\n|||\n||";
 
         FileInputSplit split = createTempFile(fileContent);
@@ -259,7 +259,7 @@ public class RowCsvInputFormatTest {
     }
 
     @Test
-    public void readMixedQuotedStringFields() throws Exception {
+    void readMixedQuotedStringFields() throws Exception {
         String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||\n";
 
         FileInputSplit split = createTempFile(fileContent);
@@ -306,7 +306,7 @@ public class RowCsvInputFormatTest {
     }
 
     @Test
-    public void testTailingEmptyFields() throws Exception {
+    void testTailingEmptyFields() throws Exception {
         String fileContent =
                 "abc|def|ghijk\n" + "abc|def|\n" + "abc||\n" + "|||\n" + 
"||\n" + "abc|def\n";
 
@@ -365,7 +365,7 @@ public class RowCsvInputFormatTest {
     }
 
     @Test
-    public void testIntegerFields() throws Exception {
+    void testIntegerFields() throws Exception {
         String fileContent = "111|222|333|444|555\n666|777|888|999|000|\n";
 
         FileInputSplit split = createTempFile(fileContent);
@@ -410,7 +410,7 @@ public class RowCsvInputFormatTest {
     }
 
     @Test
-    public void testEmptyFields() throws Exception {
+    void testEmptyFields() throws Exception {
         String fileContent =
                 ",,,,,,,,\n"
                         + ",,,,,,,\n"
@@ -458,7 +458,7 @@ public class RowCsvInputFormatTest {
     }
 
     @Test
-    public void testDoubleFields() throws Exception {
+    void testDoubleFields() throws Exception {
         String fileContent = 
"11.1|22.2|33.3|44.4|55.5\n66.6|77.7|88.8|99.9|00.0|\n";
 
         FileInputSplit split = createTempFile(fileContent);
@@ -503,7 +503,7 @@ public class RowCsvInputFormatTest {
     }
 
     @Test
-    public void testReadSparseWithPositionSetter() throws Exception {
+    void testReadSparseWithPositionSetter() throws Exception {
         String fileContent =
                 "111|222|333|444|555|666|777|888|999|000|\n"
                         + "000|999|888|777|666|555|444|333|222|111|";
@@ -553,7 +553,7 @@ public class RowCsvInputFormatTest {
     }
 
     @Test
-    public void testWindowsLineEndRemoval() throws Exception {
+    void testWindowsLineEndRemoval() throws Exception {
 
         // check typical use case -- linux file is correct and it is set up to 
linux(\n)
         testRemovingTrailingCR("\n");
@@ -573,7 +573,7 @@ public class RowCsvInputFormatTest {
     }
 
     @Test
-    public void testQuotedStringParsingWithIncludeFields() throws Exception {
+    void testQuotedStringParsingWithIncludeFields() throws Exception {
         String fileContent =
                 "\"20:41:52-1-3-2015\"|\"Re: Taskmanager memory error in 
Eclipse\"|"
                         + "\"Blahblah 
<[email protected]>\"|\"blaaa\"|\"blubb\"";
@@ -613,7 +613,7 @@ public class RowCsvInputFormatTest {
     }
 
     @Test
-    public void testQuotedStringParsingWithEscapedQuotes() throws Exception {
+    void testQuotedStringParsingWithEscapedQuotes() throws Exception {
         String fileContent = "\"\\\"Hello\\\" World\"|\"We are\\\" young\"";
         File tempFile = File.createTempFile("CsvReaderQuotedString", "tmp");
         tempFile.deleteOnExit();
@@ -647,7 +647,7 @@ public class RowCsvInputFormatTest {
     }
 
     @Test
-    public void testSqlTimeFields() throws Exception {
+    void testSqlTimeFields() throws Exception {
         String fileContent =
                 "1990-10-14|02:42:25|1990-10-14 02:42:25.123|1990-1-4 2:2:5\n"
                         + "1990-10-14|02:42:25|1990-10-14 
02:42:25.123|1990-1-4 2:2:5.3\n";
@@ -691,7 +691,7 @@ public class RowCsvInputFormatTest {
     }
 
     @Test
-    public void testScanOrder() throws Exception {
+    void testScanOrder() throws Exception {
         String fileContent =
                 // first row
                 "111|222|333|444|555|666|777|888|999|000|\n"
diff --git 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/TableCsvFormatITCase.java
 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/TableCsvFormatITCase.java
index 628f44e6d4c..c1c1a720c84 100644
--- 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/TableCsvFormatITCase.java
+++ 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/TableCsvFormatITCase.java
@@ -28,20 +28,21 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.planner.runtime.utils.TestData;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.test.util.AbstractTestBaseJUnit4;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.types.Row;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
 
 import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -58,24 +59,23 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the CSV file format. */
-public class TableCsvFormatITCase extends AbstractTestBaseJUnit4 {
-
-    @Rule public ExpectedException exception = ExpectedException.none();
+@ExtendWith(MiniClusterExtension.class)
+class TableCsvFormatITCase {
 
     private TableEnvironment tableEnv;
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    void setup() {
         tableEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
     }
 
-    @After
-    public void after() {
+    @AfterEach
+    void after() {
         TestValuesTableFactory.clearAllData();
     }
 
     @Test
-    public void testProjectPushDown() throws Exception {
+    void testProjectPushDown(@TempDir Path sourcePath, @TempDir Path sinkPath) 
throws Exception {
         List<String> data = Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello 
world");
 
         Schema sourceSchema =
@@ -85,11 +85,11 @@ public class TableCsvFormatITCase extends 
AbstractTestBaseJUnit4 {
                         .column("c", STRING())
                         .build();
 
-        createSourceTable("MyTable", data, sourceSchema);
+        createSourceTable(sourcePath.resolve("table.csv"), "MyTable", data, 
sourceSchema);
 
         Schema sinkSchema = Schema.newBuilder().column("a", 
BIGINT()).column("c", STRING()).build();
 
-        File sinkPath = createSinkTable("MySink", sinkSchema);
+        createSinkTable(sinkPath, "MySink", sinkSchema);
 
         tableEnv.executeSql("insert into MySink select a, c from 
MyTable").await();
 
@@ -97,7 +97,7 @@ public class TableCsvFormatITCase extends 
AbstractTestBaseJUnit4 {
     }
 
     @Test
-    public void testReadingMetadata() throws Exception {
+    void testReadingMetadata(@TempDir Path sinkPath) throws Exception {
 
         Schema sourceSchema =
                 Schema.newBuilder()
@@ -118,7 +118,7 @@ public class TableCsvFormatITCase extends 
AbstractTestBaseJUnit4 {
 
         Schema sinkSchema = Schema.newBuilder().column("a", 
BIGINT()).column("m", STRING()).build();
 
-        File sinkPath = createSinkTable("MySink", sinkSchema);
+        createSinkTable(sinkPath, "MySink", sinkSchema);
 
         tableEnv.executeSql("insert into MySink select a, m from 
MyTable").await();
 
@@ -126,7 +126,7 @@ public class TableCsvFormatITCase extends 
AbstractTestBaseJUnit4 {
     }
 
     @Test
-    public void testFilterPushDown() throws Exception {
+    void testFilterPushDown(@TempDir Path sourcePath, @TempDir Path sinkPath) 
throws Exception {
         List<String> data = Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello 
world");
 
         Schema sourceSchema =
@@ -136,7 +136,7 @@ public class TableCsvFormatITCase extends 
AbstractTestBaseJUnit4 {
                         .column("c", STRING())
                         .build();
 
-        createSourceTable("MyTable", data, sourceSchema);
+        createSourceTable(sourcePath.resolve("table.csv"), "MyTable", data, 
sourceSchema);
 
         Schema sinkSchema =
                 Schema.newBuilder()
@@ -145,7 +145,7 @@ public class TableCsvFormatITCase extends 
AbstractTestBaseJUnit4 {
                         .column("c", STRING())
                         .build();
 
-        File sinkPath = createSinkTable("MySink", sinkSchema);
+        createSinkTable(sinkPath, "MySink", sinkSchema);
 
         tableEnv.executeSql("insert into MySink select * from MyTable where a 
> 1").await();
 
@@ -153,7 +153,7 @@ public class TableCsvFormatITCase extends 
AbstractTestBaseJUnit4 {
     }
 
     @Test
-    public void testPartitionPushDown() throws Exception {
+    void testPartitionPushDown(@TempDir Path sinkPath) throws Exception {
         Schema sourceSchema =
                 Schema.newBuilder()
                         .column("a", INT())
@@ -179,7 +179,7 @@ public class TableCsvFormatITCase extends 
AbstractTestBaseJUnit4 {
                         .column("c", STRING())
                         .build();
 
-        File sinkPath = createSinkTable("MySink", sinkSchema);
+        createSinkTable(sinkPath, "MySink", sinkSchema);
 
         tableEnv.executeSql("insert into MySink select * from MyTable where p 
= 2").await();
 
@@ -187,7 +187,7 @@ public class TableCsvFormatITCase extends 
AbstractTestBaseJUnit4 {
     }
 
     @Test
-    public void testWatermarkPushDown() throws Exception {
+    void testWatermarkPushDown(@TempDir Path sinkPath) throws Exception {
 
         Schema sourceSchema =
                 Schema.newBuilder()
@@ -215,7 +215,7 @@ public class TableCsvFormatITCase extends 
AbstractTestBaseJUnit4 {
                         .column("ts", TIMESTAMP(3))
                         .build();
 
-        File sinkPath = createSinkTable("MySink", sinkSchema);
+        createSinkTable(sinkPath, "MySink", sinkSchema);
 
         tableEnv.executeSql("insert into MySink select a, b, ts from MyTable 
where b = 3").await();
 
@@ -228,7 +228,7 @@ public class TableCsvFormatITCase extends 
AbstractTestBaseJUnit4 {
     }
 
     @Test
-    public void testPushDowns() throws Exception {
+    void testPushDowns(@TempDir Path sinkPath) throws Exception {
         Schema sourceSchema =
                 Schema.newBuilder()
                         .column("a", INT())
@@ -255,7 +255,7 @@ public class TableCsvFormatITCase extends 
AbstractTestBaseJUnit4 {
         Schema sinkSchema =
                 Schema.newBuilder().column("a", INT()).column("ts", 
TIMESTAMP(3)).build();
 
-        File sinkPath = createSinkTable("MySink", sinkSchema);
+        createSinkTable(sinkPath, "MySink", sinkSchema);
 
         tableEnv.executeSql("insert into MySink select a, ts from MyTable 
where b = 3 and a > 4")
                 .await();
@@ -269,35 +269,36 @@ public class TableCsvFormatITCase extends 
AbstractTestBaseJUnit4 {
         return 
TimeFormats.SQL_TIMESTAMP_FORMAT.format(toLocalDateTime(timestamp));
     }
 
-    private void createSourceTable(String tableName, List<String> data, Schema 
schema)
+    private void createSourceTable(
+            Path sourceFile, String tableName, List<String> data, Schema 
schema)
             throws IOException {
 
-        File sourceFile = TEMPORARY_FOLDER.newFile();
         Collections.shuffle(data);
-        Files.write(sourceFile.toPath(), String.join("\n", data).getBytes());
+        Files.write(sourceFile, String.join("\n", data).getBytes());
 
         tableEnv.createTemporaryTable(
                 tableName,
                 TableDescriptor.forConnector(FileSystemTableFactory.IDENTIFIER)
-                        .option(FileSystemConnectorOptions.PATH, 
sourceFile.getAbsolutePath())
+                        .option(
+                                FileSystemConnectorOptions.PATH,
+                                sourceFile.toAbsolutePath().toString())
                         .format(CsvCommons.IDENTIFIER)
                         .schema(schema)
                         .build());
     }
 
-    private File createSinkTable(String tableName, Schema schema) throws 
IOException {
-        File sinkPath = TEMPORARY_FOLDER.newFolder();
+    private void createSinkTable(Path sinkPath, String tableName, Schema 
schema) {
 
         tableEnv.createTemporaryTable(
                 tableName,
                 TableDescriptor.forConnector(FileSystemTableFactory.IDENTIFIER)
-                        .option(FileSystemConnectorOptions.PATH, 
sinkPath.getAbsolutePath())
+                        .option(
+                                FileSystemConnectorOptions.PATH,
+                                sinkPath.toAbsolutePath().toString())
                         .option("csv.disable-quote-character", "true")
                         .format(CsvCommons.IDENTIFIER)
                         .schema(schema)
                         .build());
-
-        return sinkPath;
     }
 
     private void createTestValuesSourceTable(
@@ -326,8 +327,8 @@ public class TableCsvFormatITCase extends 
AbstractTestBaseJUnit4 {
         tableEnv.createTemporaryTable(tableName, descriptor.build());
     }
 
-    private void assertResult(List<String> expected, File resultFile) throws 
IOException {
-        List<String> actual = readLines(resultFile);
+    private void assertResult(List<String> expected, Path resultFile) throws 
IOException {
+        List<String> actual = readLines(resultFile.toFile());
         assertThat(actual).hasSameElementsAs(expected);
     }
 
diff --git 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java
 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java
index 6262f22e714..f2d2bdaf65a 100644
--- 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java
+++ 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java
@@ -69,9 +69,9 @@ public class OrcFormatStatisticsReportTest extends 
StatisticsReportTestBase {
         // insert data and get statistics.
         DataType dataType = 
tEnv.from("sourceTable").getResolvedSchema().toPhysicalRowDataType();
         tEnv.fromValues(dataType, 
getData()).executeInsert("sourceTable").await();
-        assertThat(folder.listFiles()).isNotNull().hasSize(1);
+        assertThat(folder.listFiles()).hasSize(1);
         File[] files = folder.listFiles();
-        assert files != null;
+        assertThat(files).isNotNull();
         TableStats tableStats =
                 orcBulkDecodingFormat.reportStatistics(
                         Collections.singletonList(new 
Path(files[0].toURI().toString())), dataType);

Reply via email to