zentol commented on a change in pull request #18871:
URL: https://github.com/apache/flink/pull/18871#discussion_r811799122



##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
##########
@@ -212,38 +211,38 @@ public void createFiles() throws IOException {
     }
 
     @Test
-    public void testSplittedIF() throws IOException {
+    void testSplittedIF() throws IOException {
         Configuration parameters = new Configuration();
 
         AvroInputFormat<User> format =
                 new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), 
User.class);
 
         format.configure(parameters);
         FileInputSplit[] splits = format.createInputSplits(4);
-        assertEquals(splits.length, 4);
+        assertThat(4).isEqualTo(splits.length);
         int elements = 0;
         int[] elementsPerSplit = new int[4];
         for (int i = 0; i < splits.length; i++) {
             format.open(splits[i]);
             while (!format.reachedEnd()) {
                 User u = format.nextRecord(null);
-                
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
+                
assertThat(u.getName().toString().startsWith(TEST_NAME)).isTrue();
                 elements++;
                 elementsPerSplit[i]++;
             }
             format.close();
         }
 
-        Assert.assertEquals(1604, elementsPerSplit[0]);
-        Assert.assertEquals(1203, elementsPerSplit[1]);
-        Assert.assertEquals(1203, elementsPerSplit[2]);
-        Assert.assertEquals(990, elementsPerSplit[3]);
-        Assert.assertEquals(NUM_RECORDS, elements);
+        assertThat(elementsPerSplit[0]).isEqualTo(1604);
+        assertThat(elementsPerSplit[1]).isEqualTo(1203);
+        assertThat(elementsPerSplit[2]).isEqualTo(1203);
+        assertThat(elementsPerSplit[3]).isEqualTo(990);

Review comment:
       ```suggestion
           assertThat(elementsPerSplit).containsExactly(1604, 1203, 1203, 990);
   ```

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
##########
@@ -253,15 +252,15 @@ public void testAvroRecoveryWithFailureAtStart() throws 
Exception {
         format.configure(parameters);
 
         FileInputSplit[] splits = format.createInputSplits(4);
-        assertEquals(splits.length, 4);
+        assertThat(4).isEqualTo(splits.length);

Review comment:
       ```suggestion
           assertThat(splits).hasSize(4);
   ```

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
##########
@@ -276,23 +275,23 @@ public void testAvroRecoveryWithFailureAtStart() throws 
Exception {
                             new AvroInputFormat<>(new 
Path(testFile.getAbsolutePath()), User.class);
 
                     format.reopen(splits[i], state);
-                    assertEquals(format.getRecordsReadFromBlock(), 
recordsUntilCheckpoint);
+                    
assertThat(recordsUntilCheckpoint).isEqualTo(format.getRecordsReadFromBlock());
                 }
                 elementsPerSplit[i]++;
             }
             format.close();
         }
 
-        Assert.assertEquals(1604, elementsPerSplit[0]);
-        Assert.assertEquals(1203, elementsPerSplit[1]);
-        Assert.assertEquals(1203, elementsPerSplit[2]);
-        Assert.assertEquals(990, elementsPerSplit[3]);
-        Assert.assertEquals(NUM_RECORDS, elements);
+        assertThat(elementsPerSplit[0]).isEqualTo(1604);

Review comment:
       see above

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
##########
@@ -302,15 +301,15 @@ public void testAvroRecovery() throws Exception {
         format.configure(parameters);
 
         FileInputSplit[] splits = format.createInputSplits(4);
-        assertEquals(splits.length, 4);
+        assertThat(4).isEqualTo(splits.length);

Review comment:
       see above

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
##########
@@ -325,18 +324,18 @@ public void testAvroRecovery() throws Exception {
                             new AvroInputFormat<>(new 
Path(testFile.getAbsolutePath()), User.class);
 
                     format.reopen(splits[i], state);
-                    assertEquals(format.getRecordsReadFromBlock(), 
recordsUntilCheckpoint);
+                    
assertThat(recordsUntilCheckpoint).isEqualTo(format.getRecordsReadFromBlock());
                 }
                 elementsPerSplit[i]++;
             }
             format.close();
         }
 
-        Assert.assertEquals(1604, elementsPerSplit[0]);
-        Assert.assertEquals(1203, elementsPerSplit[1]);
-        Assert.assertEquals(1203, elementsPerSplit[2]);
-        Assert.assertEquals(990, elementsPerSplit[3]);
-        Assert.assertEquals(NUM_RECORDS, elements);
+        assertThat(elementsPerSplit[0]).isEqualTo(1604);

Review comment:
       see above

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroStreamingFileSinkITCase.java
##########
@@ -139,18 +137,18 @@ public void testWriteAvroReflect() throws Exception {
     private static <T> void validateResults(
             File folder, DatumReader<T> datumReader, List<T> expected) throws 
Exception {
         File[] buckets = folder.listFiles();
-        assertNotNull(buckets);
-        assertEquals(1, buckets.length);
+        assertThat(buckets).isNotNull();
+        assertThat(buckets.length).isEqualTo(1);

Review comment:
       ```suggestion
           assertThat(buckets).hasSize(1);
   ```

##########
File path: 
flink-formats/flink-avro/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
##########
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.util.TestLoggerExtension

Review comment:
       This is problematic if the module produces a test-jar that is used by 
other modules, as is the case for flink-avro. You could exclude it from the 
test-jar via the maven-jar-plugin.

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
##########
@@ -269,45 +260,40 @@ public void testDeserializationReuseAvroRecordFalse() 
throws IOException {
 
         format.configure(parameters);
         FileInputSplit[] splits = format.createInputSplits(1);
-        assertEquals(splits.length, 1);
+        assertThat(splits.length).isEqualTo(1);
         format.open(splits[0]);
 
         User u = format.nextRecord(null);
-        assertNotNull(u);
+        assertThat(u).isNotNull();
 
         String name = u.getName().toString();
-        assertNotNull("empty record", name);
-        assertEquals("name not equal", TEST_NAME, name);
+        assertThat(name).isNotNull();
+        assertThat(name).isEqualTo(TEST_NAME);
 
         // check arrays
         List<CharSequence> sl = u.getTypeArrayString();
-        assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, 
sl.get(0).toString());
-        assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, 
sl.get(1).toString());
+        assertThat(sl.get(0).toString()).isEqualTo(TEST_ARRAY_STRING_1);
+        assertThat(sl.get(1).toString()).isEqualTo(TEST_ARRAY_STRING_2);
 
         List<Boolean> bl = u.getTypeArrayBoolean();
-        assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0));
-        assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1));
+        assertThat(bl).containsExactly(TEST_ARRAY_BOOLEAN_1, 
TEST_ARRAY_BOOLEAN_2);
 
         // check enums
         Colors enumValue = u.getTypeEnum();
-        assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
+        assertThat(enumValue).isEqualTo(TEST_ENUM_COLOR);
 
         // check maps
         Map<CharSequence, Long> lm = u.getTypeMap();
-        assertEquals(
-                "map value of key 1 not equal",
-                TEST_MAP_VALUE1,
-                lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
-        assertEquals(
-                "map value of key 2 not equal",
-                TEST_MAP_VALUE2,
-                lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
+        assertThat(lm)
+                .containsEntry(new Utf8(TEST_MAP_KEY1), TEST_MAP_VALUE1)
+                .containsEntry(new Utf8(TEST_MAP_KEY2), TEST_MAP_VALUE2)

Review comment:
       see above

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
##########
@@ -199,68 +195,63 @@ public static void writeTestFile(File testFile) throws 
IOException {
         dataFileWriter.close();
     }
 
-    @Before
+    @BeforeEach
     public void createFiles() throws IOException {
         testFile = File.createTempFile("AvroInputFormatTest", null);
         writeTestFile(testFile);
     }
 
     /** Test if the AvroInputFormat is able to properly read data from an Avro 
file. */
     @Test
-    public void testDeserialization() throws IOException {
+    void testDeserialization() throws IOException {
         Configuration parameters = new Configuration();
 
         AvroInputFormat<User> format =
                 new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), 
User.class);
 
         format.configure(parameters);
         FileInputSplit[] splits = format.createInputSplits(1);
-        assertEquals(splits.length, 1);
+        assertThat(splits.length).isEqualTo(1);
         format.open(splits[0]);
 
         User u = format.nextRecord(null);
-        assertNotNull(u);
+        assertThat(u).isNotNull();
 
         String name = u.getName().toString();
-        assertNotNull("empty record", name);
-        assertEquals("name not equal", TEST_NAME, name);
+        assertThat(name).isNotNull();
+        assertThat(name).isEqualTo(TEST_NAME);
 
         // check arrays
         List<CharSequence> sl = u.getTypeArrayString();
-        assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, 
sl.get(0).toString());
-        assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, 
sl.get(1).toString());
+        assertThat(sl.get(0).toString()).isEqualTo(TEST_ARRAY_STRING_1);
+        assertThat(sl.get(1).toString()).isEqualTo(TEST_ARRAY_STRING_2);
 
         List<Boolean> bl = u.getTypeArrayBoolean();
-        assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0));
-        assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1));
+        assertThat(bl).containsExactly(TEST_ARRAY_BOOLEAN_1, 
TEST_ARRAY_BOOLEAN_2);
 
         // check enums
         Colors enumValue = u.getTypeEnum();
-        assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
+        assertThat(enumValue).isEqualTo(TEST_ENUM_COLOR);
 
         // check maps
         Map<CharSequence, Long> lm = u.getTypeMap();
-        assertEquals(
-                "map value of key 1 not equal",
-                TEST_MAP_VALUE1,
-                lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
-        assertEquals(
-                "map value of key 2 not equal",
-                TEST_MAP_VALUE2,
-                lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
+        assertThat(lm)
+                .containsEntry(new Utf8(TEST_MAP_KEY1), TEST_MAP_VALUE1)
+                .containsEntry(new Utf8(TEST_MAP_KEY2), TEST_MAP_VALUE2)
+                .hasSize(2);

Review comment:
       ```suggestion
           assertThat(lm)
                   .containsExactly(
                           entry(new Utf8(TEST_MAP_KEY1), TEST_MAP_VALUE1),
                           entry(new Utf8(TEST_MAP_KEY1), TEST_MAP_VALUE1));
   ```
   ```

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
##########
@@ -117,21 +116,22 @@ private void serializeAndDeserialize(final 
AvroOutputFormat.Codec codec, final S
                 new ObjectInputStream(new 
ByteArrayInputStream(bos.toByteArray()))) {
             // then
             Object o = ois.readObject();
-            assertTrue(o instanceof AvroOutputFormat);
+            assertThat(o instanceof AvroOutputFormat).isTrue();

Review comment:
       ```suggestion
               assertThat(o).isInstanceOf(AvroOutputFormat.class);
   ```

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshotTest.java
##########
@@ -44,11 +44,11 @@
 import static 
org.apache.flink.api.common.typeutils.TypeSerializerMatchers.isCompatibleAfterMigration;
 import static 
org.apache.flink.api.common.typeutils.TypeSerializerMatchers.isCompatibleAsIs;
 import static 
org.apache.flink.api.common.typeutils.TypeSerializerMatchers.isIncompatible;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.HamcrestCondition.matching;

Review comment:
       Looks fine to me.

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
##########
@@ -124,13 +117,14 @@ public void testInvalidRawTypeAvroSchemaConversion() {
                                 .build()
                                 .toRowDataType()
                                 .getLogicalType();
-        thrown.expect(UnsupportedOperationException.class);
-        thrown.expectMessage("Unsupported to derive Schema for type: RAW");
-        AvroSchemaConverter.convertToSchema(rowType);
+
+        assertThatExceptionOfType(UnsupportedOperationException.class)
+                .isThrownBy(() -> AvroSchemaConverter.convertToSchema(rowType))
+                .withMessageStartingWith("Unsupported to derive Schema for 
type: RAW");

Review comment:
       Personally I'm more of a fan of `assertThatThrownBy().<assertions>` 
because it keeps the assertions together.

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
##########
@@ -302,15 +301,15 @@ public void testAvroRecovery() throws Exception {
         format.configure(parameters);
 
         FileInputSplit[] splits = format.createInputSplits(4);
-        assertEquals(splits.length, 4);
+        assertThat(4).isEqualTo(splits.length);
 
         int elements = 0;
         int[] elementsPerSplit = new int[4];
         for (int i = 0; i < splits.length; i++) {
             format.open(splits[i]);
             while (!format.reachedEnd()) {
                 User u = format.nextRecord(null);
-                
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
+                
assertThat(u.getName().toString().startsWith(TEST_NAME)).isTrue();

Review comment:
       see above

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroStreamingFileSinkITCase.java
##########
@@ -139,18 +137,18 @@ public void testWriteAvroReflect() throws Exception {
     private static <T> void validateResults(
             File folder, DatumReader<T> datumReader, List<T> expected) throws 
Exception {
         File[] buckets = folder.listFiles();
-        assertNotNull(buckets);
-        assertEquals(1, buckets.length);
+        assertThat(buckets).isNotNull();
+        assertThat(buckets.length).isEqualTo(1);
 
         File[] partFiles = buckets[0].listFiles();
-        assertNotNull(partFiles);
-        assertEquals(2, partFiles.length);
+        assertThat(partFiles).isNotNull();
+        assertThat(partFiles.length).isEqualTo(2);

Review comment:
       see above

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
##########
@@ -253,15 +252,15 @@ public void testAvroRecoveryWithFailureAtStart() throws 
Exception {
         format.configure(parameters);
 
         FileInputSplit[] splits = format.createInputSplits(4);
-        assertEquals(splits.length, 4);
+        assertThat(4).isEqualTo(splits.length);
 
         int elements = 0;
         int[] elementsPerSplit = new int[4];
         for (int i = 0; i < splits.length; i++) {
             format.reopen(splits[i], format.getCurrentState());
             while (!format.reachedEnd()) {
                 User u = format.nextRecord(null);
-                
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
+                
assertThat(u.getName().toString().startsWith(TEST_NAME)).isTrue();

Review comment:
       ```suggestion
                   assertThat(u.getName().toString()).startsWith(TEST_NAME);
   ```

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java
##########
@@ -60,6 +60,7 @@ public void go() throws Exception {
         sync.awaitBlocker();
 
         // this should fail with an exception
+

Review comment:
       revert

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroStreamingFileSinkITCase.java
##########
@@ -139,18 +137,18 @@ public void testWriteAvroReflect() throws Exception {
     private static <T> void validateResults(
             File folder, DatumReader<T> datumReader, List<T> expected) throws 
Exception {
         File[] buckets = folder.listFiles();
-        assertNotNull(buckets);
-        assertEquals(1, buckets.length);
+        assertThat(buckets).isNotNull();
+        assertThat(buckets.length).isEqualTo(1);
 
         File[] partFiles = buckets[0].listFiles();
-        assertNotNull(partFiles);
-        assertEquals(2, partFiles.length);
+        assertThat(partFiles).isNotNull();
+        assertThat(partFiles.length).isEqualTo(2);
 
         for (File partFile : partFiles) {
-            assertTrue(partFile.length() > 0);
+            assertThat(partFile.length() > 0).isTrue();

Review comment:
       ```suggestion
               assertThat(partFile).isNotEmpty();
   ```

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
##########
@@ -199,68 +195,63 @@ public static void writeTestFile(File testFile) throws 
IOException {
         dataFileWriter.close();
     }
 
-    @Before
+    @BeforeEach
     public void createFiles() throws IOException {
         testFile = File.createTempFile("AvroInputFormatTest", null);
         writeTestFile(testFile);
     }
 
     /** Test if the AvroInputFormat is able to properly read data from an Avro 
file. */
     @Test
-    public void testDeserialization() throws IOException {
+    void testDeserialization() throws IOException {
         Configuration parameters = new Configuration();
 
         AvroInputFormat<User> format =
                 new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), 
User.class);
 
         format.configure(parameters);
         FileInputSplit[] splits = format.createInputSplits(1);
-        assertEquals(splits.length, 1);
+        assertThat(splits.length).isEqualTo(1);

Review comment:
       ```suggestion
           assertThat(splits).hasSize(1);
   ```

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
##########
@@ -199,68 +195,63 @@ public static void writeTestFile(File testFile) throws 
IOException {
         dataFileWriter.close();
     }
 
-    @Before
+    @BeforeEach
     public void createFiles() throws IOException {
         testFile = File.createTempFile("AvroInputFormatTest", null);
         writeTestFile(testFile);
     }
 
     /** Test if the AvroInputFormat is able to properly read data from an Avro 
file. */
     @Test
-    public void testDeserialization() throws IOException {
+    void testDeserialization() throws IOException {
         Configuration parameters = new Configuration();
 
         AvroInputFormat<User> format =
                 new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), 
User.class);
 
         format.configure(parameters);
         FileInputSplit[] splits = format.createInputSplits(1);
-        assertEquals(splits.length, 1);
+        assertThat(splits.length).isEqualTo(1);
         format.open(splits[0]);
 
         User u = format.nextRecord(null);
-        assertNotNull(u);
+        assertThat(u).isNotNull();
 
         String name = u.getName().toString();
-        assertNotNull("empty record", name);
-        assertEquals("name not equal", TEST_NAME, name);
+        assertThat(name).isNotNull();

Review comment:
       should be obsolete?

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
##########
@@ -269,45 +260,40 @@ public void testDeserializationReuseAvroRecordFalse() 
throws IOException {
 
         format.configure(parameters);
         FileInputSplit[] splits = format.createInputSplits(1);
-        assertEquals(splits.length, 1);
+        assertThat(splits.length).isEqualTo(1);

Review comment:
       ```suggestion
           assertThat(splits).hasSize(1);
   ```

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
##########
@@ -445,46 +423,41 @@ private void doTestDeserializationGenericRecord(
         try {
             format.configure(parameters);
             FileInputSplit[] splits = format.createInputSplits(1);
-            assertEquals(splits.length, 1);
+            assertThat(splits.length).isEqualTo(1);
             format.open(splits[0]);
 
             GenericRecord u = format.nextRecord(null);
-            assertNotNull(u);
-            assertEquals("The schemas should be equal", userSchema, 
u.getSchema());
+            assertThat(u).isNotNull();
+            assertThat(u.getSchema()).isEqualTo(userSchema);
 
             String name = u.get("name").toString();
-            assertNotNull("empty record", name);
-            assertEquals("name not equal", TEST_NAME, name);
+            assertThat(name).isNotNull();

Review comment:
       ```suggestion
   ```

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
##########
@@ -445,46 +423,41 @@ private void doTestDeserializationGenericRecord(
         try {
             format.configure(parameters);
             FileInputSplit[] splits = format.createInputSplits(1);
-            assertEquals(splits.length, 1);
+            assertThat(splits.length).isEqualTo(1);
             format.open(splits[0]);
 
             GenericRecord u = format.nextRecord(null);
-            assertNotNull(u);
-            assertEquals("The schemas should be equal", userSchema, 
u.getSchema());
+            assertThat(u).isNotNull();
+            assertThat(u.getSchema()).isEqualTo(userSchema);
 
             String name = u.get("name").toString();
-            assertNotNull("empty record", name);
-            assertEquals("name not equal", TEST_NAME, name);
+            assertThat(name).isNotNull();
+            assertThat(name).isEqualTo(TEST_NAME);
 
             // check arrays
             List<CharSequence> sl = (List<CharSequence>) 
u.get("type_array_string");
-            assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, 
sl.get(0).toString());
-            assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, 
sl.get(1).toString());
+            assertThat(sl.get(0).toString()).isEqualTo(TEST_ARRAY_STRING_1);
+            assertThat(sl.get(1).toString()).isEqualTo(TEST_ARRAY_STRING_2);
 
             List<Boolean> bl = (List<Boolean>) u.get("type_array_boolean");
-            assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, 
bl.get(0));
-            assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, 
bl.get(1));
+            assertThat(bl).containsExactly(TEST_ARRAY_BOOLEAN_1, 
TEST_ARRAY_BOOLEAN_2);
 
             // check enums
             GenericData.EnumSymbol enumValue = (GenericData.EnumSymbol) 
u.get("type_enum");
-            assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), 
enumValue.toString());
+            assertThat(enumValue).isEqualTo(TEST_ENUM_COLOR);
 
             // check maps
             Map<CharSequence, Long> lm = (Map<CharSequence, Long>) 
u.get("type_map");
-            assertEquals(
-                    "map value of key 1 not equal",
-                    TEST_MAP_VALUE1,
-                    lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
-            assertEquals(
-                    "map value of key 2 not equal",
-                    TEST_MAP_VALUE2,
-                    lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
-
-            assertFalse("expecting second element", format.reachedEnd());
-            assertNotNull("expecting second element", format.nextRecord(u));
-
-            assertNull(format.nextRecord(u));
-            assertTrue(format.reachedEnd());
+            assertThat(lm)
+                    .containsEntry(new Utf8(TEST_MAP_KEY1), TEST_MAP_VALUE1)
+                    .containsEntry(new Utf8(TEST_MAP_KEY2), TEST_MAP_VALUE2)

Review comment:
       see above

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroStreamingFileSinkITCase.java
##########
@@ -139,18 +137,18 @@ public void testWriteAvroReflect() throws Exception {
     private static <T> void validateResults(
             File folder, DatumReader<T> datumReader, List<T> expected) throws 
Exception {
         File[] buckets = folder.listFiles();
-        assertNotNull(buckets);
-        assertEquals(1, buckets.length);
+        assertThat(buckets).isNotNull();
+        assertThat(buckets.length).isEqualTo(1);

Review comment:
       AFAIK this would also make the isNotNull assertion above obsolute

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
##########
@@ -27,55 +27,79 @@
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
 import org.apache.flink.formats.avro.AvroInputFormat;
 import org.apache.flink.formats.avro.AvroRecordInputFormatTest;
 import org.apache.flink.formats.avro.generated.Fixed16;
 import org.apache.flink.formats.avro.generated.User;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.apache.flink.runtime.testutils.MiniClusterExtension;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.CollectionTestEnvironment;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.test.util.TestEnvironment;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.File;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.Stream;
 
-/** Tests for the {@link AvroInputFormat} reading Pojos. */
-@RunWith(Parameterized.class)
-public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
-    public AvroTypeExtractionTest(TestExecutionMode mode) {
-        super(mode);
+/** Tests for the {@link AvroInputFormat} reading Pojos. */
+class AvroTypeExtractionTest {
+
+    private static final int PARALLELISM = 4;
+
+    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .withHaLeadershipControl()

Review comment:
       I doubt that you need this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to