This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ad01d71f871c34cc12798babd231592d22b6f853
Author: Arvid Heise <[email protected]>
AuthorDate: Thu Sep 5 13:28:51 2024 +0200

    [FLINK-25920] Improve sink test assertions
    
    Use the proper ObjectAssert as the base for CommittableSummaryAssert and 
CommittableWithLinageAssert.
---
 .../connector/sink2/CommittableSummaryAssert.java  | 43 ++++------------------
 .../sink2/CommittableWithLinageAssert.java         | 27 +++-----------
 .../SubtaskCommittableManagerTest.java             | 24 +++---------
 3 files changed, 19 insertions(+), 75 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java
index e02e838aeb3..ed6ea6440ee 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java
@@ -18,64 +18,37 @@
 
 package org.apache.flink.streaming.api.connector.sink2;
 
-import org.assertj.core.api.AbstractAssert;
-
-import static org.assertj.core.api.Assertions.assertThat;
+import org.assertj.core.api.AbstractObjectAssert;
 
 /** Custom assertions for {@link CommittableSummary}. */
 public class CommittableSummaryAssert
-        extends AbstractAssert<CommittableSummaryAssert, 
CommittableSummary<?>> {
+        extends AbstractObjectAssert<CommittableSummaryAssert, 
CommittableSummary<?>> {
 
     public CommittableSummaryAssert(CommittableSummary<?> summary) {
         super(summary, CommittableSummaryAssert.class);
     }
 
-    public CommittableSummaryAssert isEqualTo(CommittableSummary<?> summary) {
-        isNotNull();
-        assertThat(actual.getSubtaskId()).isEqualTo(summary.getSubtaskId());
-        
assertThat(actual.getCheckpointId()).isEqualTo(summary.getCheckpointId());
-        
assertThat(actual.getNumberOfSubtasks()).isEqualTo(summary.getNumberOfSubtasks());
-        
assertThat(actual.getNumberOfCommittables()).isEqualTo(summary.getNumberOfCommittables());
-        assertThat(actual.getNumberOfPendingCommittables())
-                .isEqualTo(summary.getNumberOfPendingCommittables());
-        assertThat(actual.getNumberOfFailedCommittables())
-                .isEqualTo(summary.getNumberOfFailedCommittables());
-        return this;
-    }
-
     public CommittableSummaryAssert hasSubtaskId(int subtaskId) {
-        isNotNull();
-        assertThat(actual.getSubtaskId()).isEqualTo(subtaskId);
-        return this;
+        return returns(subtaskId, CommittableSummary::getSubtaskId);
     }
 
     public CommittableSummaryAssert hasNumberOfSubtasks(int numberOfSubtasks) {
-        isNotNull();
-        assertThat(actual.getNumberOfSubtasks()).isEqualTo(numberOfSubtasks);
-        return this;
+        return returns(numberOfSubtasks, 
CommittableSummary::getNumberOfSubtasks);
     }
 
     public CommittableSummaryAssert hasOverallCommittables(int 
committableNumber) {
-        isNotNull();
-        
assertThat(actual.getNumberOfCommittables()).isEqualTo(committableNumber);
-        return this;
+        return returns(committableNumber, 
CommittableSummary::getNumberOfCommittables);
     }
 
     public CommittableSummaryAssert hasPendingCommittables(int 
committableNumber) {
-        isNotNull();
-        
assertThat(actual.getNumberOfPendingCommittables()).isEqualTo(committableNumber);
-        return this;
+        return returns(committableNumber, 
CommittableSummary::getNumberOfPendingCommittables);
     }
 
     public CommittableSummaryAssert hasFailedCommittables(int 
committableNumber) {
-        isNotNull();
-        
assertThat(actual.getNumberOfFailedCommittables()).isEqualTo(committableNumber);
-        return this;
+        return returns(committableNumber, 
CommittableSummary::getNumberOfFailedCommittables);
     }
 
     public CommittableSummaryAssert hasCheckpointId(long checkpointId) {
-        isNotNull();
-        assertThat(actual.getCheckpointIdOrEOI()).isEqualTo(checkpointId);
-        return this;
+        return returns(checkpointId, CommittableSummary::getCheckpointIdOrEOI);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLinageAssert.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLinageAssert.java
index 0937c7454d9..853fe6235d7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLinageAssert.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLinageAssert.java
@@ -18,45 +18,28 @@
 
 package org.apache.flink.streaming.api.connector.sink2;
 
-import org.assertj.core.api.AbstractAssert;
-
-import static org.assertj.core.api.Assertions.assertThat;
+import org.assertj.core.api.AbstractObjectAssert;
 
 /**
  * Custom assertions for {@link
  * org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage}.
  */
 public class CommittableWithLinageAssert
-        extends AbstractAssert<CommittableWithLinageAssert, 
CommittableWithLineage<?>> {
+        extends AbstractObjectAssert<CommittableWithLinageAssert, 
CommittableWithLineage<?>> {
 
     public CommittableWithLinageAssert(CommittableWithLineage<?> summary) {
         super(summary, CommittableWithLinageAssert.class);
     }
 
-    public CommittableWithLinageAssert isEqualTo(CommittableWithLineage<?> 
committableWithLineage) {
-        isNotNull();
-        
assertThat(actual.getSubtaskId()).isEqualTo(committableWithLineage.getSubtaskId());
-        assertThat(actual.getCheckpointIdOrEOI())
-                .isEqualTo(committableWithLineage.getCheckpointIdOrEOI());
-        
assertThat(actual.getCommittable()).isEqualTo(committableWithLineage.getCommittable());
-        return this;
-    }
-
     public CommittableWithLinageAssert hasCommittable(Object committable) {
-        isNotNull();
-        assertThat(actual.getCommittable()).isEqualTo(committable);
-        return this;
+        return returns(committable, CommittableWithLineage::getCommittable);
     }
 
     public CommittableWithLinageAssert hasCheckpointId(long checkpointId) {
-        isNotNull();
-        assertThat(actual.getCheckpointIdOrEOI()).isEqualTo(checkpointId);
-        return this;
+        return returns(checkpointId, 
CommittableWithLineage::getCheckpointIdOrEOI);
     }
 
     public CommittableWithLinageAssert hasSubtaskId(int subtaskId) {
-        isNotNull();
-        assertThat(actual.getSubtaskId()).isEqualTo(subtaskId);
-        return this;
+        return returns(subtaskId, CommittableWithLineage::getSubtaskId);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManagerTest.java
index dd4fdf91d6e..7c252e9dee3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManagerTest.java
@@ -30,6 +30,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.stream.IntStream;
 
+import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThat;
 
 class SubtaskCommittableManagerTest {
@@ -40,10 +41,9 @@ class SubtaskCommittableManagerTest {
     void testDrainCommittables() {
         final SubtaskCommittableManager<Integer> subtaskCommittableManager =
                 new SubtaskCommittableManager<>(3, 1, 1L, METRIC_GROUP);
-        final CommittableWithLineage<Integer> first = new 
CommittableWithLineage<Integer>(1, 1L, 1);
-        final CommittableWithLineage<Integer> second =
-                new CommittableWithLineage<Integer>(2, 1L, 1);
-        final CommittableWithLineage<Integer> third = new 
CommittableWithLineage<Integer>(3, 1L, 1);
+        final CommittableWithLineage<Integer> first = new 
CommittableWithLineage<>(1, 1L, 1);
+        final CommittableWithLineage<Integer> second = new 
CommittableWithLineage<>(2, 1L, 1);
+        final CommittableWithLineage<Integer> third = new 
CommittableWithLineage<>(3, 1L, 1);
 
         assertThat(subtaskCommittableManager.getPendingRequests()).hasSize(0);
 
@@ -68,20 +68,8 @@ class SubtaskCommittableManagerTest {
         final List<CommittableWithLineage<Integer>> committables =
                 subtaskCommittableManager.drainCommitted();
         assertThat(committables).hasSize(2);
-        assertThat(committables.get(0))
-                .satisfies(
-                        c -> {
-                            assertThat(c.getSubtaskId()).isEqualTo(1);
-                            assertThat(c.getCommittable()).isEqualTo(1);
-                            assertThat(c.getCheckpointId()).hasValue(1L);
-                        });
-        assertThat(committables.get(1))
-                .satisfies(
-                        c -> {
-                            assertThat(c.getSubtaskId()).isEqualTo(1);
-                            assertThat(c.getCommittable()).isEqualTo(2);
-                            assertThat(c.getCheckpointId()).hasValue(1L);
-                        });
+        
assertThat(committables.get(0)).hasSubtaskId(1).hasCommittable(1).hasCheckpointId(1);
+        
assertThat(committables.get(1)).hasSubtaskId(1).hasCommittable(2).hasCheckpointId(1);
         assertThat(subtaskCommittableManager.getNumFailed()).isEqualTo(0);
 
         // Drain again should not yield anything

Reply via email to