This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 1c67cccd2fd [FLINK-32850][flink-runtime][JUnit5 Migration] The io.network.api package of flink-runtime module 1c67cccd2fd is described below commit 1c67cccd2fdd6c674a38e0c26fe990e1dd7b62ae Author: Jiabao Sun <jiabao....@xtransfer.cn> AuthorDate: Wed Oct 25 16:31:45 2023 +0800 [FLINK-32850][flink-runtime][JUnit5 Migration] The io.network.api package of flink-runtime module --- .../io/network/api/CheckpointBarrierTest.java | 26 +-- .../io/network/api/reader/AbstractReaderTest.java | 89 ++++---- .../serialization/CheckpointSerializationTest.java | 26 +-- .../api/serialization/EventSerializerTest.java | 43 ++-- .../network/api/serialization/PagedViewsTest.java | 238 +++++---------------- .../SpanningRecordSerializationTest.java | 55 +++-- .../api/serialization/SpanningWrapperTest.java | 21 +- .../api/writer/BroadcastRecordWriterTest.java | 3 +- .../api/writer/RecordWriterDelegateTest.java | 61 +++--- .../network/api/writer/SubtaskStateMapperTest.java | 169 +++++++-------- 10 files changed, 273 insertions(+), 458 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java index 70de3450654..9b34ee62a64 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java @@ -22,37 +22,29 @@ import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.junit.Test; +import org.junit.jupiter.api.Test; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for the {@link CheckpointBarrier} type. */ -public class CheckpointBarrierTest { +class CheckpointBarrierTest { /** * Test serialization of the checkpoint barrier. The checkpoint barrier does not support its own * serialization, in order to be immutable. */ @Test - public void testSerialization() throws Exception { + void testSerialization() { long id = Integer.MAX_VALUE + 123123L; long timestamp = Integer.MAX_VALUE + 1228L; CheckpointOptions options = CheckpointOptions.forCheckpointWithDefaultLocation(); CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, options); - try { - barrier.write(new DataOutputSerializer(1024)); - fail("should throw an exception"); - } catch (UnsupportedOperationException e) { - // expected - } - - try { - barrier.read(new DataInputDeserializer(new byte[32])); - fail("should throw an exception"); - } catch (UnsupportedOperationException e) { - // expected - } + assertThatThrownBy(() -> barrier.write(new DataOutputSerializer(1024))) + .isInstanceOf(UnsupportedOperationException.class); + + assertThatThrownBy(() -> barrier.read(new DataInputDeserializer(new byte[32]))) + .isInstanceOf(UnsupportedOperationException.class); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java index 969cae48997..32228784396 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java @@ -26,25 +26,24 @@ import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.util.event.EventListener; -import org.junit.Test; +import org.junit.jupiter.api.Test; import org.mockito.Matchers; import java.io.IOException; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** Tests for the event handling behaviour. */ -public class AbstractReaderTest { +class AbstractReaderTest { @Test @SuppressWarnings("unchecked") - public void testTaskEvent() throws Exception { + void testTaskEvent() throws Exception { final AbstractReader reader = new MockReader(createInputGate(1)); final EventListener<TaskEvent> listener1 = mock(EventListener.class); @@ -64,10 +63,10 @@ public class AbstractReaderTest { } @Test - public void testEndOfPartitionEvent() throws Exception { + void testEndOfPartitionEvent() throws Exception { final AbstractReader reader = new MockReader(createInputGate(1)); - assertTrue(reader.handleEvent(EndOfPartitionEvent.INSTANCE)); + assertThat(reader.handleEvent(EndOfPartitionEvent.INSTANCE)).isTrue(); } /** @@ -75,78 +74,62 @@ public class AbstractReaderTest { * non-iterative reader. */ @Test - public void testExceptionsNonIterativeReader() throws Exception { + void testExceptionsNonIterativeReader() { final AbstractReader reader = new MockReader(createInputGate(4)); // Non-iterative reader cannot reach end of superstep - assertFalse(reader.hasReachedEndOfSuperstep()); - - try { - reader.startNextSuperstep(); - - fail( - "Did not throw expected exception when starting next superstep with non-iterative reader."); - } catch (Throwable t) { - // All good, expected exception. - } - - try { - reader.handleEvent(EndOfSuperstepEvent.INSTANCE); - - fail( - "Did not throw expected exception when handling end of superstep event with non-iterative reader."); - } catch (Throwable t) { - // All good, expected exception. - } + assertThat(reader.hasReachedEndOfSuperstep()).isFalse(); + + assertThatThrownBy(reader::startNextSuperstep) + .withFailMessage( + "Did not throw expected exception when starting next superstep with non-iterative reader.") + .isInstanceOf(IllegalStateException.class); + + assertThatThrownBy(() -> reader.handleEvent(EndOfSuperstepEvent.INSTANCE)) + .withFailMessage( + "Did not throw expected exception when handling end of superstep event with non-iterative reader.") + .hasCauseInstanceOf(IllegalStateException.class) + .isInstanceOf(IOException.class); } @Test - public void testEndOfSuperstepEventLogic() throws IOException { + void testEndOfSuperstepEventLogic() throws IOException { final int numberOfInputChannels = 4; final AbstractReader reader = new MockReader(createInputGate(numberOfInputChannels)); reader.setIterativeReader(); - try { - // The first superstep does not need not to be explicitly started - reader.startNextSuperstep(); - - fail( - "Did not throw expected exception when starting next superstep before receiving all end of superstep events."); - } catch (Throwable t) { - // All good, expected exception. - } + // The first superstep does not need not to be explicitly started + assertThatThrownBy(reader::startNextSuperstep) + .withFailMessage( + "Did not throw expected exception when starting next superstep before receiving all end of superstep events.") + .isInstanceOf(IllegalStateException.class); EndOfSuperstepEvent eos = EndOfSuperstepEvent.INSTANCE; // One end of superstep event for each input channel. The superstep finishes with the last // received event. for (int i = 0; i < numberOfInputChannels - 1; i++) { - assertFalse(reader.handleEvent(eos)); - assertFalse(reader.hasReachedEndOfSuperstep()); + assertThat(reader.handleEvent(eos)).isFalse(); + assertThat(reader.hasReachedEndOfSuperstep()).isFalse(); } - assertTrue(reader.handleEvent(eos)); - assertTrue(reader.hasReachedEndOfSuperstep()); + assertThat(reader.handleEvent(eos)).isTrue(); + assertThat(reader.hasReachedEndOfSuperstep()).isTrue(); - try { - // Verify exception, when receiving too many end of superstep events. - reader.handleEvent(eos); - - fail( - "Did not throw expected exception when receiving too many end of superstep events."); - } catch (Throwable t) { - // All good, expected exception. - } + assertThatThrownBy(() -> reader.handleEvent(eos)) + .withFailMessage( + "Did not throw expected exception when receiving too many end of superstep events.") + .isInstanceOf(IOException.class); // Start next superstep. reader.startNextSuperstep(); - assertFalse(reader.hasReachedEndOfSuperstep()); + assertThat(reader.hasReachedEndOfSuperstep()).isFalse(); } - private InputGate createInputGate(int numberOfInputChannels) { + private static InputGate createInputGate(int numberOfInputChannels) { final InputGate inputGate = mock(InputGate.class); when(inputGate.getNumberOfInputChannels()).thenReturn(numberOfInputChannels); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/CheckpointSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/CheckpointSerializationTest.java index 9dd3ca8222f..519026e2650 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/CheckpointSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/CheckpointSerializationTest.java @@ -25,25 +25,24 @@ import org.apache.flink.runtime.checkpoint.SavepointType; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.nio.ByteBuffer; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.assertj.core.api.Assertions.assertThat; /** * Tests the {@link EventSerializer} functionality for serializing {@link CheckpointBarrier * checkpoint barriers}. */ -public class CheckpointSerializationTest { +class CheckpointSerializationTest { private static final byte[] STORAGE_LOCATION_REF = new byte[] {15, 52, 52, 11, 0, 0, 0, 0, -1, -23, -19, 35}; @Test - public void testSuspendingCheckpointBarrierSerialization() throws Exception { + void testSuspendingCheckpointBarrierSerialization() throws Exception { CheckpointOptions suspendSavepointToSerialize = new CheckpointOptions( SavepointType.suspend(SavepointFormatType.CANONICAL), @@ -52,7 +51,7 @@ public class CheckpointSerializationTest { } @Test - public void testSavepointBarrierSerialization() throws Exception { + void testSavepointBarrierSerialization() throws Exception { CheckpointOptions savepointToSerialize = new CheckpointOptions( SavepointType.savepoint(SavepointFormatType.CANONICAL), @@ -61,7 +60,7 @@ public class CheckpointSerializationTest { } @Test - public void testCheckpointBarrierSerialization() throws Exception { + void testCheckpointBarrierSerialization() throws Exception { CheckpointOptions checkpointToSerialize = new CheckpointOptions( CheckpointType.CHECKPOINT, @@ -70,7 +69,7 @@ public class CheckpointSerializationTest { } @Test - public void testFullCheckpointBarrierSerialization() throws Exception { + void testFullCheckpointBarrierSerialization() throws Exception { CheckpointOptions checkpointToSerialize = new CheckpointOptions( CheckpointType.FULL_CHECKPOINT, @@ -79,13 +78,14 @@ public class CheckpointSerializationTest { } @Test - public void testCheckpointWithDefaultLocationSerialization() throws Exception { + void testCheckpointWithDefaultLocationSerialization() throws Exception { CheckpointOptions checkpointToSerialize = CheckpointOptions.forCheckpointWithDefaultLocation(); testCheckpointBarrierSerialization(checkpointToSerialize); } - private void testCheckpointBarrierSerialization(CheckpointOptions options) throws IOException { + private static void testCheckpointBarrierSerialization(CheckpointOptions options) + throws IOException { final long checkpointId = Integer.MAX_VALUE + 123123L; final long timestamp = Integer.MAX_VALUE + 1228L; @@ -94,16 +94,16 @@ public class CheckpointSerializationTest { final CheckpointBarrier barrierAfterDeserialization = serializeAndDeserializeCheckpointBarrier(barrierBeforeSerialization); - assertEquals(barrierBeforeSerialization, barrierAfterDeserialization); + assertThat(barrierAfterDeserialization).isEqualTo(barrierBeforeSerialization); } - private CheckpointBarrier serializeAndDeserializeCheckpointBarrier( + private static CheckpointBarrier serializeAndDeserializeCheckpointBarrier( final CheckpointBarrier barrierUnderTest) throws IOException { final ClassLoader cl = Thread.currentThread().getContextClassLoader(); final ByteBuffer serialized = EventSerializer.toSerializedEvent(barrierUnderTest); final CheckpointBarrier deserialized = (CheckpointBarrier) EventSerializer.fromSerializedEvent(serialized, cl); - assertFalse(serialized.hasRemaining()); + assertThat(serialized.hasRemaining()).isFalse(); return deserialized; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java index 00d5c097437..4a0378b0aaf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java @@ -37,18 +37,15 @@ import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.io.network.util.TestTaskEvent; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.nio.ByteBuffer; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link EventSerializer}. */ -public class EventSerializerTest { +class EventSerializerTest { private final AbstractEvent[] events = { EndOfPartitionEvent.INSTANCE, @@ -119,50 +116,50 @@ public class EventSerializerTest { }; @Test - public void testSerializeDeserializeEvent() throws Exception { + void testSerializeDeserializeEvent() throws Exception { for (AbstractEvent evt : events) { ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(evt); - assertTrue(serializedEvent.hasRemaining()); + assertThat(serializedEvent.hasRemaining()).isTrue(); AbstractEvent deserialized = EventSerializer.fromSerializedEvent( serializedEvent, getClass().getClassLoader()); - assertNotNull(deserialized); - assertEquals(evt, deserialized); + assertThat(deserialized).isNotNull().isEqualTo(evt); } } @Test - public void testToBufferConsumer() throws IOException { + void testToBufferConsumer() throws IOException { for (AbstractEvent evt : events) { BufferConsumer bufferConsumer = EventSerializer.toBufferConsumer(evt, false); - assertFalse(bufferConsumer.isBuffer()); - assertTrue(bufferConsumer.isFinished()); - assertTrue(bufferConsumer.isDataAvailable()); - assertFalse(bufferConsumer.isRecycled()); + assertThat(bufferConsumer.isBuffer()).isFalse(); + assertThat(bufferConsumer.isFinished()).isTrue(); + assertThat(bufferConsumer.isDataAvailable()).isTrue(); + assertThat(bufferConsumer.isRecycled()).isFalse(); if (evt instanceof CheckpointBarrier) { - assertTrue(bufferConsumer.build().getDataType().isBlockingUpstream()); + assertThat(bufferConsumer.build().getDataType().isBlockingUpstream()).isTrue(); } else { - assertEquals(Buffer.DataType.EVENT_BUFFER, bufferConsumer.build().getDataType()); + assertThat(bufferConsumer.build().getDataType()) + .isEqualTo(Buffer.DataType.EVENT_BUFFER); } } } @Test - public void testToBuffer() throws IOException { + void testToBuffer() throws IOException { for (AbstractEvent evt : events) { Buffer buffer = EventSerializer.toBuffer(evt, false); - assertFalse(buffer.isBuffer()); - assertTrue(buffer.readableBytes() > 0); - assertFalse(buffer.isRecycled()); + assertThat(buffer.isBuffer()).isFalse(); + assertThat(buffer.readableBytes()).isGreaterThan(0); + assertThat(buffer.isRecycled()).isFalse(); if (evt instanceof CheckpointBarrier) { - assertTrue(buffer.getDataType().isBlockingUpstream()); + assertThat(buffer.getDataType().isBlockingUpstream()).isTrue(); } else { - assertEquals(Buffer.DataType.EVENT_BUFFER, buffer.getDataType()); + assertThat(buffer.getDataType()).isEqualTo(Buffer.DataType.EVENT_BUFFER); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java index 6fc42feb05b..0257725647f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java @@ -26,7 +26,7 @@ import org.apache.flink.testutils.serialization.types.SerializationTestType; import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory; import org.apache.flink.testutils.serialization.types.Util; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.io.EOFException; import java.io.IOException; @@ -34,121 +34,72 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for the {@link AbstractPagedInputView} and {@link AbstractPagedOutputView}. */ -public class PagedViewsTest { +class PagedViewsTest { @Test - public void testSequenceOfIntegersWithAlignedBuffers() { - try { - final int numInts = 1000000; - - testSequenceOfTypes( - Util.randomRecords(numInts, SerializationTestTypeFactory.INT), 2048); - - } catch (Exception e) { - e.printStackTrace(); - fail("Test encountered an unexpected exception."); - } + void testSequenceOfIntegersWithAlignedBuffers() throws Exception { + final int numInts = 1000000; + testSequenceOfTypes(Util.randomRecords(numInts, SerializationTestTypeFactory.INT), 2048); } @Test - public void testSequenceOfIntegersWithUnalignedBuffers() { - try { - final int numInts = 1000000; - - testSequenceOfTypes( - Util.randomRecords(numInts, SerializationTestTypeFactory.INT), 2047); - - } catch (Exception e) { - e.printStackTrace(); - fail("Test encountered an unexpected exception."); - } + void testSequenceOfIntegersWithUnalignedBuffers() throws Exception { + final int numInts = 1000000; + testSequenceOfTypes(Util.randomRecords(numInts, SerializationTestTypeFactory.INT), 2047); } @Test - public void testRandomTypes() { - try { - final int numTypes = 100000; + void testRandomTypes() throws Exception { + final int numTypes = 100000; - // test with an odd buffer size to force many unaligned cases - testSequenceOfTypes(Util.randomRecords(numTypes), 57); - - } catch (Exception e) { - e.printStackTrace(); - fail("Test encountered an unexpected exception."); - } + // test with an odd buffer size to force many unaligned cases + testSequenceOfTypes(Util.randomRecords(numTypes), 57); } @Test - public void testReadFully() { + void testReadFully() throws IOException { int bufferSize = 100; byte[] expected = new byte[bufferSize]; new Random().nextBytes(expected); TestOutputView outputView = new TestOutputView(bufferSize); - - try { - outputView.write(expected); - } catch (Exception e) { - e.printStackTrace(); - fail("Unexpected exception: Could not write to TestOutputView."); - } - + outputView.write(expected); outputView.close(); TestInputView inputView = new TestInputView(outputView.segments); byte[] buffer = new byte[bufferSize]; - try { - inputView.readFully(buffer); - } catch (IOException e) { - e.printStackTrace(); - fail("Unexpected exception: Could not read TestInputView."); - } + inputView.readFully(buffer); - assertEquals(inputView.getCurrentPositionInSegment(), bufferSize); - assertArrayEquals(expected, buffer); + assertThat(inputView.getCurrentPositionInSegment()).isEqualTo(bufferSize); + assertThat(buffer).isEqualTo(expected); } @Test - public void testReadFullyAcrossSegments() { + void testReadFullyAcrossSegments() throws Exception { int bufferSize = 100; int segmentSize = 30; byte[] expected = new byte[bufferSize]; new Random().nextBytes(expected); TestOutputView outputView = new TestOutputView(segmentSize); - - try { - outputView.write(expected); - } catch (Exception e) { - e.printStackTrace(); - fail("Unexpected exception: Could not write to TestOutputView."); - } - + outputView.write(expected); outputView.close(); TestInputView inputView = new TestInputView(outputView.segments); byte[] buffer = new byte[bufferSize]; + inputView.readFully(buffer); - try { - inputView.readFully(buffer); - } catch (IOException e) { - e.printStackTrace(); - fail("Unexpected exception: Could not read TestInputView."); - } - - assertEquals(inputView.getCurrentPositionInSegment(), bufferSize % segmentSize); - assertArrayEquals(expected, buffer); + assertThat(inputView.getCurrentPositionInSegment()).isEqualTo(bufferSize % segmentSize); + assertThat(buffer).isEqualTo(expected); } @Test - public void testReadAcrossSegments() { + void testReadAcrossSegments() throws Exception { int bufferSize = 100; int bytes2Write = 75; int segmentSize = 30; @@ -156,37 +107,23 @@ public class PagedViewsTest { new Random().nextBytes(expected); TestOutputView outputView = new TestOutputView(segmentSize); - - try { - outputView.write(expected); - } catch (Exception e) { - e.printStackTrace(); - fail("Unexpected exception: Could not write to TestOutputView."); - } - + outputView.write(expected); outputView.close(); TestInputView inputView = new TestInputView(outputView.segments); byte[] buffer = new byte[bufferSize]; - int bytesRead = 0; - - try { - bytesRead = inputView.read(buffer); - } catch (IOException e) { - e.printStackTrace(); - fail("Unexpected exception: Could not read TestInputView."); - } + int bytesRead = inputView.read(buffer); - assertEquals(bytes2Write, bytesRead); - assertEquals(inputView.getCurrentPositionInSegment(), bytes2Write % segmentSize); + assertThat(bytesRead).isEqualTo(bytes2Write); + assertThat(inputView.getCurrentPositionInSegment()).isEqualTo(bytes2Write % segmentSize); byte[] tempBuffer = new byte[bytesRead]; System.arraycopy(buffer, 0, tempBuffer, 0, bytesRead); - assertArrayEquals(expected, tempBuffer); + assertThat(tempBuffer).isEqualTo(expected); } @Test - public void testEmptyingInputView() { + void testEmptyingInputView() throws Exception { int bufferSize = 100; int bytes2Write = 75; int segmentSize = 30; @@ -194,46 +131,25 @@ public class PagedViewsTest { new Random().nextBytes(expected); TestOutputView outputView = new TestOutputView(segmentSize); - - try { - outputView.write(expected); - } catch (Exception e) { - e.printStackTrace(); - fail("Unexpected exception: Could not write to TestOutputView."); - } - + outputView.write(expected); outputView.close(); TestInputView inputView = new TestInputView(outputView.segments); byte[] buffer = new byte[bufferSize]; - int bytesRead = 0; - - try { - bytesRead = inputView.read(buffer); - } catch (IOException e) { - e.printStackTrace(); - fail("Unexpected exception: Could not read TestInputView."); - } - - assertEquals(bytes2Write, bytesRead); + int bytesRead = inputView.read(buffer); + assertThat(bytesRead).isEqualTo(bytes2Write); byte[] tempBuffer = new byte[bytesRead]; System.arraycopy(buffer, 0, tempBuffer, 0, bytesRead); - assertArrayEquals(expected, tempBuffer); + assertThat(tempBuffer).isEqualTo(expected); - try { - bytesRead = inputView.read(buffer); - } catch (IOException e) { - e.printStackTrace(); - fail("Unexpected exception: Input view should be empty and thus return -1."); - } - - assertEquals(-1, bytesRead); - assertEquals(inputView.getCurrentPositionInSegment(), bytes2Write % segmentSize); + bytesRead = inputView.read(buffer); + assertThat(bytesRead).isEqualTo(-1); + assertThat(inputView.getCurrentPositionInSegment()).isEqualTo(bytes2Write % segmentSize); } @Test - public void testReadFullyWithNotEnoughData() { + void testReadFullyWithNotEnoughData() throws Exception { int bufferSize = 100; int bytes2Write = 99; int segmentSize = 30; @@ -241,99 +157,49 @@ public class PagedViewsTest { new Random().nextBytes(expected); TestOutputView outputView = new TestOutputView(segmentSize); - - try { - outputView.write(expected); - } catch (Exception e) { - e.printStackTrace(); - fail("Unexpected exception: Could not write to TestOutputView."); - } - + outputView.write(expected); outputView.close(); TestInputView inputView = new TestInputView(outputView.segments); byte[] buffer = new byte[bufferSize]; - boolean eofException = false; - - try { - inputView.readFully(buffer); - } catch (EOFException e) { - // Expected exception - eofException = true; - } catch (IOException e) { - e.printStackTrace(); - fail("Unexpected exception: Could not read TestInputView."); - } - - assertTrue("EOFException should have occurred.", eofException); - int bytesRead = 0; + assertThatThrownBy(() -> inputView.readFully(buffer)).isInstanceOf(EOFException.class); - try { - bytesRead = inputView.read(buffer); - } catch (Exception e) { - e.printStackTrace(); - fail("Unexpected exception: Could not read TestInputView."); - } - - assertEquals(-1, bytesRead); + int bytesRead = inputView.read(buffer); + assertThat(bytesRead).isEqualTo(-1); } @Test - public void testReadFullyWithOffset() { + void testReadFullyWithOffset() throws Exception { int bufferSize = 100; int segmentSize = 30; byte[] expected = new byte[bufferSize]; new Random().nextBytes(expected); TestOutputView outputView = new TestOutputView(segmentSize); - - try { - outputView.write(expected); - } catch (Exception e) { - e.printStackTrace(); - fail("Unexpected exception: Could not write to TestOutputView."); - } - + outputView.write(expected); outputView.close(); TestInputView inputView = new TestInputView(outputView.segments); byte[] buffer = new byte[2 * bufferSize]; + inputView.readFully(buffer, bufferSize, bufferSize); + assertThat(inputView.getCurrentPositionInSegment()).isEqualTo(bufferSize % segmentSize); - try { - inputView.readFully(buffer, bufferSize, bufferSize); - } catch (IOException e) { - e.printStackTrace(); - fail("Unexpected exception: Could not read TestInputView."); - } - - assertEquals(inputView.getCurrentPositionInSegment(), bufferSize % segmentSize); byte[] tempBuffer = new byte[bufferSize]; System.arraycopy(buffer, bufferSize, tempBuffer, 0, bufferSize); - assertArrayEquals(expected, tempBuffer); + assertThat(tempBuffer).isEqualTo(expected); } @Test - public void testReadFullyEmptyView() { + void testReadFullyEmptyView() { int segmentSize = 30; TestOutputView outputView = new TestOutputView(segmentSize); outputView.close(); TestInputView inputView = new TestInputView(outputView.segments); byte[] buffer = new byte[segmentSize]; - boolean eofException = false; - - try { - inputView.readFully(buffer); - } catch (EOFException e) { - // expected Exception - eofException = true; - } catch (Exception e) { - e.printStackTrace(); - fail("Unexpected exception: Could not read TestInputView."); - } - assertTrue("EOFException expected.", eofException); + assertThatThrownBy(() -> inputView.readFully(buffer)).isInstanceOf(EOFException.class); } private static void testSequenceOfTypes( @@ -356,7 +222,7 @@ public class PagedViewsTest { for (SerializationTestType reference : elements) { SerializationTestType result = reference.getClass().newInstance(); result.read(inView); - assertEquals(reference, result); + assertThat(result).isEqualTo(reference); } } @@ -367,7 +233,7 @@ public class PagedViewsTest { private final MemorySegment segment; private final int position; - public SegmentWithPosition(MemorySegment segment, int position) { + SegmentWithPosition(MemorySegment segment, int position) { this.segment = segment; this.position = position; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java index 6924a881bf5..71cc8011269 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java @@ -33,14 +33,12 @@ import org.apache.flink.testutils.serialization.types.SerializationTestType; import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory; import org.apache.flink.testutils.serialization.types.Util; import org.apache.flink.util.CloseableIterator; -import org.apache.flink.util.TestLogger; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; @@ -53,15 +51,16 @@ import java.util.Random; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferBuilder; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link SpillingAdaptiveSpanningRecordDeserializer}. */ -public class SpanningRecordSerializationTest extends TestLogger { +class SpanningRecordSerializationTest { private static final Random RANDOM = new Random(42); - @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + @TempDir private File tempFolder; @Test - public void testIntRecordsSpanningMultipleSegments() throws Exception { + void testIntRecordsSpanningMultipleSegments() throws Exception { final int segmentSize = 1; final int numValues = 10; @@ -70,7 +69,7 @@ public class SpanningRecordSerializationTest extends TestLogger { } @Test - public void testIntRecordsWithAlignedBuffers() throws Exception { + void testIntRecordsWithAlignedBuffers() throws Exception { final int segmentSize = 64; final int numValues = 64; @@ -79,7 +78,7 @@ public class SpanningRecordSerializationTest extends TestLogger { } @Test - public void testIntRecordsWithUnalignedBuffers() throws Exception { + void testIntRecordsWithUnalignedBuffers() throws Exception { final int segmentSize = 31; final int numValues = 248; @@ -88,7 +87,7 @@ public class SpanningRecordSerializationTest extends TestLogger { } @Test - public void testRandomRecords() throws Exception { + void testRandomRecords() throws Exception { final int segmentSize = 127; final int numValues = 10000; @@ -96,7 +95,7 @@ public class SpanningRecordSerializationTest extends TestLogger { } @Test - public void testHandleMixedLargeRecords() throws Exception { + void testHandleMixedLargeRecords() throws Exception { final int numValues = 99; final int segmentSize = 32 * 1024; @@ -121,7 +120,7 @@ public class SpanningRecordSerializationTest extends TestLogger { Iterable<SerializationTestType> records, int segmentSize) throws Exception { RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>( - new String[] {tempFolder.getRoot().getAbsolutePath()}); + new String[] {tempFolder.getAbsolutePath()}); testSerializationRoundTrip(records, segmentSize, deserializer); } @@ -173,7 +172,7 @@ public class SpanningRecordSerializationTest extends TestLogger { deserializer.setNextBuffer(serializationResult.buildBuffer()); } } - Assert.assertFalse(serializedRecord.hasRemaining()); + assertThat(serializedRecord.hasRemaining()).isFalse(); } // deserialize left over records @@ -185,20 +184,20 @@ public class SpanningRecordSerializationTest extends TestLogger { SerializationTestType actual = expected.getClass().newInstance(); RecordDeserializer.DeserializationResult result = deserializer.getNextRecord(actual); - Assert.assertTrue(result.isFullRecord()); - Assert.assertEquals(expected, actual); + assertThat(result.isFullRecord()).isTrue(); + assertThat(actual).isEqualTo(expected); numRecords--; } // assert that all records have been serialized and deserialized - Assert.assertEquals(0, numRecords); + assertThat(numRecords).isZero(); } @Test - public void testSmallRecordUnconsumedBuffer() throws Exception { + void testSmallRecordUnconsumedBuffer() throws Exception { RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>( - new String[] {tempFolder.getRoot().getAbsolutePath()}); + new String[] {tempFolder.getAbsolutePath()}); testUnconsumedBuffer( deserializer, Util.randomRecord(SerializationTestTypeFactory.INT), 1024); @@ -209,29 +208,29 @@ public class SpanningRecordSerializationTest extends TestLogger { * by byte. */ @Test - public void testSpanningRecordUnconsumedBuffer() throws Exception { + void testSpanningRecordUnconsumedBuffer() throws Exception { RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>( - new String[] {tempFolder.getRoot().getAbsolutePath()}); + new String[] {tempFolder.getAbsolutePath()}); testUnconsumedBuffer(deserializer, Util.randomRecord(SerializationTestTypeFactory.INT), 1); } @Test - public void testLargeSpanningRecordUnconsumedBuffer() throws Exception { + void testLargeSpanningRecordUnconsumedBuffer() throws Exception { RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>( - new String[] {tempFolder.getRoot().getAbsolutePath()}); + new String[] {tempFolder.getAbsolutePath()}); testUnconsumedBuffer( deserializer, Util.randomRecord(SerializationTestTypeFactory.BYTE_ARRAY), 1); } @Test - public void testLargeSpanningRecordUnconsumedBufferWithLeftOverBytes() throws Exception { + void testLargeSpanningRecordUnconsumedBufferWithLeftOverBytes() throws Exception { RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>( - new String[] {tempFolder.getRoot().getAbsolutePath()}); + new String[] {tempFolder.getAbsolutePath()}); testUnconsumedBuffer( deserializer, @@ -248,7 +247,7 @@ public class SpanningRecordSerializationTest extends TestLogger { new byte[] {42, 43, 44}); } - public void testUnconsumedBuffer( + private static void testUnconsumedBuffer( RecordDeserializer<SerializationTestType> deserializer, SerializationTestType record, int segmentSize, @@ -308,12 +307,12 @@ public class SpanningRecordSerializationTest extends TestLogger { private static void assertUnconsumedBuffer( ByteArrayOutputStream expected, CloseableIterator<Buffer> actual) throws Exception { if (!actual.hasNext()) { - Assert.assertEquals(expected.size(), 0); + assertThat(expected.size()).isZero(); } ByteBuffer expectedByteBuffer = ByteBuffer.wrap(expected.toByteArray()); ByteBuffer actualByteBuffer = actual.next().getNioBufferReadable(); - Assert.assertEquals(expectedByteBuffer, actualByteBuffer); + assertThat(actualByteBuffer).isEqualTo(expectedByteBuffer); actual.close(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java index 46719d1c3bd..7436ed1adab 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java @@ -19,30 +19,31 @@ package org.apache.flink.runtime.io.network.api.serialization; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.util.CloseableIterator; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import java.io.File; +import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.Random; import static org.apache.flink.core.memory.MemorySegmentFactory.wrap; import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES; -import static org.junit.Assert.assertArrayEquals; +import static org.assertj.core.api.Assertions.assertThat; /** {@link SpanningWrapper} test. */ -public class SpanningWrapperTest { +class SpanningWrapperTest { private static final Random random = new Random(); - @Rule public TemporaryFolder folder = new TemporaryFolder(); + @TempDir private Path folder; @Test - public void testLargeUnconsumedSegment() throws Exception { + void testLargeUnconsumedSegment() throws Exception { int recordLen = 100; int firstChunk = (int) (recordLen * .9); int spillingThreshold = (int) (firstChunk * .9); @@ -50,14 +51,14 @@ public class SpanningWrapperTest { byte[] record1 = recordBytes(recordLen); byte[] record2 = recordBytes(recordLen * 2); - File canNotEecutableFile = folder.newFolder(); + File canNotEecutableFile = TempDirUtils.newFolder(folder); canNotEecutableFile.setExecutable(false); // Always pick 'canNotEecutableFile' first as the Spilling Channel TmpDir. Thus trigger an // IOException. SpanningWrapper spanningWrapper = new SpanningWrapper( new String[] { - folder.newFolder().getAbsolutePath(), + TempDirUtils.newFolder(folder).getAbsolutePath(), canNotEecutableFile.getAbsolutePath() + File.separator + "pathdonotexit" }, spillingThreshold, @@ -79,7 +80,7 @@ public class SpanningWrapperTest { canNotEecutableFile.setExecutable(true); - assertArrayEquals(concat(record1, record2), toByteArray(unconsumedSegment)); + assertThat(concat(record1, record2)).isEqualTo(toByteArray(unconsumedSegment)); } private byte[] recordBytes(int recordLen) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java index b2c3fa6cf42..26dc18ceba5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java @@ -157,7 +157,8 @@ class BroadcastRecordWriterTest { assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isEqualTo(2); } - public void closeConsumer(ResultSubpartitionView view, int expectedSize) throws IOException { + private static void closeConsumer(ResultSubpartitionView view, int expectedSize) + throws IOException { Buffer buffer = view.getNextBuffer().buffer(); assertThat(buffer.getSize()).isEqualTo(expectedSize); buffer.recycleBuffer(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java index 4d6d1260d1d..b717a5f908f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java @@ -28,23 +28,20 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.types.IntValue; -import org.apache.flink.util.TestLogger; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link SingleRecordWriter} and {@link MultipleRecordWriters}. */ -public class RecordWriterDelegateTest extends TestLogger { +class RecordWriterDelegateTest { private static final int recordSize = 8; @@ -54,31 +51,31 @@ public class RecordWriterDelegateTest extends TestLogger { private NetworkBufferPool globalPool; - @Before - public void setup() { - assertEquals("Illegal memory segment size,", 0, memorySegmentSize % recordSize); + @BeforeEach + void setup() { + assertThat(memorySegmentSize % recordSize).as("Illegal memory segment size").isZero(); globalPool = new NetworkBufferPool(numberOfBuffers, memorySegmentSize); } - @After - public void teardown() { + @AfterEach + void teardown() { globalPool.destroyAllBufferPools(); globalPool.destroy(); } @Test @SuppressWarnings("unchecked") - public void testSingleRecordWriterAvailability() throws Exception { + void testSingleRecordWriterAvailability() throws Exception { final RecordWriter recordWriter = createRecordWriter(globalPool); final RecordWriterDelegate writerDelegate = new SingleRecordWriter(recordWriter); - assertEquals(recordWriter, writerDelegate.getRecordWriter(0)); + assertThat(writerDelegate.getRecordWriter(0)).isEqualTo(recordWriter); verifyAvailability(writerDelegate); } @Test @SuppressWarnings("unchecked") - public void testMultipleRecordWritersAvailability() throws Exception { + void testMultipleRecordWritersAvailability() throws Exception { // setup final int numRecordWriters = 2; final List<RecordWriter> recordWriters = new ArrayList<>(numRecordWriters); @@ -89,7 +86,7 @@ public class RecordWriterDelegateTest extends TestLogger { RecordWriterDelegate writerDelegate = new MultipleRecordWriters(recordWriters); for (int i = 0; i < numRecordWriters; i++) { - assertEquals(recordWriters.get(i), writerDelegate.getRecordWriter(i)); + assertThat(writerDelegate.getRecordWriter(i)).isEqualTo(recordWriters.get(i)); } verifyAvailability(writerDelegate); @@ -97,7 +94,7 @@ public class RecordWriterDelegateTest extends TestLogger { @Test @SuppressWarnings("unchecked") - public void testSingleRecordWriterBroadcastEvent() throws Exception { + void testSingleRecordWriterBroadcastEvent() throws Exception { // setup final ResultPartition partition = RecordWriterTest.createResultPartition(memorySegmentSize, 2); @@ -109,7 +106,7 @@ public class RecordWriterDelegateTest extends TestLogger { @Test @SuppressWarnings("unchecked") - public void testMultipleRecordWritersBroadcastEvent() throws Exception { + void testMultipleRecordWritersBroadcastEvent() throws Exception { // setup final int numRecordWriters = 2; final List<RecordWriter> recordWriters = new ArrayList<>(numRecordWriters); @@ -126,7 +123,7 @@ public class RecordWriterDelegateTest extends TestLogger { verifyBroadcastEvent(writerDelegate, partitions); } - private RecordWriter createRecordWriter(NetworkBufferPool globalPool) throws Exception { + private static RecordWriter createRecordWriter(NetworkBufferPool globalPool) throws Exception { final BufferPool localPool = globalPool.createBufferPool(1, 1, 1, Integer.MAX_VALUE, 0); final ResultPartitionWriter partition = new ResultPartitionBuilder().setBufferPoolFactory(() -> localPool).build(); @@ -135,19 +132,19 @@ public class RecordWriterDelegateTest extends TestLogger { return new RecordWriterBuilder().build(partition); } - private void verifyAvailability(RecordWriterDelegate writerDelegate) throws Exception { + private static void verifyAvailability(RecordWriterDelegate writerDelegate) throws Exception { // writer is available at the beginning - assertTrue(writerDelegate.isAvailable()); - assertTrue(writerDelegate.getAvailableFuture().isDone()); + assertThat(writerDelegate.isAvailable()).isTrue(); + assertThat(writerDelegate.getAvailableFuture()).isDone(); // request one buffer from the local pool to make it unavailable RecordWriter recordWriter = writerDelegate.getRecordWriter(0); for (int i = 0; i < memorySegmentSize / recordSize; ++i) { recordWriter.emit(new IntValue(i)); } - assertFalse(writerDelegate.isAvailable()); + assertThat(writerDelegate.isAvailable()).isFalse(); CompletableFuture future = writerDelegate.getAvailableFuture(); - assertFalse(future.isDone()); + assertThat(future).isNotDone(); // recycle the buffer to make the local pool available again ResultSubpartitionView readView = @@ -157,12 +154,12 @@ public class RecordWriterDelegateTest extends TestLogger { Buffer buffer = readView.getNextBuffer().buffer(); buffer.recycleBuffer(); - assertTrue(future.isDone()); - assertTrue(writerDelegate.isAvailable()); - assertTrue(writerDelegate.getAvailableFuture().isDone()); + assertThat(future).isDone(); + assertThat(writerDelegate.isAvailable()).isTrue(); + assertThat(writerDelegate.getAvailableFuture()).isDone(); } - private void verifyBroadcastEvent( + private static void verifyBroadcastEvent( RecordWriterDelegate writerDelegate, List<ResultPartition> partitions) throws Exception { @@ -172,13 +169,13 @@ public class RecordWriterDelegateTest extends TestLogger { // verify the added messages in all the queues for (ResultPartition partition : partitions) { for (int i = 0; i < partition.getNumberOfSubpartitions(); i++) { - assertEquals(1, partition.getNumberOfQueuedBuffers(i)); + assertThat(partition.getNumberOfQueuedBuffers(i)).isOne(); ResultSubpartitionView view = partition.createSubpartitionView(i, new NoOpBufferAvailablityListener()); BufferOrEvent boe = RecordWriterTest.parseBuffer(view.getNextBuffer().buffer(), i); - assertTrue(boe.isEvent()); - assertEquals(message, boe.getEvent()); + assertThat(boe.isEvent()).isTrue(); + assertThat(boe.getEvent()).isEqualTo(message); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/SubtaskStateMapperTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/SubtaskStateMapperTest.java index a1ae90a4164..5fb3ae8d3aa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/SubtaskStateMapperTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/SubtaskStateMapperTest.java @@ -17,146 +17,125 @@ package org.apache.flink.runtime.io.network.api.writer; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ErrorCollector; +import org.junit.jupiter.api.Test; import static org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptorUtil.mappings; import static org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptorUtil.to; -import static org.junit.Assert.assertEquals; +import static org.assertj.core.api.Assertions.assertThat; /** Tests to(@link SubtaskStateMapper). */ -public class SubtaskStateMapperTest { - @Rule public ErrorCollector collector = new ErrorCollector(); +class SubtaskStateMapperTest { @Test - public void testFirstTaskMappingOnScaleDown() { - assertEquals( - mappings(to(0, 1, 2), to()), - SubtaskStateMapper.FIRST.getNewToOldSubtasksMapping(3, 2)); + void testFirstTaskMappingOnScaleDown() { + assertThat(SubtaskStateMapper.FIRST.getNewToOldSubtasksMapping(3, 2)) + .isEqualTo(mappings(to(0, 1, 2), to())); } @Test - public void testFirstTaskMappingOnNoScale() { + void testFirstTaskMappingOnNoScale() { // this may be a bit surprising, but the optimization should be done on call-site - assertEquals( - mappings(to(0, 1, 2), to(), to()), - SubtaskStateMapper.FIRST.getNewToOldSubtasksMapping(3, 3)); + assertThat(SubtaskStateMapper.FIRST.getNewToOldSubtasksMapping(3, 3)) + .isEqualTo(mappings(to(0, 1, 2), to(), to())); } @Test - public void testFirstTaskMappingOnScaleUp() { - assertEquals( - mappings(to(0, 1, 2), to(), to(), to()), - SubtaskStateMapper.FIRST.getNewToOldSubtasksMapping(3, 4)); + void testFirstTaskMappingOnScaleUp() { + assertThat(SubtaskStateMapper.FIRST.getNewToOldSubtasksMapping(3, 4)) + .isEqualTo(mappings(to(0, 1, 2), to(), to(), to())); } @Test - public void testFullTaskMappingOnScaleDown() { - assertEquals( - mappings(to(0, 1, 2), to(0, 1, 2)), - SubtaskStateMapper.FULL.getNewToOldSubtasksMapping(3, 2)); + void testFullTaskMappingOnScaleDown() { + assertThat(SubtaskStateMapper.FULL.getNewToOldSubtasksMapping(3, 2)) + .isEqualTo(mappings(to(0, 1, 2), to(0, 1, 2))); } @Test - public void testFullTaskMappingOnNoScale() { + void testFullTaskMappingOnNoScale() { // this may be a bit surprising, but the optimization should be done on call-site - assertEquals( - mappings(to(0, 1, 2), to(0, 1, 2), to(0, 1, 2)), - SubtaskStateMapper.FULL.getNewToOldSubtasksMapping(3, 3)); + assertThat(SubtaskStateMapper.FULL.getNewToOldSubtasksMapping(3, 3)) + .isEqualTo(mappings(to(0, 1, 2), to(0, 1, 2), to(0, 1, 2))); } @Test - public void testFullTaskMappingOnScaleUp() { - assertEquals( - mappings(to(0, 1, 2), to(0, 1, 2), to(0, 1, 2), to(0, 1, 2)), - SubtaskStateMapper.FULL.getNewToOldSubtasksMapping(3, 4)); + void testFullTaskMappingOnScaleUp() { + assertThat(SubtaskStateMapper.FULL.getNewToOldSubtasksMapping(3, 4)) + .isEqualTo(mappings(to(0, 1, 2), to(0, 1, 2), to(0, 1, 2), to(0, 1, 2))); } @Test - public void testRangeSelectorTaskMappingOnScaleDown() { + void testRangeSelectorTaskMappingOnScaleDown() { // 3 partitions: [0; 43) [43; 87) [87; 128) // 2 partitions: [0; 64) [64; 128) - assertEquals( - mappings(to(0, 1), to(1, 2)), - SubtaskStateMapper.RANGE.getNewToOldSubtasksMapping(3, 2)); + assertThat(SubtaskStateMapper.RANGE.getNewToOldSubtasksMapping(3, 2)) + .isEqualTo(mappings(to(0, 1), to(1, 2))); - assertEquals( - mappings(to(0, 1, 2, 3, 4), to(5, 6, 7, 8, 9)), - SubtaskStateMapper.RANGE.getNewToOldSubtasksMapping(10, 2)); + assertThat(SubtaskStateMapper.RANGE.getNewToOldSubtasksMapping(10, 2)) + .isEqualTo(mappings(to(0, 1, 2, 3, 4), to(5, 6, 7, 8, 9))); - assertEquals( - mappings(to(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)), - SubtaskStateMapper.RANGE.getNewToOldSubtasksMapping(10, 1)); + assertThat(SubtaskStateMapper.RANGE.getNewToOldSubtasksMapping(10, 1)) + .isEqualTo(mappings(to(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))); } @Test - public void testRangeSelectorTaskMappingOnNoScale() { - assertEquals( - mappings(to(0), to(1), to(2)), - SubtaskStateMapper.RANGE.getNewToOldSubtasksMapping(3, 3)); + void testRangeSelectorTaskMappingOnNoScale() { + assertThat(SubtaskStateMapper.RANGE.getNewToOldSubtasksMapping(3, 3)) + .isEqualTo(mappings(to(0), to(1), to(2))); } @Test - public void testRangeSelectorTaskMappingOnScaleUp() { - assertEquals( - mappings(to(0), to(0, 1), to(1, 2), to(2)), - SubtaskStateMapper.RANGE.getNewToOldSubtasksMapping(3, 4)); - - assertEquals( - mappings(to(0), to(0), to(0, 1), to(1), to(1, 2), to(2), to(2)), - SubtaskStateMapper.RANGE.getNewToOldSubtasksMapping(3, 7)); - } + void testRangeSelectorTaskMappingOnScaleUp() { + assertThat(SubtaskStateMapper.RANGE.getNewToOldSubtasksMapping(3, 4)) + .isEqualTo(mappings(to(0), to(0, 1), to(1, 2), to(2))); - @Test - public void testRoundRobinTaskMappingOnScaleDown() { - assertEquals( - mappings(to(0, 4, 8), to(1, 5, 9), to(2, 6), to(3, 7)), - SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(10, 4)); - - assertEquals( - mappings(to(0, 4), to(1), to(2), to(3)), - SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(5, 4)); - - assertEquals( - mappings(to(0, 2, 4), to(1, 3)), - SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(5, 2)); - - assertEquals( - mappings(to(0, 1, 2, 3, 4)), - SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(5, 1)); + assertThat(SubtaskStateMapper.RANGE.getNewToOldSubtasksMapping(3, 7)) + .isEqualTo(mappings(to(0), to(0), to(0, 1), to(1), to(1, 2), to(2), to(2))); } @Test - public void testRoundRobinTaskMappingOnNoScale() { - assertEquals( - mappings(to(0), to(1), to(2), to(3), to(4), to(5), to(6), to(7), to(8), to(9)), - SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(10, 10)); + void testRoundRobinTaskMappingOnScaleDown() { + assertThat(SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(10, 4)) + .isEqualTo(mappings(to(0, 4, 8), to(1, 5, 9), to(2, 6), to(3, 7))); + + assertThat(SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(5, 4)) + .isEqualTo(mappings(to(0, 4), to(1), to(2), to(3))); - assertEquals( - mappings(to(0), to(1), to(2), to(3), to(4)), - SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(5, 5)); + assertThat(SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(5, 2)) + .isEqualTo(mappings(to(0, 2, 4), to(1, 3))); - assertEquals( - mappings(to(0)), SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(1, 1)); + assertThat(SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(5, 1)) + .isEqualTo(mappings(to(0, 1, 2, 3, 4))); } @Test - public void testRoundRobinTaskMappingOnScaleUp() { - assertEquals( - mappings(to(0), to(1), to(2), to(3), to(), to(), to(), to(), to(), to()), - SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(4, 10)); - - assertEquals( - mappings(to(0), to(1), to(2), to(3), to()), - SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(4, 5)); - - assertEquals( - mappings(to(0), to(1), to(), to(), to()), - SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(2, 5)); - - assertEquals( - mappings(to(0), to(), to(), to(), to()), - SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(1, 5)); + void testRoundRobinTaskMappingOnNoScale() { + assertThat(SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(10, 10)) + .isEqualTo( + mappings( + to(0), to(1), to(2), to(3), to(4), to(5), to(6), to(7), to(8), + to(9))); + + assertThat(SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(5, 5)) + .isEqualTo(mappings(to(0), to(1), to(2), to(3), to(4))); + + assertThat(SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(1, 1)) + .isEqualTo(mappings(to(0))); + } + + @Test + void testRoundRobinTaskMappingOnScaleUp() { + assertThat(SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(4, 10)) + .isEqualTo( + mappings(to(0), to(1), to(2), to(3), to(), to(), to(), to(), to(), to())); + + assertThat(SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(4, 5)) + .isEqualTo(mappings(to(0), to(1), to(2), to(3), to())); + + assertThat(SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(2, 5)) + .isEqualTo(mappings(to(0), to(1), to(), to(), to())); + + assertThat(SubtaskStateMapper.ROUND_ROBIN.getNewToOldSubtasksMapping(1, 5)) + .isEqualTo(mappings(to(0), to(), to(), to(), to())); } }