This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5f229e9e28ed5e2a756ea6c3c3aacb4900fbad19 Author: 1996fanrui <[email protected]> AuthorDate: Fri Apr 28 11:25:06 2023 +0800 [hotfix] Migrate checkpoint statistics related tests to JUnit5 and AssertJ --- .../CompletedCheckpointStatsSummaryTest.java | 55 +++---- .../checkpoint/CompletedCheckpointTest.java | 78 +++++----- .../checkpoint/FailedCheckpointStatsTest.java | 39 +++-- .../checkpoint/PendingCheckpointStatsTest.java | 160 ++++++++++----------- 4 files changed, 159 insertions(+), 173 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java index da8a37e30ed..4ae0e14163b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java @@ -21,13 +21,14 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.HashMap; import java.util.Map; import static java.util.Collections.singletonMap; -import static org.junit.Assert.assertEquals; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.data.Offset.offset; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -35,7 +36,7 @@ public class CompletedCheckpointStatsSummaryTest { /** Tests simple updates of the completed checkpoint stats. */ @Test - public void testSimpleUpdates() throws Exception { + void testSimpleUpdates() { long triggerTimestamp = 123123L; long ackTimestamp = 123123 + 1212312399L; long stateSize = Integer.MAX_VALUE + 17787L; @@ -43,10 +44,10 @@ public class CompletedCheckpointStatsSummaryTest { long persistedData = Integer.MAX_VALUE + 42L; CompletedCheckpointStatsSummary summary = new CompletedCheckpointStatsSummary(); - assertEquals(0, summary.getStateSizeStats().getCount()); - assertEquals(0, summary.getEndToEndDurationStats().getCount()); - assertEquals(0, summary.getProcessedDataStats().getCount()); - assertEquals(0, summary.getPersistedDataStats().getCount()); + assertThat(summary.getStateSizeStats().getCount()).isZero(); + assertThat(summary.getEndToEndDurationStats().getCount()).isZero(); + assertThat(summary.getProcessedDataStats().getCount()).isZero(); + assertThat(summary.getPersistedDataStats().getCount()).isZero(); int numCheckpoints = 10; @@ -62,28 +63,28 @@ public class CompletedCheckpointStatsSummaryTest { summary.updateSummary(completed); - assertEquals(i + 1, summary.getStateSizeStats().getCount()); - assertEquals(i + 1, summary.getEndToEndDurationStats().getCount()); - assertEquals(i + 1, summary.getProcessedDataStats().getCount()); - assertEquals(i + 1, summary.getPersistedDataStats().getCount()); + assertThat(summary.getStateSizeStats().getCount()).isEqualTo(i + 1); + assertThat(summary.getEndToEndDurationStats().getCount()).isEqualTo(i + 1); + assertThat(summary.getProcessedDataStats().getCount()).isEqualTo(i + 1); + assertThat(summary.getPersistedDataStats().getCount()).isEqualTo(i + 1); } StatsSummary stateSizeStats = summary.getStateSizeStats(); - assertEquals(stateSize, stateSizeStats.getMinimum()); - assertEquals(stateSize + numCheckpoints - 1, stateSizeStats.getMaximum()); + assertThat(stateSizeStats.getMinimum()).isEqualTo(stateSize); + assertThat(stateSizeStats.getMaximum()).isEqualTo(stateSize + numCheckpoints - 1); StatsSummary durationStats = summary.getEndToEndDurationStats(); - assertEquals(ackTimestamp - triggerTimestamp, durationStats.getMinimum()); - assertEquals( - ackTimestamp - triggerTimestamp + numCheckpoints - 1, durationStats.getMaximum()); + assertThat(durationStats.getMinimum()).isEqualTo(ackTimestamp - triggerTimestamp); + assertThat(durationStats.getMaximum()) + .isEqualTo(ackTimestamp - triggerTimestamp + numCheckpoints - 1); StatsSummary processedDataStats = summary.getProcessedDataStats(); - assertEquals(processedData, processedDataStats.getMinimum()); - assertEquals(processedData + numCheckpoints - 1, processedDataStats.getMaximum()); + assertThat(processedDataStats.getMinimum()).isEqualTo(processedData); + assertThat(processedDataStats.getMaximum()).isEqualTo(processedData + numCheckpoints - 1); StatsSummary persistedDataStats = summary.getPersistedDataStats(); - assertEquals(persistedData, persistedDataStats.getMinimum()); - assertEquals(persistedData + numCheckpoints - 1, persistedDataStats.getMaximum()); + assertThat(persistedDataStats.getMinimum()).isEqualTo(persistedData); + assertThat(persistedDataStats.getMaximum()).isEqualTo(persistedData + numCheckpoints - 1); } private CompletedCheckpointStats createCompletedCheckpoint( @@ -118,7 +119,7 @@ public class CompletedCheckpointStatsSummaryTest { /** Simply test that quantiles can be computed and fields are not permuted. */ @Test - public void testQuantiles() { + void testQuantiles() { int stateSize = 100; int processedData = 200; int persistedData = 300; @@ -140,10 +141,12 @@ public class CompletedCheckpointStatsSummaryTest { new SubtaskStateStats(0, lastAck), "")); CompletedCheckpointStatsSummarySnapshot snapshot = summary.createSnapshot(); - assertEquals(stateSize, snapshot.getStateSizeStats().getQuantile(1), 0); - assertEquals(processedData, snapshot.getProcessedDataStats().getQuantile(1), 0); - assertEquals(persistedData, snapshot.getPersistedDataStats().getQuantile(1), 0); - assertEquals( - lastAck - triggerTimestamp, snapshot.getEndToEndDurationStats().getQuantile(1), 0); + assertThat(snapshot.getStateSizeStats().getQuantile(1)).isCloseTo(stateSize, offset(0d)); + assertThat(snapshot.getProcessedDataStats().getQuantile(1)) + .isCloseTo(processedData, offset(0d)); + assertThat(snapshot.getPersistedDataStats().getQuantile(1)) + .isCloseTo(persistedData, offset(0d)); + assertThat(snapshot.getEndToEndDurationStats().getQuantile(1)) + .isCloseTo(lastAck - triggerTimestamp, offset(0d)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java index a0b67cd0dfa..2f6b7259cf2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java @@ -29,9 +29,7 @@ import org.apache.flink.runtime.state.SharedStateRegistryImpl; import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle; import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Collections; @@ -39,9 +37,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -49,10 +45,8 @@ import static org.mockito.Mockito.verify; /** Unit tests for the {@link CompletedCheckpoint}. */ public class CompletedCheckpointTest { - @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder(); - @Test - public void testCompareCheckpointsWithDifferentOrder() { + void testCompareCheckpointsWithDifferentOrder() { CompletedCheckpoint checkpoint1 = new CompletedCheckpoint( @@ -90,11 +84,11 @@ public class CompletedCheckpointTest { checkpoints2.add(checkpoint1); checkpoints2.add(checkpoint2); - assertFalse(CompletedCheckpoint.checkpointsMatch(checkpoints1, checkpoints2)); + assertThat(CompletedCheckpoint.checkpointsMatch(checkpoints1, checkpoints2)).isFalse(); } @Test - public void testCompareCheckpointsWithSameOrder() { + void testCompareCheckpointsWithSameOrder() { CompletedCheckpoint checkpoint1 = new CompletedCheckpoint( @@ -132,12 +126,12 @@ public class CompletedCheckpointTest { checkpoints2.add(checkpoint2); checkpoints2.add(checkpoint1); - assertTrue(CompletedCheckpoint.checkpointsMatch(checkpoints1, checkpoints2)); + assertThat(CompletedCheckpoint.checkpointsMatch(checkpoints1, checkpoints2)).isTrue(); } /** Verify that both JobID and checkpoint id are taken into account when comparing. */ @Test - public void testCompareCheckpointsWithSameJobID() { + void testCompareCheckpointsWithSameJobID() { JobID jobID = new JobID(); CompletedCheckpoint checkpoint1 = @@ -172,12 +166,12 @@ public class CompletedCheckpointTest { List<CompletedCheckpoint> checkpoints2 = new ArrayList<>(); checkpoints2.add(checkpoint2); - assertFalse(CompletedCheckpoint.checkpointsMatch(checkpoints1, checkpoints2)); + assertThat(CompletedCheckpoint.checkpointsMatch(checkpoints1, checkpoints2)).isFalse(); } /** Verify that both JobID and checkpoint id are taken into account when comparing. */ @Test - public void testCompareCheckpointsWithSameCheckpointId() { + void testCompareCheckpointsWithSameCheckpointId() { JobID jobID1 = new JobID(); JobID jobID2 = new JobID(); @@ -213,11 +207,11 @@ public class CompletedCheckpointTest { List<CompletedCheckpoint> checkpoints2 = new ArrayList<>(); checkpoints2.add(checkpoint2); - assertFalse(CompletedCheckpoint.checkpointsMatch(checkpoints1, checkpoints2)); + assertThat(CompletedCheckpoint.checkpointsMatch(checkpoints1, checkpoints2)).isFalse(); } @Test - public void testRegisterStatesAtRegistry() { + void testRegisterStatesAtRegistry() { OperatorState state = mock(OperatorState.class); Map<OperatorID, OperatorState> operatorStates = new HashMap<>(); operatorStates.put(new OperatorID(), state); @@ -242,7 +236,7 @@ public class CompletedCheckpointTest { /** Tests that the garbage collection properties are respected when subsuming checkpoints. */ @Test - public void testCleanUpOnSubsume() throws Exception { + void testCleanUpOnSubsume() throws Exception { OperatorState state = mock(OperatorState.class); Map<OperatorID, OperatorState> operatorStates = new HashMap<>(); operatorStates.put(new OperatorID(), state); @@ -275,13 +269,13 @@ public class CompletedCheckpointTest { checkpoint.markAsDiscardedOnSubsume().discard(); verify(state, times(1)).discardState(); - assertTrue(location.isDisposed()); - assertTrue(metadata.isDisposed()); + assertThat(location.isDisposed()).isTrue(); + assertThat(metadata.isDisposed()).isTrue(); } /** Tests that the garbage collection properties are respected when shutting down. */ @Test - public void testCleanUpOnShutdown() throws Exception { + void testCleanUpOnShutdown() throws Exception { JobStatus[] terminalStates = new JobStatus[] { JobStatus.FINISHED, JobStatus.CANCELED, JobStatus.FAILED, JobStatus.SUSPENDED @@ -323,8 +317,8 @@ public class CompletedCheckpointTest { checkpoint.markAsDiscardedOnShutdown(status).discard(); verify(state, times(0)).discardState(); - assertFalse(retainedLocation.isDisposed()); - assertFalse(retainedHandle.isDisposed()); + assertThat(retainedLocation.isDisposed()).isFalse(); + assertThat(retainedHandle.isDisposed()).isFalse(); // Discard EmptyStreamStateHandle discardHandle = new EmptyStreamStateHandle(); @@ -351,14 +345,14 @@ public class CompletedCheckpointTest { checkpoint.markAsDiscardedOnShutdown(status).discard(); verify(state, times(1)).discardState(); - assertTrue(discardLocation.isDisposed()); - assertTrue(discardHandle.isDisposed()); + assertThat(discardLocation.isDisposed()).isTrue(); + assertThat(discardHandle.isDisposed()).isTrue(); } } /** Tests that the stats callbacks happen if the callback is registered. */ @Test - public void testCompletedCheckpointStatsCallbacks() throws Exception { + void testCompletedCheckpointStatsCallbacks() throws Exception { Map<JobVertexID, TaskStateStats> taskStats = new HashMap<>(); JobVertexID jobVertexId = new JobVertexID(); taskStats.put(jobVertexId, new TaskStateStats(jobVertexId, 1)); @@ -390,11 +384,11 @@ public class CompletedCheckpointTest { checkpointStats); completed.markAsDiscardedOnShutdown(JobStatus.FINISHED).discard(); - assertTrue(checkpointStats.isDiscarded()); + assertThat(checkpointStats.isDiscarded()).isTrue(); } @Test - public void testIsJavaSerializable() throws Exception { + void testIsJavaSerializable() throws Exception { TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3); TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4); @@ -420,20 +414,18 @@ public class CompletedCheckpointTest { CompletedCheckpointStats copy = CommonTestUtils.createCopySerializable(completed); - assertEquals(completed.getCheckpointId(), copy.getCheckpointId()); - assertEquals(completed.getTriggerTimestamp(), copy.getTriggerTimestamp()); - assertEquals(completed.getProperties(), copy.getProperties()); - assertEquals(completed.getNumberOfSubtasks(), copy.getNumberOfSubtasks()); - assertEquals( - completed.getNumberOfAcknowledgedSubtasks(), - copy.getNumberOfAcknowledgedSubtasks()); - assertEquals(completed.getEndToEndDuration(), copy.getEndToEndDuration()); - assertEquals(completed.getStateSize(), copy.getStateSize()); - assertEquals(completed.getProcessedData(), copy.getProcessedData()); - assertEquals(completed.getPersistedData(), copy.getPersistedData()); - assertEquals( - completed.getLatestAcknowledgedSubtaskStats().getSubtaskIndex(), - copy.getLatestAcknowledgedSubtaskStats().getSubtaskIndex()); - assertEquals(completed.getStatus(), copy.getStatus()); + assertThat(copy.getCheckpointId()).isEqualTo(completed.getCheckpointId()); + assertThat(copy.getTriggerTimestamp()).isEqualTo(completed.getTriggerTimestamp()); + assertThat(copy.getProperties()).isEqualTo(completed.getProperties()); + assertThat(copy.getNumberOfSubtasks()).isEqualTo(completed.getNumberOfSubtasks()); + assertThat(copy.getNumberOfAcknowledgedSubtasks()) + .isEqualTo(completed.getNumberOfAcknowledgedSubtasks()); + assertThat(copy.getEndToEndDuration()).isEqualTo(completed.getEndToEndDuration()); + assertThat(copy.getStateSize()).isEqualTo(completed.getStateSize()); + assertThat(copy.getProcessedData()).isEqualTo(completed.getProcessedData()); + assertThat(copy.getPersistedData()).isEqualTo(completed.getPersistedData()); + assertThat(copy.getLatestAcknowledgedSubtaskStats().getSubtaskIndex()) + .isEqualTo(completed.getLatestAcknowledgedSubtaskStats().getSubtaskIndex()); + assertThat(copy.getStatus()).isEqualTo(completed.getStatus()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java index 069ab2dd58d..28beabc3e05 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java @@ -21,13 +21,13 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.io.NotSerializableException; import java.util.HashMap; import java.util.Map; -import static org.junit.Assert.assertEquals; +import static org.assertj.core.api.Assertions.assertThat; public class FailedCheckpointStatsTest { @@ -35,7 +35,7 @@ public class FailedCheckpointStatsTest { * Tests that the end to end duration of a failed checkpoint is the duration until the failure. */ @Test - public void testEndToEndDuration() throws Exception { + void testEndToEndDuration() { long duration = 123912931293L; long triggerTimestamp = 10123; long failureTimestamp = triggerTimestamp + duration; @@ -61,11 +61,11 @@ public class FailedCheckpointStatsTest { null, null); - assertEquals(duration, failed.getEndToEndDuration()); + assertThat(failed.getEndToEndDuration()).isEqualTo(duration); } @Test - public void testIsJavaSerializable() throws Exception { + void testIsJavaSerializable() throws Exception { long duration = 123912931293L; long triggerTimestamp = 10123; long failureTimestamp = triggerTimestamp + duration; @@ -93,20 +93,19 @@ public class FailedCheckpointStatsTest { FailedCheckpointStats copy = CommonTestUtils.createCopySerializable(failed); - assertEquals(failed.getCheckpointId(), copy.getCheckpointId()); - assertEquals(failed.getTriggerTimestamp(), copy.getTriggerTimestamp()); - assertEquals(failed.getProperties(), copy.getProperties()); - assertEquals(failed.getNumberOfSubtasks(), copy.getNumberOfSubtasks()); - assertEquals( - failed.getNumberOfAcknowledgedSubtasks(), copy.getNumberOfAcknowledgedSubtasks()); - assertEquals(failed.getEndToEndDuration(), copy.getEndToEndDuration()); - assertEquals(failed.getStateSize(), copy.getStateSize()); - assertEquals(failed.getProcessedData(), copy.getProcessedData()); - assertEquals(failed.getPersistedData(), copy.getPersistedData()); - assertEquals( - failed.getLatestAcknowledgedSubtaskStats(), - copy.getLatestAcknowledgedSubtaskStats()); - assertEquals(failed.getStatus(), copy.getStatus()); - assertEquals(failed.getFailureMessage(), copy.getFailureMessage()); + assertThat(copy.getCheckpointId()).isEqualTo(failed.getCheckpointId()); + assertThat(copy.getTriggerTimestamp()).isEqualTo(failed.getTriggerTimestamp()); + assertThat(copy.getProperties()).isEqualTo(failed.getProperties()); + assertThat(copy.getNumberOfSubtasks()).isEqualTo(failed.getNumberOfSubtasks()); + assertThat(copy.getNumberOfAcknowledgedSubtasks()) + .isEqualTo(failed.getNumberOfAcknowledgedSubtasks()); + assertThat(copy.getEndToEndDuration()).isEqualTo(failed.getEndToEndDuration()); + assertThat(copy.getStateSize()).isEqualTo(failed.getStateSize()); + assertThat(copy.getProcessedData()).isEqualTo(failed.getProcessedData()); + assertThat(copy.getPersistedData()).isEqualTo(failed.getPersistedData()); + assertThat(copy.getLatestAcknowledgedSubtaskStats()) + .isEqualTo(failed.getLatestAcknowledgedSubtaskStats()); + assertThat(copy.getStatus()).isEqualTo(failed.getStatus()); + assertThat(copy.getFailureMessage()).isEqualTo(failed.getFailureMessage()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java index e46f6083b27..33d3313c767 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java @@ -21,16 +21,12 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.junit.Test; +import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import java.util.HashMap; -import static junit.framework.TestCase.assertFalse; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -38,7 +34,7 @@ public class PendingCheckpointStatsTest { /** Tests reporting of subtask stats. */ @Test - public void testReportSubtaskStats() throws Exception { + void testReportSubtaskStats() { long checkpointId = Integer.MAX_VALUE + 1222L; long triggerTimestamp = Integer.MAX_VALUE - 1239L; CheckpointProperties props = @@ -57,22 +53,22 @@ public class PendingCheckpointStatsTest { checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats); // Check initial state - assertEquals(checkpointId, pending.getCheckpointId()); - assertEquals(triggerTimestamp, pending.getTriggerTimestamp()); - assertEquals(props, pending.getProperties()); - assertEquals(CheckpointStatsStatus.IN_PROGRESS, pending.getStatus()); - assertEquals(0, pending.getNumberOfAcknowledgedSubtasks()); - assertEquals(0, pending.getStateSize()); - assertEquals(totalSubtaskCount, pending.getNumberOfSubtasks()); - assertNull(pending.getLatestAcknowledgedSubtaskStats()); - assertEquals(-1, pending.getLatestAckTimestamp()); - assertEquals(-1, pending.getEndToEndDuration()); - assertEquals(task1, pending.getTaskStateStats(task1.getJobVertexId())); - assertEquals(task2, pending.getTaskStateStats(task2.getJobVertexId())); - assertNull(pending.getTaskStateStats(new JobVertexID())); + assertThat(pending.getCheckpointId()).isEqualTo(checkpointId); + assertThat(pending.getTriggerTimestamp()).isEqualTo(triggerTimestamp); + assertThat(pending.getProperties()).isEqualTo(props); + assertThat(pending.getStatus()).isEqualTo(CheckpointStatsStatus.IN_PROGRESS); + assertThat(pending.getNumberOfAcknowledgedSubtasks()).isZero(); + assertThat(pending.getStateSize()).isZero(); + assertThat(pending.getNumberOfSubtasks()).isEqualTo(totalSubtaskCount); + assertThat(pending.getLatestAcknowledgedSubtaskStats()).isNull(); + assertThat(pending.getLatestAckTimestamp()).isEqualTo(-1); + assertThat(pending.getEndToEndDuration()).isEqualTo(-1); + assertThat(pending.getTaskStateStats(task1.getJobVertexId())).isEqualTo(task1); + assertThat(pending.getTaskStateStats(task2.getJobVertexId())).isEqualTo(task2); + assertThat(pending.getTaskStateStats(new JobVertexID())).isNull(); // Report subtasks and check getters - assertFalse(pending.reportSubtaskStats(new JobVertexID(), createSubtaskStats(0))); + assertThat(pending.reportSubtaskStats(new JobVertexID(), createSubtaskStats(0))).isFalse(); long stateSize = 0; @@ -83,15 +79,16 @@ public class PendingCheckpointStatsTest { pending.reportSubtaskStats(task1.getJobVertexId(), subtask); - assertEquals(subtask, pending.getLatestAcknowledgedSubtaskStats()); - assertEquals(subtask.getAckTimestamp(), pending.getLatestAckTimestamp()); - assertEquals( - subtask.getAckTimestamp() - triggerTimestamp, pending.getEndToEndDuration()); - assertEquals(stateSize, pending.getStateSize()); + assertThat(pending.getLatestAcknowledgedSubtaskStats()).isEqualTo(subtask); + assertThat(pending.getLatestAckTimestamp()).isEqualTo(subtask.getAckTimestamp()); + assertThat(pending.getEndToEndDuration()) + .isEqualTo(subtask.getAckTimestamp() - triggerTimestamp); + assertThat(pending.getStateSize()).isEqualTo(stateSize); } // Don't allow overwrite - assertFalse(pending.reportSubtaskStats(task1.getJobVertexId(), task1.getSubtaskStats()[0])); + assertThat(pending.reportSubtaskStats(task1.getJobVertexId(), task1.getSubtaskStats()[0])) + .isFalse(); // Report 2nd task for (int i = 0; i < task2.getNumberOfSubtasks(); i++) { @@ -100,20 +97,20 @@ public class PendingCheckpointStatsTest { pending.reportSubtaskStats(task2.getJobVertexId(), subtask); - assertEquals(subtask, pending.getLatestAcknowledgedSubtaskStats()); - assertEquals(subtask.getAckTimestamp(), pending.getLatestAckTimestamp()); - assertEquals( - subtask.getAckTimestamp() - triggerTimestamp, pending.getEndToEndDuration()); - assertEquals(stateSize, pending.getStateSize()); + assertThat(pending.getLatestAcknowledgedSubtaskStats()).isEqualTo(subtask); + assertThat(pending.getLatestAckTimestamp()).isEqualTo(subtask.getAckTimestamp()); + assertThat(pending.getEndToEndDuration()) + .isEqualTo(subtask.getAckTimestamp() - triggerTimestamp); + assertThat(pending.getStateSize()).isEqualTo(stateSize); } - assertEquals(task1.getNumberOfSubtasks(), task1.getNumberOfAcknowledgedSubtasks()); - assertEquals(task2.getNumberOfSubtasks(), task2.getNumberOfAcknowledgedSubtasks()); + assertThat(task1.getNumberOfAcknowledgedSubtasks()).isEqualTo(task1.getNumberOfSubtasks()); + assertThat(task2.getNumberOfAcknowledgedSubtasks()).isEqualTo(task2.getNumberOfSubtasks()); } /** Test reporting of a completed checkpoint. */ @Test - public void testReportCompletedCheckpoint() throws Exception { + void testReportCompletedCheckpoint() { TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3); TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4); @@ -152,30 +149,28 @@ public class PendingCheckpointStatsTest { CompletedCheckpointStats completed = args.getValue(); - assertNotNull(completed); - assertEquals(CheckpointStatsStatus.COMPLETED, completed.getStatus()); - assertFalse(completed.isDiscarded()); + assertThat(completed).isNotNull(); + assertThat(completed.getStatus()).isEqualTo(CheckpointStatsStatus.COMPLETED); + assertThat(completed.isDiscarded()).isFalse(); completed.discard(); - assertTrue(completed.isDiscarded()); - assertEquals(externalPath, completed.getExternalPath()); - - assertEquals(pending.getCheckpointId(), completed.getCheckpointId()); - assertEquals( - pending.getNumberOfAcknowledgedSubtasks(), - completed.getNumberOfAcknowledgedSubtasks()); - assertEquals( - pending.getLatestAcknowledgedSubtaskStats(), - completed.getLatestAcknowledgedSubtaskStats()); - assertEquals(pending.getLatestAckTimestamp(), completed.getLatestAckTimestamp()); - assertEquals(pending.getEndToEndDuration(), completed.getEndToEndDuration()); - assertEquals(pending.getStateSize(), completed.getStateSize()); - assertEquals(task1, completed.getTaskStateStats(task1.getJobVertexId())); - assertEquals(task2, completed.getTaskStateStats(task2.getJobVertexId())); + assertThat(completed.isDiscarded()).isTrue(); + assertThat(completed.getExternalPath()).isEqualTo(externalPath); + + assertThat(completed.getCheckpointId()).isEqualTo(pending.getCheckpointId()); + assertThat(completed.getNumberOfAcknowledgedSubtasks()) + .isEqualTo(pending.getNumberOfAcknowledgedSubtasks()); + assertThat(completed.getLatestAcknowledgedSubtaskStats()) + .isEqualTo(pending.getLatestAcknowledgedSubtaskStats()); + assertThat(completed.getLatestAckTimestamp()).isEqualTo(pending.getLatestAckTimestamp()); + assertThat(completed.getEndToEndDuration()).isEqualTo(pending.getEndToEndDuration()); + assertThat(completed.getStateSize()).isEqualTo(pending.getStateSize()); + assertThat(completed.getTaskStateStats(task1.getJobVertexId())).isEqualTo(task1); + assertThat(completed.getTaskStateStats(task2.getJobVertexId())).isEqualTo(task2); } /** Test reporting of a failed checkpoint. */ @Test - public void testReportFailedCheckpoint() throws Exception { + void testReportFailedCheckpoint() { TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3); TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4); @@ -215,27 +210,25 @@ public class PendingCheckpointStatsTest { FailedCheckpointStats failed = args.getValue(); - assertNotNull(failed); - assertEquals(CheckpointStatsStatus.FAILED, failed.getStatus()); - assertEquals(failureTimestamp, failed.getFailureTimestamp()); - assertEquals(cause.getMessage(), failed.getFailureMessage()); - - assertEquals(pending.getCheckpointId(), failed.getCheckpointId()); - assertEquals( - pending.getNumberOfAcknowledgedSubtasks(), - failed.getNumberOfAcknowledgedSubtasks()); - assertEquals( - pending.getLatestAcknowledgedSubtaskStats(), - failed.getLatestAcknowledgedSubtaskStats()); - assertEquals(pending.getLatestAckTimestamp(), failed.getLatestAckTimestamp()); - assertEquals(failureTimestamp - triggerTimestamp, failed.getEndToEndDuration()); - assertEquals(pending.getStateSize(), failed.getStateSize()); - assertEquals(task1, failed.getTaskStateStats(task1.getJobVertexId())); - assertEquals(task2, failed.getTaskStateStats(task2.getJobVertexId())); + assertThat(failed).isNotNull(); + assertThat(failed.getStatus()).isEqualTo(CheckpointStatsStatus.FAILED); + assertThat(failed.getFailureTimestamp()).isEqualTo(failureTimestamp); + assertThat(failed.getFailureMessage()).isEqualTo(cause.getMessage()); + + assertThat(failed.getCheckpointId()).isEqualTo(pending.getCheckpointId()); + assertThat(failed.getNumberOfAcknowledgedSubtasks()) + .isEqualTo(pending.getNumberOfAcknowledgedSubtasks()); + assertThat(failed.getLatestAcknowledgedSubtaskStats()) + .isEqualTo(pending.getLatestAcknowledgedSubtaskStats()); + assertThat(failed.getLatestAckTimestamp()).isEqualTo(pending.getLatestAckTimestamp()); + assertThat(failed.getEndToEndDuration()).isEqualTo(failureTimestamp - triggerTimestamp); + assertThat(failed.getStateSize()).isEqualTo(pending.getStateSize()); + assertThat(failed.getTaskStateStats(task1.getJobVertexId())).isEqualTo(task1); + assertThat(failed.getTaskStateStats(task2.getJobVertexId())).isEqualTo(task2); } @Test - public void testIsJavaSerializable() throws Exception { + void testIsJavaSerializable() throws Exception { TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3); TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4); @@ -254,18 +247,17 @@ public class PendingCheckpointStatsTest { PendingCheckpointStats copy = CommonTestUtils.createCopySerializable(pending); - assertEquals(pending.getCheckpointId(), copy.getCheckpointId()); - assertEquals(pending.getTriggerTimestamp(), copy.getTriggerTimestamp()); - assertEquals(pending.getProperties(), copy.getProperties()); - assertEquals(pending.getNumberOfSubtasks(), copy.getNumberOfSubtasks()); - assertEquals( - pending.getNumberOfAcknowledgedSubtasks(), copy.getNumberOfAcknowledgedSubtasks()); - assertEquals(pending.getEndToEndDuration(), copy.getEndToEndDuration()); - assertEquals(pending.getStateSize(), copy.getStateSize()); - assertEquals( - pending.getLatestAcknowledgedSubtaskStats(), - copy.getLatestAcknowledgedSubtaskStats()); - assertEquals(pending.getStatus(), copy.getStatus()); + assertThat(copy.getCheckpointId()).isEqualTo(pending.getCheckpointId()); + assertThat(copy.getTriggerTimestamp()).isEqualTo(pending.getTriggerTimestamp()); + assertThat(copy.getProperties()).isEqualTo(pending.getProperties()); + assertThat(copy.getNumberOfSubtasks()).isEqualTo(pending.getNumberOfSubtasks()); + assertThat(copy.getNumberOfAcknowledgedSubtasks()) + .isEqualTo(pending.getNumberOfAcknowledgedSubtasks()); + assertThat(copy.getEndToEndDuration()).isEqualTo(pending.getEndToEndDuration()); + assertThat(copy.getStateSize()).isEqualTo(pending.getStateSize()); + assertThat(copy.getLatestAcknowledgedSubtaskStats()) + .isEqualTo(pending.getLatestAcknowledgedSubtaskStats()); + assertThat(copy.getStatus()).isEqualTo(pending.getStatus()); } // ------------------------------------------------------------------------
