reswqa commented on code in PR #21943:
URL: https://github.com/apache/flink/pull/21943#discussion_r1107241118


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -141,27 +140,15 @@
 import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.getCheckpointCoordinator;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link DefaultScheduler}. */
 public class DefaultSchedulerTest extends TestLogger {
 
     private static final int TIMEOUT_MS = 1000;
 
-    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+    @TempDir private Path TEMPORARY_FOLDER;

Review Comment:
   ```suggestion
       @TempDir private static Path temporaryFolder;
   ```
   Checkstyle also complained about the variable name her.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -211,8 +198,8 @@ public void setUp() throws Exception {
         timeout = Time.seconds(60);
     }
 
-    @After
-    public void tearDown() throws Exception {
+    @AfterEach
+    void tearDown() throws Exception {

Review Comment:
   ```suggestion
       void tearDown() {
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -188,8 +175,8 @@ public class DefaultSchedulerTest extends TestLogger {
 
     private Time timeout;
 
-    @Before
-    public void setUp() throws Exception {
+    @BeforeEach
+    void setUp() throws Exception {

Review Comment:
   ```suggestion
       void setUp() {
   ```
   No exception thrown in this method.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -248,20 +235,20 @@ public void testCorrectSettingOfInitializationTimestamp() 
{
                 executionGraphInfo.getArchivedExecutionGraph();
 
         // ensure all statuses are set in the ExecutionGraph
-        assertThat(
-                
archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING), 
greaterThan(0L));
-        
assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED), 
greaterThan(0L));
-        
assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.RUNNING), 
greaterThan(0L));
+        
assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING))
+                .isGreaterThan(0L);
+        
assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED)).isGreaterThan(0L);
+        
assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.RUNNING)).isGreaterThan(0L);
 
         // ensure correct order
         assertThat(
-                
archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING)
-                        <= 
archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED),
-                Is.is(true));
+                        
archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING)
+                                <= 
archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED))
+                .isTrue();

Review Comment:
   ```suggestion
       
assertThat(archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING))
           
.isLessThanOrEqualTo(archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED));
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -809,15 +796,15 @@ public void abortPendingCheckpointsWhenRestartingTasks() 
throws Exception {
 
         checkpointCoordinator.triggerCheckpoint(false);
         checkpointTriggeredLatch.await();
-        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), 
is(equalTo(1)));
+        
assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(1);
 
         
scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
         taskRestartExecutor.triggerScheduledTasks();
-        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), 
is(equalTo(0)));
+        
assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(0);

Review Comment:
   ```suggestion
           
assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -285,17 +272,17 @@ public void 
deployTasksOnlyWhenAllSlotRequestsAreFulfilled() throws Exception {
                         new ExecutionVertexID(onlyJobVertexId, 3));
         schedulingStrategy.schedule(verticesToSchedule);
 
-        assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0));
+        assertThat(testExecutionOperations.getDeployedVertices()).hasSize(0);
 
         
testExecutionSlotAllocator.completePendingRequest(verticesToSchedule.get(0));
-        assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0));
+        assertThat(testExecutionOperations.getDeployedVertices()).hasSize(0);

Review Comment:
   ```suggestion
           assertThat(testExecutionOperations.getDeployedVertices()).isEmpty();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1132,21 +1192,23 @@ public void 
pendingSlotRequestsOfVerticesToRestartWillNotBeFulfilledByReturnedSl
                         ExecutionState.FAILED,
                         new RuntimeException(exceptionMessage)));
 
-        assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), 
hasSize(0));
+        
assertThat(testExecutionSlotAllocator.getPendingRequests().keySet()).hasSize(0);

Review Comment:
   ```suggestion
           
assertThat(testExecutionSlotAllocator.getPendingRequests()).isEmpty();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1273,18 +1346,21 @@ public void testExceptionHistoryWithPreDeployFailure() {
 
         final Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
                 scheduler.getExceptionHistory();
-        assertThat(
-                actualExceptionHistory,
-                IsIterableContainingInOrder.contains(
-                        ExceptionHistoryEntryMatcher.matchesFailure(
-                                failureInfo.getException(),
-                                failureInfo.getTimestamp(),
-                                
taskFailureExecutionVertex.getTaskNameWithSubtaskIndex(),
-                                
taskFailureExecutionVertex.getCurrentAssignedResourceLocation())));
+        assertThat(actualExceptionHistory)
+                .anySatisfy(
+                        e ->
+                                ExceptionHistoryEntryMatcher.matchesFailure(

Review Comment:
   ditto.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1601,12 +1681,12 @@ public void 
testLateRegisteredPartitionsWillBeReleased() {
 
         // late registered partitions will not be tracked and will be released
         shuffleMaster.completeAllPendingRegistrations();
-        assertThat(trackedPartitions, hasSize(0));
-        assertThat(shuffleMaster.getExternallyReleasedPartitions(), 
hasSize(1));
+        assertThat(trackedPartitions).hasSize(0);

Review Comment:
   ```suggestion
           assertThat(trackedPartitions).isEmpty();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -285,17 +272,17 @@ public void 
deployTasksOnlyWhenAllSlotRequestsAreFulfilled() throws Exception {
                         new ExecutionVertexID(onlyJobVertexId, 3));
         schedulingStrategy.schedule(verticesToSchedule);
 
-        assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0));
+        assertThat(testExecutionOperations.getDeployedVertices()).hasSize(0);

Review Comment:
   ```suggestion
           assertThat(testExecutionOperations.getDeployedVertices()).isEmpty();;
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1511,44 +1591,44 @@ public void 
testDeploymentWaitForProducedPartitionRegistration() {
 
         createSchedulerAndStartScheduling(jobGraph);
 
-        assertThat(trackedPartitions, hasSize(0));
-        assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0));
+        assertThat(trackedPartitions).hasSize(0);

Review Comment:
   ```suggestion
           assertThat(trackedPartitions).isEmpty();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -141,27 +140,15 @@
 import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.getCheckpointCoordinator;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link DefaultScheduler}. */
 public class DefaultSchedulerTest extends TestLogger {

Review Comment:
   ```suggestion
   class DefaultSchedulerTest {
   ```
   Junit5 does not recommend using public test class.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java:
##########
@@ -127,6 +127,10 @@ public enum TaskAcknowledgeResult {
 
     private CheckpointException failureCause;
 
+    // Because onCompletionPromise is not required to synchronize with the 
completion status of
+    // pendingCheckpoint, this flag is used to identify whether 
pendingCheckpoint is completed.
+    private boolean isCompleted = false;

Review Comment:
   I'm not sure if I missed something.  I wonder why we need this field?  It 
seems that only the test code uses it.
   If it is only for testing purpose, I am not inclined to introduce it and 
`isCompleted()`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java:
##########
@@ -160,14 +148,13 @@ public void testSyncSavepointCannotBeSubsumed() throws 
Exception {
         CheckpointProperties forced =
                 CheckpointProperties.forSyncSavepoint(true, false, 
SavepointFormatType.CANONICAL);
         PendingCheckpoint pending = createPendingCheckpoint(forced);
-        assertFalse(pending.canBeSubsumed());
+        assertThat(pending.canBeSubsumed()).isFalse();
+        ;

Review Comment:
   This semicolon should not be needed.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -481,7 +470,7 @@ private void testRestartVerticesOnFailuresInScheduling(
         final ExecutionVertexID vid22 = new ExecutionVertexID(v2.getID(), 1);
         schedulingStrategy.schedule(Arrays.asList(vid11, vid12, vid21, vid22));
 
-        assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), 
hasSize(4));
+        
assertThat(testExecutionSlotAllocator.getPendingRequests().keySet()).hasSize(4);

Review Comment:
   ```suggestion
           
assertThat(testExecutionSlotAllocator.getPendingRequests()).hasSize(4);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1090,13 +1151,13 @@ public void 
allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception
                         .iterator();
         v1 = vertexIterator.next();
         ArchivedExecutionVertex v2 = vertexIterator.next();
-        assertThat(v1.getExecutionState(), is(ExecutionState.FAILED));
-        assertThat(v2.getExecutionState(), is(ExecutionState.CANCELED));
-        assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), 
hasSize(0));
+        assertThat(v1.getExecutionState()).isEqualTo(ExecutionState.FAILED);
+        assertThat(v2.getExecutionState()).isEqualTo(ExecutionState.CANCELED);
+        
assertThat(testExecutionSlotAllocator.getPendingRequests().keySet()).hasSize(0);

Review Comment:
   ```suggestion
           
assertThat(testExecutionSlotAllocator.getPendingRequests()).isEmpty();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -426,24 +413,26 @@ public void failJobIfNotEnoughResources() throws 
Exception {
                         .getFailureInfo()
                         .getException()
                         
.deserializeError(DefaultSchedulerTest.class.getClassLoader());
-        assertTrue(findThrowable(failureCause, 
NoResourceAvailableException.class).isPresent());
-        assertTrue(
-                findThrowableWithMessage(
-                                failureCause,
-                                "Could not allocate the required slot within 
slot request timeout.")
-                        .isPresent());
-        assertThat(jobStatus, is(equalTo(JobStatus.FAILED)));
+        assertThat(findThrowable(failureCause, 
NoResourceAvailableException.class).isPresent())
+                .isTrue();
+        assertThat(
+                        findThrowableWithMessage(
+                                        failureCause,
+                                        "Could not allocate the required slot 
within slot request timeout.")
+                                .isPresent())
+                .isTrue();

Review Comment:
   ```suggestion
          assertThat(findThrowable(failureCause, 
NoResourceAvailableException.class)).isPresent();
           assertThat(
                           findThrowableWithMessage(
                                   failureCause,
                                   "Could not allocate the required slot within 
slot request timeout."))
                   .isPresent();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -854,11 +841,85 @@ public void restoreStateWhenRestartingTasks() throws 
Exception {
 
         
scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
         taskRestartExecutor.triggerScheduledTasks();
-        assertThat(masterHook.getRestoreCount(), is(equalTo(1)));
+        assertThat(masterHook.getRestoreCount()).isEqualTo(1);
+    }
+
+    @Test
+    void testTriggerCheckpointAndCompletedAfterStore() throws Exception {
+        final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+        enableCheckpointing(jobGraph);
+
+        final CountDownLatch checkpointTriggeredLatch = 
getCheckpointTriggeredLatch();
+
+        final CompletableFuture<JobStatus> counterShutdownFuture = new 
CompletableFuture<>();
+        CheckpointIDCounter counter =

Review Comment:
   Can be removed.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -854,11 +841,85 @@ public void restoreStateWhenRestartingTasks() throws 
Exception {
 
         
scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
         taskRestartExecutor.triggerScheduledTasks();
-        assertThat(masterHook.getRestoreCount(), is(equalTo(1)));
+        assertThat(masterHook.getRestoreCount()).isEqualTo(1);
+    }
+
+    @Test
+    void testTriggerCheckpointAndCompletedAfterStore() throws Exception {

Review Comment:
   I don't quite understand why this test case can guard the correctness of 
your proposed fix approach. It seems that if the fix is removed, this test case 
can also be passed.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1048,12 +1109,12 @@ public void failureInfoIsSetAfterTaskFailure() {
 
         final ErrorInfo failureInfo =
                 
scheduler.requestJob().getArchivedExecutionGraph().getFailureInfo();
-        assertThat(failureInfo, is(notNullValue()));
-        assertThat(failureInfo.getExceptionAsString(), 
containsString(exceptionMessage));
+        assertThat(failureInfo).isNotNull();
+        
assertThat(failureInfo.getExceptionAsString()).containsSubsequence(exceptionMessage);

Review Comment:
   ```suggestion
           
assertThat(failureInfo.getExceptionAsString()).contains(exceptionMessage);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -141,27 +140,15 @@
 import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.getCheckpointCoordinator;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link DefaultScheduler}. */
 public class DefaultSchedulerTest extends TestLogger {
 
     private static final int TIMEOUT_MS = 1000;
 
-    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+    @TempDir private Path TEMPORARY_FOLDER;

Review Comment:
   I have a close look at the code. This variable should have been safely 
removed now.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -854,11 +841,85 @@ public void restoreStateWhenRestartingTasks() throws 
Exception {
 
         
scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
         taskRestartExecutor.triggerScheduledTasks();
-        assertThat(masterHook.getRestoreCount(), is(equalTo(1)));
+        assertThat(masterHook.getRestoreCount()).isEqualTo(1);
+    }
+
+    @Test
+    void testTriggerCheckpointAndCompletedAfterStore() throws Exception {

Review Comment:
   The behavior we actually need to test should be:
   1. Before `cleanupAfterCompletedCheckpoint` is completed, `checkpointFuture` 
should be in an incomplete state. After the `cleanupAfterCompletedCheckpoint` 
is completed, the `checkpointFuture` should be in the completed normally state.
   2. If an exception occurs when the checkpoint is written to the 
`CheckpointStateHandleStore`, the `checkpointFuture` should not be completed or 
completed exceptionlly.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -1373,6 +1373,7 @@ private void cleanupAfterCompletedCheckpoint(
                     completedCheckpoint.getTimestamp(),
                     extractIdIfDiscardedOnSubsumed(lastSubsumed));
         }
+        pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint);

Review Comment:
   If we encounter an exception in 
`addCompletedCheckpointToStoreAndSubsumeOldest `, what should we do with 
`pendingCheckpoint. getCompleteFuture()`?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1171,19 +1233,23 @@ public void testExceptionHistoryWithGlobalFailOver() {
         final Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
                 scheduler.getExceptionHistory();
 
-        assertThat(actualExceptionHistory, 
IsIterableWithSize.iterableWithSize(1));
+        assertThat(actualExceptionHistory).hasSize(1);
 
         final RootExceptionHistoryEntry failure = 
actualExceptionHistory.iterator().next();
         assertThat(
-                failure,
-                ExceptionHistoryEntryMatcher.matchesGlobalFailure(
-                        expectedException,
-                        
scheduler.getExecutionGraph().getFailureInfo().getTimestamp()));
-        assertThat(failure.getConcurrentExceptions(), 
IsEmptyIterable.emptyIterable());
+                        ExceptionHistoryEntryMatcher.matchesGlobalFailure(

Review Comment:
   Emm, `ExceptionHistoryEntryMatcher` is a subclass of `hamcrest` matcher. 
Maybe we should not use it now.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1229,20 +1295,28 @@ public void 
testExceptionHistoryWithRestartableFailure() {
                 scheduler.getExceptionHistory();
 
         // assert restarted attempt
+        assertThat(actualExceptionHistory).hasSize(2);
+        Iterator<RootExceptionHistoryEntry> iterator = 
actualExceptionHistory.iterator();
+        RootExceptionHistoryEntry entry0 = iterator.next();
         assertThat(
-                actualExceptionHistory,
-                IsIterableContainingInOrder.contains(
                         ExceptionHistoryEntryMatcher.matchesFailure(

Review Comment:
   ditto.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java:
##########
@@ -302,6 +306,10 @@ public CompletableFuture<CompletedCheckpoint> 
getCompletionFuture() {
         return onCompletionPromise;
     }
 
+    public boolean isCompleted() {

Review Comment:
   ```suggestion
       @VisibleForTesting
       public boolean isCompleted() {
   ```
   Marked as `VisibleForTesting` as only test method using it.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1701,14 +1781,12 @@ private void commonJobStatusHookTest(
 
         waitForTermination(scheduler);
         final JobStatus jobStatus = scheduler.requestJobStatus();
-        
org.assertj.core.api.Assertions.assertThat(jobStatus).isEqualTo(expectedJobStatus);
-        
org.assertj.core.api.Assertions.assertThat(onCreatedJobList).hasSize(1);
-        org.assertj.core.api.Assertions.assertThat(onCreatedJobList.get(0))
-                .isEqualTo(jobGraph.getJobID());
+        assertThat(jobStatus).isEqualTo(expectedJobStatus);
+        assertThat(onCreatedJobList).hasSize(1);
+        assertThat(onCreatedJobList.get(0)).isEqualTo(jobGraph.getJobID());
 
-        org.assertj.core.api.Assertions.assertThat(onJobStatusList).hasSize(1);
-        org.assertj.core.api.Assertions.assertThat(onJobStatusList.get(0))
-                .isEqualTo(jobGraph.getJobID());
+        assertThat(onJobStatusList).hasSize(1);
+        assertThat(onJobStatusList.get(0)).isEqualTo(jobGraph.getJobID());

Review Comment:
   ditto.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1511,44 +1591,44 @@ public void 
testDeploymentWaitForProducedPartitionRegistration() {
 
         createSchedulerAndStartScheduling(jobGraph);
 
-        assertThat(trackedPartitions, hasSize(0));
-        assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0));
+        assertThat(trackedPartitions).hasSize(0);
+        assertThat(testExecutionOperations.getDeployedVertices()).hasSize(0);
 
         shuffleMaster.completeAllPendingRegistrations();
-        assertThat(trackedPartitions, hasSize(1));
-        assertThat(testExecutionOperations.getDeployedVertices(), hasSize(2));
+        assertThat(trackedPartitions).hasSize(1);
+        assertThat(testExecutionOperations.getDeployedVertices()).hasSize(2);
     }
 
     @Test
-    public void testFailedProducedPartitionRegistration() {
+    void testFailedProducedPartitionRegistration() {
         shuffleMaster.setAutoCompleteRegistration(false);
 
         final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
 
         createSchedulerAndStartScheduling(jobGraph);
 
-        assertThat(testExecutionOperations.getCanceledVertices(), hasSize(0));
-        assertThat(testExecutionOperations.getFailedVertices(), hasSize(0));
+        assertThat(testExecutionOperations.getCanceledVertices()).hasSize(0);

Review Comment:
   ```suggestion
           assertThat(testExecutionOperations.getCanceledVertices()).isEmpty();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1701,14 +1781,12 @@ private void commonJobStatusHookTest(
 
         waitForTermination(scheduler);
         final JobStatus jobStatus = scheduler.requestJobStatus();
-        
org.assertj.core.api.Assertions.assertThat(jobStatus).isEqualTo(expectedJobStatus);
-        
org.assertj.core.api.Assertions.assertThat(onCreatedJobList).hasSize(1);
-        org.assertj.core.api.Assertions.assertThat(onCreatedJobList.get(0))
-                .isEqualTo(jobGraph.getJobID());
+        assertThat(jobStatus).isEqualTo(expectedJobStatus);
+        assertThat(onCreatedJobList).hasSize(1);
+        assertThat(onCreatedJobList.get(0)).isEqualTo(jobGraph.getJobID());

Review Comment:
   ```suggestion
           
assertThat(onCreatedJobList).singleElement().isEqualTo(jobGraph.getJobID());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to