reswqa commented on code in PR #21943: URL: https://github.com/apache/flink/pull/21943#discussion_r1107241118
########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -141,27 +140,15 @@ import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.getCheckpointCoordinator; import static org.apache.flink.util.ExceptionUtils.findThrowable; import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link DefaultScheduler}. */ public class DefaultSchedulerTest extends TestLogger { private static final int TIMEOUT_MS = 1000; - @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + @TempDir private Path TEMPORARY_FOLDER; Review Comment: ```suggestion @TempDir private static Path temporaryFolder; ``` Checkstyle also complained about the variable name her. ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -211,8 +198,8 @@ public void setUp() throws Exception { timeout = Time.seconds(60); } - @After - public void tearDown() throws Exception { + @AfterEach + void tearDown() throws Exception { Review Comment: ```suggestion void tearDown() { ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -188,8 +175,8 @@ public class DefaultSchedulerTest extends TestLogger { private Time timeout; - @Before - public void setUp() throws Exception { + @BeforeEach + void setUp() throws Exception { Review Comment: ```suggestion void setUp() { ``` No exception thrown in this method. ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -248,20 +235,20 @@ public void testCorrectSettingOfInitializationTimestamp() { executionGraphInfo.getArchivedExecutionGraph(); // ensure all statuses are set in the ExecutionGraph - assertThat( - archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING), greaterThan(0L)); - assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED), greaterThan(0L)); - assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.RUNNING), greaterThan(0L)); + assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING)) + .isGreaterThan(0L); + assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED)).isGreaterThan(0L); + assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.RUNNING)).isGreaterThan(0L); // ensure correct order assertThat( - archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING) - <= archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED), - Is.is(true)); + archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING) + <= archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED)) + .isTrue(); Review Comment: ```suggestion assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING)) .isLessThanOrEqualTo(archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED)); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -809,15 +796,15 @@ public void abortPendingCheckpointsWhenRestartingTasks() throws Exception { checkpointCoordinator.triggerCheckpoint(false); checkpointTriggeredLatch.await(); - assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(1))); + assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(1); scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId)); taskRestartExecutor.triggerScheduledTasks(); - assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(0))); + assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(0); Review Comment: ```suggestion assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero(); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -285,17 +272,17 @@ public void deployTasksOnlyWhenAllSlotRequestsAreFulfilled() throws Exception { new ExecutionVertexID(onlyJobVertexId, 3)); schedulingStrategy.schedule(verticesToSchedule); - assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0)); + assertThat(testExecutionOperations.getDeployedVertices()).hasSize(0); testExecutionSlotAllocator.completePendingRequest(verticesToSchedule.get(0)); - assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0)); + assertThat(testExecutionOperations.getDeployedVertices()).hasSize(0); Review Comment: ```suggestion assertThat(testExecutionOperations.getDeployedVertices()).isEmpty(); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -1132,21 +1192,23 @@ public void pendingSlotRequestsOfVerticesToRestartWillNotBeFulfilledByReturnedSl ExecutionState.FAILED, new RuntimeException(exceptionMessage))); - assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(0)); + assertThat(testExecutionSlotAllocator.getPendingRequests().keySet()).hasSize(0); Review Comment: ```suggestion assertThat(testExecutionSlotAllocator.getPendingRequests()).isEmpty(); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -1273,18 +1346,21 @@ public void testExceptionHistoryWithPreDeployFailure() { final Iterable<RootExceptionHistoryEntry> actualExceptionHistory = scheduler.getExceptionHistory(); - assertThat( - actualExceptionHistory, - IsIterableContainingInOrder.contains( - ExceptionHistoryEntryMatcher.matchesFailure( - failureInfo.getException(), - failureInfo.getTimestamp(), - taskFailureExecutionVertex.getTaskNameWithSubtaskIndex(), - taskFailureExecutionVertex.getCurrentAssignedResourceLocation()))); + assertThat(actualExceptionHistory) + .anySatisfy( + e -> + ExceptionHistoryEntryMatcher.matchesFailure( Review Comment: ditto. ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -1601,12 +1681,12 @@ public void testLateRegisteredPartitionsWillBeReleased() { // late registered partitions will not be tracked and will be released shuffleMaster.completeAllPendingRegistrations(); - assertThat(trackedPartitions, hasSize(0)); - assertThat(shuffleMaster.getExternallyReleasedPartitions(), hasSize(1)); + assertThat(trackedPartitions).hasSize(0); Review Comment: ```suggestion assertThat(trackedPartitions).isEmpty(); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -285,17 +272,17 @@ public void deployTasksOnlyWhenAllSlotRequestsAreFulfilled() throws Exception { new ExecutionVertexID(onlyJobVertexId, 3)); schedulingStrategy.schedule(verticesToSchedule); - assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0)); + assertThat(testExecutionOperations.getDeployedVertices()).hasSize(0); Review Comment: ```suggestion assertThat(testExecutionOperations.getDeployedVertices()).isEmpty();; ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -1511,44 +1591,44 @@ public void testDeploymentWaitForProducedPartitionRegistration() { createSchedulerAndStartScheduling(jobGraph); - assertThat(trackedPartitions, hasSize(0)); - assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0)); + assertThat(trackedPartitions).hasSize(0); Review Comment: ```suggestion assertThat(trackedPartitions).isEmpty(); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -141,27 +140,15 @@ import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.getCheckpointCoordinator; import static org.apache.flink.util.ExceptionUtils.findThrowable; import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link DefaultScheduler}. */ public class DefaultSchedulerTest extends TestLogger { Review Comment: ```suggestion class DefaultSchedulerTest { ``` Junit5 does not recommend using public test class. ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java: ########## @@ -127,6 +127,10 @@ public enum TaskAcknowledgeResult { private CheckpointException failureCause; + // Because onCompletionPromise is not required to synchronize with the completion status of + // pendingCheckpoint, this flag is used to identify whether pendingCheckpoint is completed. + private boolean isCompleted = false; Review Comment: I'm not sure if I missed something. I wonder why we need this field? It seems that only the test code uses it. If it is only for testing purpose, I am not inclined to introduce it and `isCompleted()`. ########## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java: ########## @@ -160,14 +148,13 @@ public void testSyncSavepointCannotBeSubsumed() throws Exception { CheckpointProperties forced = CheckpointProperties.forSyncSavepoint(true, false, SavepointFormatType.CANONICAL); PendingCheckpoint pending = createPendingCheckpoint(forced); - assertFalse(pending.canBeSubsumed()); + assertThat(pending.canBeSubsumed()).isFalse(); + ; Review Comment: This semicolon should not be needed. ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -481,7 +470,7 @@ private void testRestartVerticesOnFailuresInScheduling( final ExecutionVertexID vid22 = new ExecutionVertexID(v2.getID(), 1); schedulingStrategy.schedule(Arrays.asList(vid11, vid12, vid21, vid22)); - assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(4)); + assertThat(testExecutionSlotAllocator.getPendingRequests().keySet()).hasSize(4); Review Comment: ```suggestion assertThat(testExecutionSlotAllocator.getPendingRequests()).hasSize(4); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -1090,13 +1151,13 @@ public void allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception .iterator(); v1 = vertexIterator.next(); ArchivedExecutionVertex v2 = vertexIterator.next(); - assertThat(v1.getExecutionState(), is(ExecutionState.FAILED)); - assertThat(v2.getExecutionState(), is(ExecutionState.CANCELED)); - assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(0)); + assertThat(v1.getExecutionState()).isEqualTo(ExecutionState.FAILED); + assertThat(v2.getExecutionState()).isEqualTo(ExecutionState.CANCELED); + assertThat(testExecutionSlotAllocator.getPendingRequests().keySet()).hasSize(0); Review Comment: ```suggestion assertThat(testExecutionSlotAllocator.getPendingRequests()).isEmpty(); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -426,24 +413,26 @@ public void failJobIfNotEnoughResources() throws Exception { .getFailureInfo() .getException() .deserializeError(DefaultSchedulerTest.class.getClassLoader()); - assertTrue(findThrowable(failureCause, NoResourceAvailableException.class).isPresent()); - assertTrue( - findThrowableWithMessage( - failureCause, - "Could not allocate the required slot within slot request timeout.") - .isPresent()); - assertThat(jobStatus, is(equalTo(JobStatus.FAILED))); + assertThat(findThrowable(failureCause, NoResourceAvailableException.class).isPresent()) + .isTrue(); + assertThat( + findThrowableWithMessage( + failureCause, + "Could not allocate the required slot within slot request timeout.") + .isPresent()) + .isTrue(); Review Comment: ```suggestion assertThat(findThrowable(failureCause, NoResourceAvailableException.class)).isPresent(); assertThat( findThrowableWithMessage( failureCause, "Could not allocate the required slot within slot request timeout.")) .isPresent(); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -854,11 +841,85 @@ public void restoreStateWhenRestartingTasks() throws Exception { scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId)); taskRestartExecutor.triggerScheduledTasks(); - assertThat(masterHook.getRestoreCount(), is(equalTo(1))); + assertThat(masterHook.getRestoreCount()).isEqualTo(1); + } + + @Test + void testTriggerCheckpointAndCompletedAfterStore() throws Exception { + final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); + enableCheckpointing(jobGraph); + + final CountDownLatch checkpointTriggeredLatch = getCheckpointTriggeredLatch(); + + final CompletableFuture<JobStatus> counterShutdownFuture = new CompletableFuture<>(); + CheckpointIDCounter counter = Review Comment: Can be removed. ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -854,11 +841,85 @@ public void restoreStateWhenRestartingTasks() throws Exception { scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId)); taskRestartExecutor.triggerScheduledTasks(); - assertThat(masterHook.getRestoreCount(), is(equalTo(1))); + assertThat(masterHook.getRestoreCount()).isEqualTo(1); + } + + @Test + void testTriggerCheckpointAndCompletedAfterStore() throws Exception { Review Comment: I don't quite understand why this test case can guard the correctness of your proposed fix approach. It seems that if the fix is removed, this test case can also be passed. ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -1048,12 +1109,12 @@ public void failureInfoIsSetAfterTaskFailure() { final ErrorInfo failureInfo = scheduler.requestJob().getArchivedExecutionGraph().getFailureInfo(); - assertThat(failureInfo, is(notNullValue())); - assertThat(failureInfo.getExceptionAsString(), containsString(exceptionMessage)); + assertThat(failureInfo).isNotNull(); + assertThat(failureInfo.getExceptionAsString()).containsSubsequence(exceptionMessage); Review Comment: ```suggestion assertThat(failureInfo.getExceptionAsString()).contains(exceptionMessage); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -141,27 +140,15 @@ import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.getCheckpointCoordinator; import static org.apache.flink.util.ExceptionUtils.findThrowable; import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link DefaultScheduler}. */ public class DefaultSchedulerTest extends TestLogger { private static final int TIMEOUT_MS = 1000; - @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + @TempDir private Path TEMPORARY_FOLDER; Review Comment: I have a close look at the code. This variable should have been safely removed now. ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -854,11 +841,85 @@ public void restoreStateWhenRestartingTasks() throws Exception { scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId)); taskRestartExecutor.triggerScheduledTasks(); - assertThat(masterHook.getRestoreCount(), is(equalTo(1))); + assertThat(masterHook.getRestoreCount()).isEqualTo(1); + } + + @Test + void testTriggerCheckpointAndCompletedAfterStore() throws Exception { Review Comment: The behavior we actually need to test should be: 1. Before `cleanupAfterCompletedCheckpoint` is completed, `checkpointFuture` should be in an incomplete state. After the `cleanupAfterCompletedCheckpoint` is completed, the `checkpointFuture` should be in the completed normally state. 2. If an exception occurs when the checkpoint is written to the `CheckpointStateHandleStore`, the `checkpointFuture` should not be completed or completed exceptionlly. ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java: ########## @@ -1373,6 +1373,7 @@ private void cleanupAfterCompletedCheckpoint( completedCheckpoint.getTimestamp(), extractIdIfDiscardedOnSubsumed(lastSubsumed)); } + pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint); Review Comment: If we encounter an exception in `addCompletedCheckpointToStoreAndSubsumeOldest `, what should we do with `pendingCheckpoint. getCompleteFuture()`? ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -1171,19 +1233,23 @@ public void testExceptionHistoryWithGlobalFailOver() { final Iterable<RootExceptionHistoryEntry> actualExceptionHistory = scheduler.getExceptionHistory(); - assertThat(actualExceptionHistory, IsIterableWithSize.iterableWithSize(1)); + assertThat(actualExceptionHistory).hasSize(1); final RootExceptionHistoryEntry failure = actualExceptionHistory.iterator().next(); assertThat( - failure, - ExceptionHistoryEntryMatcher.matchesGlobalFailure( - expectedException, - scheduler.getExecutionGraph().getFailureInfo().getTimestamp())); - assertThat(failure.getConcurrentExceptions(), IsEmptyIterable.emptyIterable()); + ExceptionHistoryEntryMatcher.matchesGlobalFailure( Review Comment: Emm, `ExceptionHistoryEntryMatcher` is a subclass of `hamcrest` matcher. Maybe we should not use it now. ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -1229,20 +1295,28 @@ public void testExceptionHistoryWithRestartableFailure() { scheduler.getExceptionHistory(); // assert restarted attempt + assertThat(actualExceptionHistory).hasSize(2); + Iterator<RootExceptionHistoryEntry> iterator = actualExceptionHistory.iterator(); + RootExceptionHistoryEntry entry0 = iterator.next(); assertThat( - actualExceptionHistory, - IsIterableContainingInOrder.contains( ExceptionHistoryEntryMatcher.matchesFailure( Review Comment: ditto. ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java: ########## @@ -302,6 +306,10 @@ public CompletableFuture<CompletedCheckpoint> getCompletionFuture() { return onCompletionPromise; } + public boolean isCompleted() { Review Comment: ```suggestion @VisibleForTesting public boolean isCompleted() { ``` Marked as `VisibleForTesting` as only test method using it. ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -1701,14 +1781,12 @@ private void commonJobStatusHookTest( waitForTermination(scheduler); final JobStatus jobStatus = scheduler.requestJobStatus(); - org.assertj.core.api.Assertions.assertThat(jobStatus).isEqualTo(expectedJobStatus); - org.assertj.core.api.Assertions.assertThat(onCreatedJobList).hasSize(1); - org.assertj.core.api.Assertions.assertThat(onCreatedJobList.get(0)) - .isEqualTo(jobGraph.getJobID()); + assertThat(jobStatus).isEqualTo(expectedJobStatus); + assertThat(onCreatedJobList).hasSize(1); + assertThat(onCreatedJobList.get(0)).isEqualTo(jobGraph.getJobID()); - org.assertj.core.api.Assertions.assertThat(onJobStatusList).hasSize(1); - org.assertj.core.api.Assertions.assertThat(onJobStatusList.get(0)) - .isEqualTo(jobGraph.getJobID()); + assertThat(onJobStatusList).hasSize(1); + assertThat(onJobStatusList.get(0)).isEqualTo(jobGraph.getJobID()); Review Comment: ditto. ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -1511,44 +1591,44 @@ public void testDeploymentWaitForProducedPartitionRegistration() { createSchedulerAndStartScheduling(jobGraph); - assertThat(trackedPartitions, hasSize(0)); - assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0)); + assertThat(trackedPartitions).hasSize(0); + assertThat(testExecutionOperations.getDeployedVertices()).hasSize(0); shuffleMaster.completeAllPendingRegistrations(); - assertThat(trackedPartitions, hasSize(1)); - assertThat(testExecutionOperations.getDeployedVertices(), hasSize(2)); + assertThat(trackedPartitions).hasSize(1); + assertThat(testExecutionOperations.getDeployedVertices()).hasSize(2); } @Test - public void testFailedProducedPartitionRegistration() { + void testFailedProducedPartitionRegistration() { shuffleMaster.setAutoCompleteRegistration(false); final JobGraph jobGraph = nonParallelSourceSinkJobGraph(); createSchedulerAndStartScheduling(jobGraph); - assertThat(testExecutionOperations.getCanceledVertices(), hasSize(0)); - assertThat(testExecutionOperations.getFailedVertices(), hasSize(0)); + assertThat(testExecutionOperations.getCanceledVertices()).hasSize(0); Review Comment: ```suggestion assertThat(testExecutionOperations.getCanceledVertices()).isEmpty(); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java: ########## @@ -1701,14 +1781,12 @@ private void commonJobStatusHookTest( waitForTermination(scheduler); final JobStatus jobStatus = scheduler.requestJobStatus(); - org.assertj.core.api.Assertions.assertThat(jobStatus).isEqualTo(expectedJobStatus); - org.assertj.core.api.Assertions.assertThat(onCreatedJobList).hasSize(1); - org.assertj.core.api.Assertions.assertThat(onCreatedJobList.get(0)) - .isEqualTo(jobGraph.getJobID()); + assertThat(jobStatus).isEqualTo(expectedJobStatus); + assertThat(onCreatedJobList).hasSize(1); + assertThat(onCreatedJobList.get(0)).isEqualTo(jobGraph.getJobID()); Review Comment: ```suggestion assertThat(onCreatedJobList).singleElement().isEqualTo(jobGraph.getJobID()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org