This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9725933fc0a09274801d2acb52a6a5256afa10f6
Author: Jing Ge <[email protected]>
AuthorDate: Fri Mar 4 14:17:01 2022 +0100

    [FLINK-26126][test] migrate KafkaWriterITCase to AssertJ
---
 .../connector/kafka/sink/KafkaWriterITCase.java    | 93 ++++++++++------------
 1 file changed, 40 insertions(+), 53 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index ee21d04..f972de4 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -75,14 +75,7 @@ import java.util.stream.IntStream;
 import static 
org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
 import static 
org.apache.flink.connector.kafka.testutils.KafkaUtil.drainAllRecordsFromTopic;
 import static org.apache.flink.util.DockerImageVersions.KAFKA;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.sameInstance;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the standalone KafkaWriter. */
 @ExtendWith(TestLoggerExtension.class)
@@ -126,7 +119,7 @@ public class KafkaWriterITCase {
     public void testRegisterMetrics(DeliveryGuarantee guarantee) throws 
Exception {
         try (final KafkaWriter<Integer> ignored =
                 createWriterWithConfiguration(getKafkaClientConfiguration(), 
guarantee)) {
-            
assertTrue(metricListener.getGauge(KAFKA_METRIC_WITH_GROUP_NAME).isPresent());
+            
assertThat(metricListener.getGauge(KAFKA_METRIC_WITH_GROUP_NAME).isPresent()).isTrue();
         }
     }
 
@@ -150,15 +143,15 @@ public class KafkaWriterITCase {
             final Counter numBytesSend = metricGroup.getNumBytesSendCounter();
             final Counter numRecordsSend = 
metricGroup.getNumRecordsSendCounter();
             final Counter numRecordsWrittenErrors = 
metricGroup.getNumRecordsOutErrorsCounter();
-            assertEquals(numBytesSend.getCount(), 0L);
-            assertEquals(numRecordsSend.getCount(), 0);
-            assertEquals(numRecordsWrittenErrors.getCount(), 0);
+            assertThat(numBytesSend.getCount()).isEqualTo(0L);
+            assertThat(numRecordsSend.getCount()).isEqualTo(0);
+            assertThat(numRecordsWrittenErrors.getCount()).isEqualTo(0);
 
             writer.write(1, SINK_WRITER_CONTEXT);
             timeService.trigger();
-            assertEquals(numRecordsSend.getCount(), 1);
-            assertEquals(numRecordsWrittenErrors.getCount(), 0);
-            assertThat(numBytesSend.getCount(), greaterThan(0L));
+            assertThat(numRecordsSend.getCount()).isEqualTo(1);
+            assertThat(numRecordsWrittenErrors.getCount()).isEqualTo(0);
+            assertThat(numBytesSend.getCount()).isGreaterThan(0L);
         }
     }
 
@@ -173,8 +166,8 @@ public class KafkaWriterITCase {
                         metricGroup)) {
             final Optional<Gauge<Long>> currentSendTime =
                     metricListener.getGauge("currentSendTime");
-            assertTrue(currentSendTime.isPresent());
-            assertEquals(currentSendTime.get().getValue(), 0L);
+            assertThat(currentSendTime.isPresent()).isTrue();
+            assertThat(currentSendTime.get().getValue()).isEqualTo(0L);
             IntStream.range(0, 100)
                     .forEach(
                             (run) -> {
@@ -188,7 +181,7 @@ public class KafkaWriterITCase {
                                     throw new RuntimeException("Failed writing 
Kafka record.");
                                 }
                             });
-            assertThat(currentSendTime.get().getValue(), greaterThan(0L));
+            assertThat(currentSendTime.get().getValue()).isGreaterThan(0L);
         }
     }
 
@@ -202,12 +195,10 @@ public class KafkaWriterITCase {
                 createWriterWithConfiguration(
                         properties, DeliveryGuarantee.EXACTLY_ONCE, 
metricGroup)) {
             final Counter numRecordsOutErrors = 
metricGroup.getNumRecordsOutErrorsCounter();
-            
org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount())
-                    .isEqualTo(0L);
+            assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
 
             writer.write(1, SINK_WRITER_CONTEXT);
-            
org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount())
-                    .isEqualTo(0L);
+            assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
 
             final String transactionalId = 
writer.getCurrentProducer().getTransactionalId();
 
@@ -223,8 +214,7 @@ public class KafkaWriterITCase {
             writer.write(3, SINK_WRITER_CONTEXT);
             writer.flush(false);
             writer.prepareCommit();
-            
org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount())
-                    .isEqualTo(1L);
+            assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);
         }
     }
 
@@ -243,7 +233,7 @@ public class KafkaWriterITCase {
                 expected.add("testMetadataPublisher-0@" + i);
             }
             writer.prepareCommit();
-            
org.assertj.core.api.Assertions.assertThat(metadataList).isEqualTo(expected);
+            
assertThat(metadataList).usingRecursiveComparison().isEqualTo(expected);
         }
     }
 
@@ -270,15 +260,15 @@ public class KafkaWriterITCase {
             recoveredWriter.flush(false);
             Collection<KafkaCommittable> committables = 
recoveredWriter.prepareCommit();
             recoveredWriter.snapshotState(1);
-            assertThat(committables, hasSize(1));
+            assertThat(committables).hasSize(1);
             final KafkaCommittable committable = 
committables.stream().findFirst().get();
-            assertThat(committable.getProducer().isPresent(), equalTo(true));
+            assertThat(committable.getProducer().isPresent()).isTrue();
 
             committable.getProducer().get().getObject().commitTransaction();
 
             List<ConsumerRecord<byte[], byte[]>> records =
                     drainAllRecordsFromTopic(topic, 
getKafkaClientConfiguration(), true);
-            assertThat(records, hasSize(1));
+            assertThat(records).hasSize(1);
         }
 
         failedWriter.close();
@@ -293,19 +283,18 @@ public class KafkaWriterITCase {
     void useSameProducerForNonTransactional(DeliveryGuarantee guarantee) 
throws Exception {
         try (final KafkaWriter<Integer> writer =
                 createWriterWithConfiguration(getKafkaClientConfiguration(), 
guarantee)) {
-            assertThat(writer.getProducerPool(), hasSize(0));
+            assertThat(writer.getProducerPool()).hasSize(0);
 
             FlinkKafkaInternalProducer<byte[], byte[]> firstProducer = 
writer.getCurrentProducer();
             writer.flush(false);
             Collection<KafkaCommittable> committables = writer.prepareCommit();
             writer.snapshotState(0);
-            assertThat(committables, hasSize(0));
+            assertThat(committables).hasSize(0);
 
-            assertThat(
-                    "Expected same producer",
-                    writer.getCurrentProducer(),
-                    sameInstance(firstProducer));
-            assertThat(writer.getProducerPool(), hasSize(0));
+            assertThat(writer.getCurrentProducer() == firstProducer)
+                    .as("Expected same producer")
+                    .isTrue();
+            assertThat(writer.getProducerPool()).hasSize(0);
         }
     }
 
@@ -315,39 +304,37 @@ public class KafkaWriterITCase {
         try (final KafkaWriter<Integer> writer =
                 createWriterWithConfiguration(
                         getKafkaClientConfiguration(), 
DeliveryGuarantee.EXACTLY_ONCE)) {
-            assertThat(writer.getProducerPool(), hasSize(0));
+            assertThat(writer.getProducerPool()).hasSize(0);
 
             writer.flush(false);
             Collection<KafkaCommittable> committables0 = 
writer.prepareCommit();
             writer.snapshotState(1);
-            assertThat(committables0, hasSize(1));
+            assertThat(committables0).hasSize(1);
             final KafkaCommittable committable = 
committables0.stream().findFirst().get();
-            assertThat(committable.getProducer().isPresent(), equalTo(true));
+            assertThat(committable.getProducer().isPresent()).isTrue();
 
             FlinkKafkaInternalProducer<?, ?> firstProducer =
                     committable.getProducer().get().getObject();
-            assertThat(
-                    "Expected different producer",
-                    firstProducer,
-                    not(sameInstance(writer.getCurrentProducer())));
+            assertThat(firstProducer != writer.getCurrentProducer())
+                    .as("Expected different producer")
+                    .isTrue();
 
             // recycle first producer, KafkaCommitter would commit it and then 
return it
-            assertThat(writer.getProducerPool(), hasSize(0));
+            assertThat(writer.getProducerPool()).hasSize(0);
             firstProducer.commitTransaction();
             committable.getProducer().get().close();
-            assertThat(writer.getProducerPool(), hasSize(1));
+            assertThat(writer.getProducerPool()).hasSize(1);
 
             writer.flush(false);
             Collection<KafkaCommittable> committables1 = 
writer.prepareCommit();
             writer.snapshotState(2);
-            assertThat(committables1, hasSize(1));
+            assertThat(committables1).hasSize(1);
             final KafkaCommittable committable1 = 
committables1.stream().findFirst().get();
-            assertThat(committable1.getProducer().isPresent(), equalTo(true));
+            assertThat(committable1.getProducer().isPresent()).isTrue();
 
-            assertThat(
-                    "Expected recycled producer",
-                    firstProducer,
-                    sameInstance(writer.getCurrentProducer()));
+            assertThat(firstProducer == writer.getCurrentProducer())
+                    .as("Expected recycled producer")
+                    .isTrue();
         }
     }
 
@@ -361,7 +348,7 @@ public class KafkaWriterITCase {
         try (final KafkaWriter<Integer> writer =
                 createWriterWithConfiguration(properties, 
DeliveryGuarantee.EXACTLY_ONCE)) {
             writer.write(1, SINK_WRITER_CONTEXT);
-            assertThat(drainAllRecordsFromTopic(topic, properties, true), 
hasSize(0));
+            assertThat(drainAllRecordsFromTopic(topic, properties, 
true)).hasSize(0);
         }
 
         try (final KafkaWriter<Integer> writer =
@@ -372,7 +359,7 @@ public class KafkaWriterITCase {
             writer.snapshotState(1L);
 
             // manually commit here, which would only succeed if the first 
transaction was aborted
-            assertThat(committables, hasSize(1));
+            assertThat(committables).hasSize(1);
             final KafkaCommittable committable = 
committables.stream().findFirst().get();
             String transactionalId = committable.getTransactionalId();
             try (FlinkKafkaInternalProducer<byte[], byte[]> producer =
@@ -381,7 +368,7 @@ public class KafkaWriterITCase {
                 producer.commitTransaction();
             }
 
-            assertThat(drainAllRecordsFromTopic(topic, properties, true), 
hasSize(1));
+            assertThat(drainAllRecordsFromTopic(topic, properties, 
true)).hasSize(1);
         }
     }
 

Reply via email to