This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c67cccd2fd [FLINK-32850][flink-runtime][JUnit5 Migration] The 
io.network.api package of flink-runtime module
1c67cccd2fd is described below

commit 1c67cccd2fdd6c674a38e0c26fe990e1dd7b62ae
Author: Jiabao Sun <jiabao....@xtransfer.cn>
AuthorDate: Wed Oct 25 16:31:45 2023 +0800

    [FLINK-32850][flink-runtime][JUnit5 Migration] The io.network.api package 
of flink-runtime module
---
 .../io/network/api/CheckpointBarrierTest.java      |  26 +--
 .../io/network/api/reader/AbstractReaderTest.java  |  89 ++++----
 .../serialization/CheckpointSerializationTest.java |  26 +--
 .../api/serialization/EventSerializerTest.java     |  43 ++--
 .../network/api/serialization/PagedViewsTest.java  | 238 +++++----------------
 .../SpanningRecordSerializationTest.java           |  55 +++--
 .../api/serialization/SpanningWrapperTest.java     |  21 +-
 .../api/writer/BroadcastRecordWriterTest.java      |   3 +-
 .../api/writer/RecordWriterDelegateTest.java       |  61 +++---
 .../network/api/writer/SubtaskStateMapperTest.java | 169 +++++++--------
 10 files changed, 273 insertions(+), 458 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
index 70de3450654..9b34ee62a64 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
@@ -22,37 +22,29 @@ import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the {@link CheckpointBarrier} type. */
-public class CheckpointBarrierTest {
+class CheckpointBarrierTest {
 
     /**
      * Test serialization of the checkpoint barrier. The checkpoint barrier 
does not support its own
      * serialization, in order to be immutable.
      */
     @Test
-    public void testSerialization() throws Exception {
+    void testSerialization() {
         long id = Integer.MAX_VALUE + 123123L;
         long timestamp = Integer.MAX_VALUE + 1228L;
 
         CheckpointOptions options = 
CheckpointOptions.forCheckpointWithDefaultLocation();
         CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, 
options);
 
-        try {
-            barrier.write(new DataOutputSerializer(1024));
-            fail("should throw an exception");
-        } catch (UnsupportedOperationException e) {
-            // expected
-        }
-
-        try {
-            barrier.read(new DataInputDeserializer(new byte[32]));
-            fail("should throw an exception");
-        } catch (UnsupportedOperationException e) {
-            // expected
-        }
+        assertThatThrownBy(() -> barrier.write(new DataOutputSerializer(1024)))
+                .isInstanceOf(UnsupportedOperationException.class);
+
+        assertThatThrownBy(() -> barrier.read(new DataInputDeserializer(new 
byte[32])))
+                .isInstanceOf(UnsupportedOperationException.class);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
index 969cae48997..32228784396 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
@@ -26,25 +26,24 @@ import 
org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.util.event.EventListener;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 import org.mockito.Matchers;
 
 import java.io.IOException;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /** Tests for the event handling behaviour. */
-public class AbstractReaderTest {
+class AbstractReaderTest {
 
     @Test
     @SuppressWarnings("unchecked")
-    public void testTaskEvent() throws Exception {
+    void testTaskEvent() throws Exception {
         final AbstractReader reader = new MockReader(createInputGate(1));
 
         final EventListener<TaskEvent> listener1 = mock(EventListener.class);
@@ -64,10 +63,10 @@ public class AbstractReaderTest {
     }
 
     @Test
-    public void testEndOfPartitionEvent() throws Exception {
+    void testEndOfPartitionEvent() throws Exception {
         final AbstractReader reader = new MockReader(createInputGate(1));
 
-        assertTrue(reader.handleEvent(EndOfPartitionEvent.INSTANCE));
+        assertThat(reader.handleEvent(EndOfPartitionEvent.INSTANCE)).isTrue();
     }
 
     /**
@@ -75,78 +74,62 @@ public class AbstractReaderTest {
      * non-iterative reader.
      */
     @Test
-    public void testExceptionsNonIterativeReader() throws Exception {
+    void testExceptionsNonIterativeReader() {
 
         final AbstractReader reader = new MockReader(createInputGate(4));
 
         // Non-iterative reader cannot reach end of superstep
-        assertFalse(reader.hasReachedEndOfSuperstep());
-
-        try {
-            reader.startNextSuperstep();
-
-            fail(
-                    "Did not throw expected exception when starting next 
superstep with non-iterative reader.");
-        } catch (Throwable t) {
-            // All good, expected exception.
-        }
-
-        try {
-            reader.handleEvent(EndOfSuperstepEvent.INSTANCE);
-
-            fail(
-                    "Did not throw expected exception when handling end of 
superstep event with non-iterative reader.");
-        } catch (Throwable t) {
-            // All good, expected exception.
-        }
+        assertThat(reader.hasReachedEndOfSuperstep()).isFalse();
+
+        assertThatThrownBy(reader::startNextSuperstep)
+                .withFailMessage(
+                        "Did not throw expected exception when starting next 
superstep with non-iterative reader.")
+                .isInstanceOf(IllegalStateException.class);
+
+        assertThatThrownBy(() -> 
reader.handleEvent(EndOfSuperstepEvent.INSTANCE))
+                .withFailMessage(
+                        "Did not throw expected exception when handling end of 
superstep event with non-iterative reader.")
+                .hasCauseInstanceOf(IllegalStateException.class)
+                .isInstanceOf(IOException.class);
     }
 
     @Test
-    public void testEndOfSuperstepEventLogic() throws IOException {
+    void testEndOfSuperstepEventLogic() throws IOException {
 
         final int numberOfInputChannels = 4;
         final AbstractReader reader = new 
MockReader(createInputGate(numberOfInputChannels));
 
         reader.setIterativeReader();
 
-        try {
-            // The first superstep does not need not to be explicitly started
-            reader.startNextSuperstep();
-
-            fail(
-                    "Did not throw expected exception when starting next 
superstep before receiving all end of superstep events.");
-        } catch (Throwable t) {
-            // All good, expected exception.
-        }
+        // The first superstep does not need not to be explicitly started
+        assertThatThrownBy(reader::startNextSuperstep)
+                .withFailMessage(
+                        "Did not throw expected exception when starting next 
superstep before receiving all end of superstep events.")
+                .isInstanceOf(IllegalStateException.class);
 
         EndOfSuperstepEvent eos = EndOfSuperstepEvent.INSTANCE;
 
         // One end of superstep event for each input channel. The superstep 
finishes with the last
         // received event.
         for (int i = 0; i < numberOfInputChannels - 1; i++) {
-            assertFalse(reader.handleEvent(eos));
-            assertFalse(reader.hasReachedEndOfSuperstep());
+            assertThat(reader.handleEvent(eos)).isFalse();
+            assertThat(reader.hasReachedEndOfSuperstep()).isFalse();
         }
 
-        assertTrue(reader.handleEvent(eos));
-        assertTrue(reader.hasReachedEndOfSuperstep());
+        assertThat(reader.handleEvent(eos)).isTrue();
+        assertThat(reader.hasReachedEndOfSuperstep()).isTrue();
 
-        try {
-            // Verify exception, when receiving too many end of superstep 
events.
-            reader.handleEvent(eos);
-
-            fail(
-                    "Did not throw expected exception when receiving too many 
end of superstep events.");
-        } catch (Throwable t) {
-            // All good, expected exception.
-        }
+        assertThatThrownBy(() -> reader.handleEvent(eos))
+                .withFailMessage(
+                        "Did not throw expected exception when receiving too 
many end of superstep events.")
+                .isInstanceOf(IOException.class);
 
         // Start next superstep.
         reader.startNextSuperstep();
-        assertFalse(reader.hasReachedEndOfSuperstep());
+        assertThat(reader.hasReachedEndOfSuperstep()).isFalse();
     }
 
-    private InputGate createInputGate(int numberOfInputChannels) {
+    private static InputGate createInputGate(int numberOfInputChannels) {
         final InputGate inputGate = mock(InputGate.class);
         
when(inputGate.getNumberOfInputChannels()).thenReturn(numberOfInputChannels);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/CheckpointSerializationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/CheckpointSerializationTest.java
index 9dd3ca8222f..519026e2650 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/CheckpointSerializationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/CheckpointSerializationTest.java
@@ -25,25 +25,24 @@ import org.apache.flink.runtime.checkpoint.SavepointType;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Tests the {@link EventSerializer} functionality for serializing {@link 
CheckpointBarrier
  * checkpoint barriers}.
  */
-public class CheckpointSerializationTest {
+class CheckpointSerializationTest {
 
     private static final byte[] STORAGE_LOCATION_REF =
             new byte[] {15, 52, 52, 11, 0, 0, 0, 0, -1, -23, -19, 35};
 
     @Test
-    public void testSuspendingCheckpointBarrierSerialization() throws 
Exception {
+    void testSuspendingCheckpointBarrierSerialization() throws Exception {
         CheckpointOptions suspendSavepointToSerialize =
                 new CheckpointOptions(
                         SavepointType.suspend(SavepointFormatType.CANONICAL),
@@ -52,7 +51,7 @@ public class CheckpointSerializationTest {
     }
 
     @Test
-    public void testSavepointBarrierSerialization() throws Exception {
+    void testSavepointBarrierSerialization() throws Exception {
         CheckpointOptions savepointToSerialize =
                 new CheckpointOptions(
                         SavepointType.savepoint(SavepointFormatType.CANONICAL),
@@ -61,7 +60,7 @@ public class CheckpointSerializationTest {
     }
 
     @Test
-    public void testCheckpointBarrierSerialization() throws Exception {
+    void testCheckpointBarrierSerialization() throws Exception {
         CheckpointOptions checkpointToSerialize =
                 new CheckpointOptions(
                         CheckpointType.CHECKPOINT,
@@ -70,7 +69,7 @@ public class CheckpointSerializationTest {
     }
 
     @Test
-    public void testFullCheckpointBarrierSerialization() throws Exception {
+    void testFullCheckpointBarrierSerialization() throws Exception {
         CheckpointOptions checkpointToSerialize =
                 new CheckpointOptions(
                         CheckpointType.FULL_CHECKPOINT,
@@ -79,13 +78,14 @@ public class CheckpointSerializationTest {
     }
 
     @Test
-    public void testCheckpointWithDefaultLocationSerialization() throws 
Exception {
+    void testCheckpointWithDefaultLocationSerialization() throws Exception {
         CheckpointOptions checkpointToSerialize =
                 CheckpointOptions.forCheckpointWithDefaultLocation();
         testCheckpointBarrierSerialization(checkpointToSerialize);
     }
 
-    private void testCheckpointBarrierSerialization(CheckpointOptions options) 
throws IOException {
+    private static void testCheckpointBarrierSerialization(CheckpointOptions 
options)
+            throws IOException {
         final long checkpointId = Integer.MAX_VALUE + 123123L;
         final long timestamp = Integer.MAX_VALUE + 1228L;
 
@@ -94,16 +94,16 @@ public class CheckpointSerializationTest {
         final CheckpointBarrier barrierAfterDeserialization =
                 
serializeAndDeserializeCheckpointBarrier(barrierBeforeSerialization);
 
-        assertEquals(barrierBeforeSerialization, barrierAfterDeserialization);
+        
assertThat(barrierAfterDeserialization).isEqualTo(barrierBeforeSerialization);
     }
 
-    private CheckpointBarrier serializeAndDeserializeCheckpointBarrier(
+    private static CheckpointBarrier serializeAndDeserializeCheckpointBarrier(
             final CheckpointBarrier barrierUnderTest) throws IOException {
         final ClassLoader cl = Thread.currentThread().getContextClassLoader();
         final ByteBuffer serialized = 
EventSerializer.toSerializedEvent(barrierUnderTest);
         final CheckpointBarrier deserialized =
                 (CheckpointBarrier) 
EventSerializer.fromSerializedEvent(serialized, cl);
-        assertFalse(serialized.hasRemaining());
+        assertThat(serialized.hasRemaining()).isFalse();
         return deserialized;
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index 00d5c097437..4a0378b0aaf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -37,18 +37,15 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link EventSerializer}. */
-public class EventSerializerTest {
+class EventSerializerTest {
 
     private final AbstractEvent[] events = {
         EndOfPartitionEvent.INSTANCE,
@@ -119,50 +116,50 @@ public class EventSerializerTest {
     };
 
     @Test
-    public void testSerializeDeserializeEvent() throws Exception {
+    void testSerializeDeserializeEvent() throws Exception {
         for (AbstractEvent evt : events) {
             ByteBuffer serializedEvent = 
EventSerializer.toSerializedEvent(evt);
-            assertTrue(serializedEvent.hasRemaining());
+            assertThat(serializedEvent.hasRemaining()).isTrue();
 
             AbstractEvent deserialized =
                     EventSerializer.fromSerializedEvent(
                             serializedEvent, getClass().getClassLoader());
-            assertNotNull(deserialized);
-            assertEquals(evt, deserialized);
+            assertThat(deserialized).isNotNull().isEqualTo(evt);
         }
     }
 
     @Test
-    public void testToBufferConsumer() throws IOException {
+    void testToBufferConsumer() throws IOException {
         for (AbstractEvent evt : events) {
             BufferConsumer bufferConsumer = 
EventSerializer.toBufferConsumer(evt, false);
 
-            assertFalse(bufferConsumer.isBuffer());
-            assertTrue(bufferConsumer.isFinished());
-            assertTrue(bufferConsumer.isDataAvailable());
-            assertFalse(bufferConsumer.isRecycled());
+            assertThat(bufferConsumer.isBuffer()).isFalse();
+            assertThat(bufferConsumer.isFinished()).isTrue();
+            assertThat(bufferConsumer.isDataAvailable()).isTrue();
+            assertThat(bufferConsumer.isRecycled()).isFalse();
 
             if (evt instanceof CheckpointBarrier) {
-                
assertTrue(bufferConsumer.build().getDataType().isBlockingUpstream());
+                
assertThat(bufferConsumer.build().getDataType().isBlockingUpstream()).isTrue();
             } else {
-                assertEquals(Buffer.DataType.EVENT_BUFFER, 
bufferConsumer.build().getDataType());
+                assertThat(bufferConsumer.build().getDataType())
+                        .isEqualTo(Buffer.DataType.EVENT_BUFFER);
             }
         }
     }
 
     @Test
-    public void testToBuffer() throws IOException {
+    void testToBuffer() throws IOException {
         for (AbstractEvent evt : events) {
             Buffer buffer = EventSerializer.toBuffer(evt, false);
 
-            assertFalse(buffer.isBuffer());
-            assertTrue(buffer.readableBytes() > 0);
-            assertFalse(buffer.isRecycled());
+            assertThat(buffer.isBuffer()).isFalse();
+            assertThat(buffer.readableBytes()).isGreaterThan(0);
+            assertThat(buffer.isRecycled()).isFalse();
 
             if (evt instanceof CheckpointBarrier) {
-                assertTrue(buffer.getDataType().isBlockingUpstream());
+                assertThat(buffer.getDataType().isBlockingUpstream()).isTrue();
             } else {
-                assertEquals(Buffer.DataType.EVENT_BUFFER, 
buffer.getDataType());
+                
assertThat(buffer.getDataType()).isEqualTo(Buffer.DataType.EVENT_BUFFER);
             }
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
index 6fc42feb05b..0257725647f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.testutils.serialization.types.SerializationTestType;
 import 
org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
 import org.apache.flink.testutils.serialization.types.Util;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -34,121 +34,72 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the {@link AbstractPagedInputView} and {@link 
AbstractPagedOutputView}. */
-public class PagedViewsTest {
+class PagedViewsTest {
 
     @Test
-    public void testSequenceOfIntegersWithAlignedBuffers() {
-        try {
-            final int numInts = 1000000;
-
-            testSequenceOfTypes(
-                    Util.randomRecords(numInts, 
SerializationTestTypeFactory.INT), 2048);
-
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail("Test encountered an unexpected exception.");
-        }
+    void testSequenceOfIntegersWithAlignedBuffers() throws Exception {
+        final int numInts = 1000000;
+        testSequenceOfTypes(Util.randomRecords(numInts, 
SerializationTestTypeFactory.INT), 2048);
     }
 
     @Test
-    public void testSequenceOfIntegersWithUnalignedBuffers() {
-        try {
-            final int numInts = 1000000;
-
-            testSequenceOfTypes(
-                    Util.randomRecords(numInts, 
SerializationTestTypeFactory.INT), 2047);
-
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail("Test encountered an unexpected exception.");
-        }
+    void testSequenceOfIntegersWithUnalignedBuffers() throws Exception {
+        final int numInts = 1000000;
+        testSequenceOfTypes(Util.randomRecords(numInts, 
SerializationTestTypeFactory.INT), 2047);
     }
 
     @Test
-    public void testRandomTypes() {
-        try {
-            final int numTypes = 100000;
+    void testRandomTypes() throws Exception {
+        final int numTypes = 100000;
 
-            // test with an odd buffer size to force many unaligned cases
-            testSequenceOfTypes(Util.randomRecords(numTypes), 57);
-
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail("Test encountered an unexpected exception.");
-        }
+        // test with an odd buffer size to force many unaligned cases
+        testSequenceOfTypes(Util.randomRecords(numTypes), 57);
     }
 
     @Test
-    public void testReadFully() {
+    void testReadFully() throws IOException {
         int bufferSize = 100;
         byte[] expected = new byte[bufferSize];
         new Random().nextBytes(expected);
 
         TestOutputView outputView = new TestOutputView(bufferSize);
-
-        try {
-            outputView.write(expected);
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail("Unexpected exception: Could not write to TestOutputView.");
-        }
-
+        outputView.write(expected);
         outputView.close();
 
         TestInputView inputView = new TestInputView(outputView.segments);
         byte[] buffer = new byte[bufferSize];
 
-        try {
-            inputView.readFully(buffer);
-        } catch (IOException e) {
-            e.printStackTrace();
-            fail("Unexpected exception: Could not read TestInputView.");
-        }
+        inputView.readFully(buffer);
 
-        assertEquals(inputView.getCurrentPositionInSegment(), bufferSize);
-        assertArrayEquals(expected, buffer);
+        
assertThat(inputView.getCurrentPositionInSegment()).isEqualTo(bufferSize);
+        assertThat(buffer).isEqualTo(expected);
     }
 
     @Test
-    public void testReadFullyAcrossSegments() {
+    void testReadFullyAcrossSegments() throws Exception {
         int bufferSize = 100;
         int segmentSize = 30;
         byte[] expected = new byte[bufferSize];
         new Random().nextBytes(expected);
 
         TestOutputView outputView = new TestOutputView(segmentSize);
-
-        try {
-            outputView.write(expected);
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail("Unexpected exception: Could not write to TestOutputView.");
-        }
-
+        outputView.write(expected);
         outputView.close();
 
         TestInputView inputView = new TestInputView(outputView.segments);
         byte[] buffer = new byte[bufferSize];
+        inputView.readFully(buffer);
 
-        try {
-            inputView.readFully(buffer);
-        } catch (IOException e) {
-            e.printStackTrace();
-            fail("Unexpected exception: Could not read TestInputView.");
-        }
-
-        assertEquals(inputView.getCurrentPositionInSegment(), bufferSize % 
segmentSize);
-        assertArrayEquals(expected, buffer);
+        
assertThat(inputView.getCurrentPositionInSegment()).isEqualTo(bufferSize % 
segmentSize);
+        assertThat(buffer).isEqualTo(expected);
     }
 
     @Test
-    public void testReadAcrossSegments() {
+    void testReadAcrossSegments() throws Exception {
         int bufferSize = 100;
         int bytes2Write = 75;
         int segmentSize = 30;
@@ -156,37 +107,23 @@ public class PagedViewsTest {
         new Random().nextBytes(expected);
 
         TestOutputView outputView = new TestOutputView(segmentSize);
-
-        try {
-            outputView.write(expected);
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail("Unexpected exception: Could not write to TestOutputView.");
-        }
-
+        outputView.write(expected);
         outputView.close();
 
         TestInputView inputView = new TestInputView(outputView.segments);
         byte[] buffer = new byte[bufferSize];
-        int bytesRead = 0;
-
-        try {
-            bytesRead = inputView.read(buffer);
-        } catch (IOException e) {
-            e.printStackTrace();
-            fail("Unexpected exception: Could not read TestInputView.");
-        }
+        int bytesRead = inputView.read(buffer);
 
-        assertEquals(bytes2Write, bytesRead);
-        assertEquals(inputView.getCurrentPositionInSegment(), bytes2Write % 
segmentSize);
+        assertThat(bytesRead).isEqualTo(bytes2Write);
+        
assertThat(inputView.getCurrentPositionInSegment()).isEqualTo(bytes2Write % 
segmentSize);
 
         byte[] tempBuffer = new byte[bytesRead];
         System.arraycopy(buffer, 0, tempBuffer, 0, bytesRead);
-        assertArrayEquals(expected, tempBuffer);
+        assertThat(tempBuffer).isEqualTo(expected);
     }
 
     @Test
-    public void testEmptyingInputView() {
+    void testEmptyingInputView() throws Exception {
         int bufferSize = 100;
         int bytes2Write = 75;
         int segmentSize = 30;
@@ -194,46 +131,25 @@ public class PagedViewsTest {
         new Random().nextBytes(expected);
 
         TestOutputView outputView = new TestOutputView(segmentSize);
-
-        try {
-            outputView.write(expected);
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail("Unexpected exception: Could not write to TestOutputView.");
-        }
-
+        outputView.write(expected);
         outputView.close();
 
         TestInputView inputView = new TestInputView(outputView.segments);
         byte[] buffer = new byte[bufferSize];
-        int bytesRead = 0;
-
-        try {
-            bytesRead = inputView.read(buffer);
-        } catch (IOException e) {
-            e.printStackTrace();
-            fail("Unexpected exception: Could not read TestInputView.");
-        }
-
-        assertEquals(bytes2Write, bytesRead);
+        int bytesRead = inputView.read(buffer);
+        assertThat(bytesRead).isEqualTo(bytes2Write);
 
         byte[] tempBuffer = new byte[bytesRead];
         System.arraycopy(buffer, 0, tempBuffer, 0, bytesRead);
-        assertArrayEquals(expected, tempBuffer);
+        assertThat(tempBuffer).isEqualTo(expected);
 
-        try {
-            bytesRead = inputView.read(buffer);
-        } catch (IOException e) {
-            e.printStackTrace();
-            fail("Unexpected exception: Input view should be empty and thus 
return -1.");
-        }
-
-        assertEquals(-1, bytesRead);
-        assertEquals(inputView.getCurrentPositionInSegment(), bytes2Write % 
segmentSize);
+        bytesRead = inputView.read(buffer);
+        assertThat(bytesRead).isEqualTo(-1);
+        
assertThat(inputView.getCurrentPositionInSegment()).isEqualTo(bytes2Write % 
segmentSize);
     }
 
     @Test
-    public void testReadFullyWithNotEnoughData() {
+    void testReadFullyWithNotEnoughData() throws Exception {
         int bufferSize = 100;
         int bytes2Write = 99;
         int segmentSize = 30;
@@ -241,99 +157,49 @@ public class PagedViewsTest {
         new Random().nextBytes(expected);
 
         TestOutputView outputView = new TestOutputView(segmentSize);
-
-        try {
-            outputView.write(expected);
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail("Unexpected exception: Could not write to TestOutputView.");
-        }
-
+        outputView.write(expected);
         outputView.close();
 
         TestInputView inputView = new TestInputView(outputView.segments);
         byte[] buffer = new byte[bufferSize];
-        boolean eofException = false;
-
-        try {
-            inputView.readFully(buffer);
-        } catch (EOFException e) {
-            // Expected exception
-            eofException = true;
-        } catch (IOException e) {
-            e.printStackTrace();
-            fail("Unexpected exception: Could not read TestInputView.");
-        }
-
-        assertTrue("EOFException should have occurred.", eofException);
 
-        int bytesRead = 0;
+        assertThatThrownBy(() -> 
inputView.readFully(buffer)).isInstanceOf(EOFException.class);
 
-        try {
-            bytesRead = inputView.read(buffer);
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail("Unexpected exception: Could not read TestInputView.");
-        }
-
-        assertEquals(-1, bytesRead);
+        int bytesRead = inputView.read(buffer);
+        assertThat(bytesRead).isEqualTo(-1);
     }
 
     @Test
-    public void testReadFullyWithOffset() {
+    void testReadFullyWithOffset() throws Exception {
         int bufferSize = 100;
         int segmentSize = 30;
         byte[] expected = new byte[bufferSize];
         new Random().nextBytes(expected);
 
         TestOutputView outputView = new TestOutputView(segmentSize);
-
-        try {
-            outputView.write(expected);
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail("Unexpected exception: Could not write to TestOutputView.");
-        }
-
+        outputView.write(expected);
         outputView.close();
 
         TestInputView inputView = new TestInputView(outputView.segments);
         byte[] buffer = new byte[2 * bufferSize];
+        inputView.readFully(buffer, bufferSize, bufferSize);
+        
assertThat(inputView.getCurrentPositionInSegment()).isEqualTo(bufferSize % 
segmentSize);
 
-        try {
-            inputView.readFully(buffer, bufferSize, bufferSize);
-        } catch (IOException e) {
-            e.printStackTrace();
-            fail("Unexpected exception: Could not read TestInputView.");
-        }
-
-        assertEquals(inputView.getCurrentPositionInSegment(), bufferSize % 
segmentSize);
         byte[] tempBuffer = new byte[bufferSize];
         System.arraycopy(buffer, bufferSize, tempBuffer, 0, bufferSize);
-        assertArrayEquals(expected, tempBuffer);
+        assertThat(tempBuffer).isEqualTo(expected);
     }
 
     @Test
-    public void testReadFullyEmptyView() {
+    void testReadFullyEmptyView() {
         int segmentSize = 30;
         TestOutputView outputView = new TestOutputView(segmentSize);
         outputView.close();
 
         TestInputView inputView = new TestInputView(outputView.segments);
         byte[] buffer = new byte[segmentSize];
-        boolean eofException = false;
-
-        try {
-            inputView.readFully(buffer);
-        } catch (EOFException e) {
-            // expected Exception
-            eofException = true;
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail("Unexpected exception: Could not read TestInputView.");
-        }
 
-        assertTrue("EOFException expected.", eofException);
+        assertThatThrownBy(() -> 
inputView.readFully(buffer)).isInstanceOf(EOFException.class);
     }
 
     private static void testSequenceOfTypes(
@@ -356,7 +222,7 @@ public class PagedViewsTest {
         for (SerializationTestType reference : elements) {
             SerializationTestType result = reference.getClass().newInstance();
             result.read(inView);
-            assertEquals(reference, result);
+            assertThat(result).isEqualTo(reference);
         }
     }
 
@@ -367,7 +233,7 @@ public class PagedViewsTest {
         private final MemorySegment segment;
         private final int position;
 
-        public SegmentWithPosition(MemorySegment segment, int position) {
+        SegmentWithPosition(MemorySegment segment, int position) {
             this.segment = segment;
             this.position = position;
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
index 6924a881bf5..71cc8011269 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
@@ -33,14 +33,12 @@ import 
org.apache.flink.testutils.serialization.types.SerializationTestType;
 import 
org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
 import org.apache.flink.testutils.serialization.types.Util;
 import org.apache.flink.util.CloseableIterator;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.ByteArrayOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
@@ -53,15 +51,16 @@ import java.util.Random;
 
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferBuilder;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link SpillingAdaptiveSpanningRecordDeserializer}. */
-public class SpanningRecordSerializationTest extends TestLogger {
+class SpanningRecordSerializationTest {
     private static final Random RANDOM = new Random(42);
 
-    @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+    @TempDir private File tempFolder;
 
     @Test
-    public void testIntRecordsSpanningMultipleSegments() throws Exception {
+    void testIntRecordsSpanningMultipleSegments() throws Exception {
         final int segmentSize = 1;
         final int numValues = 10;
 
@@ -70,7 +69,7 @@ public class SpanningRecordSerializationTest extends 
TestLogger {
     }
 
     @Test
-    public void testIntRecordsWithAlignedBuffers() throws Exception {
+    void testIntRecordsWithAlignedBuffers() throws Exception {
         final int segmentSize = 64;
         final int numValues = 64;
 
@@ -79,7 +78,7 @@ public class SpanningRecordSerializationTest extends 
TestLogger {
     }
 
     @Test
-    public void testIntRecordsWithUnalignedBuffers() throws Exception {
+    void testIntRecordsWithUnalignedBuffers() throws Exception {
         final int segmentSize = 31;
         final int numValues = 248;
 
@@ -88,7 +87,7 @@ public class SpanningRecordSerializationTest extends 
TestLogger {
     }
 
     @Test
-    public void testRandomRecords() throws Exception {
+    void testRandomRecords() throws Exception {
         final int segmentSize = 127;
         final int numValues = 10000;
 
@@ -96,7 +95,7 @@ public class SpanningRecordSerializationTest extends 
TestLogger {
     }
 
     @Test
-    public void testHandleMixedLargeRecords() throws Exception {
+    void testHandleMixedLargeRecords() throws Exception {
         final int numValues = 99;
         final int segmentSize = 32 * 1024;
 
@@ -121,7 +120,7 @@ public class SpanningRecordSerializationTest extends 
TestLogger {
             Iterable<SerializationTestType> records, int segmentSize) throws 
Exception {
         RecordDeserializer<SerializationTestType> deserializer =
                 new SpillingAdaptiveSpanningRecordDeserializer<>(
-                        new String[] {tempFolder.getRoot().getAbsolutePath()});
+                        new String[] {tempFolder.getAbsolutePath()});
 
         testSerializationRoundTrip(records, segmentSize, deserializer);
     }
@@ -173,7 +172,7 @@ public class SpanningRecordSerializationTest extends 
TestLogger {
                     
deserializer.setNextBuffer(serializationResult.buildBuffer());
                 }
             }
-            Assert.assertFalse(serializedRecord.hasRemaining());
+            assertThat(serializedRecord.hasRemaining()).isFalse();
         }
 
         // deserialize left over records
@@ -185,20 +184,20 @@ public class SpanningRecordSerializationTest extends 
TestLogger {
             SerializationTestType actual = expected.getClass().newInstance();
             RecordDeserializer.DeserializationResult result = 
deserializer.getNextRecord(actual);
 
-            Assert.assertTrue(result.isFullRecord());
-            Assert.assertEquals(expected, actual);
+            assertThat(result.isFullRecord()).isTrue();
+            assertThat(actual).isEqualTo(expected);
             numRecords--;
         }
 
         // assert that all records have been serialized and deserialized
-        Assert.assertEquals(0, numRecords);
+        assertThat(numRecords).isZero();
     }
 
     @Test
-    public void testSmallRecordUnconsumedBuffer() throws Exception {
+    void testSmallRecordUnconsumedBuffer() throws Exception {
         RecordDeserializer<SerializationTestType> deserializer =
                 new SpillingAdaptiveSpanningRecordDeserializer<>(
-                        new String[] {tempFolder.getRoot().getAbsolutePath()});
+                        new String[] {tempFolder.getAbsolutePath()});
 
         testUnconsumedBuffer(
                 deserializer, 
Util.randomRecord(SerializationTestTypeFactory.INT), 1024);
@@ -209,29 +208,29 @@ public class SpanningRecordSerializationTest extends 
TestLogger {
      * by byte.
      */
     @Test
-    public void testSpanningRecordUnconsumedBuffer() throws Exception {
+    void testSpanningRecordUnconsumedBuffer() throws Exception {
         RecordDeserializer<SerializationTestType> deserializer =
                 new SpillingAdaptiveSpanningRecordDeserializer<>(
-                        new String[] {tempFolder.getRoot().getAbsolutePath()});
+                        new String[] {tempFolder.getAbsolutePath()});
 
         testUnconsumedBuffer(deserializer, 
Util.randomRecord(SerializationTestTypeFactory.INT), 1);
     }
 
     @Test
-    public void testLargeSpanningRecordUnconsumedBuffer() throws Exception {
+    void testLargeSpanningRecordUnconsumedBuffer() throws Exception {
         RecordDeserializer<SerializationTestType> deserializer =
                 new SpillingAdaptiveSpanningRecordDeserializer<>(
-                        new String[] {tempFolder.getRoot().getAbsolutePath()});
+                        new String[] {tempFolder.getAbsolutePath()});
 
         testUnconsumedBuffer(
                 deserializer, 
Util.randomRecord(SerializationTestTypeFactory.BYTE_ARRAY), 1);
     }
 
     @Test
-    public void testLargeSpanningRecordUnconsumedBufferWithLeftOverBytes() 
throws Exception {
+    void testLargeSpanningRecordUnconsumedBufferWithLeftOverBytes() throws 
Exception {
         RecordDeserializer<SerializationTestType> deserializer =
                 new SpillingAdaptiveSpanningRecordDeserializer<>(
-                        new String[] {tempFolder.getRoot().getAbsolutePath()});
+                        new String[] {tempFolder.getAbsolutePath()});
 
         testUnconsumedBuffer(
                 deserializer,
@@ -248,7 +247,7 @@ public class SpanningRecordSerializationTest extends 
TestLogger {
                 new byte[] {42, 43, 44});
     }
 
-    public void testUnconsumedBuffer(
+    private static void testUnconsumedBuffer(
             RecordDeserializer<SerializationTestType> deserializer,
             SerializationTestType record,
             int segmentSize,
@@ -308,12 +307,12 @@ public class SpanningRecordSerializationTest extends 
TestLogger {
     private static void assertUnconsumedBuffer(
             ByteArrayOutputStream expected, CloseableIterator<Buffer> actual) 
throws Exception {
         if (!actual.hasNext()) {
-            Assert.assertEquals(expected.size(), 0);
+            assertThat(expected.size()).isZero();
         }
 
         ByteBuffer expectedByteBuffer = 
ByteBuffer.wrap(expected.toByteArray());
         ByteBuffer actualByteBuffer = actual.next().getNioBufferReadable();
-        Assert.assertEquals(expectedByteBuffer, actualByteBuffer);
+        assertThat(actualByteBuffer).isEqualTo(expectedByteBuffer);
         actual.close();
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java
index 46719d1c3bd..7436ed1adab 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java
@@ -19,30 +19,31 @@ package 
org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.CloseableIterator;
 
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
 import static org.apache.flink.core.memory.MemorySegmentFactory.wrap;
 import static 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
-import static org.junit.Assert.assertArrayEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** {@link SpanningWrapper} test. */
-public class SpanningWrapperTest {
+class SpanningWrapperTest {
 
     private static final Random random = new Random();
 
-    @Rule public TemporaryFolder folder = new TemporaryFolder();
+    @TempDir private Path folder;
 
     @Test
-    public void testLargeUnconsumedSegment() throws Exception {
+    void testLargeUnconsumedSegment() throws Exception {
         int recordLen = 100;
         int firstChunk = (int) (recordLen * .9);
         int spillingThreshold = (int) (firstChunk * .9);
@@ -50,14 +51,14 @@ public class SpanningWrapperTest {
         byte[] record1 = recordBytes(recordLen);
         byte[] record2 = recordBytes(recordLen * 2);
 
-        File canNotEecutableFile = folder.newFolder();
+        File canNotEecutableFile = TempDirUtils.newFolder(folder);
         canNotEecutableFile.setExecutable(false);
         // Always pick 'canNotEecutableFile' first as the Spilling Channel 
TmpDir. Thus trigger an
         // IOException.
         SpanningWrapper spanningWrapper =
                 new SpanningWrapper(
                         new String[] {
-                            folder.newFolder().getAbsolutePath(),
+                            TempDirUtils.newFolder(folder).getAbsolutePath(),
                             canNotEecutableFile.getAbsolutePath() + 
File.separator + "pathdonotexit"
                         },
                         spillingThreshold,
@@ -79,7 +80,7 @@ public class SpanningWrapperTest {
 
         canNotEecutableFile.setExecutable(true);
 
-        assertArrayEquals(concat(record1, record2), 
toByteArray(unconsumedSegment));
+        assertThat(concat(record1, 
record2)).isEqualTo(toByteArray(unconsumedSegment));
     }
 
     private byte[] recordBytes(int recordLen) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java
index b2c3fa6cf42..26dc18ceba5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java
@@ -157,7 +157,8 @@ class BroadcastRecordWriterTest {
         
assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isEqualTo(2);
     }
 
-    public void closeConsumer(ResultSubpartitionView view, int expectedSize) 
throws IOException {
+    private static void closeConsumer(ResultSubpartitionView view, int 
expectedSize)
+            throws IOException {
         Buffer buffer = view.getNextBuffer().buffer();
         assertThat(buffer.getSize()).isEqualTo(expectedSize);
         buffer.recycleBuffer();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
index 4d6d1260d1d..b717a5f908f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
@@ -28,23 +28,20 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link SingleRecordWriter} and {@link 
MultipleRecordWriters}. */
-public class RecordWriterDelegateTest extends TestLogger {
+class RecordWriterDelegateTest {
 
     private static final int recordSize = 8;
 
@@ -54,31 +51,31 @@ public class RecordWriterDelegateTest extends TestLogger {
 
     private NetworkBufferPool globalPool;
 
-    @Before
-    public void setup() {
-        assertEquals("Illegal memory segment size,", 0, memorySegmentSize % 
recordSize);
+    @BeforeEach
+    void setup() {
+        assertThat(memorySegmentSize % recordSize).as("Illegal memory segment 
size").isZero();
         globalPool = new NetworkBufferPool(numberOfBuffers, memorySegmentSize);
     }
 
-    @After
-    public void teardown() {
+    @AfterEach
+    void teardown() {
         globalPool.destroyAllBufferPools();
         globalPool.destroy();
     }
 
     @Test
     @SuppressWarnings("unchecked")
-    public void testSingleRecordWriterAvailability() throws Exception {
+    void testSingleRecordWriterAvailability() throws Exception {
         final RecordWriter recordWriter = createRecordWriter(globalPool);
         final RecordWriterDelegate writerDelegate = new 
SingleRecordWriter(recordWriter);
 
-        assertEquals(recordWriter, writerDelegate.getRecordWriter(0));
+        assertThat(writerDelegate.getRecordWriter(0)).isEqualTo(recordWriter);
         verifyAvailability(writerDelegate);
     }
 
     @Test
     @SuppressWarnings("unchecked")
-    public void testMultipleRecordWritersAvailability() throws Exception {
+    void testMultipleRecordWritersAvailability() throws Exception {
         // setup
         final int numRecordWriters = 2;
         final List<RecordWriter> recordWriters = new 
ArrayList<>(numRecordWriters);
@@ -89,7 +86,7 @@ public class RecordWriterDelegateTest extends TestLogger {
 
         RecordWriterDelegate writerDelegate = new 
MultipleRecordWriters(recordWriters);
         for (int i = 0; i < numRecordWriters; i++) {
-            assertEquals(recordWriters.get(i), 
writerDelegate.getRecordWriter(i));
+            
assertThat(writerDelegate.getRecordWriter(i)).isEqualTo(recordWriters.get(i));
         }
 
         verifyAvailability(writerDelegate);
@@ -97,7 +94,7 @@ public class RecordWriterDelegateTest extends TestLogger {
 
     @Test
     @SuppressWarnings("unchecked")
-    public void testSingleRecordWriterBroadcastEvent() throws Exception {
+    void testSingleRecordWriterBroadcastEvent() throws Exception {
         // setup
         final ResultPartition partition =
                 RecordWriterTest.createResultPartition(memorySegmentSize, 2);
@@ -109,7 +106,7 @@ public class RecordWriterDelegateTest extends TestLogger {
 
     @Test
     @SuppressWarnings("unchecked")
-    public void testMultipleRecordWritersBroadcastEvent() throws Exception {
+    void testMultipleRecordWritersBroadcastEvent() throws Exception {
         // setup
         final int numRecordWriters = 2;
         final List<RecordWriter> recordWriters = new 
ArrayList<>(numRecordWriters);
@@ -126,7 +123,7 @@ public class RecordWriterDelegateTest extends TestLogger {
         verifyBroadcastEvent(writerDelegate, partitions);
     }
 
-    private RecordWriter createRecordWriter(NetworkBufferPool globalPool) 
throws Exception {
+    private static RecordWriter createRecordWriter(NetworkBufferPool 
globalPool) throws Exception {
         final BufferPool localPool = globalPool.createBufferPool(1, 1, 1, 
Integer.MAX_VALUE, 0);
         final ResultPartitionWriter partition =
                 new ResultPartitionBuilder().setBufferPoolFactory(() -> 
localPool).build();
@@ -135,19 +132,19 @@ public class RecordWriterDelegateTest extends TestLogger {
         return new RecordWriterBuilder().build(partition);
     }
 
-    private void verifyAvailability(RecordWriterDelegate writerDelegate) 
throws Exception {
+    private static void verifyAvailability(RecordWriterDelegate 
writerDelegate) throws Exception {
         // writer is available at the beginning
-        assertTrue(writerDelegate.isAvailable());
-        assertTrue(writerDelegate.getAvailableFuture().isDone());
+        assertThat(writerDelegate.isAvailable()).isTrue();
+        assertThat(writerDelegate.getAvailableFuture()).isDone();
 
         // request one buffer from the local pool to make it unavailable
         RecordWriter recordWriter = writerDelegate.getRecordWriter(0);
         for (int i = 0; i < memorySegmentSize / recordSize; ++i) {
             recordWriter.emit(new IntValue(i));
         }
-        assertFalse(writerDelegate.isAvailable());
+        assertThat(writerDelegate.isAvailable()).isFalse();
         CompletableFuture future = writerDelegate.getAvailableFuture();
-        assertFalse(future.isDone());
+        assertThat(future).isNotDone();
 
         // recycle the buffer to make the local pool available again
         ResultSubpartitionView readView =
@@ -157,12 +154,12 @@ public class RecordWriterDelegateTest extends TestLogger {
         Buffer buffer = readView.getNextBuffer().buffer();
 
         buffer.recycleBuffer();
-        assertTrue(future.isDone());
-        assertTrue(writerDelegate.isAvailable());
-        assertTrue(writerDelegate.getAvailableFuture().isDone());
+        assertThat(future).isDone();
+        assertThat(writerDelegate.isAvailable()).isTrue();
+        assertThat(writerDelegate.getAvailableFuture()).isDone();
     }
 
-    private void verifyBroadcastEvent(
+    private static void verifyBroadcastEvent(
             RecordWriterDelegate writerDelegate, List<ResultPartition> 
partitions)
             throws Exception {
 
@@ -172,13 +169,13 @@ public class RecordWriterDelegateTest extends TestLogger {
         // verify the added messages in all the queues
         for (ResultPartition partition : partitions) {
             for (int i = 0; i < partition.getNumberOfSubpartitions(); i++) {
-                assertEquals(1, partition.getNumberOfQueuedBuffers(i));
+                assertThat(partition.getNumberOfQueuedBuffers(i)).isOne();
 
                 ResultSubpartitionView view =
                         partition.createSubpartitionView(i, new 
NoOpBufferAvailablityListener());
                 BufferOrEvent boe = 
RecordWriterTest.parseBuffer(view.getNextBuffer().buffer(), i);
-                assertTrue(boe.isEvent());
-                assertEquals(message, boe.getEvent());
+                assertThat(boe.isEvent()).isTrue();
+                assertThat(boe.getEvent()).isEqualTo(message);
             }
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/SubtaskStateMapperTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/SubtaskStateMapperTest.java
index a1ae90a4164..5fb3ae8d3aa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/SubtaskStateMapperTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/SubtaskStateMapperTest.java
@@ -17,146 +17,125 @@
 
 package org.apache.flink.runtime.io.network.api.writer;
 
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ErrorCollector;
+import org.junit.jupiter.api.Test;
 
 import static 
org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptorUtil.mappings;
 import static 
org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptorUtil.to;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests to(@link SubtaskStateMapper). */
-public class SubtaskStateMapperTest {
-    @Rule public ErrorCollector collector = new ErrorCollector();
+class SubtaskStateMapperTest {
 
     @Test
-    public void testFirstTaskMappingOnScaleDown() {
-        assertEquals(
-                mappings(to(0, 1, 2), to()),
-                SubtaskStateMapper.FIRST.getNewToOldSubtasksMapping(3, 2));
+    void testFirstTaskMappingOnScaleDown() {
+        assertThat(SubtaskStateMapper.FIRST.getNewToOldSubtasksMapping(3, 2))
+                .isEqualTo(mappings(to(0, 1, 2), to()));
     }
 
     @Test
-    public void testFirstTaskMappingOnNoScale() {
+    void testFirstTaskMappingOnNoScale() {
         // this may be a bit surprising, but the optimization should be done 
on call-site
-        assertEquals(
-                mappings(to(0, 1, 2), to(), to()),
-                SubtaskStateMapper.FIRST.getNewToOldSubtasksMapping(3, 3));
+        assertThat(SubtaskStateMapper.FIRST.getNewToOldSubtasksMapping(3, 3))
+                .isEqualTo(mappings(to(0, 1, 2), to(), to()));
     }
 
     @Test
-    public void testFirstTaskMappingOnScaleUp() {
-        assertEquals(
-                mappings(to(0, 1, 2), to(), to(), to()),
-                SubtaskStateMapper.FIRST.getNewToOldSubtasksMapping(3, 4));
+    void testFirstTaskMappingOnScaleUp() {
+        assertThat(SubtaskStateMapper.FIRST.getNewToOldSubtasksMapping(3, 4))
+                .isEqualTo(mappings(to(0, 1, 2), to(), to(), to()));
     }
 
     @Test
-    public void testFullTaskMappingOnScaleDown() {
-        assertEquals(
-                mappings(to(0, 1, 2), to(0, 1, 2)),
-                SubtaskStateMapper.FULL.getNewToOldSubtasksMapping(3, 2));
+    void testFullTaskMappingOnScaleDown() {
+        assertThat(SubtaskStateMapper.FULL.getNewToOldSubtasksMapping(3, 2))
+                .isEqualTo(mappings(to(0, 1, 2), to(0, 1, 2)));
     }
 
     @Test
-    public void testFullTaskMappingOnNoScale() {
+    void testFullTaskMappingOnNoScale() {
         // this may be a bit surprising, but the optimization should be done 
on call-site
-        assertEquals(
-                mappings(to(0, 1, 2), to(0, 1, 2), to(0, 1, 2)),
-                SubtaskStateMapper.FULL.getNewToOldSubtasksMapping(3, 3));
+        assertThat(SubtaskStateMapper.FULL.getNewToOldSubtasksMapping(3, 3))
+                .isEqualTo(mappings(to(0, 1, 2), to(0, 1, 2), to(0, 1, 2)));
     }
 
     @Test
-    public void testFullTaskMappingOnScaleUp() {
-        assertEquals(
-                mappings(to(0, 1, 2), to(0, 1, 2), to(0, 1, 2), to(0, 1, 2)),
-                SubtaskStateMapper.FULL.getNewToOldSubtasksMapping(3, 4));
+    void testFullTaskMappingOnScaleUp() {
+        assertThat(SubtaskStateMapper.FULL.getNewToOldSubtasksMapping(3, 4))
+                .isEqualTo(mappings(to(0, 1, 2), to(0, 1, 2), to(0, 1, 2), 
to(0, 1, 2)));
     }
 
     @Test
-    public void testRangeSelectorTaskMappingOnScaleDown() {
+    void testRangeSelectorTaskMappingOnScaleDown() {
         // 3 partitions: [0; 43) [43; 87) [87; 128)
         // 2 partitions: [0; 64) [64; 128)
-        assertEquals(
-                mappings(to(0, 1), to(1, 2)),
-                SubtaskStateMapper.RANGE.getNewToOldSubtasksMapping(3, 2));
+        assertThat(SubtaskStateMapper.RANGE.getNewToOldSubtasksMapping(3, 2))
+                .isEqualTo(mappings(to(0, 1), to(1, 2)));
 
-        assertEquals(
-                mappings(to(0, 1, 2, 3, 4), to(5, 6, 7, 8, 9)),
-                SubtaskStateMapper.RANGE.getNewToOldSubtasksMapping(10, 2));
+        assertThat(SubtaskStateMapper.RANGE.getNewToOldSubtasksMapping(10, 2))
+                .isEqualTo(mappings(to(0, 1, 2, 3, 4), to(5, 6, 7, 8, 9)));
 
-        assertEquals(
-                mappings(to(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)),
-                SubtaskStateMapper.RANGE.getNewToOldSubtasksMapping(10, 1));
+        assertThat(SubtaskStateMapper.RANGE.getNewToOldSubtasksMapping(10, 1))
+                .isEqualTo(mappings(to(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)));
     }
 
     @Test
-    public void testRangeSelectorTaskMappingOnNoScale() {
-        assertEquals(
-                mappings(to(0), to(1), to(2)),
-                SubtaskStateMapper.RANGE.getNewToOldSubtasksMapping(3, 3));
+    void testRangeSelectorTaskMappingOnNoScale() {
+        assertThat(SubtaskStateMapper.RANGE.getNewToOldSubtasksMapping(3, 3))
+                .isEqualTo(mappings(to(0), to(1), to(2)));
     }
 
     @Test
-    public void testRangeSelectorTaskMappingOnScaleUp() {
-        assertEquals(
-                mappings(to(0), to(0, 1), to(1, 2), to(2)),
-                SubtaskStateMapper.RANGE.getNewToOldSubtasksMapping(3, 4));
-
-        assertEquals(
-                mappings(to(0), to(0), to(0, 1), to(1), to(1, 2), to(2), 
to(2)),
-                SubtaskStateMapper.RANGE.getNewToOldSubtasksMapping(3, 7));
-    }
+    void testRangeSelectorTaskMappingOnScaleUp() {
+        assertThat(SubtaskStateMapper.RANGE.getNewToOldSubtasksMapping(3, 4))
+                .isEqualTo(mappings(to(0), to(0, 1), to(1, 2), to(2)));
 
-    @Test
-    public void testRoundRobinTaskMappingOnScaleDown() {
-        assertEquals(
-                mappings(to(0, 4, 8), to(1, 5, 9), to(2, 6), to(3, 7)),
-                SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(10, 
4));
-
-        assertEquals(
-                mappings(to(0, 4), to(1), to(2), to(3)),
-                SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(5, 
4));
-
-        assertEquals(
-                mappings(to(0, 2, 4), to(1, 3)),
-                SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(5, 
2));
-
-        assertEquals(
-                mappings(to(0, 1, 2, 3, 4)),
-                SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(5, 
1));
+        assertThat(SubtaskStateMapper.RANGE.getNewToOldSubtasksMapping(3, 7))
+                .isEqualTo(mappings(to(0), to(0), to(0, 1), to(1), to(1, 2), 
to(2), to(2)));
     }
 
     @Test
-    public void testRoundRobinTaskMappingOnNoScale() {
-        assertEquals(
-                mappings(to(0), to(1), to(2), to(3), to(4), to(5), to(6), 
to(7), to(8), to(9)),
-                SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(10, 
10));
+    void testRoundRobinTaskMappingOnScaleDown() {
+        
assertThat(SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(10, 4))
+                .isEqualTo(mappings(to(0, 4, 8), to(1, 5, 9), to(2, 6), to(3, 
7)));
+
+        
assertThat(SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(5, 4))
+                .isEqualTo(mappings(to(0, 4), to(1), to(2), to(3)));
 
-        assertEquals(
-                mappings(to(0), to(1), to(2), to(3), to(4)),
-                SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(5, 
5));
+        
assertThat(SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(5, 2))
+                .isEqualTo(mappings(to(0, 2, 4), to(1, 3)));
 
-        assertEquals(
-                mappings(to(0)), 
SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(1, 1));
+        
assertThat(SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(5, 1))
+                .isEqualTo(mappings(to(0, 1, 2, 3, 4)));
     }
 
     @Test
-    public void testRoundRobinTaskMappingOnScaleUp() {
-        assertEquals(
-                mappings(to(0), to(1), to(2), to(3), to(), to(), to(), to(), 
to(), to()),
-                SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(4, 
10));
-
-        assertEquals(
-                mappings(to(0), to(1), to(2), to(3), to()),
-                SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(4, 
5));
-
-        assertEquals(
-                mappings(to(0), to(1), to(), to(), to()),
-                SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(2, 
5));
-
-        assertEquals(
-                mappings(to(0), to(), to(), to(), to()),
-                SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(1, 
5));
+    void testRoundRobinTaskMappingOnNoScale() {
+        
assertThat(SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(10, 10))
+                .isEqualTo(
+                        mappings(
+                                to(0), to(1), to(2), to(3), to(4), to(5), 
to(6), to(7), to(8),
+                                to(9)));
+
+        
assertThat(SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(5, 5))
+                .isEqualTo(mappings(to(0), to(1), to(2), to(3), to(4)));
+
+        
assertThat(SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(1, 1))
+                .isEqualTo(mappings(to(0)));
+    }
+
+    @Test
+    void testRoundRobinTaskMappingOnScaleUp() {
+        
assertThat(SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(4, 10))
+                .isEqualTo(
+                        mappings(to(0), to(1), to(2), to(3), to(), to(), to(), 
to(), to(), to()));
+
+        
assertThat(SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(4, 5))
+                .isEqualTo(mappings(to(0), to(1), to(2), to(3), to()));
+
+        
assertThat(SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(2, 5))
+                .isEqualTo(mappings(to(0), to(1), to(), to(), to()));
+
+        
assertThat(SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(1, 5))
+                .isEqualTo(mappings(to(0), to(), to(), to(), to()));
     }
 }

Reply via email to