This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a3dff68  [FLINK-26232][avro] Migrate tests to JUnit5
a3dff68 is described below

commit a3dff681d511a56f5d2a557e1804bbd5a10757ea
Author: Ryan Skraba <[email protected]>
AuthorDate: Wed Mar 9 12:33:38 2022 +0100

    [FLINK-26232][avro] Migrate tests to JUnit5
---
 flink-formats/flink-avro/pom.xml                   |   5 +
 .../flink/formats/avro/AvroBulkFormatITCase.java   |   2 +-
 .../flink/formats/avro/AvroBulkFormatTest.java     |  10 +-
 .../avro/AvroDeserializationSchemaTest.java        |  27 ++-
 .../formats/avro/AvroExternalJarProgramITCase.java |  15 +-
 .../flink/formats/avro/AvroFormatFactoryTest.java  |  13 +-
 .../avro/AvroInputFormatTypeExtractionTest.java    |  20 ++-
 .../formats/avro/AvroKryoClassloadingTest.java     |  13 +-
 .../avro/AvroKryoSerializerRegistrationsTest.java  |  12 +-
 .../flink/formats/avro/AvroOutputFormatITCase.java |  18 +-
 .../flink/formats/avro/AvroOutputFormatTest.java   |  35 ++--
 .../formats/avro/AvroRecordInputFormatTest.java    | 193 +++++++++------------
 .../avro/AvroRowDataDeSerializationSchemaTest.java |  63 ++++---
 .../avro/AvroRowDeSerializationSchemaTest.java     |  46 ++---
 .../formats/avro/AvroSerializationSchemaTest.java  |  18 +-
 .../avro/AvroSplittableInputFormatTest.java        |  71 +++-----
 .../formats/avro/AvroStreamingFileSinkITCase.java  |  14 +-
 .../flink/formats/avro/EncoderDecoderTest.java     |  49 +++---
 .../RegistryAvroDeserializationSchemaTest.java     |  21 ++-
 .../avro/typeutils/AvroSchemaConverterTest.java    | 188 ++++++++++----------
 ...roSerializerConcurrencyCheckInactiveITCase.java |  26 ++-
 .../typeutils/AvroSerializerConcurrencyTest.java   |   8 +-
 .../typeutils/AvroSerializerEmptyArrayTest.java    |  10 +-
 .../avro/typeutils/AvroSerializerSnapshotTest.java |  95 +++++-----
 .../avro/typeutils/AvroTypeExtractionTest.java     | 129 ++++++++------
 .../formats/avro/typeutils/AvroTypeInfoTest.java   |   8 +-
 .../flink/table/runtime/batch/AvroTypesITCase.java |   4 +-
 .../org.junit.jupiter.api.extension.Extension      |  16 ++
 28 files changed, 552 insertions(+), 577 deletions(-)

diff --git a/flink-formats/flink-avro/pom.xml b/flink-formats/flink-avro/pom.xml
index 765c250..1f60cee 100644
--- a/flink-formats/flink-avro/pom.xml
+++ b/flink-formats/flink-avro/pom.xml
@@ -262,6 +262,11 @@ under the License.
                                                </goals>
                                        </execution>
                                </executions>
+                               <configuration>
+                                       <excludes>
+                                               
<exclude>META-INF/services/org.junit.jupiter.api.extension.Extension</exclude>
+                                       </excludes>
+                               </configuration>
                        </plugin>
                </plugins>
        </build>
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatITCase.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatITCase.java
index 8ccf344..964b464 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatITCase.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatITCase.java
@@ -62,7 +62,7 @@ import java.util.Random;
 import static org.apache.flink.formats.avro.AvroBulkFormatTestUtils.ROW_TYPE;
 
 /** IT cases for {@link AbstractAvroBulkFormat}. */
-public class AvroBulkFormatITCase extends SourceTestSuiteBase<RowData> {
+class AvroBulkFormatITCase extends SourceTestSuiteBase<RowData> {
 
     private static final RowDataSerializer SERIALIZER = new 
RowDataSerializer(ROW_TYPE);
 
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatTest.java
index 9080efa..28798bb 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatTest.java
@@ -55,7 +55,7 @@ import static 
org.apache.flink.formats.avro.AvroBulkFormatTestUtils.ROW_TYPE;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link AbstractAvroBulkFormat}. */
-public class AvroBulkFormatTest {
+class AvroBulkFormatTest {
 
     private static final List<RowData> TEST_DATA =
             Arrays.asList(
@@ -116,7 +116,7 @@ public class AvroBulkFormatTest {
     }
 
     @Test
-    public void testReadWholeFileWithOneSplit() throws IOException {
+    void testReadWholeFileWithOneSplit() throws IOException {
         AvroBulkFormatTestUtils.TestingAvroBulkFormat bulkFormat =
                 new AvroBulkFormatTestUtils.TestingAvroBulkFormat();
         assertSplit(
@@ -132,7 +132,7 @@ public class AvroBulkFormatTest {
     }
 
     @Test
-    public void testReadWholeFileWithMultipleSplits() throws IOException {
+    void testReadWholeFileWithMultipleSplits() throws IOException {
         AvroBulkFormatTestUtils.TestingAvroBulkFormat bulkFormat =
                 new AvroBulkFormatTestUtils.TestingAvroBulkFormat();
         long splitLength = tmpFile.length() / 3;
@@ -149,7 +149,7 @@ public class AvroBulkFormatTest {
     }
 
     @Test
-    public void testSplitsAtCriticalLocations() throws IOException {
+    void testSplitsAtCriticalLocations() throws IOException {
         AvroBulkFormatTestUtils.TestingAvroBulkFormat bulkFormat =
                 new AvroBulkFormatTestUtils.TestingAvroBulkFormat();
         assertSplit(
@@ -168,7 +168,7 @@ public class AvroBulkFormatTest {
     }
 
     @Test
-    public void testRestoreReader() throws IOException {
+    void testRestoreReader() throws IOException {
         AvroBulkFormatTestUtils.TestingAvroBulkFormat bulkFormat =
                 new AvroBulkFormatTestUtils.TestingAvroBulkFormat();
         long splitLength = tmpFile.length() / 3;
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java
index 4f4289a..36ee6ce 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java
@@ -24,53 +24,52 @@ import 
org.apache.flink.formats.avro.generated.UnionLogicalType;
 import org.apache.flink.formats.avro.utils.TestDataGenerator;
 
 import org.apache.avro.generic.GenericRecord;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.time.Instant;
 import java.util.Random;
 
 import static org.apache.flink.formats.avro.utils.AvroTestUtils.writeRecord;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link AvroDeserializationSchema}. */
-public class AvroDeserializationSchemaTest {
+class AvroDeserializationSchemaTest {
 
     private static final Address address = 
TestDataGenerator.generateRandomAddress(new Random());
 
     @Test
-    public void testNullRecord() throws Exception {
+    void testNullRecord() throws Exception {
         DeserializationSchema<Address> deserializer =
                 AvroDeserializationSchema.forSpecific(Address.class);
 
         Address deserializedAddress = deserializer.deserialize(null);
-        assertNull(deserializedAddress);
+        assertThat(deserializedAddress).isNull();
     }
 
     @Test
-    public void testGenericRecord() throws Exception {
+    void testGenericRecord() throws Exception {
         DeserializationSchema<GenericRecord> deserializationSchema =
                 AvroDeserializationSchema.forGeneric(address.getSchema());
 
         byte[] encodedAddress = writeRecord(address, Address.getClassSchema());
         GenericRecord genericRecord = 
deserializationSchema.deserialize(encodedAddress);
-        assertEquals(address.getCity(), genericRecord.get("city").toString());
-        assertEquals(address.getNum(), genericRecord.get("num"));
-        assertEquals(address.getState(), 
genericRecord.get("state").toString());
+        
assertThat(genericRecord.get("city").toString()).isEqualTo(address.getCity());
+        assertThat(genericRecord.get("num")).isEqualTo(address.getNum());
+        
assertThat(genericRecord.get("state").toString()).isEqualTo(address.getState());
     }
 
     @Test
-    public void testSpecificRecord() throws Exception {
+    void testSpecificRecord() throws Exception {
         DeserializationSchema<Address> deserializer =
                 AvroDeserializationSchema.forSpecific(Address.class);
 
         byte[] encodedAddress = writeRecord(address);
         Address deserializedAddress = deserializer.deserialize(encodedAddress);
-        assertEquals(address, deserializedAddress);
+        assertThat(deserializedAddress).isEqualTo(address);
     }
 
     @Test
-    public void testSpecificRecordWithUnionLogicalType() throws Exception {
+    void testSpecificRecordWithUnionLogicalType() throws Exception {
         Random rnd = new Random();
         UnionLogicalType data = new 
UnionLogicalType(Instant.ofEpochMilli(rnd.nextLong()));
         DeserializationSchema<UnionLogicalType> deserializer =
@@ -78,6 +77,6 @@ public class AvroDeserializationSchemaTest {
 
         byte[] encodedData = writeRecord(data);
         UnionLogicalType deserializedData = 
deserializer.deserialize(encodedData);
-        assertEquals(data, deserializedData);
+        assertThat(deserializedData).isEqualTo(data);
     }
 }
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
index 58e73e6..39a44e7 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
@@ -25,18 +25,17 @@ import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.JarUtils;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 
 /** IT case for the {@link AvroExternalJarProgram}. */
-public class AvroExternalJarProgramITCase extends TestLogger {
+class AvroExternalJarProgramITCase {
 
     private static final String JAR_FILE = "maven-test-jar.jar";
 
@@ -51,19 +50,19 @@ public class AvroExternalJarProgramITCase extends 
TestLogger {
                             .setNumSlotsPerTaskManager(PARALLELISM)
                             .build());
 
-    @BeforeClass
+    @BeforeAll
     public static void setUp() throws Exception {
         MINI_CLUSTER.start();
     }
 
-    @AfterClass
+    @AfterAll
     public static void tearDown() {
         TestEnvironment.unsetAsContext();
         MINI_CLUSTER.closeAsync();
     }
 
     @Test
-    public void testExternalProgram() throws Exception {
+    void testExternalProgram() throws Exception {
 
         String jarFile = JAR_FILE;
         try {
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java
index ef56257..a64166a 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java
@@ -31,17 +31,16 @@ import org.apache.flink.table.factories.utils.FactoryMocks;
 import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link AvroFormatFactory}. */
-public class AvroFormatFactoryTest extends TestLogger {
+class AvroFormatFactoryTest {
 
     private static final ResolvedSchema SCHEMA =
             ResolvedSchema.of(
@@ -53,7 +52,7 @@ public class AvroFormatFactoryTest extends TestLogger {
             (RowType) SCHEMA.toPhysicalRowDataType().getLogicalType();
 
     @Test
-    public void testSeDeSchema() {
+    void testSeDeSchema() {
         final AvroRowDataDeserializationSchema expectedDeser =
                 new AvroRowDataDeserializationSchema(ROW_TYPE, 
InternalTypeInfo.of(ROW_TYPE));
 
@@ -68,7 +67,7 @@ public class AvroFormatFactoryTest extends TestLogger {
                 scanSourceMock.valueFormat.createRuntimeDecoder(
                         ScanRuntimeProviderContext.INSTANCE, 
SCHEMA.toPhysicalRowDataType());
 
-        assertEquals(expectedDeser, actualDeser);
+        assertThat(actualDeser).isEqualTo(expectedDeser);
 
         final AvroRowDataSerializationSchema expectedSer =
                 new AvroRowDataSerializationSchema(ROW_TYPE);
@@ -81,7 +80,7 @@ public class AvroFormatFactoryTest extends TestLogger {
         SerializationSchema<RowData> actualSer =
                 sinkMock.valueFormat.createRuntimeEncoder(null, 
SCHEMA.toPhysicalRowDataType());
 
-        assertEquals(expectedSer, actualSer);
+        assertThat(actualSer).isEqualTo(expectedSer);
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroInputFormatTypeExtractionTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroInputFormatTypeExtractionTest.java
index aaf3634..59f5bb8 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroInputFormatTypeExtractionTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroInputFormatTypeExtractionTest.java
@@ -26,14 +26,16 @@ import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.Path;
 
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /** Tests for the type extraction of the {@link AvroInputFormat}. */
-public class AvroInputFormatTypeExtractionTest {
+class AvroInputFormatTypeExtractionTest {
 
     @Test
-    public void testTypeExtraction() {
+    void testTypeExtraction() {
         try {
             InputFormat<MyAvroType, ?> format =
                     new AvroInputFormat<MyAvroType>(
@@ -45,14 +47,14 @@ public class AvroInputFormatTypeExtractionTest {
             DataSet<MyAvroType> input = env.createInput(format);
             TypeInformation<?> typeInfoDataSet = input.getType();
 
-            Assert.assertTrue(typeInfoDirect instanceof PojoTypeInfo);
-            Assert.assertTrue(typeInfoDataSet instanceof PojoTypeInfo);
+            assertThat(typeInfoDirect).isInstanceOf(PojoTypeInfo.class);
+            assertThat(typeInfoDataSet).isInstanceOf(PojoTypeInfo.class);
 
-            Assert.assertEquals(MyAvroType.class, 
typeInfoDirect.getTypeClass());
-            Assert.assertEquals(MyAvroType.class, 
typeInfoDataSet.getTypeClass());
+            
assertThat(typeInfoDirect.getTypeClass()).isEqualTo(MyAvroType.class);
+            
assertThat(typeInfoDataSet.getTypeClass()).isEqualTo(MyAvroType.class);
         } catch (Exception e) {
             e.printStackTrace();
-            Assert.fail(e.getMessage());
+            fail(e.getMessage());
         }
     }
 
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java
index f3eee32..feb5142 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java
@@ -24,15 +24,14 @@ import 
org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils;
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 
 import com.esotericsoftware.kryo.Kryo;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.lang.reflect.Method;
 import java.net.URL;
 import java.util.LinkedHashMap;
 
 import static 
org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * This test makes sure that reversed classloading works for the Avro/Kryo 
integration when Kryo is
@@ -59,10 +58,10 @@ import static org.junit.Assert.assertNotEquals;
  *     0x0000020: 57b1
  * </pre>
  */
-public class AvroKryoClassloadingTest {
+class AvroKryoClassloadingTest {
 
     @Test
-    public void testKryoInChildClasspath() throws Exception {
+    void testKryoInChildClasspath() throws Exception {
         final Class<?> avroClass = AvroKryoSerializerUtils.class;
 
         final URL avroLocation = 
avroClass.getProtectionDomain().getCodeSource().getLocation();
@@ -84,7 +83,7 @@ public class AvroKryoClassloadingTest {
 
         final Class<?> userLoadedAvroClass =
                 Class.forName(avroClass.getName(), false, userAppClassLoader);
-        assertNotEquals(avroClass, userLoadedAvroClass);
+        assertThat(userLoadedAvroClass).isNotEqualTo(avroClass);
 
         // call the 'addAvroGenericDataArrayRegistration(...)' method
         final Method m =
@@ -94,6 +93,6 @@ public class AvroKryoClassloadingTest {
         final LinkedHashMap<String, ?> map = new LinkedHashMap<>();
         m.invoke(userLoadedAvroClass.newInstance(), map);
 
-        assertEquals(1, map.size());
+        assertThat(map).hasSize(1);
     }
 }
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java
index a45d1ec..14ee125 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Registration;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
@@ -32,8 +32,8 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /**
  * Tests that the set of Kryo registrations is the same across compatible 
Flink versions.
@@ -42,7 +42,7 @@ import static org.junit.Assert.fail;
  * verifies that we correctly register Avro types at the {@link 
KryoSerializer} when Avro is
  * present.
  */
-public class AvroKryoSerializerRegistrationsTest {
+class AvroKryoSerializerRegistrationsTest {
 
     /**
      * Tests that the registered classes in Kryo did not change.
@@ -51,7 +51,7 @@ public class AvroKryoSerializerRegistrationsTest {
      * change in the serializers can break savepoint backwards compatibility 
between Flink versions.
      */
     @Test
-    public void testDefaultKryoRegisteredClassesDidNotChange() throws 
Exception {
+    void testDefaultKryoRegisteredClassesDidNotChange() throws Exception {
         final Kryo kryo = new KryoSerializer<>(Integer.class, new 
ExecutionConfig()).getKryo();
 
         try (BufferedReader reader =
@@ -100,7 +100,7 @@ public class AvroKryoSerializerRegistrationsTest {
     private void writeDefaultKryoRegistrations(String filePath) throws 
IOException {
         final File file = new File(filePath);
         if (file.exists()) {
-            assertTrue(file.delete());
+            assertThat(file.delete()).isTrue();
         }
 
         final Kryo kryo = new KryoSerializer<>(Integer.class, new 
ExecutionConfig()).getKryo();
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
index 3858294..7cd6414 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
@@ -32,7 +32,6 @@ import org.apache.avro.file.DataFileReader;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
-import org.junit.Assert;
 
 import java.io.File;
 import java.math.BigDecimal;
@@ -46,6 +45,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 /** IT cases for the {@link AvroOutputFormat}. */
 @SuppressWarnings("serial")
 public class AvroOutputFormatITCase extends JavaProgramTestBase {
@@ -99,8 +100,7 @@ public class AvroOutputFormatITCase extends 
JavaProgramTestBase {
             output1 = file1.listFiles();
             // check for avro ext in dir.
             for (File avroOutput : Objects.requireNonNull(output1)) {
-                Assert.assertTrue(
-                        "Expect extension '.avro'", 
avroOutput.toString().endsWith(".avro"));
+                assertThat(avroOutput.toString()).endsWith(".avro");
             }
         } else {
             output1 = new File[] {file1};
@@ -121,11 +121,7 @@ public class AvroOutputFormatITCase extends 
JavaProgramTestBase {
                                 + user.getFavoriteColor());
             }
         }
-        for (String expectedResult : userData.split("\n")) {
-            Assert.assertTrue(
-                    "expected user " + expectedResult + " not found.",
-                    result1.contains(expectedResult));
-        }
+        assertThat(result1).contains(userData.split("\n"));
 
         // compare result for reflect user type
         File[] output2;
@@ -151,11 +147,7 @@ public class AvroOutputFormatITCase extends 
JavaProgramTestBase {
                                 + user.getFavoriteColor());
             }
         }
-        for (String expectedResult : userData.split("\n")) {
-            Assert.assertTrue(
-                    "expected user " + expectedResult + " not found.",
-                    result2.contains(expectedResult));
-        }
+        assertThat(result2).contains(userData.split("\n"));
     }
 
     private static final class ConvertToUser
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
index de6b686..1ce568c 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
@@ -31,7 +31,7 @@ import org.apache.avro.file.DataFileReader;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -47,15 +47,14 @@ import java.time.LocalTime;
 import java.time.temporal.ChronoUnit;
 import java.util.Collections;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /** Tests for {@link AvroOutputFormat}. */
-public class AvroOutputFormatTest {
+class AvroOutputFormatTest {
 
     @Test
-    public void testSetCodec() {
+    void testSetCodec() {
         // given
         final AvroOutputFormat<User> outputFormat = new 
AvroOutputFormat<>(User.class);
 
@@ -69,7 +68,7 @@ public class AvroOutputFormatTest {
     }
 
     @Test
-    public void testSetCodecError() {
+    void testSetCodecError() {
         // given
         boolean error = false;
         final AvroOutputFormat<User> outputFormat = new 
AvroOutputFormat<>(User.class);
@@ -82,11 +81,11 @@ public class AvroOutputFormatTest {
         }
 
         // then
-        assertTrue(error);
+        assertThat(error).isTrue();
     }
 
     @Test
-    public void testSerialization() throws Exception {
+    void testSerialization() throws Exception {
 
         serializeAndDeserialize(null, null);
         serializeAndDeserialize(null, User.SCHEMA$);
@@ -117,7 +116,7 @@ public class AvroOutputFormatTest {
                 new ObjectInputStream(new 
ByteArrayInputStream(bos.toByteArray()))) {
             // then
             Object o = ois.readObject();
-            assertTrue(o instanceof AvroOutputFormat);
+            assertThat(o).isInstanceOf(AvroOutputFormat.class);
             @SuppressWarnings("unchecked")
             final AvroOutputFormat<User> restored = (AvroOutputFormat<User>) o;
             final AvroOutputFormat.Codec restoredCodec =
@@ -125,13 +124,13 @@ public class AvroOutputFormatTest {
             final Schema restoredSchema =
                     (Schema) Whitebox.getInternalState(restored, 
"userDefinedSchema");
 
-            assertTrue(codec != null ? restoredCodec == codec : restoredCodec 
== null);
-            assertTrue(schema != null ? restoredSchema.equals(schema) : 
restoredSchema == null);
+            assertThat(codec).isSameAs(restoredCodec);
+            assertThat(schema).isEqualTo(restoredSchema);
         }
     }
 
     @Test
-    public void testCompression() throws Exception {
+    void testCompression() throws Exception {
         // given
         final Path outputPath =
                 new Path(File.createTempFile("avro-output-file", 
"avro").getAbsolutePath());
@@ -152,7 +151,7 @@ public class AvroOutputFormatTest {
         output(compressedOutputFormat);
 
         // then
-        assertTrue(fileSize(outputPath) > fileSize(compressedOutputPath));
+        
assertThat(fileSize(outputPath)).isGreaterThan(fileSize(compressedOutputPath));
 
         // cleanup
         FileSystem fs = FileSystem.getLocalFileSystem();
@@ -197,7 +196,7 @@ public class AvroOutputFormatTest {
     }
 
     @Test
-    public void testGenericRecord() throws IOException {
+    void testGenericRecord() throws IOException {
         final Path outputPath =
                 new Path(File.createTempFile("avro-output-file", 
"generic.avro").getAbsolutePath());
         final AvroOutputFormat<GenericRecord> outputFormat =
@@ -216,9 +215,9 @@ public class AvroOutputFormatTest {
 
         while (dataFileReader.hasNext()) {
             GenericRecord record = dataFileReader.next();
-            assertEquals(record.get("user_name").toString(), "testUser");
-            assertEquals(record.get("favorite_number"), 1);
-            assertEquals(record.get("favorite_color").toString(), "blue");
+            
assertThat(record.get("user_name").toString()).isEqualTo("testUser");
+            assertThat(record.get("favorite_number")).isEqualTo(1);
+            
assertThat(record.get("favorite_color").toString()).isEqualTo("blue");
         }
 
         // cleanup
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
index 505ba24..d0fc6ac 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
@@ -48,9 +48,9 @@ import org.apache.avro.io.DatumWriter;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.util.Utf8;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -68,11 +68,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.data.MapEntry.entry;
 
 /**
  * Test the avro input format. (The testcase is mostly the getting started 
tutorial of avro)
@@ -103,7 +100,7 @@ public class AvroRecordInputFormatTest {
     static final String TEST_STATE = "London";
     static final String TEST_ZIP = "NW1 6XE";
 
-    private Schema userSchema = new User().getSchema();
+    private final Schema userSchema = new User().getSchema();
 
     public static void writeTestFile(File testFile) throws IOException {
         ArrayList<CharSequence> stringArray = new ArrayList<>();
@@ -199,7 +196,7 @@ public class AvroRecordInputFormatTest {
         dataFileWriter.close();
     }
 
-    @Before
+    @BeforeEach
     public void createFiles() throws IOException {
         testFile = File.createTempFile("AvroInputFormatTest", null);
         writeTestFile(testFile);
@@ -207,7 +204,7 @@ public class AvroRecordInputFormatTest {
 
     /** Test if the AvroInputFormat is able to properly read data from an Avro 
file. */
     @Test
-    public void testDeserialization() throws IOException {
+    void testDeserialization() throws IOException {
         Configuration parameters = new Configuration();
 
         AvroInputFormat<User> format =
@@ -215,52 +212,46 @@ public class AvroRecordInputFormatTest {
 
         format.configure(parameters);
         FileInputSplit[] splits = format.createInputSplits(1);
-        assertEquals(splits.length, 1);
+        assertThat(splits).hasSize(1);
         format.open(splits[0]);
 
         User u = format.nextRecord(null);
-        assertNotNull(u);
+        assertThat(u).isNotNull();
 
         String name = u.getName().toString();
-        assertNotNull("empty record", name);
-        assertEquals("name not equal", TEST_NAME, name);
+        assertThat(name).isEqualTo(TEST_NAME);
 
         // check arrays
         List<CharSequence> sl = u.getTypeArrayString();
-        assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, 
sl.get(0).toString());
-        assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, 
sl.get(1).toString());
+        assertThat(sl.get(0).toString()).isEqualTo(TEST_ARRAY_STRING_1);
+        assertThat(sl.get(1).toString()).isEqualTo(TEST_ARRAY_STRING_2);
 
         List<Boolean> bl = u.getTypeArrayBoolean();
-        assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0));
-        assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1));
+        assertThat(bl).containsExactly(TEST_ARRAY_BOOLEAN_1, 
TEST_ARRAY_BOOLEAN_2);
 
         // check enums
         Colors enumValue = u.getTypeEnum();
-        assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
+        assertThat(enumValue).isEqualTo(TEST_ENUM_COLOR);
 
         // check maps
         Map<CharSequence, Long> lm = u.getTypeMap();
-        assertEquals(
-                "map value of key 1 not equal",
-                TEST_MAP_VALUE1,
-                lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
-        assertEquals(
-                "map value of key 2 not equal",
-                TEST_MAP_VALUE2,
-                lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
+        assertThat(lm)
+                .containsOnly(
+                        entry(new Utf8(TEST_MAP_KEY1), TEST_MAP_VALUE1),
+                        entry(new Utf8(TEST_MAP_KEY2), TEST_MAP_VALUE2));
 
-        assertFalse("expecting second element", format.reachedEnd());
-        assertNotNull("expecting second element", format.nextRecord(u));
+        assertThat(format.reachedEnd()).as("expecting second 
element").isFalse();
+        assertThat(format.nextRecord(u)).as("expecting second 
element").isNotNull();
 
-        assertNull(format.nextRecord(u));
-        assertTrue(format.reachedEnd());
+        assertThat(format.nextRecord(u)).isNull();
+        assertThat(format.reachedEnd()).isTrue();
 
         format.close();
     }
 
     /** Test if the AvroInputFormat is able to properly read data from an Avro 
file. */
     @Test
-    public void testDeserializationReuseAvroRecordFalse() throws IOException {
+    void testDeserializationReuseAvroRecordFalse() throws IOException {
         Configuration parameters = new Configuration();
 
         AvroInputFormat<User> format =
@@ -269,45 +260,39 @@ public class AvroRecordInputFormatTest {
 
         format.configure(parameters);
         FileInputSplit[] splits = format.createInputSplits(1);
-        assertEquals(splits.length, 1);
+        assertThat(splits).hasSize(1);
         format.open(splits[0]);
 
         User u = format.nextRecord(null);
-        assertNotNull(u);
+        assertThat(u).isNotNull();
 
         String name = u.getName().toString();
-        assertNotNull("empty record", name);
-        assertEquals("name not equal", TEST_NAME, name);
+        assertThat(name).isEqualTo(TEST_NAME);
 
         // check arrays
         List<CharSequence> sl = u.getTypeArrayString();
-        assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, 
sl.get(0).toString());
-        assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, 
sl.get(1).toString());
+        assertThat(sl.get(0).toString()).isEqualTo(TEST_ARRAY_STRING_1);
+        assertThat(sl.get(1).toString()).isEqualTo(TEST_ARRAY_STRING_2);
 
         List<Boolean> bl = u.getTypeArrayBoolean();
-        assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0));
-        assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1));
+        assertThat(bl).containsExactly(TEST_ARRAY_BOOLEAN_1, 
TEST_ARRAY_BOOLEAN_2);
 
         // check enums
         Colors enumValue = u.getTypeEnum();
-        assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
+        assertThat(enumValue).isEqualTo(TEST_ENUM_COLOR);
 
         // check maps
         Map<CharSequence, Long> lm = u.getTypeMap();
-        assertEquals(
-                "map value of key 1 not equal",
-                TEST_MAP_VALUE1,
-                lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
-        assertEquals(
-                "map value of key 2 not equal",
-                TEST_MAP_VALUE2,
-                lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
+        assertThat(lm)
+                .containsOnly(
+                        entry(new Utf8(TEST_MAP_KEY1), TEST_MAP_VALUE1),
+                        entry(new Utf8(TEST_MAP_KEY2), TEST_MAP_VALUE2));
 
-        assertFalse("expecting second element", format.reachedEnd());
-        assertNotNull("expecting second element", format.nextRecord(u));
+        assertThat(format.reachedEnd()).as("expecting second 
element").isFalse();
+        assertThat(format.nextRecord(u)).as("expecting second 
element").isNotNull();
 
-        assertNull(format.nextRecord(u));
-        assertTrue(format.reachedEnd());
+        assertThat(format.nextRecord(u)).isNull();
+        assertThat(format.reachedEnd()).isTrue();
 
         format.close();
     }
@@ -321,7 +306,7 @@ public class AvroRecordInputFormatTest {
      * <p>It is not recommended to use GenericData.Record with Flink. Use 
generated POJOs instead.
      */
     @Test
-    public void testDeserializeToGenericType() throws IOException {
+    void testDeserializeToGenericType() throws IOException {
         DatumReader<GenericData.Record> datumReader = new 
GenericDatumReader<>(userSchema);
 
         try (FileReader<GenericData.Record> dataFileReader =
@@ -331,28 +316,25 @@ public class AvroRecordInputFormatTest {
             dataFileReader.next(rec);
 
             // check if record has been read correctly
-            assertNotNull(rec);
-            assertEquals("name not equal", TEST_NAME, 
rec.get("name").toString());
-            assertEquals(
-                    "enum not equal", TEST_ENUM_COLOR.toString(), 
rec.get("type_enum").toString());
-            assertEquals(null, rec.get("type_long_test")); // it is null for 
the first record.
+            assertThat(rec).isNotNull();
+            assertThat(rec.get("name").toString()).isEqualTo(TEST_NAME);
+            
assertThat(rec.get("type_enum").toString()).isEqualTo(TEST_ENUM_COLOR.toString());
+            assertThat(rec.get("type_long_test")).isNull(); // it is null for 
the first record.
 
             // now serialize it with our framework:
             TypeInformation<GenericData.Record> te =
                     TypeExtractor.createTypeInfo(GenericData.Record.class);
 
             ExecutionConfig ec = new ExecutionConfig();
-            assertEquals(GenericTypeInfo.class, te.getClass());
+            assertThat(te).isExactlyInstanceOf(GenericTypeInfo.class);
 
             Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new 
HashSet<>());
 
             TypeSerializer<GenericData.Record> tser = te.createSerializer(ec);
-            assertEquals(1, ec.getDefaultKryoSerializerClasses().size());
-            assertTrue(
-                    
ec.getDefaultKryoSerializerClasses().containsKey(Schema.class)
-                            && ec.getDefaultKryoSerializerClasses()
-                                    .get(Schema.class)
-                                    
.equals(AvroKryoSerializerUtils.AvroSchemaSerializer.class));
+            assertThat(ec.getDefaultKryoSerializerClasses())
+                    .hasSize(1)
+                    .containsEntry(
+                            Schema.class, 
AvroKryoSerializerUtils.AvroSchemaSerializer.class);
 
             ByteArrayOutputStream out = new ByteArrayOutputStream();
             try (DataOutputViewStreamWrapper outView = new 
DataOutputViewStreamWrapper(out)) {
@@ -366,19 +348,16 @@ public class AvroRecordInputFormatTest {
             }
 
             // check if it is still the same
-            assertNotNull(newRec);
-            assertEquals(
-                    "enum not equal",
-                    TEST_ENUM_COLOR.toString(),
-                    newRec.get("type_enum").toString());
-            assertEquals("name not equal", TEST_NAME, 
newRec.get("name").toString());
-            assertEquals(null, newRec.get("type_long_test"));
+            assertThat(newRec).isNotNull();
+            assertThat(newRec.get("name").toString()).isEqualTo(TEST_NAME);
+            
assertThat(newRec.get("type_enum").toString()).isEqualTo(TEST_ENUM_COLOR.toString());
+            assertThat(newRec.get("type_long_test")).isNull();
         }
     }
 
     /** This test validates proper serialization with specific (generated 
POJO) types. */
     @Test
-    public void testDeserializeToSpecificType() throws IOException {
+    void testDeserializeToSpecificType() throws IOException {
 
         DatumReader<User> datumReader = new SpecificDatumReader<>(userSchema);
 
@@ -386,16 +365,15 @@ public class AvroRecordInputFormatTest {
             User rec = dataFileReader.next();
 
             // check if record has been read correctly
-            assertNotNull(rec);
-            assertEquals("name not equal", TEST_NAME, 
rec.get("name").toString());
-            assertEquals(
-                    "enum not equal", TEST_ENUM_COLOR.toString(), 
rec.get("type_enum").toString());
+            assertThat(rec).isNotNull();
+            assertThat(rec.get("name").toString()).isEqualTo(TEST_NAME);
+            
assertThat(rec.get("type_enum").toString()).isEqualTo(TEST_ENUM_COLOR.toString());
 
             // now serialize it with our framework:
             ExecutionConfig ec = new ExecutionConfig();
             TypeInformation<User> te = 
TypeExtractor.createTypeInfo(User.class);
 
-            assertEquals(AvroTypeInfo.class, te.getClass());
+            assertThat(te).isExactlyInstanceOf(AvroTypeInfo.class);
             TypeSerializer<User> tser = te.createSerializer(ec);
 
             ByteArrayOutputStream out = new ByteArrayOutputStream();
@@ -410,10 +388,9 @@ public class AvroRecordInputFormatTest {
             }
 
             // check if it is still the same
-            assertNotNull(newRec);
-            assertEquals("name not equal", TEST_NAME, 
newRec.getName().toString());
-            assertEquals(
-                    "enum not equal", TEST_ENUM_COLOR.toString(), 
newRec.getTypeEnum().toString());
+            assertThat(newRec).isNotNull();
+            assertThat(newRec.get("name").toString()).isEqualTo(TEST_NAME);
+            
assertThat(newRec.get("type_enum").toString()).isEqualTo(TEST_ENUM_COLOR.toString());
         }
     }
 
@@ -422,7 +399,7 @@ public class AvroRecordInputFormatTest {
      * GenericRecord.
      */
     @Test
-    public void testDeserializationGenericRecord() throws IOException {
+    void testDeserializationGenericRecord() throws IOException {
         Configuration parameters = new Configuration();
 
         AvroInputFormat<GenericRecord> format =
@@ -445,46 +422,40 @@ public class AvroRecordInputFormatTest {
         try {
             format.configure(parameters);
             FileInputSplit[] splits = format.createInputSplits(1);
-            assertEquals(splits.length, 1);
+            assertThat(splits).hasSize(1);
             format.open(splits[0]);
 
             GenericRecord u = format.nextRecord(null);
-            assertNotNull(u);
-            assertEquals("The schemas should be equal", userSchema, 
u.getSchema());
+            assertThat(u).isNotNull();
+            assertThat(u.getSchema()).isEqualTo(userSchema);
 
             String name = u.get("name").toString();
-            assertNotNull("empty record", name);
-            assertEquals("name not equal", TEST_NAME, name);
+            assertThat(name).isEqualTo(TEST_NAME);
 
             // check arrays
             List<CharSequence> sl = (List<CharSequence>) 
u.get("type_array_string");
-            assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, 
sl.get(0).toString());
-            assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, 
sl.get(1).toString());
+            assertThat(sl.get(0).toString()).isEqualTo(TEST_ARRAY_STRING_1);
+            assertThat(sl.get(1).toString()).isEqualTo(TEST_ARRAY_STRING_2);
 
             List<Boolean> bl = (List<Boolean>) u.get("type_array_boolean");
-            assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, 
bl.get(0));
-            assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, 
bl.get(1));
+            assertThat(bl).containsExactly(TEST_ARRAY_BOOLEAN_1, 
TEST_ARRAY_BOOLEAN_2);
 
             // check enums
             GenericData.EnumSymbol enumValue = (GenericData.EnumSymbol) 
u.get("type_enum");
-            assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), 
enumValue.toString());
+            assertThat(enumValue).isEqualTo(TEST_ENUM_COLOR);
 
             // check maps
             Map<CharSequence, Long> lm = (Map<CharSequence, Long>) 
u.get("type_map");
-            assertEquals(
-                    "map value of key 1 not equal",
-                    TEST_MAP_VALUE1,
-                    lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
-            assertEquals(
-                    "map value of key 2 not equal",
-                    TEST_MAP_VALUE2,
-                    lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
-
-            assertFalse("expecting second element", format.reachedEnd());
-            assertNotNull("expecting second element", format.nextRecord(u));
-
-            assertNull(format.nextRecord(u));
-            assertTrue(format.reachedEnd());
+            assertThat(lm)
+                    .containsOnly(
+                            entry(new Utf8(TEST_MAP_KEY1), TEST_MAP_VALUE1),
+                            entry(new Utf8(TEST_MAP_KEY2), TEST_MAP_VALUE2));
+
+            assertThat(format.reachedEnd()).as("expecting second 
element").isFalse();
+            assertThat(format.nextRecord(u)).as("expecting second 
element").isNotNull();
+
+            assertThat(format.nextRecord(u)).isNull();
+            assertThat(format.reachedEnd()).isTrue();
         } finally {
             format.close();
         }
@@ -497,7 +468,7 @@ public class AvroRecordInputFormatTest {
      * @throws IOException if there is an error
      */
     @Test
-    public void testDeserializationGenericRecordReuseAvroValueFalse() throws 
IOException {
+    void testDeserializationGenericRecordReuseAvroValueFalse() throws 
IOException {
         Configuration parameters = new Configuration();
 
         AvroInputFormat<GenericRecord> format =
@@ -509,7 +480,7 @@ public class AvroRecordInputFormatTest {
     }
 
     @SuppressWarnings("ResultOfMethodCallIgnored")
-    @After
+    @AfterEach
     public void deleteFiles() {
         testFile.delete();
     }
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
index 80b85cc..2168a25 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.formats.avro;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.formats.avro.generated.LogicalTimeRecord;
 import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.table.data.GenericRowData;
@@ -37,8 +36,7 @@ import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumWriter;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayOutputStream;
 import java.math.BigDecimal;
@@ -69,25 +67,23 @@ import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.apache.flink.table.api.DataTypes.TIME;
 import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
 import static org.apache.flink.table.api.DataTypes.TINYINT;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for the Avro serialization and deserialization schema. */
-public class AvroRowDataDeSerializationSchemaTest {
+class AvroRowDataDeSerializationSchemaTest {
 
     @Test
-    public void testDeserializeNullRow() throws Exception {
+    void testDeserializeNullRow() throws Exception {
         final DataType dataType = ROW(FIELD("bool", BOOLEAN())).nullable();
         AvroRowDataDeserializationSchema deserializationSchema =
                 createDeserializationSchema(dataType);
 
-        assertNull(deserializationSchema.deserialize(null));
+        assertThat(deserializationSchema.deserialize(null)).isNull();
     }
 
     @Test
-    public void testSerializeDeserialize() throws Exception {
+    void testSerializeDeserialize() throws Exception {
         final DataType dataType =
                 ROW(
                                 FIELD("bool", BOOLEAN()),
@@ -175,11 +171,11 @@ public class AvroRowDataDeSerializationSchemaTest {
         RowData rowData = deserializationSchema.deserialize(input);
         byte[] output = serializationSchema.serialize(rowData);
 
-        assertArrayEquals(input, output);
+        assertThat(output).isEqualTo(input);
     }
 
     @Test
-    public void testSpecificType() throws Exception {
+    void testSpecificType() throws Exception {
         LogicalTimeRecord record = new LogicalTimeRecord();
         Instant timestamp = Instant.parse("2010-06-30T01:20:20Z");
         record.setTypeTimestampMillis(timestamp);
@@ -206,34 +202,33 @@ public class AvroRowDataDeSerializationSchemaTest {
         RowData rowData = deserializationSchema.deserialize(input);
         byte[] output = serializationSchema.serialize(rowData);
         RowData rowData2 = deserializationSchema.deserialize(output);
-        Assert.assertEquals(rowData, rowData2);
-        Assert.assertEquals(timestamp, rowData.getTimestamp(0, 3).toInstant());
-        Assert.assertEquals(
-                "2014-03-01",
-                DataFormatConverters.LocalDateConverter.INSTANCE
-                        .toExternal(rowData.getInt(1))
-                        .toString());
-        Assert.assertEquals(
-                "12:12:12",
-                DataFormatConverters.LocalTimeConverter.INSTANCE
-                        .toExternal(rowData.getInt(2))
-                        .toString());
+        assertThat(rowData2).isEqualTo(rowData);
+        assertThat(rowData.getTimestamp(0, 
3).toInstant()).isEqualTo(timestamp);
+
+        assertThat(
+                        DataFormatConverters.LocalDateConverter.INSTANCE
+                                .toExternal(rowData.getInt(1))
+                                .toString())
+                .isEqualTo("2014-03-01");
+        assertThat(
+                        DataFormatConverters.LocalTimeConverter.INSTANCE
+                                .toExternal(rowData.getInt(2))
+                                .toString())
+                .isEqualTo("12:12:12");
     }
 
     @Test
-    public void testSerializationWithTypesMismatch() throws Exception {
+    void testSerializationWithTypesMismatch() throws Exception {
         AvroRowDataSerializationSchema serializationSchema =
                 createSerializationSchema(ROW(FIELD("f0", INT()), FIELD("f1", 
STRING())).notNull());
         GenericRowData rowData = new GenericRowData(2);
         rowData.setField(0, 1);
-        rowData.setField(0, 2);
-        String errorMessage = "Fail to serialize at field: f1.";
-        try {
-            serializationSchema.serialize(rowData);
-            fail("expecting exception message: " + errorMessage);
-        } catch (Throwable t) {
-            assertThat(t, FlinkMatchers.containsMessage(errorMessage));
-        }
+        rowData.setField(1, 2); // This should be a STRING
+
+        assertThatThrownBy(() -> serializationSchema.serialize(rowData))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining("Failed to serialize row.")
+                .hasStackTraceContaining("Fail to serialize at field: f1");
     }
 
     private AvroRowDataSerializationSchema createSerializationSchema(DataType 
dataType)
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
index 816b986..c7bb6c5 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
@@ -26,17 +26,17 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.specific.SpecificRecord;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for the Avro serialization and deserialization schema. */
-public class AvroRowDeSerializationSchemaTest {
+class AvroRowDeSerializationSchemaTest {
 
     @Test
-    public void testSpecificSerializeDeserializeFromClass() throws IOException 
{
+    void testSpecificSerializeDeserializeFromClass() throws IOException {
         final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
testData =
                 AvroTestUtils.getSpecificTestData();
 
@@ -48,11 +48,11 @@ public class AvroRowDeSerializationSchemaTest {
         final byte[] bytes = serializationSchema.serialize(testData.f2);
         final Row actual = deserializationSchema.deserialize(bytes);
 
-        assertEquals(testData.f2, actual);
+        assertThat(actual).isEqualTo(testData.f2);
     }
 
     @Test
-    public void testSpecificSerializeDeserializeFromSchema() throws 
IOException {
+    void testSpecificSerializeDeserializeFromSchema() throws IOException {
         final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
testData =
                 AvroTestUtils.getSpecificTestData();
         final String schemaString = testData.f1.getSchema().toString();
@@ -65,11 +65,11 @@ public class AvroRowDeSerializationSchemaTest {
         final byte[] bytes = serializationSchema.serialize(testData.f2);
         final Row actual = deserializationSchema.deserialize(bytes);
 
-        assertEquals(testData.f2, actual);
+        assertThat(actual).isEqualTo(testData.f2);
     }
 
     @Test
-    public void testGenericSerializeDeserialize() throws IOException {
+    void testGenericSerializeDeserialize() throws IOException {
         final Tuple3<GenericRecord, Row, Schema> testData = 
AvroTestUtils.getGenericTestData();
 
         final AvroRowSerializationSchema serializationSchema =
@@ -80,11 +80,11 @@ public class AvroRowDeSerializationSchemaTest {
         final byte[] bytes = serializationSchema.serialize(testData.f1);
         final Row actual = deserializationSchema.deserialize(bytes);
 
-        assertEquals(testData.f1, actual);
+        assertThat(actual).isEqualTo(testData.f1);
     }
 
     @Test
-    public void testSpecificSerializeFromClassSeveralTimes() throws 
IOException {
+    void testSpecificSerializeFromClassSeveralTimes() throws IOException {
         final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
testData =
                 AvroTestUtils.getSpecificTestData();
 
@@ -98,11 +98,11 @@ public class AvroRowDeSerializationSchemaTest {
         final byte[] bytes = serializationSchema.serialize(testData.f2);
         final Row actual = deserializationSchema.deserialize(bytes);
 
-        assertEquals(testData.f2, actual);
+        assertThat(actual).isEqualTo(testData.f2);
     }
 
     @Test
-    public void testSpecificSerializeFromSchemaSeveralTimes() throws 
IOException {
+    void testSpecificSerializeFromSchemaSeveralTimes() throws IOException {
         final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
testData =
                 AvroTestUtils.getSpecificTestData();
         final String schemaString = testData.f1.getSchema().toString();
@@ -117,11 +117,11 @@ public class AvroRowDeSerializationSchemaTest {
         final byte[] bytes = serializationSchema.serialize(testData.f2);
         final Row actual = deserializationSchema.deserialize(bytes);
 
-        assertEquals(testData.f2, actual);
+        assertThat(actual).isEqualTo(testData.f2);
     }
 
     @Test
-    public void testGenericSerializeSeveralTimes() throws IOException {
+    void testGenericSerializeSeveralTimes() throws IOException {
         final Tuple3<GenericRecord, Row, Schema> testData = 
AvroTestUtils.getGenericTestData();
 
         final AvroRowSerializationSchema serializationSchema =
@@ -134,11 +134,11 @@ public class AvroRowDeSerializationSchemaTest {
         final byte[] bytes = serializationSchema.serialize(testData.f1);
         final Row actual = deserializationSchema.deserialize(bytes);
 
-        assertEquals(testData.f1, actual);
+        assertThat(actual).isEqualTo(testData.f1);
     }
 
     @Test
-    public void testSpecificDeserializeFromClassSeveralTimes() throws 
IOException {
+    void testSpecificDeserializeFromClassSeveralTimes() throws IOException {
         final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
testData =
                 AvroTestUtils.getSpecificTestData();
 
@@ -152,11 +152,11 @@ public class AvroRowDeSerializationSchemaTest {
         deserializationSchema.deserialize(bytes);
         final Row actual = deserializationSchema.deserialize(bytes);
 
-        assertEquals(testData.f2, actual);
+        assertThat(actual).isEqualTo(testData.f2);
     }
 
     @Test
-    public void testSpecificDeserializeFromSchemaSeveralTimes() throws 
IOException {
+    void testSpecificDeserializeFromSchemaSeveralTimes() throws IOException {
         final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
testData =
                 AvroTestUtils.getSpecificTestData();
         final String schemaString = testData.f1.getSchema().toString();
@@ -171,11 +171,11 @@ public class AvroRowDeSerializationSchemaTest {
         deserializationSchema.deserialize(bytes);
         final Row actual = deserializationSchema.deserialize(bytes);
 
-        assertEquals(testData.f2, actual);
+        assertThat(actual).isEqualTo(testData.f2);
     }
 
     @Test
-    public void testGenericDeserializeSeveralTimes() throws IOException {
+    void testGenericDeserializeSeveralTimes() throws IOException {
         final Tuple3<GenericRecord, Row, Schema> testData = 
AvroTestUtils.getGenericTestData();
 
         final AvroRowSerializationSchema serializationSchema =
@@ -188,11 +188,11 @@ public class AvroRowDeSerializationSchemaTest {
         deserializationSchema.deserialize(bytes);
         final Row actual = deserializationSchema.deserialize(bytes);
 
-        assertEquals(testData.f1, actual);
+        assertThat(actual).isEqualTo(testData.f1);
     }
 
     @Test
-    public void testSerializability() throws Exception {
+    void testSerializability() throws Exception {
         final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
testData =
                 AvroTestUtils.getSpecificTestData();
         final String schemaString = testData.f1.getSchema().toString();
@@ -228,6 +228,6 @@ public class AvroRowDeSerializationSchemaTest {
         deserCopy.deserialize(bytes);
         final Row actual = deserCopy.deserialize(bytes);
 
-        assertEquals(data, actual);
+        assertThat(actual).isEqualTo(data);
     }
 }
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSerializationSchemaTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSerializationSchemaTest.java
index 37f53df..2eb265a 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSerializationSchemaTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSerializationSchemaTest.java
@@ -24,41 +24,41 @@ import 
org.apache.flink.formats.avro.generated.UnionLogicalType;
 import org.apache.flink.formats.avro.utils.TestDataGenerator;
 
 import org.apache.avro.generic.GenericRecord;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.time.Instant;
 import java.util.Random;
 
 import static org.apache.flink.formats.avro.utils.AvroTestUtils.writeRecord;
-import static org.junit.Assert.assertArrayEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link AvroDeserializationSchema}. */
-public class AvroSerializationSchemaTest {
+class AvroSerializationSchemaTest {
 
     private static final Address address = 
TestDataGenerator.generateRandomAddress(new Random());
 
     @Test
-    public void testGenericRecord() throws Exception {
+    void testGenericRecord() throws Exception {
         SerializationSchema<GenericRecord> serializationSchema =
                 AvroSerializationSchema.forGeneric(address.getSchema());
 
         byte[] encodedAddress = writeRecord(address, Address.getClassSchema());
         byte[] dataSerialized = serializationSchema.serialize(address);
-        assertArrayEquals(encodedAddress, dataSerialized);
+        assertThat(dataSerialized).isEqualTo(encodedAddress);
     }
 
     @Test
-    public void testSpecificRecord() throws Exception {
+    void testSpecificRecord() throws Exception {
         SerializationSchema<Address> serializer =
                 AvroSerializationSchema.forSpecific(Address.class);
 
         byte[] encodedAddress = writeRecord(address, Address.getClassSchema());
         byte[] serializedAddress = serializer.serialize(address);
-        assertArrayEquals(encodedAddress, serializedAddress);
+        assertThat(serializedAddress).isEqualTo(encodedAddress);
     }
 
     @Test
-    public void testSpecificRecordWithUnionLogicalType() throws Exception {
+    void testSpecificRecordWithUnionLogicalType() throws Exception {
         Random rnd = new Random();
         UnionLogicalType data = new 
UnionLogicalType(Instant.ofEpochMilli(rnd.nextLong()));
         AvroSerializationSchema<UnionLogicalType> serializer =
@@ -66,6 +66,6 @@ public class AvroSerializationSchemaTest {
 
         byte[] encodedData = writeRecord(data);
         byte[] serializedData = serializer.serialize(data);
-        assertArrayEquals(encodedData, serializedData);
+        assertThat(serializedData).isEqualTo(encodedData);
     }
 }
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
index 4bd34c1..f6fe063 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
@@ -31,10 +31,9 @@ import org.apache.flink.formats.avro.generated.User;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.specific.SpecificDatumWriter;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.IOException;
@@ -48,13 +47,13 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Random;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Test the avro input format. (The testcase is mostly the getting started 
tutorial of avro)
  * http://avro.apache.org/docs/current/gettingstartedjava.html
  */
-public class AvroSplittableInputFormatTest {
+class AvroSplittableInputFormatTest {
 
     private File testFile;
 
@@ -81,9 +80,9 @@ public class AvroSplittableInputFormatTest {
 
     static final int NUM_RECORDS = 5000;
 
-    @Before
-    public void createFiles() throws IOException {
-        testFile = File.createTempFile("AvroSplittableInputFormatTest", null);
+    @BeforeEach
+    void createFiles(@TempDir java.nio.file.Path tempDir) throws IOException {
+        testFile = tempDir.resolve("AvroSplittableInputFormatTest").toFile();
 
         ArrayList<CharSequence> stringArray = new ArrayList<>();
         stringArray.add(TEST_ARRAY_STRING_1);
@@ -212,38 +211,36 @@ public class AvroSplittableInputFormatTest {
     }
 
     @Test
-    public void testSplittedIF() throws IOException {
+    void testSplittedIF() throws IOException {
         Configuration parameters = new Configuration();
 
         AvroInputFormat<User> format =
                 new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), 
User.class);
-
         format.configure(parameters);
+
         FileInputSplit[] splits = format.createInputSplits(4);
-        assertEquals(splits.length, 4);
+        assertThat(splits).hasSize(4);
+
         int elements = 0;
         int[] elementsPerSplit = new int[4];
         for (int i = 0; i < splits.length; i++) {
             format.open(splits[i]);
             while (!format.reachedEnd()) {
                 User u = format.nextRecord(null);
-                
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
+                assertThat(u.getName().toString()).startsWith(TEST_NAME);
                 elements++;
                 elementsPerSplit[i]++;
             }
             format.close();
         }
 
-        Assert.assertEquals(1604, elementsPerSplit[0]);
-        Assert.assertEquals(1203, elementsPerSplit[1]);
-        Assert.assertEquals(1203, elementsPerSplit[2]);
-        Assert.assertEquals(990, elementsPerSplit[3]);
-        Assert.assertEquals(NUM_RECORDS, elements);
+        assertThat(elementsPerSplit).containsExactly(1604, 1203, 1203, 990);
+        assertThat(elements).isEqualTo(NUM_RECORDS);
         format.close();
     }
 
     @Test
-    public void testAvroRecoveryWithFailureAtStart() throws Exception {
+    void testAvroRecoveryWithFailureAtStart() throws Exception {
         final int recordsUntilCheckpoint = 132;
 
         Configuration parameters = new Configuration();
@@ -253,7 +250,7 @@ public class AvroSplittableInputFormatTest {
         format.configure(parameters);
 
         FileInputSplit[] splits = format.createInputSplits(4);
-        assertEquals(splits.length, 4);
+        assertThat(splits).hasSize(4);
 
         int elements = 0;
         int[] elementsPerSplit = new int[4];
@@ -261,7 +258,7 @@ public class AvroSplittableInputFormatTest {
             format.reopen(splits[i], format.getCurrentState());
             while (!format.reachedEnd()) {
                 User u = format.nextRecord(null);
-                
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
+                assertThat(u.getName().toString()).startsWith(TEST_NAME);
                 elements++;
 
                 if (format.getRecordsReadFromBlock() == 
recordsUntilCheckpoint) {
@@ -276,23 +273,20 @@ public class AvroSplittableInputFormatTest {
                             new AvroInputFormat<>(new 
Path(testFile.getAbsolutePath()), User.class);
 
                     format.reopen(splits[i], state);
-                    assertEquals(format.getRecordsReadFromBlock(), 
recordsUntilCheckpoint);
+                    
assertThat(format.getRecordsReadFromBlock()).isEqualTo(recordsUntilCheckpoint);
                 }
                 elementsPerSplit[i]++;
             }
             format.close();
         }
 
-        Assert.assertEquals(1604, elementsPerSplit[0]);
-        Assert.assertEquals(1203, elementsPerSplit[1]);
-        Assert.assertEquals(1203, elementsPerSplit[2]);
-        Assert.assertEquals(990, elementsPerSplit[3]);
-        Assert.assertEquals(NUM_RECORDS, elements);
+        assertThat(elementsPerSplit).containsExactly(1604, 1203, 1203, 990);
+        assertThat(elements).isEqualTo(NUM_RECORDS);
         format.close();
     }
 
     @Test
-    public void testAvroRecovery() throws Exception {
+    void testAvroRecovery() throws Exception {
         final int recordsUntilCheckpoint = 132;
 
         Configuration parameters = new Configuration();
@@ -302,7 +296,7 @@ public class AvroSplittableInputFormatTest {
         format.configure(parameters);
 
         FileInputSplit[] splits = format.createInputSplits(4);
-        assertEquals(splits.length, 4);
+        assertThat(splits).hasSize(4);
 
         int elements = 0;
         int[] elementsPerSplit = new int[4];
@@ -310,7 +304,7 @@ public class AvroSplittableInputFormatTest {
             format.open(splits[i]);
             while (!format.reachedEnd()) {
                 User u = format.nextRecord(null);
-                
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
+                assertThat(u.getName().toString()).startsWith(TEST_NAME);
                 elements++;
 
                 if (format.getRecordsReadFromBlock() == 
recordsUntilCheckpoint) {
@@ -325,18 +319,15 @@ public class AvroSplittableInputFormatTest {
                             new AvroInputFormat<>(new 
Path(testFile.getAbsolutePath()), User.class);
 
                     format.reopen(splits[i], state);
-                    assertEquals(format.getRecordsReadFromBlock(), 
recordsUntilCheckpoint);
+                    
assertThat(recordsUntilCheckpoint).isEqualTo(format.getRecordsReadFromBlock());
                 }
                 elementsPerSplit[i]++;
             }
             format.close();
         }
 
-        Assert.assertEquals(1604, elementsPerSplit[0]);
-        Assert.assertEquals(1203, elementsPerSplit[1]);
-        Assert.assertEquals(1203, elementsPerSplit[2]);
-        Assert.assertEquals(990, elementsPerSplit[3]);
-        Assert.assertEquals(NUM_RECORDS, elements);
+        assertThat(elementsPerSplit).containsExactly(1604, 1203, 1203, 990);
+        assertThat(elements).isEqualTo(NUM_RECORDS);
         format.close();
     }
 
@@ -364,7 +355,7 @@ public class AvroSplittableInputFormatTest {
        </dependency>
 
     @Test
-    public void testHadoop() throws Exception {
+    void testHadoop() throws Exception {
        JobConf jf = new JobConf();
        FileInputFormat.addInputPath(jf, new 
org.apache.hadoop.fs.Path(testFile.toURI()));
        
jf.setBoolean(org.apache.avro.mapred.AvroInputFormat.IGNORE_FILES_WITHOUT_EXTENSION_KEY,
 false);
@@ -386,10 +377,4 @@ public class AvroSplittableInputFormatTest {
        }
        System.out.println("Status " + Arrays.toString(elementsPerSplit));
     } */
-
-    @After
-    @SuppressWarnings("ResultOfMethodCallIgnored")
-    public void deleteFiles() {
-        testFile.delete();
-    }
 }
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroStreamingFileSinkITCase.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroStreamingFileSinkITCase.java
index c05e381..e2752aa 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroStreamingFileSinkITCase.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroStreamingFileSinkITCase.java
@@ -51,9 +51,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Simple integration test case for writing bulk encoded files with the {@link 
StreamingFileSink}
@@ -139,18 +137,16 @@ public class AvroStreamingFileSinkITCase extends 
AbstractTestBase {
     private static <T> void validateResults(
             File folder, DatumReader<T> datumReader, List<T> expected) throws 
Exception {
         File[] buckets = folder.listFiles();
-        assertNotNull(buckets);
-        assertEquals(1, buckets.length);
+        assertThat(buckets).hasSize(1);
 
         File[] partFiles = buckets[0].listFiles();
-        assertNotNull(partFiles);
-        assertEquals(2, partFiles.length);
+        assertThat(partFiles).hasSize(2);
 
         for (File partFile : partFiles) {
-            assertTrue(partFile.length() > 0);
+            assertThat(partFile).isNotEmpty();
 
             final List<T> fileContent = readAvroFile(partFile, datumReader);
-            assertEquals(expected, fileContent);
+            assertThat(fileContent).isEqualTo(expected);
         }
     }
 
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
index 10f1533..65b470f 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.util.StringUtils;
 
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -48,17 +48,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /**
  * Tests the {@link DataOutputEncoder} and {@link DataInputDecoder} classes 
for Avro serialization.
  */
-public class EncoderDecoderTest {
+class EncoderDecoderTest {
 
     @Test
-    public void testComplexStringsDirecty() {
+    void testComplexStringsDirectly() {
         try {
             Random rnd = new Random(349712539451944123L);
 
@@ -87,7 +86,7 @@ public class EncoderDecoderTest {
 
                     String deserialized = decoder.readString();
 
-                    assertEquals(testString, deserialized);
+                    assertThat(deserialized).isEqualTo(testString);
                 }
             }
         } catch (Exception e) {
@@ -98,7 +97,7 @@ public class EncoderDecoderTest {
     }
 
     @Test
-    public void testPrimitiveTypes() {
+    void testPrimitiveTypes() {
 
         testObjectSerialization(Boolean.TRUE);
         testObjectSerialization(Boolean.FALSE);
@@ -174,7 +173,7 @@ public class EncoderDecoderTest {
     }
 
     @Test
-    public void testArrayTypes() {
+    void testArrayTypes() {
         {
             int[] array = new int[] {1, 2, 3, 4, 5};
             testObjectSerialization(array);
@@ -198,7 +197,7 @@ public class EncoderDecoderTest {
     }
 
     @Test
-    public void testEmptyArray() {
+    void testEmptyArray() {
         {
             int[] array = new int[0];
             testObjectSerialization(array);
@@ -222,7 +221,7 @@ public class EncoderDecoderTest {
     }
 
     @Test
-    public void testObjects() {
+    void testObjects() {
         // simple object containing only primitives
         {
             testObjectSerialization(new Book(976243875L, "The Serialization 
Odysse", 42));
@@ -248,12 +247,12 @@ public class EncoderDecoderTest {
     }
 
     @Test
-    public void testNestedObjectsWithCollections() {
+    void testNestedObjectsWithCollections() {
         testObjectSerialization(new ComplexNestedObject2(true));
     }
 
     @Test
-    public void testGeneratedObjectWithNullableFields() {
+    void testGeneratedObjectWithNullableFields() {
         List<CharSequence> strings =
                 Arrays.asList(
                         new CharSequence[] {
@@ -311,7 +310,7 @@ public class EncoderDecoderTest {
     }
 
     @Test
-    public void testVarLenCountEncoding() {
+    void testVarLenCountEncoding() {
         try {
             long[] values =
                     new long[] {
@@ -351,7 +350,9 @@ public class EncoderDecoderTest {
 
                 for (long val : values) {
                     long read = DataInputDecoder.readVarLongCount(dataIn);
-                    assertEquals("Wrong var-len encoded value read.", val, 
read);
+                    assertThat(read)
+                            .withFailMessage("Wrong var-len encoded value 
read.")
+                            .isEqualTo(val);
                 }
             }
         } catch (Exception e) {
@@ -414,24 +415,24 @@ public class EncoderDecoderTest {
             if (obj.getClass().isArray()) {
                 Class<?> clazz = obj.getClass();
                 if (clazz == byte[].class) {
-                    assertArrayEquals(message, (byte[]) obj, (byte[]) result);
+                    assertThat((byte[]) result).containsExactly((byte[]) obj);
                 } else if (clazz == short[].class) {
-                    assertArrayEquals(message, (short[]) obj, (short[]) 
result);
+                    assertThat((short[]) result).containsExactly((short[]) 
obj);
                 } else if (clazz == int[].class) {
-                    assertArrayEquals(message, (int[]) obj, (int[]) result);
+                    assertThat((int[]) result).containsExactly((int[]) obj);
                 } else if (clazz == long[].class) {
-                    assertArrayEquals(message, (long[]) obj, (long[]) result);
+                    assertThat((long[]) result).containsExactly((long[]) obj);
                 } else if (clazz == char[].class) {
-                    assertArrayEquals(message, (char[]) obj, (char[]) result);
+                    assertThat((char[]) result).containsExactly((char[]) obj);
                 } else if (clazz == float[].class) {
-                    assertArrayEquals(message, (float[]) obj, (float[]) 
result, 0.0f);
+                    assertThat((float[]) result).containsExactly((float[]) 
obj);
                 } else if (clazz == double[].class) {
-                    assertArrayEquals(message, (double[]) obj, (double[]) 
result, 0.0);
+                    assertThat((double[]) result).containsExactly((double[]) 
obj);
                 } else {
-                    assertArrayEquals(message, (Object[]) obj, (Object[]) 
result);
+                    assertThat((Object[]) result).containsExactly((Object[]) 
obj);
                 }
             } else {
-                assertEquals(message, obj, result);
+                assertThat(result).withFailMessage(message).isEqualTo(obj);
             }
         } catch (Exception e) {
             System.err.println(e.getMessage());
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java
index 13a9188..c11c4bf 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java
@@ -27,7 +27,7 @@ import org.apache.avro.SchemaBuilder;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -35,16 +35,15 @@ import java.io.OutputStream;
 import java.util.Random;
 
 import static org.apache.flink.formats.avro.utils.AvroTestUtils.writeRecord;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link RegistryAvroDeserializationSchema}. */
-public class RegistryAvroDeserializationSchemaTest {
+class RegistryAvroDeserializationSchemaTest {
 
     private static final Address address = 
TestDataGenerator.generateRandomAddress(new Random());
 
     @Test
-    public void testGenericRecordReadWithCompatibleSchema() throws IOException 
{
+    void testGenericRecordReadWithCompatibleSchema() throws IOException {
         RegistryAvroDeserializationSchema<GenericRecord> deserializer =
                 new RegistryAvroDeserializationSchema<>(
                         GenericRecord.class,
@@ -70,13 +69,13 @@ public class RegistryAvroDeserializationSchemaTest {
 
         GenericRecord genericRecord =
                 deserializer.deserialize(writeRecord(address, 
Address.getClassSchema()));
-        assertEquals(address.getNum(), genericRecord.get("num"));
-        assertEquals(address.getStreet(), 
genericRecord.get("street").toString());
-        assertNull(genericRecord.get("country"));
+        assertThat(genericRecord.get("num")).isEqualTo(address.getNum());
+        
assertThat(genericRecord.get("street").toString()).isEqualTo(address.getStreet());
+        assertThat(genericRecord.get("country")).isNull();
     }
 
     @Test
-    public void testSpecificRecordReadMoreFieldsThanWereWritten() throws 
IOException {
+    void testSpecificRecordReadMoreFieldsThanWereWritten() throws IOException {
         Schema smallerUserSchema =
                 new Schema.Parser()
                         .parse(
@@ -111,7 +110,7 @@ public class RegistryAvroDeserializationSchemaTest {
         SimpleRecord simpleRecord =
                 deserializer.deserialize(writeRecord(smallUser, 
smallerUserSchema));
 
-        assertEquals("someName", simpleRecord.getName().toString());
-        assertNull(simpleRecord.getOptionalField());
+        assertThat(simpleRecord.getName().toString()).isEqualTo("someName");
+        assertThat(simpleRecord.getOptionalField()).isNull();
     }
 }
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
index 0fc451f..5640eac 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
@@ -38,41 +38,35 @@ import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.io.DecoderFactory;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link AvroSchemaConverter}. */
-public class AvroSchemaConverterTest {
-
-    @Rule public ExpectedException thrown = ExpectedException.none();
+class AvroSchemaConverterTest {
 
     @Test
-    public void testAvroClassConversion() {
+    void testAvroClassConversion() {
         validateUserSchema(AvroSchemaConverter.convertToTypeInfo(User.class));
     }
 
     @Test
-    public void testAvroSchemaConversion() {
+    void testAvroSchemaConversion() {
         final String schema = User.getClassSchema().toString(true);
         validateUserSchema(AvroSchemaConverter.convertToTypeInfo(schema));
     }
 
     @Test
-    public void testConvertAvroSchemaToDataType() {
+    void testConvertAvroSchemaToDataType() {
         final String schema = User.getClassSchema().toString(true);
         validateUserSchema(AvroSchemaConverter.convertToDataType(schema));
     }
 
     @Test
-    public void testAddingOptionalField() throws IOException {
+    void testAddingOptionalField() throws IOException {
         Schema oldSchema =
                 SchemaBuilder.record("record")
                         .fields()
@@ -104,18 +98,17 @@ public class AvroSchemaConverterTest {
                         null,
                         DecoderFactory.get()
                                 .binaryDecoder(serializedRecord, 0, 
serializedRecord.length, null));
-        assertThat(
-                newRecord,
-                equalTo(
+        assertThat(newRecord)
+                .isEqualTo(
                         new GenericRecordBuilder(newSchema)
                                 .set("category_id", 1L)
                                 .set("name", "test")
                                 .set("description", null)
-                                .build()));
+                                .build());
     }
 
     @Test
-    public void testInvalidRawTypeAvroSchemaConversion() {
+    void testInvalidRawTypeAvroSchemaConversion() {
         RowType rowType =
                 (RowType)
                         TableSchema.builder()
@@ -124,13 +117,14 @@ public class AvroSchemaConverterTest {
                                 .build()
                                 .toRowDataType()
                                 .getLogicalType();
-        thrown.expect(UnsupportedOperationException.class);
-        thrown.expectMessage("Unsupported to derive Schema for type: RAW");
-        AvroSchemaConverter.convertToSchema(rowType);
+
+        assertThatThrownBy(() -> AvroSchemaConverter.convertToSchema(rowType))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageStartingWith("Unsupported to derive Schema for 
type: RAW");
     }
 
     @Test
-    public void testInvalidTimestampTypeAvroSchemaConversion() {
+    void testInvalidTimestampTypeAvroSchemaConversion() {
         RowType rowType =
                 (RowType)
                         TableSchema.builder()
@@ -139,15 +133,16 @@ public class AvroSchemaConverterTest {
                                 .build()
                                 .toRowDataType()
                                 .getLogicalType();
-        thrown.expect(IllegalArgumentException.class);
-        thrown.expectMessage(
-                "Avro does not support TIMESTAMP type with precision: 9, "
-                        + "it only supports precision less than 3.");
-        AvroSchemaConverter.convertToSchema(rowType);
+
+        assertThatThrownBy(() -> AvroSchemaConverter.convertToSchema(rowType))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "Avro does not support TIMESTAMP type with precision: 
9, "
+                                + "it only supports precision less than 3.");
     }
 
     @Test
-    public void testInvalidTimeTypeAvroSchemaConversion() {
+    void testInvalidTimeTypeAvroSchemaConversion() {
         RowType rowType =
                 (RowType)
                         TableSchema.builder()
@@ -156,14 +151,15 @@ public class AvroSchemaConverterTest {
                                 .build()
                                 .toRowDataType()
                                 .getLogicalType();
-        thrown.expect(IllegalArgumentException.class);
-        thrown.expectMessage(
-                "Avro does not support TIME type with precision: 6, it only 
supports precision less than 3.");
-        AvroSchemaConverter.convertToSchema(rowType);
+
+        assertThatThrownBy(() -> AvroSchemaConverter.convertToSchema(rowType))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "Avro does not support TIME type with precision: 6, it 
only supports precision less than 3.");
     }
 
     @Test
-    public void testRowTypeAvroSchemaConversion() {
+    void testRowTypeAvroSchemaConversion() {
         RowType rowType =
                 (RowType)
                         TableSchema.builder()
@@ -185,62 +181,62 @@ public class AvroSchemaConverterTest {
                                 .toRowDataType()
                                 .getLogicalType();
         Schema schema = AvroSchemaConverter.convertToSchema(rowType);
-        assertEquals(
-                "{\n"
-                        + "  \"type\" : \"record\",\n"
-                        + "  \"name\" : \"record\",\n"
-                        + "  \"fields\" : [ {\n"
-                        + "    \"name\" : \"row1\",\n"
-                        + "    \"type\" : [ \"null\", {\n"
-                        + "      \"type\" : \"record\",\n"
-                        + "      \"name\" : \"record_row1\",\n"
-                        + "      \"fields\" : [ {\n"
-                        + "        \"name\" : \"a\",\n"
-                        + "        \"type\" : [ \"null\", \"string\" ],\n"
-                        + "        \"default\" : null\n"
-                        + "      } ]\n"
-                        + "    } ],\n"
-                        + "    \"default\" : null\n"
-                        + "  }, {\n"
-                        + "    \"name\" : \"row2\",\n"
-                        + "    \"type\" : [ \"null\", {\n"
-                        + "      \"type\" : \"record\",\n"
-                        + "      \"name\" : \"record_row2\",\n"
-                        + "      \"fields\" : [ {\n"
-                        + "        \"name\" : \"b\",\n"
-                        + "        \"type\" : [ \"null\", \"string\" ],\n"
-                        + "        \"default\" : null\n"
-                        + "      } ]\n"
-                        + "    } ],\n"
-                        + "    \"default\" : null\n"
-                        + "  }, {\n"
-                        + "    \"name\" : \"row3\",\n"
-                        + "    \"type\" : [ \"null\", {\n"
-                        + "      \"type\" : \"record\",\n"
-                        + "      \"name\" : \"record_row3\",\n"
-                        + "      \"fields\" : [ {\n"
-                        + "        \"name\" : \"row3\",\n"
-                        + "        \"type\" : [ \"null\", {\n"
-                        + "          \"type\" : \"record\",\n"
-                        + "          \"name\" : \"record_row3_row3\",\n"
-                        + "          \"fields\" : [ {\n"
-                        + "            \"name\" : \"c\",\n"
-                        + "            \"type\" : [ \"null\", \"string\" ],\n"
-                        + "            \"default\" : null\n"
-                        + "          } ]\n"
-                        + "        } ],\n"
-                        + "        \"default\" : null\n"
-                        + "      } ]\n"
-                        + "    } ],\n"
-                        + "    \"default\" : null\n"
-                        + "  } ]\n"
-                        + "}",
-                schema.toString(true));
+        assertThat(schema.toString(true))
+                .isEqualTo(
+                        "{\n"
+                                + "  \"type\" : \"record\",\n"
+                                + "  \"name\" : \"record\",\n"
+                                + "  \"fields\" : [ {\n"
+                                + "    \"name\" : \"row1\",\n"
+                                + "    \"type\" : [ \"null\", {\n"
+                                + "      \"type\" : \"record\",\n"
+                                + "      \"name\" : \"record_row1\",\n"
+                                + "      \"fields\" : [ {\n"
+                                + "        \"name\" : \"a\",\n"
+                                + "        \"type\" : [ \"null\", \"string\" 
],\n"
+                                + "        \"default\" : null\n"
+                                + "      } ]\n"
+                                + "    } ],\n"
+                                + "    \"default\" : null\n"
+                                + "  }, {\n"
+                                + "    \"name\" : \"row2\",\n"
+                                + "    \"type\" : [ \"null\", {\n"
+                                + "      \"type\" : \"record\",\n"
+                                + "      \"name\" : \"record_row2\",\n"
+                                + "      \"fields\" : [ {\n"
+                                + "        \"name\" : \"b\",\n"
+                                + "        \"type\" : [ \"null\", \"string\" 
],\n"
+                                + "        \"default\" : null\n"
+                                + "      } ]\n"
+                                + "    } ],\n"
+                                + "    \"default\" : null\n"
+                                + "  }, {\n"
+                                + "    \"name\" : \"row3\",\n"
+                                + "    \"type\" : [ \"null\", {\n"
+                                + "      \"type\" : \"record\",\n"
+                                + "      \"name\" : \"record_row3\",\n"
+                                + "      \"fields\" : [ {\n"
+                                + "        \"name\" : \"row3\",\n"
+                                + "        \"type\" : [ \"null\", {\n"
+                                + "          \"type\" : \"record\",\n"
+                                + "          \"name\" : 
\"record_row3_row3\",\n"
+                                + "          \"fields\" : [ {\n"
+                                + "            \"name\" : \"c\",\n"
+                                + "            \"type\" : [ \"null\", 
\"string\" ],\n"
+                                + "            \"default\" : null\n"
+                                + "          } ]\n"
+                                + "        } ],\n"
+                                + "        \"default\" : null\n"
+                                + "      } ]\n"
+                                + "    } ],\n"
+                                + "    \"default\" : null\n"
+                                + "  } ]\n"
+                                + "}");
     }
 
     /** Test convert nullable data type to Avro schema then converts back. */
     @Test
-    public void testDataTypeToSchemaToDataTypeNullable() {
+    void testDataTypeToSchemaToDataTypeNullable() {
         DataType dataType =
                 DataTypes.ROW(
                         DataTypes.FIELD("f_null", DataTypes.NULL()),
@@ -271,12 +267,12 @@ public class AvroSchemaConverterTest {
                         DataTypes.FIELD("f_array", 
DataTypes.ARRAY(DataTypes.INT())));
         Schema schema = 
AvroSchemaConverter.convertToSchema(dataType.getLogicalType());
         DataType converted = 
AvroSchemaConverter.convertToDataType(schema.toString());
-        assertEquals(dataType, converted);
+        assertThat(converted).isEqualTo(dataType);
     }
 
     /** Test convert non-nullable data type to Avro schema then converts back. 
*/
     @Test
-    public void testDataTypeToSchemaToDataTypeNonNullable() {
+    void testDataTypeToSchemaToDataTypeNonNullable() {
         DataType dataType =
                 DataTypes.ROW(
                                 DataTypes.FIELD("f_boolean", 
DataTypes.BOOLEAN().notNull()),
@@ -316,12 +312,12 @@ public class AvroSchemaConverterTest {
                         .notNull();
         Schema schema = 
AvroSchemaConverter.convertToSchema(dataType.getLogicalType());
         DataType converted = 
AvroSchemaConverter.convertToDataType(schema.toString());
-        assertEquals(dataType, converted);
+        assertThat(converted).isEqualTo(dataType);
     }
 
     /** Test convert nullable Avro schema to data type then converts back. */
     @Test
-    public void testSchemaToDataTypeToSchemaNullable() {
+    void testSchemaToDataTypeToSchemaNullable() {
         String schemaStr =
                 "{\n"
                         + "  \"type\" : \"record\",\n"
@@ -425,12 +421,12 @@ public class AvroSchemaConverterTest {
                         + "}";
         DataType dataType = AvroSchemaConverter.convertToDataType(schemaStr);
         Schema schema = 
AvroSchemaConverter.convertToSchema(dataType.getLogicalType());
-        assertEquals(new Schema.Parser().parse(schemaStr), schema);
+        assertThat(schema).isEqualTo(new Schema.Parser().parse(schemaStr));
     }
 
     /** Test convert non-nullable Avro schema to data type then converts back. 
*/
     @Test
-    public void testSchemaToDataTypeToSchemaNonNullable() {
+    void testSchemaToDataTypeToSchemaNonNullable() {
         String schemaStr =
                 "{\n"
                         + "  \"type\" : \"record\",\n"
@@ -514,7 +510,7 @@ public class AvroSchemaConverterTest {
                         + "}";
         DataType dataType = AvroSchemaConverter.convertToDataType(schemaStr);
         Schema schema = 
AvroSchemaConverter.convertToSchema(dataType.getLogicalType());
-        assertEquals(new Schema.Parser().parse(schemaStr), schema);
+        assertThat(schema).isEqualTo(new Schema.Parser().parse(schemaStr));
     }
 
     private void validateUserSchema(TypeInformation<?> actual) {
@@ -578,10 +574,10 @@ public class AvroSchemaConverterTest {
                         Types.BIG_DEC,
                         Types.BIG_DEC);
 
-        assertEquals(user, actual);
+        assertThat(actual).isEqualTo(user);
 
         final RowTypeInfo userRowInfo = (RowTypeInfo) user;
-        assertTrue(userRowInfo.schemaEquals(actual));
+        assertThat(userRowInfo.schemaEquals(actual)).isTrue();
     }
 
     private void validateUserSchema(DataType actual) {
@@ -640,6 +636,6 @@ public class AvroSchemaConverterTest {
                                         "type_decimal_fixed", 
DataTypes.DECIMAL(4, 2).notNull()))
                         .notNull();
 
-        assertEquals(user, actual);
+        assertThat(actual).isEqualTo(user);
     }
 }
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyCheckInactiveITCase.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyCheckInactiveITCase.java
index ae7deb1..836bb03 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyCheckInactiveITCase.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyCheckInactiveITCase.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.formats.avro.typeutils;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /**
  * A test that validates that the concurrency checks in the Avro Serializer 
are not hard coded to
@@ -33,7 +33,7 @@ import static org.junit.Assert.assertTrue;
  * <p><b>Important:</b> If you see this test fail and the initial settings are 
still correct, check
  * the assumptions above (on fresh JVM fork).
  */
-public class AvroSerializerConcurrencyCheckInactiveITCase {
+class AvroSerializerConcurrencyCheckInactiveITCase {
 
     // this sets the debug initialization back to its default, even if
     // by default tests modify it (implicitly via assertion loading)
@@ -46,18 +46,12 @@ public class AvroSerializerConcurrencyCheckInactiveITCase {
      * concurrency checks are off by default.
      */
     @Test
-    public void testWithNoConcurrencyCheck() throws Exception {
-        boolean assertionError;
-        try {
-            new 
AvroSerializerConcurrencyTest().testConcurrentUseOfSerializer();
-            assertionError = false;
-        } catch (AssertionError e) {
-            assertionError = true;
-        }
-
-        assertTrue(
-                "testConcurrentUseOfSerializer() should have failed if "
-                        + "concurrency checks are off by default",
-                assertionError);
+    void testWithNoConcurrencyCheck() throws Exception {
+        assertThatThrownBy(
+                        () -> new 
AvroSerializerConcurrencyTest().testConcurrentUseOfSerializer())
+                .as(
+                        "testConcurrentUseOfSerializer() should fail if "
+                                + "concurrency checks are off by default")
+                .isInstanceOf(AssertionError.class);
     }
 }
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java
index 6bacf8a..6feb84a 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java
@@ -23,11 +23,11 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.testutils.BlockerSync;
 import org.apache.flink.core.testutils.CheckedThread;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.fail;
 
 /**
  * This tests that the {@link AvroSerializer} properly fails when accessed by 
two threads
@@ -36,10 +36,10 @@ import static org.junit.Assert.fail;
  * <p><b>Important:</b> This test only works if assertions are activated (-ea) 
on the JVM when
  * running tests.
  */
-public class AvroSerializerConcurrencyTest {
+class AvroSerializerConcurrencyTest {
 
     @Test
-    public void testConcurrentUseOfSerializer() throws Exception {
+    void testConcurrentUseOfSerializer() throws Exception {
         final AvroSerializer<String> serializer = new 
AvroSerializer<>(String.class);
 
         final BlockerSync sync = new BlockerSync();
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerEmptyArrayTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerEmptyArrayTest.java
index d0c8581..c56c8ab 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerEmptyArrayTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerEmptyArrayTest.java
@@ -21,18 +21,18 @@ package org.apache.flink.formats.avro.typeutils;
 import org.apache.flink.api.common.typeutils.SerializerTestInstance;
 
 import org.apache.avro.reflect.Nullable;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.fail;
 
 /** Tests for the {@link AvroSerializer}. */
-public class AvroSerializerEmptyArrayTest {
+class AvroSerializerEmptyArrayTest {
 
     @Test
-    public void testBookSerialization() {
+    void testBookSerialization() {
         try {
             Book b = new Book(123, "This is a test book", 26382648);
             AvroSerializer<Book> serializer = new 
AvroSerializer<Book>(Book.class);
@@ -46,7 +46,7 @@ public class AvroSerializerEmptyArrayTest {
     }
 
     @Test
-    public void testSerialization() {
+    void testSerialization() {
         try {
             List<String> titles = new ArrayList<String>();
 
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshotTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshotTest.java
index 2b58b6c..a178295 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshotTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshotTest.java
@@ -31,8 +31,8 @@ import org.apache.flink.formats.avro.utils.TestDataGenerator;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.generic.GenericRecord;
-import org.junit.Ignore;
-import org.junit.Test;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -44,11 +44,11 @@ import java.util.Random;
 import static 
org.apache.flink.api.common.typeutils.TypeSerializerMatchers.isCompatibleAfterMigration;
 import static 
org.apache.flink.api.common.typeutils.TypeSerializerMatchers.isCompatibleAsIs;
 import static 
org.apache.flink.api.common.typeutils.TypeSerializerMatchers.isIncompatible;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.HamcrestCondition.matching;
 
 /** Test {@link AvroSerializerSnapshot}. */
-public class AvroSerializerSnapshotTest {
+class AvroSerializerSnapshotTest {
 
     private static final int[] PAST_VERSIONS = new int[] {2};
 
@@ -76,78 +76,81 @@ public class AvroSerializerSnapshotTest {
                     .endRecord();
 
     @Test
-    public void sameSchemaShouldBeCompatibleAsIs() {
-        assertThat(
-                AvroSerializerSnapshot.resolveSchemaCompatibility(FIRST_NAME, 
FIRST_NAME),
-                isCompatibleAsIs());
+    void sameSchemaShouldBeCompatibleAsIs() {
+        
assertThat(AvroSerializerSnapshot.resolveSchemaCompatibility(FIRST_NAME, 
FIRST_NAME))
+                .is(matching(isCompatibleAsIs()));
     }
 
     @Test
-    public void removingAnOptionalFieldsIsCompatibleAsIs() {
+    void removingAnOptionalFieldsIsCompatibleAsIs() {
         assertThat(
-                AvroSerializerSnapshot.resolveSchemaCompatibility(
-                        FIRST_REQUIRED_LAST_OPTIONAL, FIRST_NAME),
-                isCompatibleAfterMigration());
+                        AvroSerializerSnapshot.resolveSchemaCompatibility(
+                                FIRST_REQUIRED_LAST_OPTIONAL, FIRST_NAME))
+                .is(matching(isCompatibleAfterMigration()));
     }
 
     @Test
-    public void addingAnOptionalFieldsIsCompatibleAsIs() {
+    void addingAnOptionalFieldsIsCompatibleAsIs() {
         assertThat(
-                AvroSerializerSnapshot.resolveSchemaCompatibility(
-                        FIRST_NAME, FIRST_REQUIRED_LAST_OPTIONAL),
-                isCompatibleAfterMigration());
+                        AvroSerializerSnapshot.resolveSchemaCompatibility(
+                                FIRST_NAME, FIRST_REQUIRED_LAST_OPTIONAL))
+                .is(matching(isCompatibleAfterMigration()));
     }
 
     @Test
-    public void addingARequiredMakesSerializersIncompatible() {
+    void addingARequiredMakesSerializersIncompatible() {
         assertThat(
-                AvroSerializerSnapshot.resolveSchemaCompatibility(
-                        FIRST_REQUIRED_LAST_OPTIONAL, BOTH_REQUIRED),
-                isIncompatible());
+                        AvroSerializerSnapshot.resolveSchemaCompatibility(
+                                FIRST_REQUIRED_LAST_OPTIONAL, BOTH_REQUIRED))
+                .is(matching(isIncompatible()));
     }
 
     @Test
-    public void anAvroSnapshotIsCompatibleWithItsOriginatingSerializer() {
+    void anAvroSnapshotIsCompatibleWithItsOriginatingSerializer() {
         AvroSerializer<GenericRecord> serializer =
                 new AvroSerializer<>(GenericRecord.class, 
FIRST_REQUIRED_LAST_OPTIONAL);
 
         TypeSerializerSnapshot<GenericRecord> snapshot = 
serializer.snapshotConfiguration();
 
-        assertThat(snapshot.resolveSchemaCompatibility(serializer), 
isCompatibleAsIs());
+        assertThat(snapshot.resolveSchemaCompatibility(serializer))
+                .is(matching(isCompatibleAsIs()));
     }
 
     @Test
-    public void anAvroSnapshotIsCompatibleAfterARoundTrip() throws IOException 
{
+    void anAvroSnapshotIsCompatibleAfterARoundTrip() throws IOException {
         AvroSerializer<GenericRecord> serializer =
                 new AvroSerializer<>(GenericRecord.class, 
FIRST_REQUIRED_LAST_OPTIONAL);
 
         AvroSerializerSnapshot<GenericRecord> restored =
                 roundTrip(serializer.snapshotConfiguration());
 
-        assertThat(restored.resolveSchemaCompatibility(serializer), 
isCompatibleAsIs());
+        assertThat(restored.resolveSchemaCompatibility(serializer))
+                .is(matching(isCompatibleAsIs()));
     }
 
     @Test
-    public void anAvroSpecificRecordIsCompatibleAfterARoundTrip() throws 
IOException {
+    void anAvroSpecificRecordIsCompatibleAfterARoundTrip() throws IOException {
         // user is an avro generated test object.
         AvroSerializer<User> serializer = new AvroSerializer<>(User.class);
 
         AvroSerializerSnapshot<User> restored = 
roundTrip(serializer.snapshotConfiguration());
 
-        assertThat(restored.resolveSchemaCompatibility(serializer), 
isCompatibleAsIs());
+        assertThat(restored.resolveSchemaCompatibility(serializer))
+                .is(matching(isCompatibleAsIs()));
     }
 
     @Test
-    public void aPojoIsCompatibleAfterARoundTrip() throws IOException {
+    void aPojoIsCompatibleAfterARoundTrip() throws IOException {
         AvroSerializer<Pojo> serializer = new AvroSerializer<>(Pojo.class);
 
         AvroSerializerSnapshot<Pojo> restored = 
roundTrip(serializer.snapshotConfiguration());
 
-        assertThat(restored.resolveSchemaCompatibility(serializer), 
isCompatibleAsIs());
+        assertThat(restored.resolveSchemaCompatibility(serializer))
+                .is(matching(isCompatibleAsIs()));
     }
 
     @Test
-    public void aLargeSchemaAvroSnapshotIsCompatibleAfterARoundTrip() throws 
IOException {
+    void aLargeSchemaAvroSnapshotIsCompatibleAfterARoundTrip() throws 
IOException {
         // construct the large schema up to a size of 65535 bytes.
         int thresholdSize = 65535;
         StringBuilder schemaField = new StringBuilder(thresholdSize);
@@ -166,11 +169,12 @@ public class AvroSerializerSnapshotTest {
         AvroSerializerSnapshot<GenericRecord> restored =
                 roundTrip(serializer.snapshotConfiguration());
 
-        assertThat(restored.resolveSchemaCompatibility(serializer), 
isCompatibleAsIs());
+        assertThat(restored.resolveSchemaCompatibility(serializer))
+                .is(matching(isCompatibleAsIs()));
     }
 
     @Test
-    public void recordSerializedShouldBeDeserializeWithTheResortedSerializer() 
throws IOException {
+    void recordSerializedShouldBeDeserializeWithTheResortedSerializer() throws 
IOException {
         // user is an avro generated test object.
         final User user = TestDataGenerator.generateRandomUser(new Random());
         final AvroSerializer<User> originalSerializer = new 
AvroSerializer<>(User.class);
@@ -188,11 +192,11 @@ public class AvroSerializerSnapshotTest {
         //
         User restoredUser = deserialize(restoredSerializer, serializedUser);
 
-        assertThat(user, is(restoredUser));
+        assertThat(restoredUser).isEqualTo(user);
     }
 
     @Test
-    public void validSchemaEvaluationShouldResultInCRequiresMigration() {
+    void validSchemaEvaluationShouldResultInCRequiresMigration() {
         final AvroSerializer<GenericRecord> originalSerializer =
                 new AvroSerializer<>(GenericRecord.class, FIRST_NAME);
         final AvroSerializer<GenericRecord> newSerializer =
@@ -201,13 +205,12 @@ public class AvroSerializerSnapshotTest {
         TypeSerializerSnapshot<GenericRecord> originalSnapshot =
                 originalSerializer.snapshotConfiguration();
 
-        assertThat(
-                originalSnapshot.resolveSchemaCompatibility(newSerializer),
-                isCompatibleAfterMigration());
+        assertThat(originalSnapshot.resolveSchemaCompatibility(newSerializer))
+                .is(matching(isCompatibleAfterMigration()));
     }
 
     @Test
-    public void nonValidSchemaEvaluationShouldResultInCompatibleSerializers() {
+    void nonValidSchemaEvaluationShouldResultInCompatibleSerializers() {
         final AvroSerializer<GenericRecord> originalSerializer =
                 new AvroSerializer<>(GenericRecord.class, 
FIRST_REQUIRED_LAST_OPTIONAL);
         final AvroSerializer<GenericRecord> newSerializer =
@@ -216,7 +219,8 @@ public class AvroSerializerSnapshotTest {
         TypeSerializerSnapshot<GenericRecord> originalSnapshot =
                 originalSerializer.snapshotConfiguration();
 
-        assertThat(originalSnapshot.resolveSchemaCompatibility(newSerializer), 
isIncompatible());
+        assertThat(originalSnapshot.resolveSchemaCompatibility(newSerializer))
+                .is(matching(isIncompatible()));
     }
 
     @Test
@@ -231,12 +235,12 @@ public class AvroSerializerSnapshotTest {
         AvroSerializer<Object> specificSerializer = new 
AvroSerializer(User.class);
         specificSerializer.snapshotConfiguration();
 
-        assertThat(
-                
genericSnapshot.resolveSchemaCompatibility(specificSerializer), 
isCompatibleAsIs());
+        
assertThat(genericSnapshot.resolveSchemaCompatibility(specificSerializer))
+                .is(matching(isCompatibleAsIs()));
     }
 
     @Test
-    public void restorePastSnapshots() throws IOException {
+    void restorePastSnapshots() throws IOException {
         for (int pastVersion : PAST_VERSIONS) {
             AvroSerializer<GenericRecord> currentSerializer =
                     new AvroSerializer<>(GenericRecord.class, 
Address.getClassSchema());
@@ -249,7 +253,8 @@ public class AvroSerializerSnapshotTest {
                     
TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
                             in, AvroSerializer.class.getClassLoader(), null);
 
-            assertThat(restored.resolveSchemaCompatibility(currentSerializer), 
isCompatibleAsIs());
+            assertThat(restored.resolveSchemaCompatibility(currentSerializer))
+                    .is(matching(isCompatibleAsIs()));
         }
     }
 
@@ -257,9 +262,9 @@ public class AvroSerializerSnapshotTest {
      * Creates a new serializer snapshot for the current version. Use this 
before bumping the
      * snapshot version and also add the version (before bumping) to {@link 
#PAST_VERSIONS}.
      */
-    @Ignore
+    @Disabled
     @Test
-    public void writeCurrentVersionSnapshot() throws IOException {
+    void writeCurrentVersionSnapshot() throws IOException {
         AvroSerializer<GenericRecord> serializer =
                 new AvroSerializer<>(GenericRecord.class, 
Address.getClassSchema());
 
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
index 0402f6f9..eefed37 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
@@ -27,55 +27,78 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
 import org.apache.flink.formats.avro.AvroInputFormat;
 import org.apache.flink.formats.avro.AvroRecordInputFormatTest;
 import org.apache.flink.formats.avro.generated.Fixed16;
 import org.apache.flink.formats.avro.generated.User;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.apache.flink.runtime.testutils.MiniClusterExtension;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.CollectionTestEnvironment;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.test.util.TestEnvironment;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.File;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.Stream;
 
-/** Tests for the {@link AvroInputFormat} reading Pojos. */
-@RunWith(Parameterized.class)
-public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
-    public AvroTypeExtractionTest(TestExecutionMode mode) {
-        super(mode);
+/** Tests for the {@link AvroInputFormat} reading Pojos. */
+class AvroTypeExtractionTest {
+
+    private static final int PARALLELISM = 4;
+
+    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .build());
+
+    @RegisterExtension
+    private static final AllCallbackWrapper<MiniClusterExtension> 
allCallbackWrapper =
+            new AllCallbackWrapper<>(MINI_CLUSTER_RESOURCE);
+
+    /**
+     * The MiniCluster is created and reused for the lifetime of this class, 
but tests alternate
+     * between the MiniCluster and a CollectionTestEnvironment.
+     */
+    private static Stream<ExecutionEnvironment> getExecutionEnvironment() {
+        return Stream.of(
+                new TestEnvironment(MINI_CLUSTER_RESOURCE.getMiniCluster(), 
PARALLELISM, false),
+                new CollectionTestEnvironment());
     }
 
     private File inFile;
     private String resultPath;
     private String expected;
 
-    @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
-
-    @Before
-    public void before() throws Exception {
-        resultPath = tempFolder.newFile().toURI().toString();
-        inFile = tempFolder.newFile();
+    @BeforeEach
+    public void before(@TempDir java.nio.file.Path tmpDir) throws Exception {
+        resultPath = tmpDir.resolve("out").toUri().toString();
+        inFile = tmpDir.resolve("in.avro").toFile();
         AvroRecordInputFormatTest.writeTestFile(inFile);
     }
 
-    @After
+    @AfterEach
     public void after() throws Exception {
-        compareResultsByLinesInMemory(expected, resultPath);
+        TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath);
     }
 
-    @Test
-    public void testSimpleAvroRead() throws Exception {
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+    @ParameterizedTest
+    @MethodSource("getExecutionEnvironment")
+    void testSimpleAvroRead(final ExecutionEnvironment env) throws Exception {
         Path in = new Path(inFile.getAbsoluteFile().toURI());
 
         AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
@@ -112,9 +135,9 @@ public class AvroTypeExtractionTest extends 
MultipleProgramsTestBase {
                         + "\"type_decimal_fixed\": [7, -48]}\n";
     }
 
-    @Test
-    public void testSerializeWithAvro() throws Exception {
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+    @ParameterizedTest
+    @MethodSource("getExecutionEnvironment")
+    void testSerializeWithAvro(final ExecutionEnvironment env) throws 
Exception {
         env.getConfig().enableForceAvro();
         Path in = new Path(inFile.getAbsoluteFile().toURI());
 
@@ -161,9 +184,9 @@ public class AvroTypeExtractionTest extends 
MultipleProgramsTestBase {
                         + "\"type_decimal_fixed\": [7, -48]}\n";
     }
 
-    @Test
-    public void testKeySelection() throws Exception {
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+    @ParameterizedTest
+    @MethodSource("getExecutionEnvironment")
+    void testKeySelection(final ExecutionEnvironment env) throws Exception {
         env.getConfig().enableObjectReuse();
         Path in = new Path(inFile.getAbsoluteFile().toURI());
 
@@ -187,9 +210,9 @@ public class AvroTypeExtractionTest extends 
MultipleProgramsTestBase {
         expected = "(Alyssa,1)\n(Charlie,1)\n";
     }
 
-    @Test
-    public void testWithAvroGenericSer() throws Exception {
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+    @ParameterizedTest
+    @MethodSource("getExecutionEnvironment")
+    void testWithAvroGenericSer(final ExecutionEnvironment env) throws 
Exception {
         env.getConfig().enableForceAvro();
         Path in = new Path(inFile.getAbsoluteFile().toURI());
 
@@ -216,9 +239,9 @@ public class AvroTypeExtractionTest extends 
MultipleProgramsTestBase {
         expected = "(Charlie,1)\n(Alyssa,1)\n";
     }
 
-    @Test
-    public void testWithKryoGenericSer() throws Exception {
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+    @ParameterizedTest
+    @MethodSource("getExecutionEnvironment")
+    void testWithKryoGenericSer(final ExecutionEnvironment env) throws 
Exception {
         env.getConfig().enableForceKryo();
         Path in = new Path(inFile.getAbsoluteFile().toURI());
 
@@ -245,18 +268,20 @@ public class AvroTypeExtractionTest extends 
MultipleProgramsTestBase {
         expected = "(Charlie,1)\n(Alyssa,1)\n";
     }
 
-    /** Test some know fields for grouping on. */
-    @Test
-    public void testAllFields() throws Exception {
-        for (String fieldName : Arrays.asList("name", "type_enum", 
"type_double_test")) {
-            testField(fieldName);
-        }
+    private static Stream<Arguments> testField() {
+        return getExecutionEnvironment()
+                .flatMap(
+                        env ->
+                                Stream.of(
+                                        Arguments.of(env, "name"),
+                                        Arguments.of(env, "type_enum"),
+                                        Arguments.of(env, 
"type_double_test")));
     }
 
-    private void testField(final String fieldName) throws Exception {
-        before();
-
-        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+    /** Test some known fields for grouping on. */
+    @ParameterizedTest
+    @MethodSource("testField")
+    void testField(final ExecutionEnvironment env, final String fieldName) 
throws Exception {
         Path in = new Path(inFile.getAbsoluteFile().toURI());
 
         AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
@@ -277,7 +302,7 @@ public class AvroTypeExtractionTest extends 
MultipleProgramsTestBase {
 
         // test if automatic registration of the Types worked
         ExecutionConfig ec = env.getConfig();
-        Assert.assertTrue(ec.getRegisteredKryoTypes().contains(Fixed16.class));
+        assertThat(ec.getRegisteredKryoTypes()).contains(Fixed16.class);
 
         switch (fieldName) {
             case "name":
@@ -290,10 +315,8 @@ public class AvroTypeExtractionTest extends 
MultipleProgramsTestBase {
                 expected = "123.45\n1.337\n";
                 break;
             default:
-                Assert.fail("Unknown field");
+                fail("Unknown field");
                 break;
         }
-
-        after();
     }
 }
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java
index 38e54d1..b5d6fcb 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java
@@ -24,9 +24,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.formats.avro.generated.Address;
 import org.apache.flink.formats.avro.generated.User;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link AvroTypeInfo}. */
 public class AvroTypeInfoTest extends TypeInformationTestBase<AvroTypeInfo<?>> 
{
@@ -39,9 +39,9 @@ public class AvroTypeInfoTest extends 
TypeInformationTestBase<AvroTypeInfo<?>> {
     }
 
     @Test
-    public void testAvroByDefault() {
+    void testAvroByDefault() {
         final TypeSerializer<User> serializer =
                 new AvroTypeInfo<>(User.class).createSerializer(new 
ExecutionConfig());
-        assertTrue(serializer instanceof AvroSerializer);
+        assertThat(serializer).isInstanceOf(AvroSerializer.class);
     }
 }
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
index e28d3dc..e04c774 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
@@ -53,7 +53,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.api.Expressions.$;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for interoperability with Avro types. */
 public class AvroTypesITCase extends AbstractTestBase {
@@ -236,7 +236,7 @@ public class AvroTypesITCase extends AbstractTestBase {
                 CollectionUtil.iteratorToList(
                         DataStreamUtils.collect(tEnv.toAppendStream(result, 
User.class)));
         List<User> expected = Arrays.asList(USER_1, USER_2, USER_3);
-        assertEquals(expected, results);
+        assertThat(results).isEqualTo(expected);
     }
 
     private DataStream<User> testData(StreamExecutionEnvironment env) {
diff --git 
a/flink-formats/flink-avro/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
 
b/flink-formats/flink-avro/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
new file mode 100644
index 0000000..2899913
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.util.TestLoggerExtension
\ No newline at end of file

Reply via email to