Jiabao-Sun commented on code in PR #23960:
URL: https://github.com/apache/flink/pull/23960#discussion_r1475422173


##########
flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java:
##########
@@ -603,55 +587,52 @@ public void 
testGetStatisticsMultipleOneFileWithCachedVersion() throws IOExcepti
      * split has to start from the beginning.
      */
     @Test
-    public void testFileInputFormatWithCompression() {
-        try {
-            String tempFile =
-                    TestFileUtils.createTempFileDirForProvidedFormats(
-                            temporaryFolder.newFolder(),
-                            FileInputFormat.getSupportedCompressionFormats());
-            final DummyFileInputFormat format = new DummyFileInputFormat();
-            format.setFilePath(tempFile);
-            format.configure(new Configuration());
-            FileInputSplit[] splits = format.createInputSplits(2);
-            final Set<String> supportedCompressionFormats =
-                    FileInputFormat.getSupportedCompressionFormats();
-            Assert.assertEquals(supportedCompressionFormats.size(), 
splits.length);
-            for (FileInputSplit split : splits) {
-                Assert.assertEquals(
-                        FileInputFormat.READ_WHOLE_SPLIT_FLAG,
-                        split.getLength()); // unsplittable compressed files 
have this size as a
-                // flag for "read whole file"
-                Assert.assertEquals(0L, split.getStart()); // always read from 
the beginning.
-            }
+    void testFileInputFormatWithCompression() throws IOException {
 
-            // test if this also works for "mixed" directories
-            TestFileUtils.createTempFileInDirectory(
-                    tempFile.replace("file:", ""),
-                    "this creates a test file with a random extension (at 
least not .deflate)");
-
-            final DummyFileInputFormat formatMixed = new 
DummyFileInputFormat();
-            formatMixed.setFilePath(tempFile);
-            formatMixed.configure(new Configuration());
-            FileInputSplit[] splitsMixed = formatMixed.createInputSplits(2);
-            Assert.assertEquals(supportedCompressionFormats.size() + 1, 
splitsMixed.length);
-            for (FileInputSplit split : splitsMixed) {
-                final String extension =
-                        
FileInputFormat.extractFileExtension(split.getPath().getName());
-                if (supportedCompressionFormats.contains(extension)) {
-                    Assert.assertEquals(
-                            FileInputFormat.READ_WHOLE_SPLIT_FLAG,
-                            split.getLength()); // unsplittable compressed 
files have this size as a
-                    // flag for "read whole file"
-                    Assert.assertEquals(0L, split.getStart()); // always read 
from the beginning.
-                } else {
-                    Assert.assertEquals(0L, split.getStart());
-                    Assert.assertTrue("split size not correct", 
split.getLength() > 0);
-                }
-            }
+        String tempFile =
+                TestFileUtils.createTempFileDirForProvidedFormats(
+                        TempDirUtils.newFolder(temporaryFolder),
+                        FileInputFormat.getSupportedCompressionFormats());
+        final DummyFileInputFormat format = new DummyFileInputFormat();
+        format.setFilePath(tempFile);
+        format.configure(new Configuration());
+        FileInputSplit[] splits = format.createInputSplits(2);
+        final Set<String> supportedCompressionFormats =
+                FileInputFormat.getSupportedCompressionFormats();
+        assertThat(splits).hasSize(supportedCompressionFormats.size());
+        for (FileInputSplit split : splits) {
+            assertThat(split.getLength())
+                    .isEqualTo(
+                            FileInputFormat.READ_WHOLE_SPLIT_FLAG); // 
unsplittable compressed files
+            // have this size as a
+            // flag for "read whole file"
+            assertThat(split.getStart()).isEqualTo(0L); // always read from 
the beginning.

Review Comment:
   ```suggestion
               assertThat(split.getStart()).isZero(); // always read from the 
beginning.
   ```



##########
flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java:
##########
@@ -662,35 +643,32 @@ public void testFileInputFormatWithCompression() {
      * split is not the compressed file size and that the compression 
decorator is called.
      */
     @Test
-    public void testFileInputFormatWithCompressionFromFileSource() {
-        try {
-            String tempFile =
-                    TestFileUtils.createTempFileDirForProvidedFormats(
-                            temporaryFolder.newFolder(),
-                            FileInputFormat.getSupportedCompressionFormats());
-            DummyFileInputFormat format = new DummyFileInputFormat();
-            format.setFilePath(tempFile);
-            format.configure(new Configuration());
-
-            // manually create a FileInputSplit per file as FileSource would do
-            // see 
org.apache.flink.connector.file.table.DeserializationSchemaAdapter.Reader()
-            List<FileInputSplit> splits = manuallyCreateSplits(tempFile);
-            final Set<String> supportedCompressionFormats =
-                    FileInputFormat.getSupportedCompressionFormats();
-            // one file per compression format, one split per file
-            Assert.assertEquals(supportedCompressionFormats.size(), 
splits.size());
-            for (FileInputSplit split : splits) {
-                Assert.assertEquals(0L, split.getStart()); // always read from 
the beginning.
-                format.open(split);
-                Assert.assertTrue(format.compressedRead);
-                Assert.assertEquals(
-                        FileInputFormat.READ_WHOLE_SPLIT_FLAG,
-                        format.getSplitLength()); // unsplittable compressed 
files have this size
-                // as flag for "read whole file"
-            }
-        } catch (Exception ex) {
-            ex.printStackTrace();
-            Assert.fail(ex.getMessage());
+    void testFileInputFormatWithCompressionFromFileSource() throws IOException 
{
+
+        String tempFile =
+                TestFileUtils.createTempFileDirForProvidedFormats(
+                        TempDirUtils.newFolder(temporaryFolder),
+                        FileInputFormat.getSupportedCompressionFormats());
+        DummyFileInputFormat format = new DummyFileInputFormat();
+        format.setFilePath(tempFile);
+        format.configure(new Configuration());
+
+        // manually create a FileInputSplit per file as FileSource would do
+        // see 
org.apache.flink.connector.file.table.DeserializationSchemaAdapter.Reader()
+        List<FileInputSplit> splits = manuallyCreateSplits(tempFile);
+        final Set<String> supportedCompressionFormats =
+                FileInputFormat.getSupportedCompressionFormats();
+        // one file per compression format, one split per file
+        assertThat(splits).hasSize(supportedCompressionFormats.size());
+        for (FileInputSplit split : splits) {
+            assertThat(split.getStart()).isEqualTo(0L); // always read from 
the beginning.

Review Comment:
   ```suggestion
               assertThat(split.getStart()).isZero(); // always read from the 
beginning.
   ```



##########
flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java:
##########
@@ -662,35 +643,32 @@ public void testFileInputFormatWithCompression() {
      * split is not the compressed file size and that the compression 
decorator is called.
      */
     @Test
-    public void testFileInputFormatWithCompressionFromFileSource() {
-        try {
-            String tempFile =
-                    TestFileUtils.createTempFileDirForProvidedFormats(
-                            temporaryFolder.newFolder(),
-                            FileInputFormat.getSupportedCompressionFormats());
-            DummyFileInputFormat format = new DummyFileInputFormat();
-            format.setFilePath(tempFile);
-            format.configure(new Configuration());
-
-            // manually create a FileInputSplit per file as FileSource would do
-            // see 
org.apache.flink.connector.file.table.DeserializationSchemaAdapter.Reader()
-            List<FileInputSplit> splits = manuallyCreateSplits(tempFile);
-            final Set<String> supportedCompressionFormats =
-                    FileInputFormat.getSupportedCompressionFormats();
-            // one file per compression format, one split per file
-            Assert.assertEquals(supportedCompressionFormats.size(), 
splits.size());
-            for (FileInputSplit split : splits) {
-                Assert.assertEquals(0L, split.getStart()); // always read from 
the beginning.
-                format.open(split);
-                Assert.assertTrue(format.compressedRead);
-                Assert.assertEquals(
-                        FileInputFormat.READ_WHOLE_SPLIT_FLAG,
-                        format.getSplitLength()); // unsplittable compressed 
files have this size
-                // as flag for "read whole file"
-            }
-        } catch (Exception ex) {
-            ex.printStackTrace();
-            Assert.fail(ex.getMessage());
+    void testFileInputFormatWithCompressionFromFileSource() throws IOException 
{
+
+        String tempFile =
+                TestFileUtils.createTempFileDirForProvidedFormats(
+                        TempDirUtils.newFolder(temporaryFolder),
+                        FileInputFormat.getSupportedCompressionFormats());
+        DummyFileInputFormat format = new DummyFileInputFormat();
+        format.setFilePath(tempFile);
+        format.configure(new Configuration());
+
+        // manually create a FileInputSplit per file as FileSource would do
+        // see 
org.apache.flink.connector.file.table.DeserializationSchemaAdapter.Reader()
+        List<FileInputSplit> splits = manuallyCreateSplits(tempFile);
+        final Set<String> supportedCompressionFormats =
+                FileInputFormat.getSupportedCompressionFormats();
+        // one file per compression format, one split per file
+        assertThat(splits).hasSize(supportedCompressionFormats.size());

Review Comment:
   ```suggestion
           assertThat(splits).hasSameSizeAs(supportedCompressionFormats);
   ```



##########
flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java:
##########
@@ -21,38 +21,40 @@
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.TaskInfoImpl;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.types.Value;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
-import java.util.concurrent.Future;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests runtime context access from inside an RichOutputFormat class. */
-public class RichOutputFormatTest {
+class RichOutputFormatTest {
 
     @Test
-    public void testCheckRuntimeContextAccess() {
-        final SerializedOutputFormat<Value> inputFormat = new 
SerializedOutputFormat<Value>();
+    void testCheckRuntimeContextAccess() {
+        final SerializedOutputFormat<Value> inputFormat = new 
SerializedOutputFormat<>();
         final TaskInfo taskInfo = new TaskInfoImpl("test name", 3, 1, 3, 0);
 
         inputFormat.setRuntimeContext(
                 new RuntimeUDFContext(
                         taskInfo,
                         getClass().getClassLoader(),
                         new ExecutionConfig(),
-                        new HashMap<String, Future<Path>>(),
-                        new HashMap<String, Accumulator<?, ?>>(),
+                        new HashMap<>(),
+                        new HashMap<>(),
                         UnregisteredMetricsGroup.createOperatorMetricGroup()));
 
-        
assertEquals(inputFormat.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
 1);
-        assertEquals(
-                
inputFormat.getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(), 3);
+        assertThat(1)
+                
.isEqualTo(inputFormat.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
+        assertThat(3)
+                .isEqualTo(
+                        inputFormat
+                                .getRuntimeContext()
+                                .getTaskInfo()
+                                .getNumberOfParallelSubtasks());

Review Comment:
   1 and 3 should be expected value, right?



##########
flink-core/src/test/java/org/apache/flink/api/common/operators/DualInputSemanticPropertiesTest.java:
##########
@@ -179,12 +178,12 @@ public void testGetSourceField() {
         sp.addForwardedField(1, 2, 3);
         sp.addForwardedField(1, 3, 2);
 
-        assertEquals(0, sp.getForwardingSourceField(1, 1));
-        assertEquals(1, sp.getForwardingSourceField(1, 4));
-        assertEquals(2, sp.getForwardingSourceField(1, 3));
-        assertEquals(3, sp.getForwardingSourceField(1, 2));
-        assertTrue(sp.getForwardingSourceField(1, 0) < 0);
-        assertTrue(sp.getForwardingSourceField(1, 5) < 0);
+        assertThat(sp.getForwardingSourceField(1, 1)).isZero();
+        assertThat(sp.getForwardingSourceField(1, 4)).isEqualTo(1);

Review Comment:
   isOne



##########
flink-core/src/test/java/org/apache/flink/api/common/operators/SlotSharingGroupTest.java:
##########
@@ -46,39 +44,48 @@ public void testBuildSlotSharingGroupWithSpecificResource() 
{
                         .setExternalResource("gpu", 1)
                         .build();
 
-        assertThat(slotSharingGroup.getName(), is(name));
-        assertThat(slotSharingGroup.getCpuCores().get(), is(1.0));
-        assertThat(slotSharingGroup.getTaskHeapMemory().get(), is(heap));
-        assertThat(slotSharingGroup.getTaskOffHeapMemory().get(), is(offHeap));
-        assertThat(slotSharingGroup.getManagedMemory().get(), is(managed));
-        assertThat(
-                slotSharingGroup.getExternalResources(), 
is(Collections.singletonMap("gpu", 1.0)));
+        assertThat(slotSharingGroup.getName()).isEqualTo(name);
+        assertThat(slotSharingGroup.getCpuCores()).contains(1.0);
+        assertThat(slotSharingGroup.getTaskHeapMemory()).contains(heap);
+        assertThat(slotSharingGroup.getTaskOffHeapMemory()).contains(offHeap);
+        assertThat(slotSharingGroup.getManagedMemory()).contains(managed);
+        assertThat(slotSharingGroup.getExternalResources())
+                .isEqualTo(Collections.singletonMap("gpu", 1.0));
     }
 
     @Test
-    public void testBuildSlotSharingGroupWithUnknownResource() {
+    void testBuildSlotSharingGroupWithUnknownResource() {
         final String name = "ssg";
         final SlotSharingGroup slotSharingGroup = 
SlotSharingGroup.newBuilder(name).build();
 
-        assertThat(slotSharingGroup.getName(), is(name));
-        assertFalse(slotSharingGroup.getCpuCores().isPresent());
-        assertFalse(slotSharingGroup.getTaskHeapMemory().isPresent());
-        assertFalse(slotSharingGroup.getManagedMemory().isPresent());
-        assertFalse(slotSharingGroup.getTaskOffHeapMemory().isPresent());
-        assertTrue(slotSharingGroup.getExternalResources().isEmpty());
+        assertThat(slotSharingGroup.getName()).isEqualTo(name);
+        assertThat(slotSharingGroup.getCpuCores().isPresent()).isFalse();
+        assertThat(slotSharingGroup.getTaskHeapMemory().isPresent()).isFalse();
+        assertThat(slotSharingGroup.getManagedMemory().isPresent()).isFalse();
+        
assertThat(slotSharingGroup.getTaskOffHeapMemory().isPresent()).isFalse();
+        assertThat(slotSharingGroup.getExternalResources().isEmpty()).isTrue();

Review Comment:
   ```suggestion
           assertThat(slotSharingGroup.getCpuCores()).isNotPresent();
           assertThat(slotSharingGroup.getTaskHeapMemory()).isNotPresent();
           assertThat(slotSharingGroup.getManagedMemory()).isNotPresent();
           assertThat(slotSharingGroup.getTaskOffHeapMemory()).isNotPresent();
           assertThat(slotSharingGroup.getExternalResources()).isEmpty();
   ```



##########
flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java:
##########
@@ -249,11 +252,11 @@ private void testOuterJoin(
                 baseOperator.executeOnCollections(
                         leftInput, rightInput, runtimeContext, 
executionConfig);
 
-        assertEquals(expected, resultSafe);
-        assertEquals(expected, resultRegular);
+        assertThat(resultSafe).isEqualTo(expected);
+        assertThat(resultRegular).isEqualTo(expected);
 
-        assertTrue(joiner.opened.get());
-        assertTrue(joiner.closed.get());
+        assertThat(joiner.opened.get()).isTrue();
+        assertThat(joiner.closed.get()).isTrue();

Review Comment:
   ```suggestion
           assertThat(joiner.opened).isTrue();
           assertThat(joiner.closed).isTrue();
   ```



##########
flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java:
##########
@@ -19,50 +19,47 @@
 
 import org.apache.flink.core.fs.Path;
 
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
+import lombok.Getter;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Arrays;
-import java.util.Collection;
+import java.util.List;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
-@RunWith(Parameterized.class)

Review Comment:
   ```java
   @ExtendWith(ParameterizedTestExtension.class)
   class DefaultFilterTest {
       @Parameters
       private static Collection<Object[]> data() {
           return Arrays.asList(
                   new Object[][] {
                       {"file.txt", false},
                       {".file.txt", true},
                       {"dir/.file.txt", true},
                       {".dir/file.txt", false},
                       {"_file.txt", true},
                       {"dir/_file.txt", true},
                       {"_dir/file.txt", false},
   
                       // Check filtering Hadoop's unfinished files
                       {FilePathFilter.HADOOP_COPYING, true},
                       {"dir/" + FilePathFilter.HADOOP_COPYING, true},
                       {FilePathFilter.HADOOP_COPYING + "/file.txt", false},
                   });
       }
   
       private final boolean shouldFilter;
       private final String filePath;
   
       DefaultFilterTest(String filePath, boolean shouldFilter) {
           this.filePath = filePath;
           this.shouldFilter = shouldFilter;
       }
   
       @TestTemplate
       void test() {
           FilePathFilter defaultFilter = FilePathFilter.createDefaultFilter();
           Path path = new Path(filePath);
           assertThat(defaultFilter.filterPath(path)).as("File: " + 
filePath).isEqualTo(shouldFilter);
       }
   }
   ```



##########
flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java:
##########
@@ -603,55 +587,52 @@ public void 
testGetStatisticsMultipleOneFileWithCachedVersion() throws IOExcepti
      * split has to start from the beginning.
      */
     @Test
-    public void testFileInputFormatWithCompression() {
-        try {
-            String tempFile =
-                    TestFileUtils.createTempFileDirForProvidedFormats(
-                            temporaryFolder.newFolder(),
-                            FileInputFormat.getSupportedCompressionFormats());
-            final DummyFileInputFormat format = new DummyFileInputFormat();
-            format.setFilePath(tempFile);
-            format.configure(new Configuration());
-            FileInputSplit[] splits = format.createInputSplits(2);
-            final Set<String> supportedCompressionFormats =
-                    FileInputFormat.getSupportedCompressionFormats();
-            Assert.assertEquals(supportedCompressionFormats.size(), 
splits.length);
-            for (FileInputSplit split : splits) {
-                Assert.assertEquals(
-                        FileInputFormat.READ_WHOLE_SPLIT_FLAG,
-                        split.getLength()); // unsplittable compressed files 
have this size as a
-                // flag for "read whole file"
-                Assert.assertEquals(0L, split.getStart()); // always read from 
the beginning.
-            }
+    void testFileInputFormatWithCompression() throws IOException {
 
-            // test if this also works for "mixed" directories
-            TestFileUtils.createTempFileInDirectory(
-                    tempFile.replace("file:", ""),
-                    "this creates a test file with a random extension (at 
least not .deflate)");
-
-            final DummyFileInputFormat formatMixed = new 
DummyFileInputFormat();
-            formatMixed.setFilePath(tempFile);
-            formatMixed.configure(new Configuration());
-            FileInputSplit[] splitsMixed = formatMixed.createInputSplits(2);
-            Assert.assertEquals(supportedCompressionFormats.size() + 1, 
splitsMixed.length);
-            for (FileInputSplit split : splitsMixed) {
-                final String extension =
-                        
FileInputFormat.extractFileExtension(split.getPath().getName());
-                if (supportedCompressionFormats.contains(extension)) {
-                    Assert.assertEquals(
-                            FileInputFormat.READ_WHOLE_SPLIT_FLAG,
-                            split.getLength()); // unsplittable compressed 
files have this size as a
-                    // flag for "read whole file"
-                    Assert.assertEquals(0L, split.getStart()); // always read 
from the beginning.
-                } else {
-                    Assert.assertEquals(0L, split.getStart());
-                    Assert.assertTrue("split size not correct", 
split.getLength() > 0);
-                }
-            }
+        String tempFile =
+                TestFileUtils.createTempFileDirForProvidedFormats(
+                        TempDirUtils.newFolder(temporaryFolder),
+                        FileInputFormat.getSupportedCompressionFormats());
+        final DummyFileInputFormat format = new DummyFileInputFormat();
+        format.setFilePath(tempFile);
+        format.configure(new Configuration());
+        FileInputSplit[] splits = format.createInputSplits(2);
+        final Set<String> supportedCompressionFormats =
+                FileInputFormat.getSupportedCompressionFormats();
+        assertThat(splits).hasSize(supportedCompressionFormats.size());
+        for (FileInputSplit split : splits) {
+            assertThat(split.getLength())
+                    .isEqualTo(
+                            FileInputFormat.READ_WHOLE_SPLIT_FLAG); // 
unsplittable compressed files
+            // have this size as a
+            // flag for "read whole file"
+            assertThat(split.getStart()).isEqualTo(0L); // always read from 
the beginning.
+        }
 
-        } catch (Exception ex) {
-            ex.printStackTrace();
-            Assert.fail(ex.getMessage());
+        // test if this also works for "mixed" directories
+        TestFileUtils.createTempFileInDirectory(
+                tempFile.replace("file:", ""),
+                "this creates a test file with a random extension (at least 
not .deflate)");
+
+        final DummyFileInputFormat formatMixed = new DummyFileInputFormat();
+        formatMixed.setFilePath(tempFile);
+        formatMixed.configure(new Configuration());
+        FileInputSplit[] splitsMixed = formatMixed.createInputSplits(2);
+        assertThat(splitsMixed).hasSize(supportedCompressionFormats.size() + 
1);
+        for (FileInputSplit split : splitsMixed) {
+            final String extension =
+                    
FileInputFormat.extractFileExtension(split.getPath().getName());
+            if (supportedCompressionFormats.contains(extension)) {
+                assertThat(split.getLength())
+                        .isEqualTo(
+                                FileInputFormat.READ_WHOLE_SPLIT_FLAG); // 
unsplittable compressed
+                // files have this size as a
+                // flag for "read whole file"
+                assertThat(split.getStart()).isEqualTo(0L); // always read 
from the beginning.
+            } else {
+                assertThat(split.getStart()).isEqualTo(0L);
+                assertThat(split.getLength() > 0).as("split size not 
correct").isTrue();

Review Comment:
   ```suggestion
                   assertThat(split.getStart()).isZero(); // always read from 
the beginning.
               } else {
                   assertThat(split.getStart()).isZero();
                   assertThat(split.getLength()).as("split size not 
correct").isPositive();
   ```



##########
flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java:
##########
@@ -24,199 +24,160 @@
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.assertj.core.api.Fail.fail;
 
 /** Test for the {@link RuntimeUDFContext}. */
-public class RuntimeUDFContextTest {
+class RuntimeUDFContextTest {
 
     private final TaskInfo taskInfo = new TaskInfoImpl("test name", 3, 1, 3, 
0);
 
     @Test
-    public void testBroadcastVariableNotFound() {
-        try {
-            RuntimeUDFContext ctx =
-                    new RuntimeUDFContext(
-                            taskInfo,
-                            getClass().getClassLoader(),
-                            new ExecutionConfig(),
-                            new HashMap<>(),
-                            new HashMap<>(),
-                            
UnregisteredMetricsGroup.createOperatorMetricGroup());
-
-            assertFalse(ctx.hasBroadcastVariable("some name"));
-
-            try {
-                ctx.getBroadcastVariable("some name");
-                fail("should throw an exception");
-            } catch (IllegalArgumentException e) {
-                // expected
-            }
-
-            try {
-                ctx.getBroadcastVariableWithInitializer(
-                        "some name",
-                        new BroadcastVariableInitializer<Object, Object>() {
-                            public Object 
initializeBroadcastVariable(Iterable<Object> data) {
-                                return null;
-                            }
-                        });
-
-                fail("should throw an exception");
-            } catch (IllegalArgumentException e) {
-                // expected
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+    void testBroadcastVariableNotFound() {
+        RuntimeUDFContext ctx =
+                new RuntimeUDFContext(
+                        taskInfo,
+                        getClass().getClassLoader(),
+                        new ExecutionConfig(),
+                        new HashMap<>(),
+                        new HashMap<>(),
+                        UnregisteredMetricsGroup.createOperatorMetricGroup());
+
+        assertThat(ctx.hasBroadcastVariable("some name")).isFalse();
+
+        assertThatThrownBy(() -> ctx.getBroadcastVariable("some name"))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("some name");
+
+        assertThatThrownBy(() -> ctx.getBroadcastVariableWithInitializer("some 
name", data -> null))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("some name");
     }
 
     @Test
-    public void testBroadcastVariableSimple() {
-        try {
-            RuntimeUDFContext ctx =
-                    new RuntimeUDFContext(
-                            taskInfo,
-                            getClass().getClassLoader(),
-                            new ExecutionConfig(),
-                            new HashMap<>(),
-                            new HashMap<>(),
-                            
UnregisteredMetricsGroup.createOperatorMetricGroup());
-
-            ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 3, 4));
-            ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 2.0, 3.0, 
4.0));
-
-            assertTrue(ctx.hasBroadcastVariable("name1"));
-            assertTrue(ctx.hasBroadcastVariable("name2"));
-
-            List<Integer> list1 = ctx.getBroadcastVariable("name1");
-            List<Double> list2 = ctx.getBroadcastVariable("name2");
-
-            assertEquals(Arrays.asList(1, 2, 3, 4), list1);
-            assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list2);
-
-            // access again
-            List<Integer> list3 = ctx.getBroadcastVariable("name1");
-            List<Double> list4 = ctx.getBroadcastVariable("name2");
-
-            assertEquals(Arrays.asList(1, 2, 3, 4), list3);
-            assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list4);
-
-            // and again ;-)
-            List<Integer> list5 = ctx.getBroadcastVariable("name1");
-            List<Double> list6 = ctx.getBroadcastVariable("name2");
-
-            assertEquals(Arrays.asList(1, 2, 3, 4), list5);
-            assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list6);
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+    void testBroadcastVariableSimple() {
+        RuntimeUDFContext ctx =
+                new RuntimeUDFContext(
+                        taskInfo,
+                        getClass().getClassLoader(),
+                        new ExecutionConfig(),
+                        new HashMap<>(),
+                        new HashMap<>(),
+                        UnregisteredMetricsGroup.createOperatorMetricGroup());
+
+        ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 3, 4));
+        ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 2.0, 3.0, 4.0));
+
+        assertThat(ctx.hasBroadcastVariable("name1")).isTrue();
+        assertThat(ctx.hasBroadcastVariable("name2")).isTrue();
+
+        List<Integer> list1 = ctx.getBroadcastVariable("name1");
+        List<Double> list2 = ctx.getBroadcastVariable("name2");
+
+        assertThat(list1).isEqualTo(Arrays.asList(1, 2, 3, 4));
+        assertThat(list2).isEqualTo(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+
+        // access again
+        List<Integer> list3 = ctx.getBroadcastVariable("name1");
+        List<Double> list4 = ctx.getBroadcastVariable("name2");
+
+        assertThat(list3).isEqualTo(Arrays.asList(1, 2, 3, 4));
+        assertThat(list4).isEqualTo(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+
+        // and again ;-)
+        List<Integer> list5 = ctx.getBroadcastVariable("name1");
+        List<Double> list6 = ctx.getBroadcastVariable("name2");
+
+        assertThat(list5).isEqualTo(Arrays.asList(1, 2, 3, 4));
+        assertThat(list6).isEqualTo(Arrays.asList(1.0, 2.0, 3.0, 4.0));
     }
 
     @Test
-    public void testBroadcastVariableWithInitializer() {
-        try {
-            RuntimeUDFContext ctx =
-                    new RuntimeUDFContext(
-                            taskInfo,
-                            getClass().getClassLoader(),
-                            new ExecutionConfig(),
-                            new HashMap<>(),
-                            new HashMap<>(),
-                            
UnregisteredMetricsGroup.createOperatorMetricGroup());
-
-            ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
-
-            // access it the first time with an initializer
-            List<Double> list =
-                    ctx.getBroadcastVariableWithInitializer("name", new 
ConvertingInitializer());
-            assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list);
-
-            // access it the second time with an initializer (which might not 
get executed)
-            List<Double> list2 =
-                    ctx.getBroadcastVariableWithInitializer("name", new 
ConvertingInitializer());
-            assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list2);
-
-            // access it the third time without an initializer (should work by 
"chance", because the
-            // result is a list)
-            List<Double> list3 = ctx.getBroadcastVariable("name");
-            assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list3);
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+    void testBroadcastVariableWithInitializer() {
+        RuntimeUDFContext ctx =
+                new RuntimeUDFContext(
+                        taskInfo,
+                        getClass().getClassLoader(),
+                        new ExecutionConfig(),
+                        new HashMap<>(),
+                        new HashMap<>(),
+                        UnregisteredMetricsGroup.createOperatorMetricGroup());
+
+        ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
+
+        // access it the first time with an initializer
+        List<Double> list =
+                ctx.getBroadcastVariableWithInitializer("name", new 
ConvertingInitializer());
+        assertThat(list).isEqualTo(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+
+        // access it the second time with an initializer (which might not get 
executed)
+        List<Double> list2 =
+                ctx.getBroadcastVariableWithInitializer("name", new 
ConvertingInitializer());
+        assertThat(list2).isEqualTo(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+
+        // access it the third time without an initializer (should work by 
"chance", because the
+        // result is a list)
+        List<Double> list3 = ctx.getBroadcastVariable("name");
+        assertThat(list3).isEqualTo(Arrays.asList(1.0, 2.0, 3.0, 4.0));
     }
 
     @Test
-    public void testResetBroadcastVariableWithInitializer() {
-        try {
-            RuntimeUDFContext ctx =
-                    new RuntimeUDFContext(
-                            taskInfo,
-                            getClass().getClassLoader(),
-                            new ExecutionConfig(),
-                            new HashMap<>(),
-                            new HashMap<>(),
-                            
UnregisteredMetricsGroup.createOperatorMetricGroup());
-
-            ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
-
-            // access it the first time with an initializer
-            List<Double> list =
-                    ctx.getBroadcastVariableWithInitializer("name", new 
ConvertingInitializer());
-            assertEquals(Arrays.asList(1.0, 2.0, 3.0, 4.0), list);
-
-            // set it again to something different
-            ctx.setBroadcastVariable("name", Arrays.asList(2, 3, 4, 5));
-
-            List<Double> list2 =
-                    ctx.getBroadcastVariableWithInitializer("name", new 
ConvertingInitializer());
-            assertEquals(Arrays.asList(2.0, 3.0, 4.0, 5.0), list2);
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+    void testResetBroadcastVariableWithInitializer() {
+        RuntimeUDFContext ctx =
+                new RuntimeUDFContext(
+                        taskInfo,
+                        getClass().getClassLoader(),
+                        new ExecutionConfig(),
+                        new HashMap<>(),
+                        new HashMap<>(),
+                        UnregisteredMetricsGroup.createOperatorMetricGroup());
+
+        ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
+
+        // access it the first time with an initializer
+        List<Double> list =
+                ctx.getBroadcastVariableWithInitializer("name", new 
ConvertingInitializer());
+        assertThat(list).isEqualTo(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+
+        // set it again to something different
+        ctx.setBroadcastVariable("name", Arrays.asList(2, 3, 4, 5));
+
+        List<Double> list2 =
+                ctx.getBroadcastVariableWithInitializer("name", new 
ConvertingInitializer());
+        assertThat(list2).isEqualTo(Arrays.asList(2.0, 3.0, 4.0, 5.0));
     }
 
     @Test
-    public void testBroadcastVariableWithInitializerAndMismatch() {
+    void testBroadcastVariableWithInitializerAndMismatch() {
+        RuntimeUDFContext ctx =
+                new RuntimeUDFContext(
+                        taskInfo,
+                        getClass().getClassLoader(),
+                        new ExecutionConfig(),
+                        new HashMap<>(),
+                        new HashMap<>(),
+                        UnregisteredMetricsGroup.createOperatorMetricGroup());
+
+        ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
+
+        // access it the first time with an initializer
+        int sum = ctx.getBroadcastVariableWithInitializer("name", new 
SumInitializer());
+        assertThat(sum).isEqualTo(10);
+
+        // access it the second time with no initializer -> should fail due to 
type mismatch
         try {
-            RuntimeUDFContext ctx =
-                    new RuntimeUDFContext(
-                            taskInfo,
-                            getClass().getClassLoader(),
-                            new ExecutionConfig(),
-                            new HashMap<>(),
-                            new HashMap<>(),
-                            
UnregisteredMetricsGroup.createOperatorMetricGroup());
-
-            ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
-
-            // access it the first time with an initializer
-            int sum = ctx.getBroadcastVariableWithInitializer("name", new 
SumInitializer());
-            assertEquals(10, sum);
-
-            // access it the second time with no initializer -> should fail 
due to type mismatch
-            try {
-                ctx.getBroadcastVariable("name");
-                fail("should throw an exception");
-            } catch (IllegalStateException e) {
-                // expected
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+            ctx.getBroadcastVariable("name");
+            fail("should throw an exception");
+        } catch (IllegalStateException e) {
+            // expected
         }

Review Comment:
   assertThatThrownBy.isInstanceOf(IllegalStateException.class)



##########
flink-core/src/test/java/org/apache/flink/api/common/operators/DualInputSemanticPropertiesTest.java:
##########
@@ -193,61 +192,69 @@ public void testGetSourceField() {
         sp.addForwardedField(1, 1, 2);
         sp.addForwardedField(1, 1, 3);
 
-        assertEquals(0, sp.getForwardingSourceField(1, 0));
-        assertEquals(0, sp.getForwardingSourceField(1, 4));
-        assertEquals(1, sp.getForwardingSourceField(1, 1));
-        assertEquals(1, sp.getForwardingSourceField(1, 2));
-        assertEquals(1, sp.getForwardingSourceField(1, 3));
-        assertTrue(sp.getForwardingSourceField(1, 5) < 0);
+        assertThat(sp.getForwardingSourceField(1, 0)).isZero();
+        assertThat(sp.getForwardingSourceField(1, 4)).isZero();
+        assertThat(sp.getForwardingSourceField(1, 1)).isEqualTo(1);
+        assertThat(sp.getForwardingSourceField(1, 2)).isEqualTo(1);
+        assertThat(sp.getForwardingSourceField(1, 3)).isEqualTo(1);
+        assertThat(sp.getForwardingSourceField(1, 5) < 0).isTrue();

Review Comment:
   isOne and isNegative



##########
flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java:
##########
@@ -21,37 +21,39 @@
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.TaskInfoImpl;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.types.Value;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
-import java.util.concurrent.Future;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests runtime context access from inside an RichInputFormat class. */
-public class RichInputFormatTest {
+class RichInputFormatTest {
 
     @Test
-    public void testCheckRuntimeContextAccess() {
-        final SerializedInputFormat<Value> inputFormat = new 
SerializedInputFormat<Value>();
+    void testCheckRuntimeContextAccess() {
+        final SerializedInputFormat<Value> inputFormat = new 
SerializedInputFormat<>();
         final TaskInfo taskInfo = new TaskInfoImpl("test name", 3, 1, 3, 0);
         inputFormat.setRuntimeContext(
                 new RuntimeUDFContext(
                         taskInfo,
                         getClass().getClassLoader(),
                         new ExecutionConfig(),
-                        new HashMap<String, Future<Path>>(),
-                        new HashMap<String, Accumulator<?, ?>>(),
+                        new HashMap<>(),
+                        new HashMap<>(),
                         UnregisteredMetricsGroup.createOperatorMetricGroup()));
 
-        
assertEquals(inputFormat.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
 1);
-        assertEquals(
-                
inputFormat.getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(), 3);
+        assertThat(1)
+                
.isEqualTo(inputFormat.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
+        assertThat(3)
+                .isEqualTo(
+                        inputFormat
+                                .getRuntimeContext()
+                                .getTaskInfo()
+                                .getNumberOfParallelSubtasks());

Review Comment:
   1 and 3 should be expected value, right?



##########
flink-core/src/test/java/org/apache/flink/api/common/operators/DualInputSemanticPropertiesTest.java:
##########
@@ -179,12 +178,12 @@ public void testGetSourceField() {
         sp.addForwardedField(1, 2, 3);
         sp.addForwardedField(1, 3, 2);
 
-        assertEquals(0, sp.getForwardingSourceField(1, 1));
-        assertEquals(1, sp.getForwardingSourceField(1, 4));
-        assertEquals(2, sp.getForwardingSourceField(1, 3));
-        assertEquals(3, sp.getForwardingSourceField(1, 2));
-        assertTrue(sp.getForwardingSourceField(1, 0) < 0);
-        assertTrue(sp.getForwardingSourceField(1, 5) < 0);
+        assertThat(sp.getForwardingSourceField(1, 1)).isZero();
+        assertThat(sp.getForwardingSourceField(1, 4)).isEqualTo(1);
+        assertThat(sp.getForwardingSourceField(1, 3)).isEqualTo(2);
+        assertThat(sp.getForwardingSourceField(1, 2)).isEqualTo(3);
+        assertThat(sp.getForwardingSourceField(1, 0) < 0).isTrue();
+        assertThat(sp.getForwardingSourceField(1, 5) < 0).isTrue();

Review Comment:
   isNegative



##########
flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java:
##########
@@ -24,620 +24,617 @@
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.IntValue;
 
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.io.IOException;
 
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-public class FileOutputFormatTest {
+class FileOutputFormatTest {
 
     @Test
-    public void testCreateNonParallelLocalFS() throws IOException {
+    void testCreateNonParallelLocalFS() throws IOException {
 
         File tmpOutPath = File.createTempFile("fileOutputFormatTest", "Test1");
         File tmpOutFile = new File(tmpOutPath.getAbsolutePath() + "/1");
 
         String tmpFilePath = tmpOutPath.toURI().toString();
 
         // check fail if file exists
-        DummyFileOutputFormat dfof = new DummyFileOutputFormat();
-        dfof.setOutputFilePath(new Path(tmpFilePath));
-        dfof.setWriteMode(WriteMode.NO_OVERWRITE);
-        dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
-
-        dfof.configure(new Configuration());
-
-        try {
-            dfof.open(0, 1);
-            dfof.close();
-            fail();
-        } catch (Exception e) {
-            // exception expected
-        }
+        assertThatThrownBy(
+                        () -> {
+                            DummyFileOutputFormat dfof = new 
DummyFileOutputFormat();
+                            dfof.setOutputFilePath(new Path(tmpFilePath));
+                            dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+                            
dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
+
+                            dfof.configure(new Configuration());
+
+                            dfof.open(0, 1);
+                            dfof.close();
+                        })
+                .isInstanceOf(Exception.class);
         tmpOutPath.delete();
 
         // check fail if directory exists
-        Assert.assertTrue("Directory could not be created.", 
tmpOutPath.mkdir());
+        assertThat(tmpOutPath.mkdir()).as("Directory could not be 
created.").isTrue();
 
-        dfof = new DummyFileOutputFormat();
-        dfof.setOutputFilePath(new Path(tmpFilePath));
-        dfof.setWriteMode(WriteMode.NO_OVERWRITE);
-        dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
+        assertThatThrownBy(
+                        () -> {
+                            DummyFileOutputFormat dfof = new 
DummyFileOutputFormat();
+                            dfof.setOutputFilePath(new Path(tmpFilePath));
+                            dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+                            
dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
 
-        dfof.configure(new Configuration());
+                            dfof.configure(new Configuration());
 
-        try {
-            dfof.open(0, 1);
-            dfof.close();
-            fail();
-        } catch (Exception e) {
-            // exception expected
-        }
+                            dfof.open(0, 1);
+                            dfof.close();
+                        })
+                .isInstanceOf(Exception.class);
         tmpOutPath.delete();
 
         // check success
-        dfof = new DummyFileOutputFormat();
-        dfof.setOutputFilePath(new Path(tmpFilePath));
-        dfof.setWriteMode(WriteMode.NO_OVERWRITE);
-        dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
-
-        dfof.configure(new Configuration());
-
-        try {
-            dfof.open(0, 1);
-            dfof.close();
-        } catch (Exception e) {
-            fail();
-        }
-        Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isFile());
+        assertThatNoException()
+                .isThrownBy(
+                        () -> {
+                            DummyFileOutputFormat dfof = new 
DummyFileOutputFormat();
+                            dfof.setOutputFilePath(new Path(tmpFilePath));
+                            dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+                            
dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
+
+                            dfof.configure(new Configuration());
+
+                            dfof.open(0, 1);
+                            dfof.close();
+                        });
+        assertThat(tmpOutPath.exists() && tmpOutPath.isFile()).isTrue();
         tmpOutPath.delete();
 
         // check fail for path with tailing '/'
-        dfof = new DummyFileOutputFormat();
-        dfof.setOutputFilePath(new Path(tmpFilePath + "/"));
-        dfof.setWriteMode(WriteMode.NO_OVERWRITE);
-        dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
-
-        dfof.configure(new Configuration());
-
-        try {
-            dfof.open(0, 1);
-            dfof.close();
-        } catch (Exception e) {
-            fail();
-        }
-        Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isFile());
+        assertThatNoException()
+                .isThrownBy(
+                        () -> {
+                            DummyFileOutputFormat dfof = new 
DummyFileOutputFormat();
+                            dfof.setOutputFilePath(new Path(tmpFilePath + 
"/"));
+                            dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+                            
dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
+
+                            dfof.configure(new Configuration());
+
+                            dfof.open(0, 1);
+                            dfof.close();
+                        });
+        assertThat(tmpOutPath.exists() && tmpOutPath.isFile()).isTrue();
         tmpOutPath.delete();
 
         // ----------- test again with always directory mode
 
         // check fail if file exists
         tmpOutPath.createNewFile();
 
-        dfof = new DummyFileOutputFormat();
-        dfof.setOutputFilePath(new Path(tmpFilePath));
-        dfof.setWriteMode(WriteMode.NO_OVERWRITE);
-        dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
+        assertThatThrownBy(
+                        () -> {
+                            DummyFileOutputFormat dfof = new 
DummyFileOutputFormat();
+                            dfof.setOutputFilePath(new Path(tmpFilePath));
+                            dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+                            
dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
 
-        dfof.configure(new Configuration());
+                            dfof.configure(new Configuration());
 
-        try {
-            dfof.open(0, 1);
-            dfof.close();
-            fail();
-        } catch (Exception e) {
-            // exception expected
-        }
+                            dfof.open(0, 1);
+                            dfof.close();
+                        })
+                .isInstanceOf(Exception.class);
         tmpOutPath.delete();
 
         // check success if directory exists
-        Assert.assertTrue("Directory could not be created.", 
tmpOutPath.mkdir());
-
-        dfof = new DummyFileOutputFormat();
-        dfof.setOutputFilePath(new Path(tmpFilePath));
-        dfof.setWriteMode(WriteMode.NO_OVERWRITE);
-        dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
-
-        dfof.configure(new Configuration());
-
-        try {
-            dfof.open(0, 1);
-            dfof.close();
-        } catch (Exception e) {
-            fail();
-        }
-        Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
-        Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
+        assertThat(tmpOutPath.mkdir()).as("Directory could not be 
created.").isTrue();
+
+        assertThatNoException()
+                .isThrownBy(
+                        () -> {
+                            DummyFileOutputFormat dfof = new 
DummyFileOutputFormat();
+                            dfof.setOutputFilePath(new Path(tmpFilePath));
+                            dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+                            
dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
+
+                            dfof.configure(new Configuration());
+
+                            dfof.open(0, 1);
+                            dfof.close();
+                        });
+        assertThat(tmpOutPath.exists() && tmpOutPath.isDirectory()).isTrue();
+        assertThat(tmpOutFile.exists() && tmpOutFile.isFile()).isTrue();
         (new File(tmpOutPath.getAbsoluteFile() + "/1")).delete();
 
         // check custom file name inside directory if directory exists
-        dfof = new DummyFileOutputFormat();
-        dfof.setOutputFilePath(new Path(tmpFilePath));
-        dfof.setWriteMode(WriteMode.NO_OVERWRITE);
-        dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
-        dfof.testFileName = true;
-        Configuration c = new Configuration();
-        dfof.configure(c);
-
-        try {
-            dfof.open(0, 1);
-            dfof.close();
-        } catch (Exception e) {
-            fail();
-        }
+        assertThatNoException()
+                .isThrownBy(
+                        () -> {
+                            DummyFileOutputFormat dfof = new 
DummyFileOutputFormat();
+                            dfof.setOutputFilePath(new Path(tmpFilePath));
+                            dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+                            
dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
+                            dfof.testFileName = true;
+                            Configuration c = new Configuration();
+                            dfof.configure(c);
+
+                            dfof.open(0, 1);
+                            dfof.close();
+                        });
         File customOutFile = new File(tmpOutPath.getAbsolutePath() + 
"/fancy-1-0.avro");
-        Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
-        Assert.assertTrue(customOutFile.exists() && customOutFile.isFile());
+        assertThat(tmpOutPath.exists() && tmpOutPath.isDirectory()).isTrue();
+        assertThat(customOutFile.exists() && customOutFile.isFile()).isTrue();

Review Comment:
   ```suggestion
           assertThat(tmpOutPath).exists().isDirectory();
           assertThat(customOutFile).exists(). isFile();
   ```



##########
flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java:
##########
@@ -24,620 +24,617 @@
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.IntValue;
 
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.io.IOException;
 
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-public class FileOutputFormatTest {
+class FileOutputFormatTest {
 
     @Test
-    public void testCreateNonParallelLocalFS() throws IOException {
+    void testCreateNonParallelLocalFS() throws IOException {
 
         File tmpOutPath = File.createTempFile("fileOutputFormatTest", "Test1");
         File tmpOutFile = new File(tmpOutPath.getAbsolutePath() + "/1");
 
         String tmpFilePath = tmpOutPath.toURI().toString();
 
         // check fail if file exists
-        DummyFileOutputFormat dfof = new DummyFileOutputFormat();
-        dfof.setOutputFilePath(new Path(tmpFilePath));
-        dfof.setWriteMode(WriteMode.NO_OVERWRITE);
-        dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
-
-        dfof.configure(new Configuration());
-
-        try {
-            dfof.open(0, 1);
-            dfof.close();
-            fail();
-        } catch (Exception e) {
-            // exception expected
-        }
+        assertThatThrownBy(
+                        () -> {
+                            DummyFileOutputFormat dfof = new 
DummyFileOutputFormat();
+                            dfof.setOutputFilePath(new Path(tmpFilePath));
+                            dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+                            
dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
+
+                            dfof.configure(new Configuration());
+
+                            dfof.open(0, 1);
+                            dfof.close();
+                        })
+                .isInstanceOf(Exception.class);
         tmpOutPath.delete();
 
         // check fail if directory exists
-        Assert.assertTrue("Directory could not be created.", 
tmpOutPath.mkdir());
+        assertThat(tmpOutPath.mkdir()).as("Directory could not be 
created.").isTrue();
 
-        dfof = new DummyFileOutputFormat();
-        dfof.setOutputFilePath(new Path(tmpFilePath));
-        dfof.setWriteMode(WriteMode.NO_OVERWRITE);
-        dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
+        assertThatThrownBy(
+                        () -> {
+                            DummyFileOutputFormat dfof = new 
DummyFileOutputFormat();
+                            dfof.setOutputFilePath(new Path(tmpFilePath));
+                            dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+                            
dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
 
-        dfof.configure(new Configuration());
+                            dfof.configure(new Configuration());
 
-        try {
-            dfof.open(0, 1);
-            dfof.close();
-            fail();
-        } catch (Exception e) {
-            // exception expected
-        }
+                            dfof.open(0, 1);
+                            dfof.close();
+                        })
+                .isInstanceOf(Exception.class);
         tmpOutPath.delete();
 
         // check success
-        dfof = new DummyFileOutputFormat();
-        dfof.setOutputFilePath(new Path(tmpFilePath));
-        dfof.setWriteMode(WriteMode.NO_OVERWRITE);
-        dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
-
-        dfof.configure(new Configuration());
-
-        try {
-            dfof.open(0, 1);
-            dfof.close();
-        } catch (Exception e) {
-            fail();
-        }
-        Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isFile());
+        assertThatNoException()
+                .isThrownBy(
+                        () -> {
+                            DummyFileOutputFormat dfof = new 
DummyFileOutputFormat();
+                            dfof.setOutputFilePath(new Path(tmpFilePath));
+                            dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+                            
dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
+
+                            dfof.configure(new Configuration());
+
+                            dfof.open(0, 1);
+                            dfof.close();
+                        });
+        assertThat(tmpOutPath.exists() && tmpOutPath.isFile()).isTrue();
         tmpOutPath.delete();
 
         // check fail for path with tailing '/'
-        dfof = new DummyFileOutputFormat();
-        dfof.setOutputFilePath(new Path(tmpFilePath + "/"));
-        dfof.setWriteMode(WriteMode.NO_OVERWRITE);
-        dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
-
-        dfof.configure(new Configuration());
-
-        try {
-            dfof.open(0, 1);
-            dfof.close();
-        } catch (Exception e) {
-            fail();
-        }
-        Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isFile());
+        assertThatNoException()
+                .isThrownBy(
+                        () -> {
+                            DummyFileOutputFormat dfof = new 
DummyFileOutputFormat();
+                            dfof.setOutputFilePath(new Path(tmpFilePath + 
"/"));
+                            dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+                            
dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
+
+                            dfof.configure(new Configuration());
+
+                            dfof.open(0, 1);
+                            dfof.close();
+                        });
+        assertThat(tmpOutPath.exists() && tmpOutPath.isFile()).isTrue();
         tmpOutPath.delete();
 
         // ----------- test again with always directory mode
 
         // check fail if file exists
         tmpOutPath.createNewFile();
 
-        dfof = new DummyFileOutputFormat();
-        dfof.setOutputFilePath(new Path(tmpFilePath));
-        dfof.setWriteMode(WriteMode.NO_OVERWRITE);
-        dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
+        assertThatThrownBy(
+                        () -> {
+                            DummyFileOutputFormat dfof = new 
DummyFileOutputFormat();
+                            dfof.setOutputFilePath(new Path(tmpFilePath));
+                            dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+                            
dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
 
-        dfof.configure(new Configuration());
+                            dfof.configure(new Configuration());
 
-        try {
-            dfof.open(0, 1);
-            dfof.close();
-            fail();
-        } catch (Exception e) {
-            // exception expected
-        }
+                            dfof.open(0, 1);
+                            dfof.close();
+                        })
+                .isInstanceOf(Exception.class);
         tmpOutPath.delete();
 
         // check success if directory exists
-        Assert.assertTrue("Directory could not be created.", 
tmpOutPath.mkdir());
-
-        dfof = new DummyFileOutputFormat();
-        dfof.setOutputFilePath(new Path(tmpFilePath));
-        dfof.setWriteMode(WriteMode.NO_OVERWRITE);
-        dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
-
-        dfof.configure(new Configuration());
-
-        try {
-            dfof.open(0, 1);
-            dfof.close();
-        } catch (Exception e) {
-            fail();
-        }
-        Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
-        Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
+        assertThat(tmpOutPath.mkdir()).as("Directory could not be 
created.").isTrue();
+
+        assertThatNoException()
+                .isThrownBy(
+                        () -> {
+                            DummyFileOutputFormat dfof = new 
DummyFileOutputFormat();
+                            dfof.setOutputFilePath(new Path(tmpFilePath));
+                            dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+                            
dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
+
+                            dfof.configure(new Configuration());
+
+                            dfof.open(0, 1);
+                            dfof.close();
+                        });
+        assertThat(tmpOutPath.exists() && tmpOutPath.isDirectory()).isTrue();
+        assertThat(tmpOutFile.exists() && tmpOutFile.isFile()).isTrue();

Review Comment:
   ```suggestion
           assertThat(tmpOutPath).exists().isDirectory();
           assertThat(tmpOutFile).exists().isFile();
   ```



##########
flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java:
##########
@@ -41,126 +41,106 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** The test for map operator. */
-@SuppressWarnings("serial")
 public class MapOperatorTest implements java.io.Serializable {
 
     @Test
-    public void testMapPlain() {
-        try {
-            final MapFunction<String, Integer> parser =
-                    new MapFunction<String, Integer>() {
-                        @Override
-                        public Integer map(String value) {
-                            return Integer.parseInt(value);
-                        }
-                    };
-
-            MapOperatorBase<String, Integer, MapFunction<String, Integer>> op =
-                    new MapOperatorBase<String, Integer, MapFunction<String, 
Integer>>(
-                            parser,
-                            new UnaryOperatorInformation<String, Integer>(
-                                    BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO),
-                            "TestMapper");
-
-            List<String> input = new ArrayList<String>(asList("1", "2", "3", 
"4", "5", "6"));
-
-            ExecutionConfig executionConfig = new ExecutionConfig();
-            executionConfig.disableObjectReuse();
-            List<Integer> resultMutableSafe = op.executeOnCollections(input, 
null, executionConfig);
-            executionConfig.enableObjectReuse();
-            List<Integer> resultRegular = op.executeOnCollections(input, null, 
executionConfig);
-
-            assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);
-            assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+    void testMapPlain() throws Exception {
+        final MapFunction<String, Integer> parser = Integer::parseInt;
+
+        MapOperatorBase<String, Integer, MapFunction<String, Integer>> op =
+                new MapOperatorBase<>(
+                        parser,
+                        new UnaryOperatorInformation<>(
+                                BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO),
+                        "TestMapper");
+
+        List<String> input = new ArrayList<>(asList("1", "2", "3", "4", "5", 
"6"));
+
+        ExecutionConfig executionConfig = new ExecutionConfig();
+        executionConfig.disableObjectReuse();
+        List<Integer> resultMutableSafe = op.executeOnCollections(input, null, 
executionConfig);
+        executionConfig.enableObjectReuse();
+        List<Integer> resultRegular = op.executeOnCollections(input, null, 
executionConfig);
+
+        assertThat(resultMutableSafe).isEqualTo(asList(1, 2, 3, 4, 5, 6));
+        assertThat(resultRegular).isEqualTo(asList(1, 2, 3, 4, 5, 6));
     }
 
     @Test
-    public void testMapWithRuntimeContext() {
-        try {
-            final String taskName = "Test Task";
-            final AtomicBoolean opened = new AtomicBoolean();
-            final AtomicBoolean closed = new AtomicBoolean();
-
-            final MapFunction<String, Integer> parser =
-                    new RichMapFunction<String, Integer>() {
-
-                        @Override
-                        public void open(OpenContext openContext) throws 
Exception {
-                            opened.set(true);
-                            RuntimeContext ctx = getRuntimeContext();
-                            assertEquals(0, 
ctx.getTaskInfo().getIndexOfThisSubtask());
-                            assertEquals(1, 
ctx.getTaskInfo().getNumberOfParallelSubtasks());
-                            assertEquals(taskName, 
ctx.getTaskInfo().getTaskName());
-                        }
-
-                        @Override
-                        public Integer map(String value) {
-                            return Integer.parseInt(value);
-                        }
-
-                        @Override
-                        public void close() throws Exception {
-                            closed.set(true);
-                        }
-                    };
-
-            MapOperatorBase<String, Integer, MapFunction<String, Integer>> op =
-                    new MapOperatorBase<String, Integer, MapFunction<String, 
Integer>>(
-                            parser,
-                            new UnaryOperatorInformation<String, Integer>(
-                                    BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO),
-                            taskName);
-
-            List<String> input = new ArrayList<String>(asList("1", "2", "3", 
"4", "5", "6"));
-            final HashMap<String, Accumulator<?, ?>> accumulatorMap =
-                    new HashMap<String, Accumulator<?, ?>>();
-            final HashMap<String, Future<Path>> cpTasks = new HashMap<>();
-            final TaskInfo taskInfo = new TaskInfoImpl(taskName, 1, 0, 1, 0);
-            ExecutionConfig executionConfig = new ExecutionConfig();
-            executionConfig.disableObjectReuse();
-
-            List<Integer> resultMutableSafe =
-                    op.executeOnCollections(
-                            input,
-                            new RuntimeUDFContext(
-                                    taskInfo,
-                                    null,
-                                    executionConfig,
-                                    cpTasks,
-                                    accumulatorMap,
-                                    
UnregisteredMetricsGroup.createOperatorMetricGroup()),
-                            executionConfig);
-
-            executionConfig.enableObjectReuse();
-            List<Integer> resultRegular =
-                    op.executeOnCollections(
-                            input,
-                            new RuntimeUDFContext(
-                                    taskInfo,
-                                    null,
-                                    executionConfig,
-                                    cpTasks,
-                                    accumulatorMap,
-                                    
UnregisteredMetricsGroup.createOperatorMetricGroup()),
-                            executionConfig);
-
-            assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);
-            assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);
-
-            assertTrue(opened.get());
-            assertTrue(closed.get());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+    void testMapWithRuntimeContext() throws Exception {
+        final String taskName = "Test Task";
+        final AtomicBoolean opened = new AtomicBoolean();
+        final AtomicBoolean closed = new AtomicBoolean();
+
+        final MapFunction<String, Integer> parser =
+                new RichMapFunction<String, Integer>() {
+
+                    @Override
+                    public void open(OpenContext openContext) {
+                        opened.set(true);
+                        RuntimeContext ctx = getRuntimeContext();
+                        
assertThat(ctx.getTaskInfo().getIndexOfThisSubtask()).isZero();
+                        
assertThat(ctx.getTaskInfo().getNumberOfParallelSubtasks()).isEqualTo(1);
+                        
assertThat(ctx.getTaskInfo().getTaskName()).isEqualTo(taskName);
+                    }
+
+                    @Override
+                    public Integer map(String value) {
+                        return Integer.parseInt(value);
+                    }
+
+                    @Override
+                    public void close() {
+                        closed.set(true);
+                    }
+                };
+
+        MapOperatorBase<String, Integer, MapFunction<String, Integer>> op =
+                new MapOperatorBase<>(
+                        parser,
+                        new UnaryOperatorInformation<>(
+                                BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO),
+                        taskName);
+
+        List<String> input = new ArrayList<>(asList("1", "2", "3", "4", "5", 
"6"));
+        final HashMap<String, Accumulator<?, ?>> accumulatorMap = new 
HashMap<>();
+        final HashMap<String, Future<Path>> cpTasks = new HashMap<>();
+        final TaskInfo taskInfo = new TaskInfoImpl(taskName, 1, 0, 1, 0);
+        ExecutionConfig executionConfig = new ExecutionConfig();
+        executionConfig.disableObjectReuse();
+
+        List<Integer> resultMutableSafe =
+                op.executeOnCollections(
+                        input,
+                        new RuntimeUDFContext(
+                                taskInfo,
+                                null,
+                                executionConfig,
+                                cpTasks,
+                                accumulatorMap,
+                                
UnregisteredMetricsGroup.createOperatorMetricGroup()),
+                        executionConfig);
+
+        executionConfig.enableObjectReuse();
+        List<Integer> resultRegular =
+                op.executeOnCollections(
+                        input,
+                        new RuntimeUDFContext(
+                                taskInfo,
+                                null,
+                                executionConfig,
+                                cpTasks,
+                                accumulatorMap,
+                                
UnregisteredMetricsGroup.createOperatorMetricGroup()),
+                        executionConfig);
+
+        assertThat(resultMutableSafe).isEqualTo(asList(1, 2, 3, 4, 5, 6));
+        assertThat(resultRegular).isEqualTo(asList(1, 2, 3, 4, 5, 6));
+
+        assertThat(opened.get()).isTrue();
+        assertThat(closed.get()).isTrue();

Review Comment:
   ```suggestion
           assertThat(opened).isTrue();
           assertThat(closed).isTrue();
   ```



##########
flink-core/src/test/java/org/apache/flink/api/common/operators/DualInputSemanticPropertiesTest.java:
##########
@@ -165,12 +164,12 @@ public void testGetSourceField() {
         sp.addForwardedField(0, 1, 2);
         sp.addForwardedField(0, 1, 3);
 
-        assertEquals(0, sp.getForwardingSourceField(0, 0));
-        assertEquals(0, sp.getForwardingSourceField(0, 4));
-        assertEquals(1, sp.getForwardingSourceField(0, 1));
-        assertEquals(1, sp.getForwardingSourceField(0, 2));
-        assertEquals(1, sp.getForwardingSourceField(0, 3));
-        assertTrue(sp.getForwardingSourceField(0, 5) < 0);
+        assertThat(sp.getForwardingSourceField(0, 0)).isZero();
+        assertThat(sp.getForwardingSourceField(0, 4)).isZero();
+        assertThat(sp.getForwardingSourceField(0, 1)).isEqualTo(1);
+        assertThat(sp.getForwardingSourceField(0, 2)).isEqualTo(1);
+        assertThat(sp.getForwardingSourceField(0, 3)).isEqualTo(1);
+        assertThat(sp.getForwardingSourceField(0, 5) < 0).isTrue();

Review Comment:
   ```suggestion
           ```suggestion
           assertThat(sp.getForwardingSourceField(0, 1)).isOne();
           assertThat(sp.getForwardingSourceField(0, 2)).isOne();
           assertThat(sp.getForwardingSourceField(0, 3)).isOne();
           assertThat(sp. getForwardingSourceField(0, 5)).isNegative();
   ```
   ```



##########
flink-core/src/test/java/org/apache/flink/api/common/operators/DualInputSemanticPropertiesTest.java:
##########
@@ -151,12 +150,12 @@ public void testGetSourceField() {
         sp.addForwardedField(0, 2, 3);
         sp.addForwardedField(0, 3, 2);
 
-        assertEquals(0, sp.getForwardingSourceField(0, 1));
-        assertEquals(1, sp.getForwardingSourceField(0, 4));
-        assertEquals(2, sp.getForwardingSourceField(0, 3));
-        assertEquals(3, sp.getForwardingSourceField(0, 2));
-        assertTrue(sp.getForwardingSourceField(0, 0) < 0);
-        assertTrue(sp.getForwardingSourceField(0, 5) < 0);
+        assertThat(sp.getForwardingSourceField(0, 1)).isZero();
+        assertThat(sp.getForwardingSourceField(0, 4)).isEqualTo(1);

Review Comment:
   ```suggestion
           assertThat(sp.getForwardingSourceField(0, 4)).isOne();
   ```



##########
flink-core/src/test/java/org/apache/flink/api/common/operators/SingleInputSemanticPropertiesTest.java:
##########
@@ -54,32 +55,32 @@ public void testGetTargetFields() {
         sp.addForwardedField(1, 2);
         sp.addForwardedField(1, 3);
 
-        assertEquals(2, sp.getForwardingTargetFields(0, 0).size());
-        assertEquals(3, sp.getForwardingTargetFields(0, 1).size());
-        assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
-        assertTrue(sp.getForwardingTargetFields(0, 0).contains(4));
-        assertTrue(sp.getForwardingTargetFields(0, 1).contains(1));
-        assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
-        assertTrue(sp.getForwardingTargetFields(0, 1).contains(3));
-        assertNotNull(sp.getForwardingTargetFields(0, 2));
-        assertEquals(0, sp.getForwardingTargetFields(0, 2).size());
+        assertThat(sp.getForwardingTargetFields(0, 0)).hasSize(2);
+        assertThat(sp.getForwardingTargetFields(0, 1)).hasSize(3);
+        assertThat(sp.getForwardingTargetFields(0, 0).contains(0)).isTrue();
+        assertThat(sp.getForwardingTargetFields(0, 0).contains(4)).isTrue();
+        assertThat(sp.getForwardingTargetFields(0, 1).contains(1)).isTrue();
+        assertThat(sp.getForwardingTargetFields(0, 1).contains(2)).isTrue();
+        assertThat(sp.getForwardingTargetFields(0, 1).contains(3)).isTrue();
+        assertThat(sp.getForwardingTargetFields(0, 2)).isNotNull();
+        assertThat(sp.getForwardingTargetFields(0, 2)).isEmpty();
     }
 
     @Test
-    public void testGetSourceField() {
+    void testGetSourceField() {
 
         SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
         sp.addForwardedField(0, 1);
         sp.addForwardedField(1, 4);
         sp.addForwardedField(2, 3);
         sp.addForwardedField(3, 2);
 
-        assertEquals(0, sp.getForwardingSourceField(0, 1));
-        assertEquals(1, sp.getForwardingSourceField(0, 4));
-        assertEquals(2, sp.getForwardingSourceField(0, 3));
-        assertEquals(3, sp.getForwardingSourceField(0, 2));
-        assertTrue(sp.getForwardingSourceField(0, 0) < 0);
-        assertTrue(sp.getForwardingSourceField(0, 5) < 0);
+        assertThat(sp.getForwardingSourceField(0, 1)).isZero();
+        assertThat(sp.getForwardingSourceField(0, 4)).isEqualTo(1);
+        assertThat(sp.getForwardingSourceField(0, 3)).isEqualTo(2);
+        assertThat(sp.getForwardingSourceField(0, 2)).isEqualTo(3);
+        assertThat(sp.getForwardingSourceField(0, 0) < 0).isTrue();
+        assertThat(sp.getForwardingSourceField(0, 5) < 0).isTrue();

Review Comment:
   isOne and isNegative



##########
flink-core/src/test/java/org/apache/flink/api/common/operators/DualInputSemanticPropertiesTest.java:
##########
@@ -151,12 +150,12 @@ public void testGetSourceField() {
         sp.addForwardedField(0, 2, 3);
         sp.addForwardedField(0, 3, 2);
 
-        assertEquals(0, sp.getForwardingSourceField(0, 1));
-        assertEquals(1, sp.getForwardingSourceField(0, 4));
-        assertEquals(2, sp.getForwardingSourceField(0, 3));
-        assertEquals(3, sp.getForwardingSourceField(0, 2));
-        assertTrue(sp.getForwardingSourceField(0, 0) < 0);
-        assertTrue(sp.getForwardingSourceField(0, 5) < 0);
+        assertThat(sp.getForwardingSourceField(0, 1)).isZero();
+        assertThat(sp.getForwardingSourceField(0, 4)).isEqualTo(1);
+        assertThat(sp.getForwardingSourceField(0, 3)).isEqualTo(2);
+        assertThat(sp.getForwardingSourceField(0, 2)).isEqualTo(3);
+        assertThat(sp.getForwardingSourceField(0, 0) < 0).isTrue();
+        assertThat(sp.getForwardingSourceField(0, 5) < 0).isTrue();

Review Comment:
   ```suggestion
           assertThat(sp.getForwardingSourceField(0, 0)).isNegative();
           assertThat(sp.getForwardingSourceField(0, 5)).isNegative();
   ```



##########
flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java:
##########
@@ -129,69 +116,58 @@ public void join(String first, String second, 
Collector<Integer> out)
         InnerJoinOperatorBase<
                         String, String, Integer, RichFlatJoinFunction<String, 
String, Integer>>
                 base =
-                        new InnerJoinOperatorBase<
-                                String,
-                                String,
-                                Integer,
-                                RichFlatJoinFunction<String, String, Integer>>(
+                        new InnerJoinOperatorBase<>(
                                 joiner,
-                                new BinaryOperatorInformation<String, String, 
Integer>(
+                                new BinaryOperatorInformation<>(
                                         BasicTypeInfo.STRING_TYPE_INFO,
                                         BasicTypeInfo.STRING_TYPE_INFO,
                                         BasicTypeInfo.INT_TYPE_INFO),
                                 new int[0],
                                 new int[0],
                                 taskName);
 
-        final List<String> inputData1 =
-                new ArrayList<String>(Arrays.asList("foo", "bar", "foobar"));
-        final List<String> inputData2 = new 
ArrayList<String>(Arrays.asList("foobar", "foo"));
-        final List<Integer> expected = new ArrayList<Integer>(Arrays.asList(3, 
3, 6, 6));
-
-        try {
-            final TaskInfo taskInfo = new TaskInfoImpl(taskName, 1, 0, 1, 0);
-            final HashMap<String, Accumulator<?, ?>> accumulatorMap =
-                    new HashMap<String, Accumulator<?, ?>>();
-            final HashMap<String, Future<Path>> cpTasks = new HashMap<>();
-
-            ExecutionConfig executionConfig = new ExecutionConfig();
-
-            executionConfig.disableObjectReuse();
-            List<Integer> resultSafe =
-                    base.executeOnCollections(
-                            inputData1,
-                            inputData2,
-                            new RuntimeUDFContext(
-                                    taskInfo,
-                                    null,
-                                    executionConfig,
-                                    cpTasks,
-                                    accumulatorMap,
-                                    
UnregisteredMetricsGroup.createOperatorMetricGroup()),
-                            executionConfig);
-
-            executionConfig.enableObjectReuse();
-            List<Integer> resultRegular =
-                    base.executeOnCollections(
-                            inputData1,
-                            inputData2,
-                            new RuntimeUDFContext(
-                                    taskInfo,
-                                    null,
-                                    executionConfig,
-                                    cpTasks,
-                                    accumulatorMap,
-                                    
UnregisteredMetricsGroup.createOperatorMetricGroup()),
-                            executionConfig);
-
-            assertEquals(expected, resultSafe);
-            assertEquals(expected, resultRegular);
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
-
-        assertTrue(opened.get());
-        assertTrue(closed.get());
+        final List<String> inputData1 = new ArrayList<>(Arrays.asList("foo", 
"bar", "foobar"));
+        final List<String> inputData2 = new 
ArrayList<>(Arrays.asList("foobar", "foo"));
+        final List<Integer> expected = new ArrayList<>(Arrays.asList(3, 3, 6, 
6));
+
+        final TaskInfo taskInfo = new TaskInfoImpl(taskName, 1, 0, 1, 0);
+        final HashMap<String, Accumulator<?, ?>> accumulatorMap = new 
HashMap<>();
+        final HashMap<String, Future<Path>> cpTasks = new HashMap<>();
+
+        ExecutionConfig executionConfig = new ExecutionConfig();
+
+        executionConfig.disableObjectReuse();
+        List<Integer> resultSafe =
+                base.executeOnCollections(
+                        inputData1,
+                        inputData2,
+                        new RuntimeUDFContext(
+                                taskInfo,
+                                null,
+                                executionConfig,
+                                cpTasks,
+                                accumulatorMap,
+                                
UnregisteredMetricsGroup.createOperatorMetricGroup()),
+                        executionConfig);
+
+        executionConfig.enableObjectReuse();
+        List<Integer> resultRegular =
+                base.executeOnCollections(
+                        inputData1,
+                        inputData2,
+                        new RuntimeUDFContext(
+                                taskInfo,
+                                null,
+                                executionConfig,
+                                cpTasks,
+                                accumulatorMap,
+                                
UnregisteredMetricsGroup.createOperatorMetricGroup()),
+                        executionConfig);
+
+        assertThat(resultSafe).isEqualTo(expected);
+        assertThat(resultRegular).isEqualTo(expected);
+
+        assertThat(opened.get()).isTrue();
+        assertThat(closed.get()).isTrue();

Review Comment:
   ```suggestion
           assertThat(opened).isTrue();
           assertThat(closed).isTrue();
   ```



##########
flink-core/src/test/java/org/apache/flink/api/common/operators/ExpressionKeysTest.java:
##########
@@ -407,49 +397,49 @@ public void testInvalidPojo() throws Throwable {
             } catch (Throwable t) {
                 e = t;
             }
-            Assert.assertNotNull(e);
+            assertThat(e).isNotNull();

Review Comment:
   assertThatThrownBy



##########
flink-core/src/test/java/org/apache/flink/api/common/operators/ExpressionKeysTest.java:
##########
@@ -148,24 +152,25 @@ public void testInvalidTuple() throws Throwable {
             } catch (Throwable t) {
                 e = t;
             }
-            Assert.assertNotNull(e);
+            assertThat(e).isNotNull();

Review Comment:
   assertThatThrownBy



##########
flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java:
##########
@@ -262,20 +265,21 @@ private static class MockRichFlatJoinFunction
         final AtomicBoolean closed = new AtomicBoolean(false);
 
         @Override
-        public void open(OpenContext openContext) throws Exception {
+        public void open(OpenContext openContext) {
             opened.compareAndSet(false, true);
-            assertEquals(0, 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
-            assertEquals(1, 
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks());
+            
assertThat(getRuntimeContext().getTaskInfo().getIndexOfThisSubtask()).isZero();
+            
assertThat(getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks())
+                    .isEqualTo(1);

Review Comment:
   ```suggestion
                       .isOne();
   ```



##########
flink-core/src/test/java/org/apache/flink/api/common/operators/SingleInputSemanticPropertiesTest.java:
##########
@@ -88,98 +89,113 @@ public void testGetSourceField() {
         sp.addForwardedField(1, 2);
         sp.addForwardedField(1, 3);
 
-        assertEquals(0, sp.getForwardingSourceField(0, 0));
-        assertEquals(0, sp.getForwardingSourceField(0, 4));
-        assertEquals(1, sp.getForwardingSourceField(0, 1));
-        assertEquals(1, sp.getForwardingSourceField(0, 2));
-        assertEquals(1, sp.getForwardingSourceField(0, 3));
-        assertTrue(sp.getForwardingSourceField(0, 5) < 0);
+        assertThat(sp.getForwardingSourceField(0, 0)).isZero();
+        assertThat(sp.getForwardingSourceField(0, 4)).isZero();
+        assertThat(sp.getForwardingSourceField(0, 1)).isEqualTo(1);
+        assertThat(sp.getForwardingSourceField(0, 2)).isEqualTo(1);
+        assertThat(sp.getForwardingSourceField(0, 3)).isEqualTo(1);
+        assertThat(sp.getForwardingSourceField(0, 5) < 0).isTrue();

Review Comment:
   isOne and isNegative



##########
flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java:
##########
@@ -218,38 +193,30 @@ public void testInequalityWithReference() {
     }
 
     protected void testGreatSmallAscDescWithReference(boolean ascending, 
boolean greater) {
-        try {
-            T[] data = getSortedData();
+        T[] data = getSortedData();
 
-            TypeComparator<T> comparatorLow = getComparator(ascending);
-            TypeComparator<T> comparatorHigh = getComparator(ascending);
+        TypeComparator<T> comparatorLow = getComparator(ascending);
+        TypeComparator<T> comparatorHigh = getComparator(ascending);
 
-            // compares every element in high with every element in low
-            for (int x = 0; x < data.length - 1; x++) {
-                for (int y = x + 1; y < data.length; y++) {
-                    comparatorLow.setReference(data[x]);
-                    comparatorHigh.setReference(data[y]);
+        // compares every element in high with every element in low
+        for (int x = 0; x < data.length - 1; x++) {
+            for (int y = x + 1; y < data.length; y++) {
+                comparatorLow.setReference(data[x]);
+                comparatorHigh.setReference(data[y]);
 
-                    if (greater && ascending) {
-                        
assertThat(comparatorLow.compareToReference(comparatorHigh))
-                                .isGreaterThan(0);
-                    }
-                    if (greater && !ascending) {
-                        
assertThat(comparatorLow.compareToReference(comparatorHigh)).isLessThan(0);
-                    }
-                    if (!greater && ascending) {
-                        
assertThat(comparatorHigh.compareToReference(comparatorLow)).isLessThan(0);
-                    }
-                    if (!greater && !ascending) {
-                        
assertThat(comparatorHigh.compareToReference(comparatorLow))
-                                .isGreaterThan(0);
-                    }
+                if (greater && ascending) {
+                    
assertThat(comparatorLow.compareToReference(comparatorHigh)).isGreaterThan(0);
+                }
+                if (greater && !ascending) {
+                    
assertThat(comparatorLow.compareToReference(comparatorHigh)).isLessThan(0);
+                }
+                if (!greater && ascending) {
+                    
assertThat(comparatorHigh.compareToReference(comparatorLow)).isLessThan(0);
+                }
+                if (!greater && !ascending) {
+                    
assertThat(comparatorHigh.compareToReference(comparatorLow)).isGreaterThan(0);

Review Comment:
   How about isPositive and isNegative?



##########
flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java:
##########
@@ -90,13 +82,13 @@ private void testExecuteOnCollection(
                                         taskInfo,
                                         null,
                                         executionConfig,
-                                        new HashMap<String, Future<Path>>(),
-                                        new HashMap<String, Accumulator<?, 
?>>(),
+                                        new HashMap<>(),
+                                        new HashMap<>(),
                                         
UnregisteredMetricsGroup.createOperatorMetricGroup()),
                                 executionConfig);
 
-        Assert.assertEquals(input.size(), result.size());
-        Assert.assertEquals(input, result);
+        assertThat(result).hasSize(input.size());
+        assertThat(result).isEqualTo(input);

Review Comment:
   ```suggestion
           assertThat(result).hasSameSizeAs(input).isEqualTo(input);
   ```
   or just isEqualTo for short



##########
flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java:
##########
@@ -21,115 +21,103 @@
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.TaskInfoImpl;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.util.Collector;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** The test for partition map operator. */
-@SuppressWarnings("serial")
 public class PartitionMapOperatorTest implements java.io.Serializable {
 
     @Test
-    public void testMapPartitionWithRuntimeContext() {
-        try {
-            final String taskName = "Test Task";
-            final AtomicBoolean opened = new AtomicBoolean();
-            final AtomicBoolean closed = new AtomicBoolean();
-
-            final MapPartitionFunction<String, Integer> parser =
-                    new RichMapPartitionFunction<String, Integer>() {
-
-                        @Override
-                        public void open(OpenContext openContext) throws 
Exception {
-                            opened.set(true);
-                            RuntimeContext ctx = getRuntimeContext();
-                            assertEquals(0, 
ctx.getTaskInfo().getIndexOfThisSubtask());
-                            assertEquals(1, 
ctx.getTaskInfo().getNumberOfParallelSubtasks());
-                            assertEquals(taskName, 
ctx.getTaskInfo().getTaskName());
+    void testMapPartitionWithRuntimeContext() throws Exception {
+        final String taskName = "Test Task";
+        final AtomicBoolean opened = new AtomicBoolean();
+        final AtomicBoolean closed = new AtomicBoolean();
+
+        final MapPartitionFunction<String, Integer> parser =
+                new RichMapPartitionFunction<String, Integer>() {
+
+                    @Override
+                    public void open(OpenContext openContext) {
+                        opened.set(true);
+                        RuntimeContext ctx = getRuntimeContext();
+                        
assertThat(ctx.getTaskInfo().getIndexOfThisSubtask()).isZero();
+                        
assertThat(ctx.getTaskInfo().getNumberOfParallelSubtasks()).isEqualTo(1);
+                        
assertThat(ctx.getTaskInfo().getTaskName()).isEqualTo(taskName);
+                    }
+
+                    @Override
+                    public void mapPartition(Iterable<String> values, 
Collector<Integer> out) {
+                        for (String s : values) {
+                            out.collect(Integer.parseInt(s));
                         }
-
-                        @Override
-                        public void mapPartition(Iterable<String> values, 
Collector<Integer> out) {
-                            for (String s : values) {
-                                out.collect(Integer.parseInt(s));
-                            }
-                        }
-
-                        @Override
-                        public void close() throws Exception {
-                            closed.set(true);
-                        }
-                    };
-
-            MapPartitionOperatorBase<String, Integer, 
MapPartitionFunction<String, Integer>> op =
-                    new MapPartitionOperatorBase<
-                            String, Integer, MapPartitionFunction<String, 
Integer>>(
-                            parser,
-                            new UnaryOperatorInformation<String, Integer>(
-                                    BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO),
-                            taskName);
-
-            List<String> input = new ArrayList<String>(asList("1", "2", "3", 
"4", "5", "6"));
-
-            final TaskInfo taskInfo = new TaskInfoImpl(taskName, 1, 0, 1, 0);
-
-            ExecutionConfig executionConfig = new ExecutionConfig();
-            executionConfig.disableObjectReuse();
-
-            List<Integer> resultMutableSafe =
-                    op.executeOnCollections(
-                            input,
-                            new RuntimeUDFContext(
-                                    taskInfo,
-                                    null,
-                                    executionConfig,
-                                    new HashMap<String, Future<Path>>(),
-                                    new HashMap<String, Accumulator<?, ?>>(),
-                                    
UnregisteredMetricsGroup.createOperatorMetricGroup()),
-                            executionConfig);
-
-            executionConfig.enableObjectReuse();
-            List<Integer> resultRegular =
-                    op.executeOnCollections(
-                            input,
-                            new RuntimeUDFContext(
-                                    taskInfo,
-                                    null,
-                                    executionConfig,
-                                    new HashMap<String, Future<Path>>(),
-                                    new HashMap<String, Accumulator<?, ?>>(),
-                                    
UnregisteredMetricsGroup.createOperatorMetricGroup()),
-                            executionConfig);
-
-            assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);
-            assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);
-
-            assertTrue(opened.get());
-            assertTrue(closed.get());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+                    }
+
+                    @Override
+                    public void close() {
+                        closed.set(true);
+                    }
+                };
+
+        MapPartitionOperatorBase<String, Integer, MapPartitionFunction<String, 
Integer>> op =
+                new MapPartitionOperatorBase<>(
+                        parser,
+                        new UnaryOperatorInformation<>(
+                                BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO),
+                        taskName);
+
+        List<String> input = new ArrayList<>(asList("1", "2", "3", "4", "5", 
"6"));
+
+        final TaskInfo taskInfo = new TaskInfoImpl(taskName, 1, 0, 1, 0);
+
+        ExecutionConfig executionConfig = new ExecutionConfig();
+        executionConfig.disableObjectReuse();
+
+        List<Integer> resultMutableSafe =
+                op.executeOnCollections(
+                        input,
+                        new RuntimeUDFContext(
+                                taskInfo,
+                                null,
+                                executionConfig,
+                                new HashMap<>(),
+                                new HashMap<>(),
+                                
UnregisteredMetricsGroup.createOperatorMetricGroup()),
+                        executionConfig);
+
+        executionConfig.enableObjectReuse();
+        List<Integer> resultRegular =
+                op.executeOnCollections(
+                        input,
+                        new RuntimeUDFContext(
+                                taskInfo,
+                                null,
+                                executionConfig,
+                                new HashMap<>(),
+                                new HashMap<>(),
+                                
UnregisteredMetricsGroup.createOperatorMetricGroup()),
+                        executionConfig);
+
+        assertThat(resultMutableSafe).isEqualTo(asList(1, 2, 3, 4, 5, 6));
+        assertThat(resultRegular).isEqualTo(asList(1, 2, 3, 4, 5, 6));
+
+        assertThat(opened.get()).isTrue();
+        assertThat(closed.get()).isTrue();

Review Comment:
   ```suggestion
           assertThat(opened).isTrue();
           assertThat(closed).isTrue();
   ```



##########
flink-core/src/test/java/org/apache/flink/api/common/operators/util/FieldListTest.java:
##########
@@ -101,8 +98,8 @@ private static void check(FieldList set, int... elements) {
             for (int i = 0; i < fromIter.length; i++) {
                 fromIter[i] = iter.next();
             }
-            assertFalse(iter.hasNext());
-            assertTrue(Arrays.equals(fromIter, elements));
+            assertThat(iter.hasNext()).isFalse();

Review Comment:
   ```suggestion
               assertThat(iter).isExhausted();
   ```



##########
flink-core/src/test/java/org/apache/flink/api/common/operators/util/FieldListTest.java:
##########
@@ -62,26 +59,26 @@ public void testImmutability() {
         s1.addField(Integer.valueOf(14));
         s2.addFields(78, 13, 66, 3);
 
-        assertEquals(0, s1.size());
-        assertEquals(1, s2.size());
-        assertEquals(1, s3.size());
-        assertEquals(4, s4.size());
+        assertThat(s1).isEmpty();
+        assertThat(s2).hasSize(1);
+        assertThat(s3).hasSize(1);
+        assertThat(s4).hasSize(4);
     }
 
     @Test
-    public void testAddSetToList() {
+    void testAddSetToList() {
         check(new FieldList().addFields(new FieldSet(1)).addFields(2), 1, 2);
         check(new FieldList().addFields(1).addFields(new FieldSet(2)), 1, 2);
         check(new FieldList().addFields(new FieldSet(2)), 2);
     }
 
     private static void check(FieldList set, int... elements) {
         if (elements == null) {
-            assertEquals(0, set.size());
+            assertThat(set).isEmpty();
             return;
         }
 
-        assertEquals(elements.length, set.size());
+        assertThat(set).hasSize(elements.length);

Review Comment:
   ```suggestion
           assertThat(set).hasSameSizeAs(elements);
   ```



##########
flink-core/src/test/java/org/apache/flink/api/common/operators/util/FieldSetTest.java:
##########
@@ -105,9 +103,9 @@ private static void check(FieldSet set, int... elements) {
             for (int i = 0; i < fromIter.length; i++) {
                 fromIter[i] = iter.next();
             }
-            assertFalse(iter.hasNext());
+            assertThat(iter.hasNext()).isFalse();

Review Comment:
   ```suggestion
               assertThat(iter).isExhausted();
   ```



##########
flink-core/src/test/java/org/apache/flink/api/common/resources/ResourceTest.java:
##########
@@ -20,194 +20,198 @@
 
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.math.BigDecimal;
 
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.lessThan;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
 
 /** Tests for {@link Resource}. */
-public class ResourceTest extends TestLogger {
+@SuppressWarnings("rawtypes")
+class ResourceTest extends TestLogger {
 
     @Test
-    public void testConstructorValid() {
+    void testConstructorValid() {
         final Resource v1 = new TestResource(0.1);
         assertTestResourceValueEquals(0.1, v1);
 
         final Resource v2 = new TestResource(BigDecimal.valueOf(0.1));
         assertTestResourceValueEquals(0.1, v2);
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testConstructorInvalidValue() {
-        new TestResource(-0.1);
+    @Test
+    void testConstructorInvalidValue() {
+        assertThatThrownBy(() -> new TestResource(-0.1))
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
-    public void testEquals() {
+    void testEquals() {
         final Resource v1 = new TestResource(0.1);
         final Resource v2 = new TestResource(0.1);
         final Resource v3 = new TestResource(0.2);
-        assertTrue(v1.equals(v2));
-        assertFalse(v1.equals(v3));
+        assertThat(v2).isEqualTo(v1);
+        assertThat(v3).isNotEqualTo(v1);
     }
 
     @Test
-    public void testEqualsIgnoringScale() {
+    void testEqualsIgnoringScale() {
         final Resource v1 = new TestResource(new BigDecimal("0.1"));
         final Resource v2 = new TestResource(new BigDecimal("0.10"));
-        assertTrue(v1.equals(v2));
+        assertThat(v2).isEqualTo(v1);
     }
 
     @Test
-    public void testHashCodeIgnoringScale() {
+    void testHashCodeIgnoringScale() {
         final Resource v1 = new TestResource(new BigDecimal("0.1"));
         final Resource v2 = new TestResource(new BigDecimal("0.10"));
-        assertTrue(v1.hashCode() == v2.hashCode());
+        assertThat(v2).hasSameHashCodeAs(v1);
     }
 
     @Test
-    public void testMerge() {
+    void testMerge() {
         final Resource v1 = new TestResource(0.1);
         final Resource v2 = new TestResource(0.2);
         assertTestResourceValueEquals(0.3, v1.merge(v2));
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testMergeErrorOnDifferentTypes() {
+    @Test
+    void testMergeErrorOnDifferentTypes() {
         final Resource v1 = new TestResource(0.1);
         final Resource v2 = new CPUResource(0.1);
-        v1.merge(v2);
+        // v1.merge(v2);
+        assertThatThrownBy(() -> 
v1.merge(v2)).isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
-    public void testSubtract() {
+    void testSubtract() {
         final Resource v1 = new TestResource(0.2);
         final Resource v2 = new TestResource(0.1);
         assertTestResourceValueEquals(0.1, v1.subtract(v2));
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testSubtractLargerValue() {
+    @Test
+    void testSubtractLargerValue() {
         final Resource v1 = new TestResource(0.1);
         final Resource v2 = new TestResource(0.2);
-        v1.subtract(v2);
+        assertThatThrownBy(() -> 
v1.subtract(v2)).isInstanceOf(IllegalArgumentException.class);
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testSubtractErrorOnDifferentTypes() {
+    @Test
+    void testSubtractErrorOnDifferentTypes() {
         final Resource v1 = new TestResource(0.1);
         final Resource v2 = new CPUResource(0.1);
-        v1.subtract(v2);
+        assertThatThrownBy(() -> 
v1.subtract(v2)).isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
-    public void testDivide() {
+    void testDivide() {
         final Resource resource = new TestResource(0.04);
         final BigDecimal by = BigDecimal.valueOf(0.1);
         assertTestResourceValueEquals(0.4, resource.divide(by));
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testDivideNegative() {
+    @Test
+    void testDivideNegative() {
         final Resource resource = new TestResource(1.2);
         final BigDecimal by = BigDecimal.valueOf(-0.5);
-        resource.divide(by);
+        assertThatThrownBy(() -> 
resource.divide(by)).isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
-    public void testDivideInteger() {
+    void testDivideInteger() {
         final Resource resource = new TestResource(0.12);
         final int by = 4;
         assertTestResourceValueEquals(0.03, resource.divide(by));
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testDivideNegativeInteger() {
+    @Test
+    void testDivideNegativeInteger() {
         final Resource resource = new TestResource(1.2);
         final int by = -5;
-        resource.divide(by);
+        assertThatThrownBy(() -> 
resource.divide(by)).isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
-    public void testMultiply() {
+    void testMultiply() {
         final Resource resource = new TestResource(0.3);
         final BigDecimal by = BigDecimal.valueOf(0.2);
         assertTestResourceValueEquals(0.06, resource.multiply(by));
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testMutiplyNegative() {
+    @Test
+    void testMutiplyNegative() {
         final Resource resource = new TestResource(0.3);
         final BigDecimal by = BigDecimal.valueOf(-0.2);
-        resource.multiply(by);
+        assertThatThrownBy(() -> resource.multiply(by))
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
-    public void testMultiplyInteger() {
+    void testMultiplyInteger() {
         final Resource resource = new TestResource(0.3);
         final int by = 2;
         assertTestResourceValueEquals(0.6, resource.multiply(by));
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testMutiplyNegativeInteger() {
+    @Test
+    void testMutiplyNegativeInteger() {
         final Resource resource = new TestResource(0.3);
         final int by = -2;
-        resource.multiply(by);
+        assertThatThrownBy(() -> resource.multiply(by))
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
-    public void testIsZero() {
+    void testIsZero() {
         final Resource resource1 = new TestResource(0.0);
         final Resource resource2 = new TestResource(1.0);
 
-        assertTrue(resource1.isZero());
-        assertFalse(resource2.isZero());
+        assertThat(resource1.isZero()).isTrue();
+        assertThat(resource2.isZero()).isFalse();
     }
 
     @Test
-    public void testCompareTo() {
+    void testCompareTo() {
         final Resource resource1 = new TestResource(0.0);
         final Resource resource2 = new TestResource(0.0);
         final Resource resource3 = new TestResource(1.0);
 
-        assertThat(resource1.compareTo(resource1), is(0));
-        assertThat(resource1.compareTo(resource2), is(0));
-        assertThat(resource1.compareTo(resource3), lessThan(0));
-        assertThat(resource3.compareTo(resource1), greaterThan(0));
+        assertThat(resource1.compareTo(resource1)).isZero();
+        assertThat(resource1.compareTo(resource2)).isZero();
+        assertThat(resource1.compareTo(resource3)).isLessThan(0);
+        assertThat(resource3.compareTo(resource1)).isGreaterThan(0);

Review Comment:
   ```suggestion
           assertThat(resource1.compareTo(resource3)).isNegative();
           assertThat(resource3.compareTo(resource1)).isPositive();
   ```



##########
flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java:
##########
@@ -68,148 +67,124 @@ protected Order[] getTestedOrder() {
     // -------------------------------- test duplication 
------------------------------------------
 
     @Test
-    public void testDuplicate() {
-        try {
-            boolean ascending = isAscending(getTestedOrder()[0]);
-            TypeComparator<T> comparator = getComparator(ascending);
-            TypeComparator<T> clone = comparator.duplicate();
-
-            T[] data = getSortedData();
-            comparator.setReference(data[0]);
-            clone.setReference(data[1]);
-
-            assertThat(comparator.equalToReference(data[0]) && 
clone.equalToReference(data[1]))
-                    .as(
-                            "Comparator duplication does not work: Altering 
the reference in a duplicated comparator alters the original comparator's 
reference.")
-                    .isTrue();
-        } catch (Exception e) {
-            System.err.println(e.getMessage());
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+    protected void testDuplicate() {
+        boolean ascending = isAscending(getTestedOrder()[0]);
+        TypeComparator<T> comparator = getComparator(ascending);
+        TypeComparator<T> clone = comparator.duplicate();
+
+        T[] data = getSortedData();
+        comparator.setReference(data[0]);
+        clone.setReference(data[1]);
+
+        assertThat(comparator.equalToReference(data[0]) && 
clone.equalToReference(data[1]))
+                .as(
+                        "Comparator duplication does not work: Altering the 
reference in a duplicated comparator alters the original comparator's 
reference.")
+                .isTrue();
     }
 
     // --------------------------------- equality tests 
-------------------------------------------
 
     @Test
-    public void testEquality() {
+    protected void testEquality() throws IOException {
         for (Order order : getTestedOrder()) {
             boolean ascending = isAscending(order);
             testEquals(ascending);
         }
     }
 
-    protected void testEquals(boolean ascending) {
-        try {
-            // Just setup two identical output/inputViews and go over their 
data to see if compare
-            // works
-            TestOutputView out1;
-            TestOutputView out2;
-            TestInputView in1;
-            TestInputView in2;
+    protected void testEquals(boolean ascending) throws IOException {
+        // Just setup two identical output/inputViews and go over their data 
to see if compare
+        // works
+        TestOutputView out1;
+        TestOutputView out2;
+        TestInputView in1;
+        TestInputView in2;
 
-            // Now use comparator and compare
-            TypeComparator<T> comparator = getComparator(ascending);
-            T[] data = getSortedData();
-            for (T d : data) {
+        // Now use comparator and compare
+        TypeComparator<T> comparator = getComparator(ascending);
+        T[] data = getSortedData();
+        for (T d : data) {
 
-                out2 = new TestOutputView();
-                writeSortedData(d, out2);
-                in2 = out2.getInputView();
+            out2 = new TestOutputView();
+            writeSortedData(d, out2);
+            in2 = out2.getInputView();
 
-                out1 = new TestOutputView();
-                writeSortedData(d, out1);
-                in1 = out1.getInputView();
+            out1 = new TestOutputView();
+            writeSortedData(d, out1);
+            in1 = out1.getInputView();
 
-                assertThat(comparator.compareSerialized(in1, 
in2)).isEqualTo(0);
-            }
-        } catch (Exception e) {
-            System.err.println(e.getMessage());
-            e.printStackTrace();
-            fail("Exception in test: " + e.getMessage());
+            assertThat(comparator.compareSerialized(in1, in2)).isZero();
         }
     }
 
     @Test
-    public void testEqualityWithReference() {
-        try {
-            TypeSerializer<T> serializer = createSerializer();
-            boolean ascending = isAscending(getTestedOrder()[0]);
-            TypeComparator<T> comparator = getComparator(ascending);
-            TypeComparator<T> comparator2 = getComparator(ascending);
-            T[] data = getSortedData();
-            for (T d : data) {
-                comparator.setReference(d);
-                // Make a copy to compare
-                T copy = serializer.copy(d, serializer.createInstance());
-
-                // And then test equalTo and compareToReference method of 
comparator
-                assertThat(comparator.equalToReference(d)).isTrue();
-                comparator2.setReference(copy);
-                
assertThat(comparator.compareToReference(comparator2)).isEqualTo(0);
-            }
-        } catch (Exception e) {
-            System.err.println(e.getMessage());
-            e.printStackTrace();
-            fail("Exception in test: " + e.getMessage());
+    protected void testEqualityWithReference() {
+        TypeSerializer<T> serializer = createSerializer();
+        boolean ascending = isAscending(getTestedOrder()[0]);
+        TypeComparator<T> comparator = getComparator(ascending);
+        TypeComparator<T> comparator2 = getComparator(ascending);
+        T[] data = getSortedData();
+        for (T d : data) {
+            comparator.setReference(d);
+            // Make a copy to compare
+            T copy = serializer.copy(d, serializer.createInstance());
+
+            // And then test equalTo and compareToReference method of 
comparator
+            assertThat(comparator.equalToReference(d)).isTrue();
+            comparator2.setReference(copy);
+            assertThat(comparator.compareToReference(comparator2)).isZero();
         }
     }
 
     // --------------------------------- inequality tests 
----------------------------------------
     @Test
-    public void testInequality() {
+    protected void testInequality() throws IOException {
         for (Order order : getTestedOrder()) {
             boolean ascending = isAscending(order);
             testGreatSmallAscDesc(ascending, true);
             testGreatSmallAscDesc(ascending, false);
         }
     }
 
-    protected void testGreatSmallAscDesc(boolean ascending, boolean greater) {
-        try {
-            // split data into low and high part
-            T[] data = getSortedData();
-
-            TypeComparator<T> comparator = getComparator(ascending);
-            TestOutputView out1;
-            TestOutputView out2;
-            TestInputView in1;
-            TestInputView in2;
-
-            // compares every element in high with every element in low
-            for (int x = 0; x < data.length - 1; x++) {
-                for (int y = x + 1; y < data.length; y++) {
-                    out1 = new TestOutputView();
-                    writeSortedData(data[x], out1);
-                    in1 = out1.getInputView();
-
-                    out2 = new TestOutputView();
-                    writeSortedData(data[y], out2);
-                    in2 = out2.getInputView();
-
-                    if (greater && ascending) {
-                        assertThat(comparator.compareSerialized(in1, 
in2)).isLessThan(0);
-                    }
-                    if (greater && !ascending) {
-                        assertThat(comparator.compareSerialized(in1, 
in2)).isGreaterThan(0);
-                    }
-                    if (!greater && ascending) {
-                        assertThat(comparator.compareSerialized(in2, 
in1)).isGreaterThan(0);
-                    }
-                    if (!greater && !ascending) {
-                        assertThat(comparator.compareSerialized(in2, 
in1)).isLessThan(0);
-                    }
+    protected void testGreatSmallAscDesc(boolean ascending, boolean greater) 
throws IOException {
+        // split data into low and high part
+        T[] data = getSortedData();
+
+        TypeComparator<T> comparator = getComparator(ascending);
+        TestOutputView out1;
+        TestOutputView out2;
+        TestInputView in1;
+        TestInputView in2;
+
+        // compares every element in high with every element in low
+        for (int x = 0; x < data.length - 1; x++) {
+            for (int y = x + 1; y < data.length; y++) {
+                out1 = new TestOutputView();
+                writeSortedData(data[x], out1);
+                in1 = out1.getInputView();
+
+                out2 = new TestOutputView();
+                writeSortedData(data[y], out2);
+                in2 = out2.getInputView();
+
+                if (greater && ascending) {
+                    assertThat(comparator.compareSerialized(in1, 
in2)).isLessThan(0);
+                }
+                if (greater && !ascending) {
+                    assertThat(comparator.compareSerialized(in1, 
in2)).isGreaterThan(0);
+                }
+                if (!greater && ascending) {
+                    assertThat(comparator.compareSerialized(in2, 
in1)).isGreaterThan(0);
+                }
+                if (!greater && !ascending) {

Review Comment:
   How about isPositive and isNegative?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to