This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5f229e9e28ed5e2a756ea6c3c3aacb4900fbad19
Author: 1996fanrui <[email protected]>
AuthorDate: Fri Apr 28 11:25:06 2023 +0800

    [hotfix] Migrate checkpoint statistics related tests to JUnit5 and AssertJ
---
 .../CompletedCheckpointStatsSummaryTest.java       |  55 +++----
 .../checkpoint/CompletedCheckpointTest.java        |  78 +++++-----
 .../checkpoint/FailedCheckpointStatsTest.java      |  39 +++--
 .../checkpoint/PendingCheckpointStatsTest.java     | 160 ++++++++++-----------
 4 files changed, 159 insertions(+), 173 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java
index da8a37e30ed..4ae0e14163b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java
@@ -21,13 +21,14 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 
 import static java.util.Collections.singletonMap;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.data.Offset.offset;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -35,7 +36,7 @@ public class CompletedCheckpointStatsSummaryTest {
 
     /** Tests simple updates of the completed checkpoint stats. */
     @Test
-    public void testSimpleUpdates() throws Exception {
+    void testSimpleUpdates() {
         long triggerTimestamp = 123123L;
         long ackTimestamp = 123123 + 1212312399L;
         long stateSize = Integer.MAX_VALUE + 17787L;
@@ -43,10 +44,10 @@ public class CompletedCheckpointStatsSummaryTest {
         long persistedData = Integer.MAX_VALUE + 42L;
 
         CompletedCheckpointStatsSummary summary = new 
CompletedCheckpointStatsSummary();
-        assertEquals(0, summary.getStateSizeStats().getCount());
-        assertEquals(0, summary.getEndToEndDurationStats().getCount());
-        assertEquals(0, summary.getProcessedDataStats().getCount());
-        assertEquals(0, summary.getPersistedDataStats().getCount());
+        assertThat(summary.getStateSizeStats().getCount()).isZero();
+        assertThat(summary.getEndToEndDurationStats().getCount()).isZero();
+        assertThat(summary.getProcessedDataStats().getCount()).isZero();
+        assertThat(summary.getPersistedDataStats().getCount()).isZero();
 
         int numCheckpoints = 10;
 
@@ -62,28 +63,28 @@ public class CompletedCheckpointStatsSummaryTest {
 
             summary.updateSummary(completed);
 
-            assertEquals(i + 1, summary.getStateSizeStats().getCount());
-            assertEquals(i + 1, summary.getEndToEndDurationStats().getCount());
-            assertEquals(i + 1, summary.getProcessedDataStats().getCount());
-            assertEquals(i + 1, summary.getPersistedDataStats().getCount());
+            assertThat(summary.getStateSizeStats().getCount()).isEqualTo(i + 
1);
+            
assertThat(summary.getEndToEndDurationStats().getCount()).isEqualTo(i + 1);
+            assertThat(summary.getProcessedDataStats().getCount()).isEqualTo(i 
+ 1);
+            assertThat(summary.getPersistedDataStats().getCount()).isEqualTo(i 
+ 1);
         }
 
         StatsSummary stateSizeStats = summary.getStateSizeStats();
-        assertEquals(stateSize, stateSizeStats.getMinimum());
-        assertEquals(stateSize + numCheckpoints - 1, 
stateSizeStats.getMaximum());
+        assertThat(stateSizeStats.getMinimum()).isEqualTo(stateSize);
+        assertThat(stateSizeStats.getMaximum()).isEqualTo(stateSize + 
numCheckpoints - 1);
 
         StatsSummary durationStats = summary.getEndToEndDurationStats();
-        assertEquals(ackTimestamp - triggerTimestamp, 
durationStats.getMinimum());
-        assertEquals(
-                ackTimestamp - triggerTimestamp + numCheckpoints - 1, 
durationStats.getMaximum());
+        assertThat(durationStats.getMinimum()).isEqualTo(ackTimestamp - 
triggerTimestamp);
+        assertThat(durationStats.getMaximum())
+                .isEqualTo(ackTimestamp - triggerTimestamp + numCheckpoints - 
1);
 
         StatsSummary processedDataStats = summary.getProcessedDataStats();
-        assertEquals(processedData, processedDataStats.getMinimum());
-        assertEquals(processedData + numCheckpoints - 1, 
processedDataStats.getMaximum());
+        assertThat(processedDataStats.getMinimum()).isEqualTo(processedData);
+        assertThat(processedDataStats.getMaximum()).isEqualTo(processedData + 
numCheckpoints - 1);
 
         StatsSummary persistedDataStats = summary.getPersistedDataStats();
-        assertEquals(persistedData, persistedDataStats.getMinimum());
-        assertEquals(persistedData + numCheckpoints - 1, 
persistedDataStats.getMaximum());
+        assertThat(persistedDataStats.getMinimum()).isEqualTo(persistedData);
+        assertThat(persistedDataStats.getMaximum()).isEqualTo(persistedData + 
numCheckpoints - 1);
     }
 
     private CompletedCheckpointStats createCompletedCheckpoint(
@@ -118,7 +119,7 @@ public class CompletedCheckpointStatsSummaryTest {
 
     /** Simply test that quantiles can be computed and fields are not 
permuted. */
     @Test
-    public void testQuantiles() {
+    void testQuantiles() {
         int stateSize = 100;
         int processedData = 200;
         int persistedData = 300;
@@ -140,10 +141,12 @@ public class CompletedCheckpointStatsSummaryTest {
                         new SubtaskStateStats(0, lastAck),
                         ""));
         CompletedCheckpointStatsSummarySnapshot snapshot = 
summary.createSnapshot();
-        assertEquals(stateSize, snapshot.getStateSizeStats().getQuantile(1), 
0);
-        assertEquals(processedData, 
snapshot.getProcessedDataStats().getQuantile(1), 0);
-        assertEquals(persistedData, 
snapshot.getPersistedDataStats().getQuantile(1), 0);
-        assertEquals(
-                lastAck - triggerTimestamp, 
snapshot.getEndToEndDurationStats().getQuantile(1), 0);
+        
assertThat(snapshot.getStateSizeStats().getQuantile(1)).isCloseTo(stateSize, 
offset(0d));
+        assertThat(snapshot.getProcessedDataStats().getQuantile(1))
+                .isCloseTo(processedData, offset(0d));
+        assertThat(snapshot.getPersistedDataStats().getQuantile(1))
+                .isCloseTo(persistedData, offset(0d));
+        assertThat(snapshot.getEndToEndDurationStats().getQuantile(1))
+                .isCloseTo(lastAck - triggerTimestamp, offset(0d));
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index a0b67cd0dfa..2f6b7259cf2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -29,9 +29,7 @@ import org.apache.flink.runtime.state.SharedStateRegistryImpl;
 import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
 import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -39,9 +37,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -49,10 +45,8 @@ import static org.mockito.Mockito.verify;
 /** Unit tests for the {@link CompletedCheckpoint}. */
 public class CompletedCheckpointTest {
 
-    @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder();
-
     @Test
-    public void testCompareCheckpointsWithDifferentOrder() {
+    void testCompareCheckpointsWithDifferentOrder() {
 
         CompletedCheckpoint checkpoint1 =
                 new CompletedCheckpoint(
@@ -90,11 +84,11 @@ public class CompletedCheckpointTest {
         checkpoints2.add(checkpoint1);
         checkpoints2.add(checkpoint2);
 
-        assertFalse(CompletedCheckpoint.checkpointsMatch(checkpoints1, 
checkpoints2));
+        assertThat(CompletedCheckpoint.checkpointsMatch(checkpoints1, 
checkpoints2)).isFalse();
     }
 
     @Test
-    public void testCompareCheckpointsWithSameOrder() {
+    void testCompareCheckpointsWithSameOrder() {
 
         CompletedCheckpoint checkpoint1 =
                 new CompletedCheckpoint(
@@ -132,12 +126,12 @@ public class CompletedCheckpointTest {
         checkpoints2.add(checkpoint2);
         checkpoints2.add(checkpoint1);
 
-        assertTrue(CompletedCheckpoint.checkpointsMatch(checkpoints1, 
checkpoints2));
+        assertThat(CompletedCheckpoint.checkpointsMatch(checkpoints1, 
checkpoints2)).isTrue();
     }
 
     /** Verify that both JobID and checkpoint id are taken into account when 
comparing. */
     @Test
-    public void testCompareCheckpointsWithSameJobID() {
+    void testCompareCheckpointsWithSameJobID() {
         JobID jobID = new JobID();
 
         CompletedCheckpoint checkpoint1 =
@@ -172,12 +166,12 @@ public class CompletedCheckpointTest {
         List<CompletedCheckpoint> checkpoints2 = new ArrayList<>();
         checkpoints2.add(checkpoint2);
 
-        assertFalse(CompletedCheckpoint.checkpointsMatch(checkpoints1, 
checkpoints2));
+        assertThat(CompletedCheckpoint.checkpointsMatch(checkpoints1, 
checkpoints2)).isFalse();
     }
 
     /** Verify that both JobID and checkpoint id are taken into account when 
comparing. */
     @Test
-    public void testCompareCheckpointsWithSameCheckpointId() {
+    void testCompareCheckpointsWithSameCheckpointId() {
         JobID jobID1 = new JobID();
         JobID jobID2 = new JobID();
 
@@ -213,11 +207,11 @@ public class CompletedCheckpointTest {
         List<CompletedCheckpoint> checkpoints2 = new ArrayList<>();
         checkpoints2.add(checkpoint2);
 
-        assertFalse(CompletedCheckpoint.checkpointsMatch(checkpoints1, 
checkpoints2));
+        assertThat(CompletedCheckpoint.checkpointsMatch(checkpoints1, 
checkpoints2)).isFalse();
     }
 
     @Test
-    public void testRegisterStatesAtRegistry() {
+    void testRegisterStatesAtRegistry() {
         OperatorState state = mock(OperatorState.class);
         Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
         operatorStates.put(new OperatorID(), state);
@@ -242,7 +236,7 @@ public class CompletedCheckpointTest {
 
     /** Tests that the garbage collection properties are respected when 
subsuming checkpoints. */
     @Test
-    public void testCleanUpOnSubsume() throws Exception {
+    void testCleanUpOnSubsume() throws Exception {
         OperatorState state = mock(OperatorState.class);
         Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
         operatorStates.put(new OperatorID(), state);
@@ -275,13 +269,13 @@ public class CompletedCheckpointTest {
         checkpoint.markAsDiscardedOnSubsume().discard();
 
         verify(state, times(1)).discardState();
-        assertTrue(location.isDisposed());
-        assertTrue(metadata.isDisposed());
+        assertThat(location.isDisposed()).isTrue();
+        assertThat(metadata.isDisposed()).isTrue();
     }
 
     /** Tests that the garbage collection properties are respected when 
shutting down. */
     @Test
-    public void testCleanUpOnShutdown() throws Exception {
+    void testCleanUpOnShutdown() throws Exception {
         JobStatus[] terminalStates =
                 new JobStatus[] {
                     JobStatus.FINISHED, JobStatus.CANCELED, JobStatus.FAILED, 
JobStatus.SUSPENDED
@@ -323,8 +317,8 @@ public class CompletedCheckpointTest {
             checkpoint.markAsDiscardedOnShutdown(status).discard();
 
             verify(state, times(0)).discardState();
-            assertFalse(retainedLocation.isDisposed());
-            assertFalse(retainedHandle.isDisposed());
+            assertThat(retainedLocation.isDisposed()).isFalse();
+            assertThat(retainedHandle.isDisposed()).isFalse();
 
             // Discard
             EmptyStreamStateHandle discardHandle = new 
EmptyStreamStateHandle();
@@ -351,14 +345,14 @@ public class CompletedCheckpointTest {
             checkpoint.markAsDiscardedOnShutdown(status).discard();
 
             verify(state, times(1)).discardState();
-            assertTrue(discardLocation.isDisposed());
-            assertTrue(discardHandle.isDisposed());
+            assertThat(discardLocation.isDisposed()).isTrue();
+            assertThat(discardHandle.isDisposed()).isTrue();
         }
     }
 
     /** Tests that the stats callbacks happen if the callback is registered. */
     @Test
-    public void testCompletedCheckpointStatsCallbacks() throws Exception {
+    void testCompletedCheckpointStatsCallbacks() throws Exception {
         Map<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
         JobVertexID jobVertexId = new JobVertexID();
         taskStats.put(jobVertexId, new TaskStateStats(jobVertexId, 1));
@@ -390,11 +384,11 @@ public class CompletedCheckpointTest {
                         checkpointStats);
 
         completed.markAsDiscardedOnShutdown(JobStatus.FINISHED).discard();
-        assertTrue(checkpointStats.isDiscarded());
+        assertThat(checkpointStats.isDiscarded()).isTrue();
     }
 
     @Test
-    public void testIsJavaSerializable() throws Exception {
+    void testIsJavaSerializable() throws Exception {
         TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3);
         TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4);
 
@@ -420,20 +414,18 @@ public class CompletedCheckpointTest {
 
         CompletedCheckpointStats copy = 
CommonTestUtils.createCopySerializable(completed);
 
-        assertEquals(completed.getCheckpointId(), copy.getCheckpointId());
-        assertEquals(completed.getTriggerTimestamp(), 
copy.getTriggerTimestamp());
-        assertEquals(completed.getProperties(), copy.getProperties());
-        assertEquals(completed.getNumberOfSubtasks(), 
copy.getNumberOfSubtasks());
-        assertEquals(
-                completed.getNumberOfAcknowledgedSubtasks(),
-                copy.getNumberOfAcknowledgedSubtasks());
-        assertEquals(completed.getEndToEndDuration(), 
copy.getEndToEndDuration());
-        assertEquals(completed.getStateSize(), copy.getStateSize());
-        assertEquals(completed.getProcessedData(), copy.getProcessedData());
-        assertEquals(completed.getPersistedData(), copy.getPersistedData());
-        assertEquals(
-                
completed.getLatestAcknowledgedSubtaskStats().getSubtaskIndex(),
-                copy.getLatestAcknowledgedSubtaskStats().getSubtaskIndex());
-        assertEquals(completed.getStatus(), copy.getStatus());
+        
assertThat(copy.getCheckpointId()).isEqualTo(completed.getCheckpointId());
+        
assertThat(copy.getTriggerTimestamp()).isEqualTo(completed.getTriggerTimestamp());
+        assertThat(copy.getProperties()).isEqualTo(completed.getProperties());
+        
assertThat(copy.getNumberOfSubtasks()).isEqualTo(completed.getNumberOfSubtasks());
+        assertThat(copy.getNumberOfAcknowledgedSubtasks())
+                .isEqualTo(completed.getNumberOfAcknowledgedSubtasks());
+        
assertThat(copy.getEndToEndDuration()).isEqualTo(completed.getEndToEndDuration());
+        assertThat(copy.getStateSize()).isEqualTo(completed.getStateSize());
+        
assertThat(copy.getProcessedData()).isEqualTo(completed.getProcessedData());
+        
assertThat(copy.getPersistedData()).isEqualTo(completed.getPersistedData());
+        assertThat(copy.getLatestAcknowledgedSubtaskStats().getSubtaskIndex())
+                
.isEqualTo(completed.getLatestAcknowledgedSubtaskStats().getSubtaskIndex());
+        assertThat(copy.getStatus()).isEqualTo(completed.getStatus());
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
index 069ab2dd58d..28beabc3e05 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
@@ -21,13 +21,13 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.NotSerializableException;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 public class FailedCheckpointStatsTest {
 
@@ -35,7 +35,7 @@ public class FailedCheckpointStatsTest {
      * Tests that the end to end duration of a failed checkpoint is the 
duration until the failure.
      */
     @Test
-    public void testEndToEndDuration() throws Exception {
+    void testEndToEndDuration() {
         long duration = 123912931293L;
         long triggerTimestamp = 10123;
         long failureTimestamp = triggerTimestamp + duration;
@@ -61,11 +61,11 @@ public class FailedCheckpointStatsTest {
                         null,
                         null);
 
-        assertEquals(duration, failed.getEndToEndDuration());
+        assertThat(failed.getEndToEndDuration()).isEqualTo(duration);
     }
 
     @Test
-    public void testIsJavaSerializable() throws Exception {
+    void testIsJavaSerializable() throws Exception {
         long duration = 123912931293L;
         long triggerTimestamp = 10123;
         long failureTimestamp = triggerTimestamp + duration;
@@ -93,20 +93,19 @@ public class FailedCheckpointStatsTest {
 
         FailedCheckpointStats copy = 
CommonTestUtils.createCopySerializable(failed);
 
-        assertEquals(failed.getCheckpointId(), copy.getCheckpointId());
-        assertEquals(failed.getTriggerTimestamp(), copy.getTriggerTimestamp());
-        assertEquals(failed.getProperties(), copy.getProperties());
-        assertEquals(failed.getNumberOfSubtasks(), copy.getNumberOfSubtasks());
-        assertEquals(
-                failed.getNumberOfAcknowledgedSubtasks(), 
copy.getNumberOfAcknowledgedSubtasks());
-        assertEquals(failed.getEndToEndDuration(), copy.getEndToEndDuration());
-        assertEquals(failed.getStateSize(), copy.getStateSize());
-        assertEquals(failed.getProcessedData(), copy.getProcessedData());
-        assertEquals(failed.getPersistedData(), copy.getPersistedData());
-        assertEquals(
-                failed.getLatestAcknowledgedSubtaskStats(),
-                copy.getLatestAcknowledgedSubtaskStats());
-        assertEquals(failed.getStatus(), copy.getStatus());
-        assertEquals(failed.getFailureMessage(), copy.getFailureMessage());
+        assertThat(copy.getCheckpointId()).isEqualTo(failed.getCheckpointId());
+        
assertThat(copy.getTriggerTimestamp()).isEqualTo(failed.getTriggerTimestamp());
+        assertThat(copy.getProperties()).isEqualTo(failed.getProperties());
+        
assertThat(copy.getNumberOfSubtasks()).isEqualTo(failed.getNumberOfSubtasks());
+        assertThat(copy.getNumberOfAcknowledgedSubtasks())
+                .isEqualTo(failed.getNumberOfAcknowledgedSubtasks());
+        
assertThat(copy.getEndToEndDuration()).isEqualTo(failed.getEndToEndDuration());
+        assertThat(copy.getStateSize()).isEqualTo(failed.getStateSize());
+        
assertThat(copy.getProcessedData()).isEqualTo(failed.getProcessedData());
+        
assertThat(copy.getPersistedData()).isEqualTo(failed.getPersistedData());
+        assertThat(copy.getLatestAcknowledgedSubtaskStats())
+                .isEqualTo(failed.getLatestAcknowledgedSubtaskStats());
+        assertThat(copy.getStatus()).isEqualTo(failed.getStatus());
+        
assertThat(copy.getFailureMessage()).isEqualTo(failed.getFailureMessage());
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
index e46f6083b27..33d3313c767 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
@@ -21,16 +21,12 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 
 import java.util.HashMap;
 
-import static junit.framework.TestCase.assertFalse;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
@@ -38,7 +34,7 @@ public class PendingCheckpointStatsTest {
 
     /** Tests reporting of subtask stats. */
     @Test
-    public void testReportSubtaskStats() throws Exception {
+    void testReportSubtaskStats() {
         long checkpointId = Integer.MAX_VALUE + 1222L;
         long triggerTimestamp = Integer.MAX_VALUE - 1239L;
         CheckpointProperties props =
@@ -57,22 +53,22 @@ public class PendingCheckpointStatsTest {
                         checkpointId, triggerTimestamp, props, 
totalSubtaskCount, taskStats);
 
         // Check initial state
-        assertEquals(checkpointId, pending.getCheckpointId());
-        assertEquals(triggerTimestamp, pending.getTriggerTimestamp());
-        assertEquals(props, pending.getProperties());
-        assertEquals(CheckpointStatsStatus.IN_PROGRESS, pending.getStatus());
-        assertEquals(0, pending.getNumberOfAcknowledgedSubtasks());
-        assertEquals(0, pending.getStateSize());
-        assertEquals(totalSubtaskCount, pending.getNumberOfSubtasks());
-        assertNull(pending.getLatestAcknowledgedSubtaskStats());
-        assertEquals(-1, pending.getLatestAckTimestamp());
-        assertEquals(-1, pending.getEndToEndDuration());
-        assertEquals(task1, pending.getTaskStateStats(task1.getJobVertexId()));
-        assertEquals(task2, pending.getTaskStateStats(task2.getJobVertexId()));
-        assertNull(pending.getTaskStateStats(new JobVertexID()));
+        assertThat(pending.getCheckpointId()).isEqualTo(checkpointId);
+        assertThat(pending.getTriggerTimestamp()).isEqualTo(triggerTimestamp);
+        assertThat(pending.getProperties()).isEqualTo(props);
+        
assertThat(pending.getStatus()).isEqualTo(CheckpointStatsStatus.IN_PROGRESS);
+        assertThat(pending.getNumberOfAcknowledgedSubtasks()).isZero();
+        assertThat(pending.getStateSize()).isZero();
+        assertThat(pending.getNumberOfSubtasks()).isEqualTo(totalSubtaskCount);
+        assertThat(pending.getLatestAcknowledgedSubtaskStats()).isNull();
+        assertThat(pending.getLatestAckTimestamp()).isEqualTo(-1);
+        assertThat(pending.getEndToEndDuration()).isEqualTo(-1);
+        
assertThat(pending.getTaskStateStats(task1.getJobVertexId())).isEqualTo(task1);
+        
assertThat(pending.getTaskStateStats(task2.getJobVertexId())).isEqualTo(task2);
+        assertThat(pending.getTaskStateStats(new JobVertexID())).isNull();
 
         // Report subtasks and check getters
-        assertFalse(pending.reportSubtaskStats(new JobVertexID(), 
createSubtaskStats(0)));
+        assertThat(pending.reportSubtaskStats(new JobVertexID(), 
createSubtaskStats(0))).isFalse();
 
         long stateSize = 0;
 
@@ -83,15 +79,16 @@ public class PendingCheckpointStatsTest {
 
             pending.reportSubtaskStats(task1.getJobVertexId(), subtask);
 
-            assertEquals(subtask, pending.getLatestAcknowledgedSubtaskStats());
-            assertEquals(subtask.getAckTimestamp(), 
pending.getLatestAckTimestamp());
-            assertEquals(
-                    subtask.getAckTimestamp() - triggerTimestamp, 
pending.getEndToEndDuration());
-            assertEquals(stateSize, pending.getStateSize());
+            
assertThat(pending.getLatestAcknowledgedSubtaskStats()).isEqualTo(subtask);
+            
assertThat(pending.getLatestAckTimestamp()).isEqualTo(subtask.getAckTimestamp());
+            assertThat(pending.getEndToEndDuration())
+                    .isEqualTo(subtask.getAckTimestamp() - triggerTimestamp);
+            assertThat(pending.getStateSize()).isEqualTo(stateSize);
         }
 
         // Don't allow overwrite
-        assertFalse(pending.reportSubtaskStats(task1.getJobVertexId(), 
task1.getSubtaskStats()[0]));
+        assertThat(pending.reportSubtaskStats(task1.getJobVertexId(), 
task1.getSubtaskStats()[0]))
+                .isFalse();
 
         // Report 2nd task
         for (int i = 0; i < task2.getNumberOfSubtasks(); i++) {
@@ -100,20 +97,20 @@ public class PendingCheckpointStatsTest {
 
             pending.reportSubtaskStats(task2.getJobVertexId(), subtask);
 
-            assertEquals(subtask, pending.getLatestAcknowledgedSubtaskStats());
-            assertEquals(subtask.getAckTimestamp(), 
pending.getLatestAckTimestamp());
-            assertEquals(
-                    subtask.getAckTimestamp() - triggerTimestamp, 
pending.getEndToEndDuration());
-            assertEquals(stateSize, pending.getStateSize());
+            
assertThat(pending.getLatestAcknowledgedSubtaskStats()).isEqualTo(subtask);
+            
assertThat(pending.getLatestAckTimestamp()).isEqualTo(subtask.getAckTimestamp());
+            assertThat(pending.getEndToEndDuration())
+                    .isEqualTo(subtask.getAckTimestamp() - triggerTimestamp);
+            assertThat(pending.getStateSize()).isEqualTo(stateSize);
         }
 
-        assertEquals(task1.getNumberOfSubtasks(), 
task1.getNumberOfAcknowledgedSubtasks());
-        assertEquals(task2.getNumberOfSubtasks(), 
task2.getNumberOfAcknowledgedSubtasks());
+        
assertThat(task1.getNumberOfAcknowledgedSubtasks()).isEqualTo(task1.getNumberOfSubtasks());
+        
assertThat(task2.getNumberOfAcknowledgedSubtasks()).isEqualTo(task2.getNumberOfSubtasks());
     }
 
     /** Test reporting of a completed checkpoint. */
     @Test
-    public void testReportCompletedCheckpoint() throws Exception {
+    void testReportCompletedCheckpoint() {
         TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3);
         TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4);
 
@@ -152,30 +149,28 @@ public class PendingCheckpointStatsTest {
 
         CompletedCheckpointStats completed = args.getValue();
 
-        assertNotNull(completed);
-        assertEquals(CheckpointStatsStatus.COMPLETED, completed.getStatus());
-        assertFalse(completed.isDiscarded());
+        assertThat(completed).isNotNull();
+        
assertThat(completed.getStatus()).isEqualTo(CheckpointStatsStatus.COMPLETED);
+        assertThat(completed.isDiscarded()).isFalse();
         completed.discard();
-        assertTrue(completed.isDiscarded());
-        assertEquals(externalPath, completed.getExternalPath());
-
-        assertEquals(pending.getCheckpointId(), completed.getCheckpointId());
-        assertEquals(
-                pending.getNumberOfAcknowledgedSubtasks(),
-                completed.getNumberOfAcknowledgedSubtasks());
-        assertEquals(
-                pending.getLatestAcknowledgedSubtaskStats(),
-                completed.getLatestAcknowledgedSubtaskStats());
-        assertEquals(pending.getLatestAckTimestamp(), 
completed.getLatestAckTimestamp());
-        assertEquals(pending.getEndToEndDuration(), 
completed.getEndToEndDuration());
-        assertEquals(pending.getStateSize(), completed.getStateSize());
-        assertEquals(task1, 
completed.getTaskStateStats(task1.getJobVertexId()));
-        assertEquals(task2, 
completed.getTaskStateStats(task2.getJobVertexId()));
+        assertThat(completed.isDiscarded()).isTrue();
+        assertThat(completed.getExternalPath()).isEqualTo(externalPath);
+
+        
assertThat(completed.getCheckpointId()).isEqualTo(pending.getCheckpointId());
+        assertThat(completed.getNumberOfAcknowledgedSubtasks())
+                .isEqualTo(pending.getNumberOfAcknowledgedSubtasks());
+        assertThat(completed.getLatestAcknowledgedSubtaskStats())
+                .isEqualTo(pending.getLatestAcknowledgedSubtaskStats());
+        
assertThat(completed.getLatestAckTimestamp()).isEqualTo(pending.getLatestAckTimestamp());
+        
assertThat(completed.getEndToEndDuration()).isEqualTo(pending.getEndToEndDuration());
+        assertThat(completed.getStateSize()).isEqualTo(pending.getStateSize());
+        
assertThat(completed.getTaskStateStats(task1.getJobVertexId())).isEqualTo(task1);
+        
assertThat(completed.getTaskStateStats(task2.getJobVertexId())).isEqualTo(task2);
     }
 
     /** Test reporting of a failed checkpoint. */
     @Test
-    public void testReportFailedCheckpoint() throws Exception {
+    void testReportFailedCheckpoint() {
         TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3);
         TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4);
 
@@ -215,27 +210,25 @@ public class PendingCheckpointStatsTest {
 
         FailedCheckpointStats failed = args.getValue();
 
-        assertNotNull(failed);
-        assertEquals(CheckpointStatsStatus.FAILED, failed.getStatus());
-        assertEquals(failureTimestamp, failed.getFailureTimestamp());
-        assertEquals(cause.getMessage(), failed.getFailureMessage());
-
-        assertEquals(pending.getCheckpointId(), failed.getCheckpointId());
-        assertEquals(
-                pending.getNumberOfAcknowledgedSubtasks(),
-                failed.getNumberOfAcknowledgedSubtasks());
-        assertEquals(
-                pending.getLatestAcknowledgedSubtaskStats(),
-                failed.getLatestAcknowledgedSubtaskStats());
-        assertEquals(pending.getLatestAckTimestamp(), 
failed.getLatestAckTimestamp());
-        assertEquals(failureTimestamp - triggerTimestamp, 
failed.getEndToEndDuration());
-        assertEquals(pending.getStateSize(), failed.getStateSize());
-        assertEquals(task1, failed.getTaskStateStats(task1.getJobVertexId()));
-        assertEquals(task2, failed.getTaskStateStats(task2.getJobVertexId()));
+        assertThat(failed).isNotNull();
+        assertThat(failed.getStatus()).isEqualTo(CheckpointStatsStatus.FAILED);
+        assertThat(failed.getFailureTimestamp()).isEqualTo(failureTimestamp);
+        assertThat(failed.getFailureMessage()).isEqualTo(cause.getMessage());
+
+        
assertThat(failed.getCheckpointId()).isEqualTo(pending.getCheckpointId());
+        assertThat(failed.getNumberOfAcknowledgedSubtasks())
+                .isEqualTo(pending.getNumberOfAcknowledgedSubtasks());
+        assertThat(failed.getLatestAcknowledgedSubtaskStats())
+                .isEqualTo(pending.getLatestAcknowledgedSubtaskStats());
+        
assertThat(failed.getLatestAckTimestamp()).isEqualTo(pending.getLatestAckTimestamp());
+        assertThat(failed.getEndToEndDuration()).isEqualTo(failureTimestamp - 
triggerTimestamp);
+        assertThat(failed.getStateSize()).isEqualTo(pending.getStateSize());
+        
assertThat(failed.getTaskStateStats(task1.getJobVertexId())).isEqualTo(task1);
+        
assertThat(failed.getTaskStateStats(task2.getJobVertexId())).isEqualTo(task2);
     }
 
     @Test
-    public void testIsJavaSerializable() throws Exception {
+    void testIsJavaSerializable() throws Exception {
         TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3);
         TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4);
 
@@ -254,18 +247,17 @@ public class PendingCheckpointStatsTest {
 
         PendingCheckpointStats copy = 
CommonTestUtils.createCopySerializable(pending);
 
-        assertEquals(pending.getCheckpointId(), copy.getCheckpointId());
-        assertEquals(pending.getTriggerTimestamp(), 
copy.getTriggerTimestamp());
-        assertEquals(pending.getProperties(), copy.getProperties());
-        assertEquals(pending.getNumberOfSubtasks(), 
copy.getNumberOfSubtasks());
-        assertEquals(
-                pending.getNumberOfAcknowledgedSubtasks(), 
copy.getNumberOfAcknowledgedSubtasks());
-        assertEquals(pending.getEndToEndDuration(), 
copy.getEndToEndDuration());
-        assertEquals(pending.getStateSize(), copy.getStateSize());
-        assertEquals(
-                pending.getLatestAcknowledgedSubtaskStats(),
-                copy.getLatestAcknowledgedSubtaskStats());
-        assertEquals(pending.getStatus(), copy.getStatus());
+        
assertThat(copy.getCheckpointId()).isEqualTo(pending.getCheckpointId());
+        
assertThat(copy.getTriggerTimestamp()).isEqualTo(pending.getTriggerTimestamp());
+        assertThat(copy.getProperties()).isEqualTo(pending.getProperties());
+        
assertThat(copy.getNumberOfSubtasks()).isEqualTo(pending.getNumberOfSubtasks());
+        assertThat(copy.getNumberOfAcknowledgedSubtasks())
+                .isEqualTo(pending.getNumberOfAcknowledgedSubtasks());
+        
assertThat(copy.getEndToEndDuration()).isEqualTo(pending.getEndToEndDuration());
+        assertThat(copy.getStateSize()).isEqualTo(pending.getStateSize());
+        assertThat(copy.getLatestAcknowledgedSubtaskStats())
+                .isEqualTo(pending.getLatestAcknowledgedSubtaskStats());
+        assertThat(copy.getStatus()).isEqualTo(pending.getStatus());
     }
 
     // ------------------------------------------------------------------------

Reply via email to