This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9725933fc0a09274801d2acb52a6a5256afa10f6 Author: Jing Ge <[email protected]> AuthorDate: Fri Mar 4 14:17:01 2022 +0100 [FLINK-26126][test] migrate KafkaWriterITCase to AssertJ --- .../connector/kafka/sink/KafkaWriterITCase.java | 93 ++++++++++------------ 1 file changed, 40 insertions(+), 53 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java index ee21d04..f972de4 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java @@ -75,14 +75,7 @@ import java.util.stream.IntStream; import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer; import static org.apache.flink.connector.kafka.testutils.KafkaUtil.drainAllRecordsFromTopic; import static org.apache.flink.util.DockerImageVersions.KAFKA; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.sameInstance; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for the standalone KafkaWriter. */ @ExtendWith(TestLoggerExtension.class) @@ -126,7 +119,7 @@ public class KafkaWriterITCase { public void testRegisterMetrics(DeliveryGuarantee guarantee) throws Exception { try (final KafkaWriter<Integer> ignored = createWriterWithConfiguration(getKafkaClientConfiguration(), guarantee)) { - assertTrue(metricListener.getGauge(KAFKA_METRIC_WITH_GROUP_NAME).isPresent()); + assertThat(metricListener.getGauge(KAFKA_METRIC_WITH_GROUP_NAME).isPresent()).isTrue(); } } @@ -150,15 +143,15 @@ public class KafkaWriterITCase { final Counter numBytesSend = metricGroup.getNumBytesSendCounter(); final Counter numRecordsSend = metricGroup.getNumRecordsSendCounter(); final Counter numRecordsWrittenErrors = metricGroup.getNumRecordsOutErrorsCounter(); - assertEquals(numBytesSend.getCount(), 0L); - assertEquals(numRecordsSend.getCount(), 0); - assertEquals(numRecordsWrittenErrors.getCount(), 0); + assertThat(numBytesSend.getCount()).isEqualTo(0L); + assertThat(numRecordsSend.getCount()).isEqualTo(0); + assertThat(numRecordsWrittenErrors.getCount()).isEqualTo(0); writer.write(1, SINK_WRITER_CONTEXT); timeService.trigger(); - assertEquals(numRecordsSend.getCount(), 1); - assertEquals(numRecordsWrittenErrors.getCount(), 0); - assertThat(numBytesSend.getCount(), greaterThan(0L)); + assertThat(numRecordsSend.getCount()).isEqualTo(1); + assertThat(numRecordsWrittenErrors.getCount()).isEqualTo(0); + assertThat(numBytesSend.getCount()).isGreaterThan(0L); } } @@ -173,8 +166,8 @@ public class KafkaWriterITCase { metricGroup)) { final Optional<Gauge<Long>> currentSendTime = metricListener.getGauge("currentSendTime"); - assertTrue(currentSendTime.isPresent()); - assertEquals(currentSendTime.get().getValue(), 0L); + assertThat(currentSendTime.isPresent()).isTrue(); + assertThat(currentSendTime.get().getValue()).isEqualTo(0L); IntStream.range(0, 100) .forEach( (run) -> { @@ -188,7 +181,7 @@ public class KafkaWriterITCase { throw new RuntimeException("Failed writing Kafka record."); } }); - assertThat(currentSendTime.get().getValue(), greaterThan(0L)); + assertThat(currentSendTime.get().getValue()).isGreaterThan(0L); } } @@ -202,12 +195,10 @@ public class KafkaWriterITCase { createWriterWithConfiguration( properties, DeliveryGuarantee.EXACTLY_ONCE, metricGroup)) { final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter(); - org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount()) - .isEqualTo(0L); + assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); writer.write(1, SINK_WRITER_CONTEXT); - org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount()) - .isEqualTo(0L); + assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); final String transactionalId = writer.getCurrentProducer().getTransactionalId(); @@ -223,8 +214,7 @@ public class KafkaWriterITCase { writer.write(3, SINK_WRITER_CONTEXT); writer.flush(false); writer.prepareCommit(); - org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount()) - .isEqualTo(1L); + assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); } } @@ -243,7 +233,7 @@ public class KafkaWriterITCase { expected.add("testMetadataPublisher-0@" + i); } writer.prepareCommit(); - org.assertj.core.api.Assertions.assertThat(metadataList).isEqualTo(expected); + assertThat(metadataList).usingRecursiveComparison().isEqualTo(expected); } } @@ -270,15 +260,15 @@ public class KafkaWriterITCase { recoveredWriter.flush(false); Collection<KafkaCommittable> committables = recoveredWriter.prepareCommit(); recoveredWriter.snapshotState(1); - assertThat(committables, hasSize(1)); + assertThat(committables).hasSize(1); final KafkaCommittable committable = committables.stream().findFirst().get(); - assertThat(committable.getProducer().isPresent(), equalTo(true)); + assertThat(committable.getProducer().isPresent()).isTrue(); committable.getProducer().get().getObject().commitTransaction(); List<ConsumerRecord<byte[], byte[]>> records = drainAllRecordsFromTopic(topic, getKafkaClientConfiguration(), true); - assertThat(records, hasSize(1)); + assertThat(records).hasSize(1); } failedWriter.close(); @@ -293,19 +283,18 @@ public class KafkaWriterITCase { void useSameProducerForNonTransactional(DeliveryGuarantee guarantee) throws Exception { try (final KafkaWriter<Integer> writer = createWriterWithConfiguration(getKafkaClientConfiguration(), guarantee)) { - assertThat(writer.getProducerPool(), hasSize(0)); + assertThat(writer.getProducerPool()).hasSize(0); FlinkKafkaInternalProducer<byte[], byte[]> firstProducer = writer.getCurrentProducer(); writer.flush(false); Collection<KafkaCommittable> committables = writer.prepareCommit(); writer.snapshotState(0); - assertThat(committables, hasSize(0)); + assertThat(committables).hasSize(0); - assertThat( - "Expected same producer", - writer.getCurrentProducer(), - sameInstance(firstProducer)); - assertThat(writer.getProducerPool(), hasSize(0)); + assertThat(writer.getCurrentProducer() == firstProducer) + .as("Expected same producer") + .isTrue(); + assertThat(writer.getProducerPool()).hasSize(0); } } @@ -315,39 +304,37 @@ public class KafkaWriterITCase { try (final KafkaWriter<Integer> writer = createWriterWithConfiguration( getKafkaClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE)) { - assertThat(writer.getProducerPool(), hasSize(0)); + assertThat(writer.getProducerPool()).hasSize(0); writer.flush(false); Collection<KafkaCommittable> committables0 = writer.prepareCommit(); writer.snapshotState(1); - assertThat(committables0, hasSize(1)); + assertThat(committables0).hasSize(1); final KafkaCommittable committable = committables0.stream().findFirst().get(); - assertThat(committable.getProducer().isPresent(), equalTo(true)); + assertThat(committable.getProducer().isPresent()).isTrue(); FlinkKafkaInternalProducer<?, ?> firstProducer = committable.getProducer().get().getObject(); - assertThat( - "Expected different producer", - firstProducer, - not(sameInstance(writer.getCurrentProducer()))); + assertThat(firstProducer != writer.getCurrentProducer()) + .as("Expected different producer") + .isTrue(); // recycle first producer, KafkaCommitter would commit it and then return it - assertThat(writer.getProducerPool(), hasSize(0)); + assertThat(writer.getProducerPool()).hasSize(0); firstProducer.commitTransaction(); committable.getProducer().get().close(); - assertThat(writer.getProducerPool(), hasSize(1)); + assertThat(writer.getProducerPool()).hasSize(1); writer.flush(false); Collection<KafkaCommittable> committables1 = writer.prepareCommit(); writer.snapshotState(2); - assertThat(committables1, hasSize(1)); + assertThat(committables1).hasSize(1); final KafkaCommittable committable1 = committables1.stream().findFirst().get(); - assertThat(committable1.getProducer().isPresent(), equalTo(true)); + assertThat(committable1.getProducer().isPresent()).isTrue(); - assertThat( - "Expected recycled producer", - firstProducer, - sameInstance(writer.getCurrentProducer())); + assertThat(firstProducer == writer.getCurrentProducer()) + .as("Expected recycled producer") + .isTrue(); } } @@ -361,7 +348,7 @@ public class KafkaWriterITCase { try (final KafkaWriter<Integer> writer = createWriterWithConfiguration(properties, DeliveryGuarantee.EXACTLY_ONCE)) { writer.write(1, SINK_WRITER_CONTEXT); - assertThat(drainAllRecordsFromTopic(topic, properties, true), hasSize(0)); + assertThat(drainAllRecordsFromTopic(topic, properties, true)).hasSize(0); } try (final KafkaWriter<Integer> writer = @@ -372,7 +359,7 @@ public class KafkaWriterITCase { writer.snapshotState(1L); // manually commit here, which would only succeed if the first transaction was aborted - assertThat(committables, hasSize(1)); + assertThat(committables).hasSize(1); final KafkaCommittable committable = committables.stream().findFirst().get(); String transactionalId = committable.getTransactionalId(); try (FlinkKafkaInternalProducer<byte[], byte[]> producer = @@ -381,7 +368,7 @@ public class KafkaWriterITCase { producer.commitTransaction(); } - assertThat(drainAllRecordsFromTopic(topic, properties, true), hasSize(1)); + assertThat(drainAllRecordsFromTopic(topic, properties, true)).hasSize(1); } }
