This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 71ee033865a3adc3247c47d8636b16e99b924470 Author: Jing Ge <[email protected]> AuthorDate: Tue Mar 15 18:38:56 2022 +0100 [FLINK-26420][test] migrate AsyncSinkWriterTest to AssertJ. (cherry picked from commit a2df2665b6ff411a2aeb9b204fd9d46a2af0ecfa) --- .../base/sink/writer/AsyncSinkWriterTest.java | 224 ++++++++++++--------- 1 file changed, 130 insertions(+), 94 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java index c6e95c7..47c437e 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java @@ -21,8 +21,8 @@ import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.ArrayList; @@ -41,11 +41,8 @@ import java.util.stream.Collectors; import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; /** * Unit Tests the functionality of AsyncSinkWriter without any assumptions of what a concrete @@ -57,7 +54,7 @@ public class AsyncSinkWriterTest { private TestSinkInitContext sinkInitContext; private TestSinkInitContextAnyThreadMailbox sinkInitContextAnyThreadMailbox; - @Before + @BeforeEach public void before() { res.clear(); sinkInitContext = new TestSinkInitContext(); @@ -77,17 +74,20 @@ public class AsyncSinkWriterTest { public void testNumberOfRecordsIsAMultipleOfBatchSizeResultsInThatNumberOfRecordsBeingWritten() throws IOException, InterruptedException { performNormalWriteOfEightyRecordsToMock(); - assertEquals(80, res.size()); + + assertThat(res.size()).isEqualTo(80); } @Test public void testMetricsGroupHasLoggedNumberOfRecordsAndNumberOfBytesCorrectly() throws IOException, InterruptedException { performNormalWriteOfEightyRecordsToMock(); - assertEquals(80, sinkInitContext.getNumRecordsOutCounter().getCount()); - assertEquals(320, sinkInitContext.getNumBytesOutCounter().getCount()); - assertTrue(sinkInitContext.getCurrentSendTimeGauge().get().getValue() >= 0); - assertTrue(sinkInitContext.getCurrentSendTimeGauge().get().getValue() < 1000); + + assertThat(sinkInitContext.getNumRecordsOutCounter().getCount()).isEqualTo(80); + assertThat(sinkInitContext.getNumBytesOutCounter().getCount()).isEqualTo(320); + assertThat(sinkInitContext.getCurrentSendTimeGauge().get().getValue()) + .isGreaterThanOrEqualTo(0); + assertThat(sinkInitContext.getCurrentSendTimeGauge().get().getValue()).isLessThan(1000); } @Test @@ -101,8 +101,10 @@ public class AsyncSinkWriterTest { for (int i = 0; i < 4; i++) { sink.write(String.valueOf(i)); } - assertTrue(sinkInitContext.getCurrentSendTimeGauge().get().getValue() >= 99); - assertTrue(sinkInitContext.getCurrentSendTimeGauge().get().getValue() < 110); + + assertThat(sinkInitContext.getCurrentSendTimeGauge().get().getValue()) + .isGreaterThanOrEqualTo(99); + assertThat(sinkInitContext.getCurrentSendTimeGauge().get().getValue()).isLessThan(110); } @Test @@ -113,7 +115,8 @@ public class AsyncSinkWriterTest { for (int i = 0; i < 23; i++) { sink.write(String.valueOf(i)); } - assertEquals(20, res.size()); + + assertThat(res.size()).isEqualTo(20); assertThatBufferStatesAreEqual(sink.wrapRequests(20, 21, 22), getWriterState(sink)); } @@ -129,7 +132,8 @@ public class AsyncSinkWriterTest { sink.write("1"); // 4 bytes per record sink.write("2"); // to give 12 bytes in final flush sink.write("3"); - assertEquals(3, res.size()); + + assertThat(res.size()).isEqualTo(3); } @Test @@ -141,7 +145,8 @@ public class AsyncSinkWriterTest { sink.write(String.valueOf(i)); } sink.flush(true); - assertEquals(23, res.size()); + + assertThat(res.size()).isEqualTo(23); } @Test @@ -151,7 +156,8 @@ public class AsyncSinkWriterTest { new AsyncSinkWriterImplBuilder().context(sinkInitContext).build(); sink.write(String.valueOf(0)); sink.flush(true); - assertEquals(1, res.size()); + + assertThat(res.size()).isEqualTo(1); } @Test @@ -162,12 +168,14 @@ public class AsyncSinkWriterTest { sink.write("25"); sink.write("55"); + assertThatBufferStatesAreEqual(sink.wrapRequests(25, 55), getWriterState(sink)); - assertEquals(0, res.size()); + assertThat(res.size()).isEqualTo(0); sink.write("75"); + assertThatBufferStatesAreEqual(BufferedRequestState.emptyState(), getWriterState(sink)); - assertEquals(3, res.size()); + assertThat(res.size()).isEqualTo(3); } public void writeFiveRecordsWithOneFailingThenCallPrepareCommitWithFlushing() @@ -180,6 +188,7 @@ public class AsyncSinkWriterTest { sink.write("75"); sink.write("95"); sink.write("955"); + assertThatBufferStatesAreEqual(sink.wrapRequests(95, 955), getWriterState(sink)); sink.flush(true); assertThatBufferStatesAreEqual(BufferedRequestState.emptyState(), getWriterState(sink)); @@ -189,15 +198,17 @@ public class AsyncSinkWriterTest { public void testThatSnapshotsAreTakenOfBufferCorrectlyBeforeAndAfterManualFlush() throws IOException, InterruptedException { writeFiveRecordsWithOneFailingThenCallPrepareCommitWithFlushing(); - assertEquals(5, res.size()); + + assertThat(res.size()).isEqualTo(5); } @Test public void metricsAreLoggedEachTimeSubmitRequestEntriesIsCalled() throws IOException, InterruptedException { writeFiveRecordsWithOneFailingThenCallPrepareCommitWithFlushing(); - assertEquals(5, sinkInitContext.getNumRecordsOutCounter().getCount()); - assertEquals(20, sinkInitContext.getNumBytesOutCounter().getCount()); + + assertThat(sinkInitContext.getNumRecordsOutCounter().getCount()).isEqualTo(5); + assertThat(sinkInitContext.getNumBytesOutCounter().getCount()).isEqualTo(20); } @Test @@ -215,11 +226,11 @@ public class AsyncSinkWriterTest { sink.write("75"); sink.write("95"); sink.write("35"); - Exception e = assertThrows(RuntimeException.class, () -> sink.write("135")); - assertEquals( - "Deliberate runtime exception occurred in SinkWriterImplementation.", - e.getMessage()); - assertEquals(3, res.size()); + + assertThatThrownBy(() -> sink.write("135")) + .isInstanceOf(RuntimeException.class) + .hasMessage("Deliberate runtime exception occurred in SinkWriterImplementation."); + assertThat(res.size()).isEqualTo(3); } @Test @@ -292,8 +303,8 @@ public class AsyncSinkWriterTest { sink.flush(true); // Everything is saved - assertEquals(Arrays.asList(25, 55, 965, 75, 95, 955, 550, 45, 35, 535), res); - assertEquals(0, getWriterState(sink).getStateSize()); + assertThat(res).isEqualTo(Arrays.asList(25, 55, 965, 75, 95, 955, 550, 45, 35, 535)); + assertThat(getWriterState(sink).getStateSize()).isEqualTo(0); } @Test @@ -342,40 +353,39 @@ public class AsyncSinkWriterTest { // Buffer continues to fill up without blocking on write, until eventually yield is called // on the mailbox thread during the prepare commit sink.flush(true); - assertEquals(Arrays.asList(25, 55, 965, 75, 95, 955, 550, 645, 545, 535, 515, 505), res); + + assertThat(res) + .isEqualTo(Arrays.asList(25, 55, 965, 75, 95, 955, 550, 645, 545, 535, 515, 505)); } @Test public void testThatMaxBufferSizeOfSinkShouldBeStrictlyGreaterThanMaxSizeOfEachBatch() { - Exception e = - assertThrows( - IllegalArgumentException.class, + assertThatThrownBy( () -> new AsyncSinkWriterImplBuilder() .context(sinkInitContext) .maxBufferedRequests(10) - .build()); - assertEquals( - "The maximum number of requests that may be buffered should be " - + "strictly greater than the maximum number of requests per batch.", - e.getMessage()); + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "The maximum number of requests that may be buffered should be " + + "strictly greater than the maximum number of requests per batch."); } @Test public void maxRecordSizeSetMustBeSmallerThanOrEqualToMaxBatchSize() { - Exception e = - assertThrows( - IllegalArgumentException.class, + assertThatThrownBy( () -> new AsyncSinkWriterImplBuilder() .context(sinkInitContext) .maxBufferedRequests(11) .maxBatchSizeInBytes(10_000) .maxRecordSizeInBytes(10_001) - .build()); - assertEquals( - "The maximum allowed size in bytes per flush must be greater than or equal to the maximum allowed size in bytes of a single record.", - e.getMessage()); + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "The maximum allowed size in bytes per flush must be greater than or equal to" + + " the maximum allowed size in bytes of a single record."); } @Test @@ -388,17 +398,20 @@ public class AsyncSinkWriterTest { .maxBatchSizeInBytes(10_000) .maxRecordSizeInBytes(3) .build(); - Exception e = assertThrows(IllegalArgumentException.class, () -> sink.write("3")); - assertEquals( - "The request entry sent to the buffer was of size [4], when the maxRecordSizeInBytes was set to [3].", - e.getMessage()); + + assertThatThrownBy(() -> sink.write("3")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "The request entry sent to the buffer was of size [4], when " + + "the maxRecordSizeInBytes was set to [3]."); } private void writeXToSinkAssertDestinationIsInStateYAndBufferHasZ( AsyncSinkWriterImpl sink, String x, List<Integer> y, List<Integer> z) throws IOException, InterruptedException { sink.write(x); - assertEquals(y, res); + + assertThat(res).isEqualTo(y); assertThatBufferStatesAreEqual(sink.wrapRequests(z), getWriterState(sink)); } @@ -418,7 +431,8 @@ public class AsyncSinkWriterTest { */ for (int i = 0; i < 100; i++) { sink.write(String.valueOf(i)); - assertEquals((i / 7) * 7, res.size()); + + assertThat(res.size()).isEqualTo((i / 7) * 7); } } @@ -430,7 +444,8 @@ public class AsyncSinkWriterTest { sink.write(String.valueOf(1)); sink.write(String.valueOf(2)); sink.flush(false); - assertEquals(0, res.size()); + + assertThat(res.size()).isEqualTo(0); } @Test @@ -447,11 +462,14 @@ public class AsyncSinkWriterTest { for (int i = 0; i < 7; i++) { sink.write(String.valueOf(i)); } - assertEquals(7, res.size()); + + assertThat(res.size()).isEqualTo(7); + for (int i = 7; i < 14; i++) { sink.write(String.valueOf(i)); } - assertEquals(14, res.size()); + + assertThat(res.size()).isEqualTo(14); } @Test @@ -468,13 +486,17 @@ public class AsyncSinkWriterTest { sink.write(String.valueOf(1)); // Buffer: 104/110B; 2/10 elements; 0 inflight sink.write(String.valueOf(2)); // Buffer: 108/110B; 3/10 elements; 0 inflight sink.write(String.valueOf(3)); // Buffer: 112/110B; 4/10 elements; 0 inflight -- flushing - assertEquals(2, res.size()); // Request was [225, 1, 2], element 225 failed on first attempt + + assertThat(res.size()) + .isEqualTo(2); // Request was [225, 1, 2], element 225 failed on first attempt + sink.write(String.valueOf(4)); // Buffer: 8/110B; 2/10 elements; 1 inflight sink.write(String.valueOf(5)); // Buffer: 12/110B; 3/10 elements; 1 inflight sink.write(String.valueOf(6)); // Buffer: 16/110B; 4/10 elements; 1 inflight sink.write(String.valueOf(325)); // Buffer: 116/110B; 5/10 elements; 1 inflight -- flushing + // inflight request is processed, buffer: [225, 3, 4, 5, 6, 325] - assertEquals(Arrays.asList(1, 2, 225, 3, 4), res); + assertThat(res).isEqualTo(Arrays.asList(1, 2, 225, 3, 4)); // Buffer: [5, 6, 325]; 0 inflight } @@ -494,14 +516,18 @@ public class AsyncSinkWriterTest { sink.write(String.valueOf(1)); // Buffer: 204/210B; 3/10 elements; 0 inflight sink.write(String.valueOf(2)); // Buffer: 208/210B; 4/10 elements; 0 inflight sink.write(String.valueOf(3)); // Buffer: 212/210B; 5/10 elements; 0 inflight -- flushing - assertEquals(2, res.size()); // Request was [228, 225, 1, 2], element 228, 225 failed + + assertThat(res.size()) + .isEqualTo(2); // Request was [228, 225, 1, 2], element 228, 225 failed + sink.write(String.valueOf(4)); // Buffer: 8/210B; 2/10 elements; 2 inflight sink.write(String.valueOf(5)); // Buffer: 12/210B; 3/10 elements; 2 inflight sink.write(String.valueOf(6)); // Buffer: 16/210B; 4/10 elements; 2 inflight sink.write(String.valueOf(328)); // Buffer: 116/210B; 5/10 elements; 2 inflight sink.write(String.valueOf(325)); // Buffer: 216/210B; 6/10 elements; 2 inflight -- flushing + // inflight request is processed, buffer: [228, 225, 3, 4, 5, 6, 328, 325] - assertEquals(Arrays.asList(1, 2, 228, 225, 3, 4), res); + assertThat(res).isEqualTo(Arrays.asList(1, 2, 228, 225, 3, 4)); // Buffer: [5, 6, 328, 325]; 0 inflight } @@ -526,9 +552,12 @@ public class AsyncSinkWriterTest { } tpts.setCurrentTime(99L); - assertEquals(0, res.size()); + + assertThat(res.size()).isEqualTo(0); + tpts.setCurrentTime(100L); - assertEquals(8, res.size()); + + assertThat(res.size()).isEqualTo(8); } @Test @@ -551,9 +580,9 @@ public class AsyncSinkWriterTest { sink.write(String.valueOf(i)); } tpts.setCurrentTime(99L); - assertEquals(90, res.size()); + assertThat(res.size()).isEqualTo(90); tpts.setCurrentTime(100L); - assertEquals(98, res.size()); + assertThat(res.size()).isEqualTo(98); } @Test @@ -571,15 +600,17 @@ public class AsyncSinkWriterTest { sink.write(String.valueOf(1)); // buffer: [225, 0, 1] sink.write(String.valueOf(2)); // buffer: [2], inflight: [225], destination: [0, 1] - assertEquals(Arrays.asList(0, 1), res); + assertThat(res).isEqualTo(Arrays.asList(0, 1)); assertThatBufferStatesAreEqual(sink.wrapRequests(2), getWriterState(sink)); sink.flush(false); // buffer: [225, 2], inflight: [], destination: [0, 1] - assertEquals(Arrays.asList(0, 1), res); + + assertThat(res).isEqualTo(Arrays.asList(0, 1)); assertThatBufferStatesAreEqual(sink.wrapRequests(225, 2), getWriterState(sink)); sink.flush(true); // buffer: [], inflight: [], destination: [0, 1, 225, 2] - assertEquals(Arrays.asList(0, 1, 225, 2), res); + + assertThat(res).isEqualTo(Arrays.asList(0, 1, 225, 2)); } @Test @@ -596,11 +627,14 @@ public class AsyncSinkWriterTest { sink.write(String.valueOf(1)); // Buffer: 104/110B; 3/10 elements; 0 inflight sink.write(String.valueOf(2)); // Buffer: 108/110B; 4/10 elements; 0 inflight sink.write(String.valueOf(3)); // Buffer: 112/110B; 5/10 elements; 0 inflight -- flushing - assertEquals(2, res.size()); // Request was [225, 1, 2], element 225 failed + + assertThat(res.size()).isEqualTo(2); // Request was [225, 1, 2], element 225 failed // buffer should be [3] with [225] inflight sink.flush(false); // Buffer: [225,3] - > 8/110; 2/10 elements; 0 inflight - assertEquals(2, res.size()); // + + assertThat(res.size()).isEqualTo(2); // + List<BufferedRequestState<Integer>> states = sink.snapshotState(1); AsyncSinkWriterImpl newSink = new AsyncSinkWriterImplBuilder() @@ -610,7 +644,8 @@ public class AsyncSinkWriterTest { newSink.write(String.valueOf(4)); // Buffer: 12/15B; 3/10 elements; 0 inflight newSink.write(String.valueOf(5)); // Buffer: 16/15B; 4/10 elements; 0 inflight --flushing - assertEquals(Arrays.asList(1, 2, 225, 3, 4), res); + + assertThat(res).isEqualTo(Arrays.asList(1, 2, 225, 3, 4)); // Buffer: [5]; 0 inflight } @@ -687,20 +722,20 @@ public class AsyncSinkWriterTest { TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService(); tpts.setCurrentTime(0L); sink.write("1"); // A timer is registered here to elapse at t=100 - assertEquals(0, res.size()); + assertThat(res.size()).isEqualTo(0); tpts.setCurrentTime(10L); sink.flush(true); - assertEquals(1, res.size()); + assertThat(res.size()).isEqualTo(1); tpts.setCurrentTime(20L); // At t=20, we write a new element that should not trigger another sink.write("2"); // timer to be registered. If it is, it should elapse at t=120s. - assertEquals(1, res.size()); + assertThat(res.size()).isEqualTo(1); tpts.setCurrentTime(100L); - assertEquals(2, res.size()); + assertThat(res.size()).isEqualTo(2); sink.write("3"); tpts.setCurrentTime(199L); // At t=199s, our third element has not been written - assertEquals(2, res.size()); // therefore, no timer fired at 120s. + assertThat(res.size()).isEqualTo(2); // therefore, no timer fired at 120s. tpts.setCurrentTime(200L); - assertEquals(3, res.size()); + assertThat(res.size()).isEqualTo(3); } @Test @@ -721,13 +756,13 @@ public class AsyncSinkWriterTest { sink.write("2"); sink.write("225"); tpts.setCurrentTime(100L); - assertEquals(2, res.size()); + assertThat(res.size()).isEqualTo(2); sink.write("3"); sink.write("4"); tpts.setCurrentTime(199L); - assertEquals(2, res.size()); + assertThat(res.size()).isEqualTo(2); tpts.setCurrentTime(200L); - assertEquals(5, res.size()); + assertThat(res.size()).isEqualTo(5); } @Test @@ -748,7 +783,7 @@ public class AsyncSinkWriterTest { sink.write("1"); tpts.setCurrentTime(50L); sink.flush(true); - assertEquals(1, res.size()); + assertThat(res.size()).isEqualTo(1); tpts.setCurrentTime(200L); } @@ -769,10 +804,10 @@ public class AsyncSinkWriterTest { tpts.setCurrentTime(0L); sink.write("1"); tpts.setCurrentTime(100L); - assertEquals(1, res.size()); + assertThat(res.size()).isEqualTo(1); sink.write("2"); tpts.setCurrentTime(200L); - assertEquals(2, res.size()); + assertThat(res.size()).isEqualTo(2); } /** @@ -806,7 +841,7 @@ public class AsyncSinkWriterTest { true); writeTwoElementsAndInterleaveTheNextTwoElements(sink, blockedWriteLatch, delayedStartLatch); - assertEquals(Arrays.asList(1, 2, 3, 4), res); + assertThat(res).isEqualTo(Arrays.asList(1, 2, 3, 4)); } /** @@ -837,7 +872,7 @@ public class AsyncSinkWriterTest { false); writeTwoElementsAndInterleaveTheNextTwoElements(sink, blockedWriteLatch, delayedStartLatch); - assertEquals(Arrays.asList(4, 1, 2, 3), res); + assertThat(res).isEqualTo(Arrays.asList(4, 1, 2, 3)); } private void writeTwoElementsAndInterleaveTheNextTwoElements( @@ -866,9 +901,9 @@ public class AsyncSinkWriterTest { tpts.setCurrentTime(100L); blockedWriteLatch.countDown(); es.shutdown(); - assertTrue( - es.awaitTermination(500, TimeUnit.MILLISECONDS), - "Executor Service stuck at termination, not terminated after 500ms!"); + assertThat(es.awaitTermination(500, TimeUnit.MILLISECONDS)) + .as("Executor Service stuck at termination, not terminated after 500ms!") + .isTrue(); } /** @@ -924,19 +959,19 @@ public class AsyncSinkWriterTest { } }); Thread.sleep(300); - assertFalse(s.isInterrupted()); + assertThat(s.isInterrupted()).isFalse(); s.interrupt(); blockedWriteLatch.countDown(); t.join(); - assertEquals(Arrays.asList(1, 2, 3), res); + assertThat(res).isEqualTo(Arrays.asList(1, 2, 3)); } private BufferedRequestState<Integer> getWriterState( AsyncSinkWriter<String, Integer> sinkWriter) { List<BufferedRequestState<Integer>> states = sinkWriter.snapshotState(1); - assertEquals(states.size(), 1); + assertThat(states.size()).isEqualTo(1); return states.get(0); } @@ -1205,10 +1240,11 @@ public class AsyncSinkWriterTest { try { delayedStartLatch.countDown(); if (blockForLimitedTime) { - assertFalse( - blockedThreadLatch.await(500, TimeUnit.MILLISECONDS), - "The countdown latch was released before the full amount" - + "of time was reached."); + assertThat(blockedThreadLatch.await(500, TimeUnit.MILLISECONDS)) + .as( + "The countdown latch was released before the full amount" + + "of time was reached.") + .isFalse(); } else { blockedThreadLatch.await(); }
