This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4dd0912bb8a809b533fe15665899dc1638253e4d
Author: 1996fanrui <[email protected]>
AuthorDate: Mon Aug 14 22:10:50 2023 +0800

    [hotfix][JUnit5 Migration] Migrate some state tests of adaptive scheduler 
to junit5
---
 .../runtime/scheduler/adaptive/CreatedTest.java    | 115 +++---
 .../adaptive/CreatingExecutionGraphTest.java       | 316 +++++++--------
 .../runtime/scheduler/adaptive/StateTest.java      |  96 ++---
 .../adaptive/WaitingForResourcesTest.java          | 430 ++++++++++-----------
 4 files changed, 474 insertions(+), 483 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java
index ee1ee2b7c1d..302fb471c3e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java
@@ -23,82 +23,97 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.failure.FailureEnricherUtils;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.FlinkException;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
 import java.util.function.Consumer;
 
-import static 
org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link Created} state. */
-public class CreatedTest extends TestLogger {
+class CreatedTest {
 
-    @Test
-    public void testCancel() throws Exception {
-        try (MockCreatedContext ctx = new MockCreatedContext()) {
-            Created created = new Created(ctx, log);
+    private static final Logger LOG = 
LoggerFactory.getLogger(CreatedTest.class);
 
-            ctx.setExpectFinished(assertNonNull());
+    @RegisterExtension MockCreatedContext ctx = new MockCreatedContext();
 
-            created.cancel();
-        }
+    @Test
+    void testCancel() {
+        Created created = new Created(ctx, LOG);
+
+        ctx.setExpectFinished(
+                archivedExecutionGraph -> {
+                    
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.CANCELED);
+                    
assertThat(archivedExecutionGraph.getFailureInfo()).isNull();
+                });
+        created.cancel();
     }
 
     @Test
-    public void testStartScheduling() throws Exception {
-        try (MockCreatedContext ctx = new MockCreatedContext()) {
-            Created created = new Created(ctx, log);
+    void testStartScheduling() {
+        Created created = new Created(ctx, LOG);
 
-            ctx.setExpectWaitingForResources();
+        ctx.setExpectWaitingForResources();
 
-            created.startScheduling();
-        }
+        created.startScheduling();
     }
 
     @Test
-    public void testSuspend() throws Exception {
-        try (MockCreatedContext ctx = new MockCreatedContext()) {
-            Created created = new Created(ctx, log);
-
-            ctx.setExpectFinished(
-                    archivedExecutionGraph -> {
-                        assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.SUSPENDED));
-                    });
-
-            created.suspend(new RuntimeException("Suspend"));
-        }
+    void testSuspend() {
+        FlinkException expectedException = new FlinkException("This is a test 
exception");
+        Created created = new Created(ctx, LOG);
+
+        ctx.setExpectFinished(
+                archivedExecutionGraph -> {
+                    
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.SUSPENDED);
+                    
assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull();
+                    assertThat(
+                                    archivedExecutionGraph
+                                            .getFailureInfo()
+                                            .getException()
+                                            
.deserializeError(this.getClass().getClassLoader()))
+                            .isEqualTo(expectedException);
+                });
+
+        created.suspend(expectedException);
     }
 
     @Test
-    public void testFailure() throws Exception {
-        try (MockCreatedContext ctx = new MockCreatedContext()) {
-            Created created = new Created(ctx, log);
-
-            ctx.setExpectFinished(
-                    archivedExecutionGraph -> {
-                        assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.FAILED));
-                    });
-
-            created.handleGlobalFailure(
-                    new RuntimeException("Global"), 
FailureEnricherUtils.EMPTY_FAILURE_LABELS);
-        }
+    void testFailure() {
+        Created created = new Created(ctx, LOG);
+        RuntimeException expectedException = new RuntimeException("This is a 
test exception");
+
+        ctx.setExpectFinished(
+                archivedExecutionGraph -> {
+                    
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED);
+                    
assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull();
+                    assertThat(
+                                    archivedExecutionGraph
+                                            .getFailureInfo()
+                                            .getException()
+                                            
.deserializeError(this.getClass().getClassLoader()))
+                            .isEqualTo(expectedException);
+                });
+
+        created.handleGlobalFailure(expectedException, 
FailureEnricherUtils.EMPTY_FAILURE_LABELS);
     }
 
     @Test
-    public void testJobInformation() throws Exception {
-        try (MockCreatedContext ctx = new MockCreatedContext()) {
-            Created created = new Created(ctx, log);
-            ArchivedExecutionGraph job = created.getJob();
-            assertThat(job.getState(), is(JobStatus.INITIALIZING));
-        }
+    void testJobInformation() {
+        Created created = new Created(ctx, LOG);
+        ArchivedExecutionGraph job = created.getJob();
+        assertThat(job.getState()).isEqualTo(JobStatus.INITIALIZING);
     }
 
-    static class MockCreatedContext implements Created.Context, AutoCloseable {
+    static class MockCreatedContext implements Created.Context, 
AfterEachCallback {
         private final StateValidator<ArchivedExecutionGraph> 
finishedStateValidator =
                 new StateValidator<>("finished");
         private final StateValidator<Void> waitingForResourcesStateValidator =
@@ -130,7 +145,7 @@ public class CreatedTest extends TestLogger {
         }
 
         @Override
-        public void close() throws Exception {
+        public void afterEach(ExtensionContext extensionContext) throws 
Exception {
             finishedStateValidator.close();
             waitingForResourcesStateValidator.close();
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
index 2375a194206..833835b7d22 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
@@ -33,12 +33,14 @@ import 
org.apache.flink.runtime.scheduler.GlobalFailureHandler;
 import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.TestLogger;
-import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.concurrent.Executors;
 
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -54,167 +56,171 @@ import java.util.function.Function;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link CreatingExecutionGraph} state. */
-@ExtendWith(TestLoggerExtension.class)
-public class CreatingExecutionGraphTest extends TestLogger {
+class CreatingExecutionGraphTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CreatingExecutionGraphTest.class);
+
+    @RegisterExtension
+    MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext();
 
     @Test
-    public void testCancelTransitionsToFinished() throws Exception {
-        try (MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext()) {
-            final CreatingExecutionGraph creatingExecutionGraph =
-                    new CreatingExecutionGraph(
-                            context,
-                            new CompletableFuture<>(),
-                            log,
-                            
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler,
-                            null);
-
-            context.setExpectFinished(
-                    archivedExecutionGraph ->
-                            assertThat(archivedExecutionGraph.getState())
-                                    .isEqualTo(JobStatus.CANCELED));
-
-            creatingExecutionGraph.cancel();
-        }
+    void testCancelTransitionsToFinished() {
+        final CreatingExecutionGraph creatingExecutionGraph =
+                new CreatingExecutionGraph(
+                        context,
+                        new CompletableFuture<>(),
+                        LOG,
+                        
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler,
+                        null);
+
+        context.setExpectFinished(
+                archivedExecutionGraph -> {
+                    
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.CANCELED);
+                    
assertThat(archivedExecutionGraph.getFailureInfo()).isNull();
+                });
+        creatingExecutionGraph.cancel();
     }
 
     @Test
-    public void testSuspendTransitionsToFinished() throws Exception {
-        try (MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext()) {
-            final CreatingExecutionGraph creatingExecutionGraph =
-                    new CreatingExecutionGraph(
-                            context,
-                            new CompletableFuture<>(),
-                            log,
-                            
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler,
-                            null);
-
-            context.setExpectFinished(
-                    archivedExecutionGraph ->
-                            assertThat(archivedExecutionGraph.getState())
-                                    .isEqualTo(JobStatus.SUSPENDED));
-
-            creatingExecutionGraph.suspend(new FlinkException("Job has been 
suspended."));
-        }
+    void testSuspendTransitionsToFinished() {
+        final CreatingExecutionGraph creatingExecutionGraph =
+                new CreatingExecutionGraph(
+                        context,
+                        new CompletableFuture<>(),
+                        LOG,
+                        
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler,
+                        null);
+
+        FlinkException expectedException = new FlinkException("This is a test 
exception");
+
+        context.setExpectFinished(
+                archivedExecutionGraph -> {
+                    
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.SUSPENDED);
+                    
assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull();
+                    assertThat(
+                                    archivedExecutionGraph
+                                            .getFailureInfo()
+                                            .getException()
+                                            
.deserializeError(this.getClass().getClassLoader()))
+                            .isEqualTo(expectedException);
+                });
+
+        creatingExecutionGraph.suspend(expectedException);
     }
 
     @Test
-    public void testGlobalFailureTransitionsToFinished() throws Exception {
-        try (MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext()) {
-            final CreatingExecutionGraph creatingExecutionGraph =
-                    new CreatingExecutionGraph(
-                            context,
-                            new CompletableFuture<>(),
-                            log,
-                            
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler,
-                            null);
-
-            context.setExpectFinished(
-                    archivedExecutionGraph ->
-                            assertThat(archivedExecutionGraph.getState())
-                                    .isEqualTo(JobStatus.FAILED));
-
-            creatingExecutionGraph.handleGlobalFailure(
-                    new FlinkException("Test exception"),
-                    FailureEnricherUtils.EMPTY_FAILURE_LABELS);
-        }
+    void testGlobalFailureTransitionsToFinished() {
+        final CreatingExecutionGraph creatingExecutionGraph =
+                new CreatingExecutionGraph(
+                        context,
+                        new CompletableFuture<>(),
+                        LOG,
+                        
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler,
+                        null);
+
+        RuntimeException expectedException = new RuntimeException("This is a 
test exception");
+
+        context.setExpectFinished(
+                archivedExecutionGraph -> {
+                    
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED);
+                    
assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull();
+                    assertThat(
+                                    archivedExecutionGraph
+                                            .getFailureInfo()
+                                            .getException()
+                                            
.deserializeError(this.getClass().getClassLoader()))
+                            .isEqualTo(expectedException);
+                });
+
+        creatingExecutionGraph.handleGlobalFailure(
+                expectedException, FailureEnricherUtils.EMPTY_FAILURE_LABELS);
     }
 
     @Test
-    public void testFailedExecutionGraphCreationTransitionsToFinished() throws 
Exception {
-        try (MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext()) {
-            final 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
-                    executionGraphWithVertexParallelismFuture = new 
CompletableFuture<>();
-            new CreatingExecutionGraph(
-                    context,
-                    executionGraphWithVertexParallelismFuture,
-                    log,
-                    
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler,
-                    null);
-
-            context.setExpectFinished(
-                    archivedExecutionGraph ->
-                            assertThat(archivedExecutionGraph.getState())
-                                    .isEqualTo(JobStatus.FAILED));
-
-            executionGraphWithVertexParallelismFuture.completeExceptionally(
-                    new FlinkException("Test exception"));
-        }
+    void testFailedExecutionGraphCreationTransitionsToFinished() {
+        final 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+                executionGraphWithVertexParallelismFuture = new 
CompletableFuture<>();
+        new CreatingExecutionGraph(
+                context,
+                executionGraphWithVertexParallelismFuture,
+                LOG,
+                
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler,
+                null);
+
+        context.setExpectFinished(
+                archivedExecutionGraph ->
+                        
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED));
+
+        executionGraphWithVertexParallelismFuture.completeExceptionally(
+                new FlinkException("Test exception"));
     }
 
     @Test
-    public void 
testNotPossibleSlotAssignmentTransitionsToWaitingForResources() throws 
Exception {
-        try (MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext()) {
-            final 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
-                    executionGraphWithVertexParallelismFuture = new 
CompletableFuture<>();
-            new CreatingExecutionGraph(
-                    context,
-                    executionGraphWithVertexParallelismFuture,
-                    log,
-                    
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler,
-                    null);
-
-            context.setTryToAssignSlotsFunction(
-                    ignored -> 
CreatingExecutionGraph.AssignmentResult.notPossible());
-            context.setExpectWaitingForResources();
-
-            executionGraphWithVertexParallelismFuture.complete(
-                    getGraph(new StateTrackingMockExecutionGraph()));
-        }
+    void testNotPossibleSlotAssignmentTransitionsToWaitingForResources() {
+        final 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+                executionGraphWithVertexParallelismFuture = new 
CompletableFuture<>();
+        new CreatingExecutionGraph(
+                context,
+                executionGraphWithVertexParallelismFuture,
+                LOG,
+                
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler,
+                null);
+
+        context.setTryToAssignSlotsFunction(
+                ignored -> 
CreatingExecutionGraph.AssignmentResult.notPossible());
+        context.setExpectWaitingForResources();
+
+        executionGraphWithVertexParallelismFuture.complete(
+                getGraph(new StateTrackingMockExecutionGraph()));
     }
 
     @Test
-    public void testSuccessfulSlotAssignmentTransitionsToExecuting() throws 
Exception {
-        try (MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext()) {
-            final 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
-                    executionGraphWithVertexParallelismFuture = new 
CompletableFuture<>();
-            new CreatingExecutionGraph(
-                    context,
-                    executionGraphWithVertexParallelismFuture,
-                    log,
-                    
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler,
-                    null);
-
-            final StateTrackingMockExecutionGraph executionGraph =
-                    new StateTrackingMockExecutionGraph();
-
-            
context.setTryToAssignSlotsFunction(CreatingExecutionGraphTest::successfulAssignment);
-            context.setExpectedExecuting(
-                    actualExecutionGraph ->
-                            
assertThat(actualExecutionGraph).isEqualTo(executionGraph));
-
-            
executionGraphWithVertexParallelismFuture.complete(getGraph(executionGraph));
-        }
+    void testSuccessfulSlotAssignmentTransitionsToExecuting() {
+        final 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+                executionGraphWithVertexParallelismFuture = new 
CompletableFuture<>();
+        new CreatingExecutionGraph(
+                context,
+                executionGraphWithVertexParallelismFuture,
+                LOG,
+                
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler,
+                null);
+
+        final StateTrackingMockExecutionGraph executionGraph =
+                new StateTrackingMockExecutionGraph();
+
+        
context.setTryToAssignSlotsFunction(CreatingExecutionGraphTest::successfulAssignment);
+        context.setExpectedExecuting(
+                actualExecutionGraph -> 
assertThat(actualExecutionGraph).isEqualTo(executionGraph));
+
+        
executionGraphWithVertexParallelismFuture.complete(getGraph(executionGraph));
     }
 
     @Test
-    public void testOperatorCoordinatorUsesFailureHandlerOfTheCurrentState() 
throws Exception {
-        try (MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext()) {
-            final 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
-                    executionGraphWithVertexParallelismFuture = new 
CompletableFuture<>();
-            final AtomicReference<GlobalFailureHandler> 
operatorCoordinatorGlobalFailureHandlerRef =
-                    new AtomicReference<>();
-            new CreatingExecutionGraph(
-                    context,
-                    executionGraphWithVertexParallelismFuture,
-                    log,
-                    (executionGraph, errorHandler) -> {
-                        
operatorCoordinatorGlobalFailureHandlerRef.set(errorHandler);
-                        return new TestingOperatorCoordinatorHandler();
-                    },
-                    null);
-
-            final StateTrackingMockExecutionGraph executionGraph =
-                    new StateTrackingMockExecutionGraph();
-
-            
context.setTryToAssignSlotsFunction(CreatingExecutionGraphTest::successfulAssignment);
-            context.setExpectedExecuting(
-                    actualExecutionGraph ->
-                            
assertThat(actualExecutionGraph).isEqualTo(executionGraph));
-
-            
executionGraphWithVertexParallelismFuture.complete(getGraph(executionGraph));
-
-            
assertThat(operatorCoordinatorGlobalFailureHandlerRef.get()).isSameAs(context);
-        }
+    void testOperatorCoordinatorUsesFailureHandlerOfTheCurrentState() {
+        final 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
+                executionGraphWithVertexParallelismFuture = new 
CompletableFuture<>();
+        final AtomicReference<GlobalFailureHandler> 
operatorCoordinatorGlobalFailureHandlerRef =
+                new AtomicReference<>();
+        new CreatingExecutionGraph(
+                context,
+                executionGraphWithVertexParallelismFuture,
+                LOG,
+                (executionGraph, errorHandler) -> {
+                    
operatorCoordinatorGlobalFailureHandlerRef.set(errorHandler);
+                    return new TestingOperatorCoordinatorHandler();
+                },
+                null);
+
+        final StateTrackingMockExecutionGraph executionGraph =
+                new StateTrackingMockExecutionGraph();
+
+        
context.setTryToAssignSlotsFunction(CreatingExecutionGraphTest::successfulAssignment);
+        context.setExpectedExecuting(
+                actualExecutionGraph -> 
assertThat(actualExecutionGraph).isEqualTo(executionGraph));
+
+        
executionGraphWithVertexParallelismFuture.complete(getGraph(executionGraph));
+
+        
assertThat(operatorCoordinatorGlobalFailureHandlerRef.get()).isSameAs(context);
     }
 
     private static CreatingExecutionGraph.AssignmentResult 
successfulAssignment(
@@ -230,7 +236,7 @@ public class CreatingExecutionGraphTest extends TestLogger {
     }
 
     static class MockCreatingExecutionGraphContext
-            implements CreatingExecutionGraph.Context, AutoCloseable {
+            implements CreatingExecutionGraph.Context, AfterEachCallback {
         private final StateValidator<ArchivedExecutionGraph> 
finishedStateValidator =
                 new StateValidator<>("Finished");
         private final StateValidator<Void> waitingForResourcesStateValidator =
@@ -249,7 +255,7 @@ public class CreatingExecutionGraphTest extends TestLogger {
                     // No-op.
                 };
 
-        private boolean hadStateTransitionHappened = false;
+        private boolean hasStateTransition = false;
 
         public void setExpectFinished(Consumer<ArchivedExecutionGraph> 
asserter) {
             finishedStateValidator.expectInput(asserter);
@@ -278,7 +284,7 @@ public class CreatingExecutionGraphTest extends TestLogger {
         @Override
         public void goToFinished(ArchivedExecutionGraph 
archivedExecutionGraph) {
             finishedStateValidator.validateInput(archivedExecutionGraph);
-            hadStateTransitionHappened = true;
+            registerStateTransition();
         }
 
         @Override
@@ -288,7 +294,7 @@ public class CreatingExecutionGraphTest extends TestLogger {
                 OperatorCoordinatorHandler operatorCoordinatorHandler,
                 List<ExceptionHistoryEntry> failureCollection) {
             executingStateValidator.validateInput(executionGraph);
-            hadStateTransitionHappened = true;
+            registerStateTransition();
         }
 
         @Override
@@ -300,7 +306,7 @@ public class CreatingExecutionGraphTest extends TestLogger {
 
         @Override
         public ScheduledFuture<?> runIfState(State expectedState, Runnable 
action, Duration delay) {
-            if (!hadStateTransitionHappened) {
+            if (!hasStateTransition()) {
                 action.run();
             }
 
@@ -322,7 +328,7 @@ public class CreatingExecutionGraphTest extends TestLogger {
         @Override
         public void goToWaitingForResources(@Nullable ExecutionGraph 
previousExecutionGraph) {
             waitingForResourcesStateValidator.validateInput(null);
-            hadStateTransitionHappened = true;
+            hasStateTransition = true;
         }
 
         @Override
@@ -341,11 +347,19 @@ public class CreatingExecutionGraphTest extends 
TestLogger {
         }
 
         @Override
-        public void close() throws Exception {
+        public void afterEach(ExtensionContext extensionContext) throws 
Exception {
             finishedStateValidator.close();
             waitingForResourcesStateValidator.close();
             executingStateValidator.close();
         }
+
+        public boolean hasStateTransition() {
+            return hasStateTransition;
+        }
+
+        public void registerStateTransition() {
+            hasStateTransition = true;
+        }
     }
 
     private static CreatingExecutionGraph.ExecutionGraphWithVertexParallelism 
getGraph(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTest.java
index fc3b0f6731c..568ed8a2c6c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTest.java
@@ -18,85 +18,71 @@
 
 package org.apache.flink.runtime.scheduler.adaptive;
 
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /**
  * Tests for the default methods on the {@link State} interface, based on the 
{@link Created} state,
  * as it is a simple state.
  */
-public class StateTest extends TestLogger {
+class StateTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StateTest.class);
+
+    @RegisterExtension CreatedTest.MockCreatedContext ctx = new 
CreatedTest.MockCreatedContext();
+
     @Test
-    public void testEmptyAs() throws Exception {
-        try (CreatedTest.MockCreatedContext ctx = new 
CreatedTest.MockCreatedContext()) {
-            State state = new Created(ctx, log);
-            assertThat(state.as(WaitingForResources.class), 
is(Optional.empty()));
-        }
+    void testEmptyAs() {
+        State state = new Created(ctx, LOG);
+        assertThat(state.as(WaitingForResources.class)).isEmpty();
     }
 
     @Test
-    public void testCast() throws Exception {
-        try (CreatedTest.MockCreatedContext ctx = new 
CreatedTest.MockCreatedContext()) {
-            State state = new Created(ctx, log);
-            assertThat(state.as(Created.class), is(Optional.of(state)));
-        }
+    void testCast() {
+        Created state = new Created(ctx, LOG);
+        assertThat(state.as(Created.class)).hasValue(state);
     }
 
     @Test
-    public void testTryRunStateMismatch() throws Exception {
-        try (CreatedTest.MockCreatedContext ctx = new 
CreatedTest.MockCreatedContext()) {
-            State state = new Created(ctx, log);
-            state.tryRun(
-                    WaitingForResources.class, (waiting -> fail("Unexpected 
execution")), "test");
-        }
+    void testTryRunStateMismatch() {
+        State state = new Created(ctx, LOG);
+        state.tryRun(WaitingForResources.class, waiting -> fail("Unexpected 
execution"), "test");
     }
 
     @Test
-    public void testTryRun() throws Exception {
-        try (CreatedTest.MockCreatedContext ctx = new 
CreatedTest.MockCreatedContext()) {
-            State state = new Created(ctx, log);
-            AtomicBoolean called = new AtomicBoolean(false);
-            state.tryRun(Created.class, created -> called.set(true), "test");
-            assertThat(called.get(), is(true));
-        }
+    void testTryRun() {
+        State state = new Created(ctx, LOG);
+        AtomicBoolean called = new AtomicBoolean(false);
+        state.tryRun(Created.class, created -> called.set(true), "test");
+        assertThat(called).isTrue();
     }
 
     @Test
-    public void testTryCallStateMismatch() throws Exception {
-        try (CreatedTest.MockCreatedContext ctx = new 
CreatedTest.MockCreatedContext()) {
-            State state = new Created(ctx, log);
-            Optional<String> result =
-                    state.tryCall(
-                            WaitingForResources.class,
-                            Waiting -> {
-                                fail("Unexpected execution");
-                                return "nope";
-                            },
-                            "test");
-            assertThat(result, is(Optional.empty()));
-        }
+    void testTryCallStateMismatch() {
+        State state = new Created(ctx, LOG);
+        Optional<String> result =
+                state.tryCall(
+                        WaitingForResources.class,
+                        Waiting -> {
+                            fail("Unexpected execution");
+                            return "nope";
+                        },
+                        "test");
+        assertThat(result).isEmpty();
     }
 
     @Test
-    public void testTryCall() throws Exception {
-        try (CreatedTest.MockCreatedContext ctx = new 
CreatedTest.MockCreatedContext()) {
-            State state = new Created(ctx, log);
-            Optional<String> result =
-                    state.tryCall(
-                            Created.class,
-                            created -> {
-                                return "yes";
-                            },
-                            "test");
-            assertThat(result, is(Optional.of("yes")));
-        }
+    void testTryCall() {
+        State state = new Created(ctx, LOG);
+        Optional<String> result = state.tryCall(Created.class, created -> 
"yes", "test");
+        assertThat(result).hasValue("yes");
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
index 2f60b4aeac5..bdcc73f5c9d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
@@ -25,11 +25,14 @@ import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.failure.FailureEnricherUtils;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.clock.Clock;
 import org.apache.flink.util.clock.ManualClock;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,256 +48,234 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /** Tests for the WaitingForResources state. */
-public class WaitingForResourcesTest extends TestLogger {
+class WaitingForResourcesTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(WaitingForResourcesTest.class);
+
     private static final Duration STABILIZATION_TIMEOUT = 
Duration.ofSeconds(1);
 
+    @RegisterExtension MockContext ctx = new MockContext();
+
     /** WaitingForResources is transitioning to Executing if there are enough 
resources. */
     @Test
-    public void testTransitionToCreatingExecutionGraph() throws Exception {
-        try (MockContext ctx = new MockContext()) {
-            ctx.setHasDesiredResources(() -> true);
+    void testTransitionToCreatingExecutionGraph() {
+        ctx.setHasDesiredResources(() -> true);
 
-            ctx.setExpectCreatingExecutionGraph();
+        ctx.setExpectCreatingExecutionGraph();
 
-            new WaitingForResources(ctx, log, Duration.ZERO, 
STABILIZATION_TIMEOUT);
+        new WaitingForResources(ctx, LOG, Duration.ZERO, 
STABILIZATION_TIMEOUT);
 
-            ctx.runScheduledTasks();
-        }
+        ctx.runScheduledTasks();
     }
 
     @Test
-    public void testNotEnoughResources() throws Exception {
-        try (MockContext ctx = new MockContext()) {
-            ctx.setHasDesiredResources(() -> false);
-            WaitingForResources wfr =
-                    new WaitingForResources(ctx, log, Duration.ZERO, 
STABILIZATION_TIMEOUT);
-
-            // we expect no state transition.
-            wfr.onNewResourcesAvailable();
-        }
+    void testNotEnoughResources() {
+        ctx.setHasDesiredResources(() -> false);
+        WaitingForResources wfr =
+                new WaitingForResources(ctx, LOG, Duration.ZERO, 
STABILIZATION_TIMEOUT);
+
+        // we expect no state transition.
+        wfr.onNewResourcesAvailable();
     }
 
     @Test
-    public void testNotifyNewResourcesAvailable() throws Exception {
-        try (MockContext ctx = new MockContext()) {
-            ctx.setHasDesiredResources(() -> false); // initially, not enough 
resources
-            WaitingForResources wfr =
-                    new WaitingForResources(ctx, log, Duration.ZERO, 
STABILIZATION_TIMEOUT);
-            ctx.setHasDesiredResources(() -> true); // make resources available
-            ctx.setExpectCreatingExecutionGraph();
-            wfr.onNewResourcesAvailable(); // .. and notify
-        }
+    void testNotifyNewResourcesAvailable() {
+        ctx.setHasDesiredResources(() -> false); // initially, not enough 
resources
+        WaitingForResources wfr =
+                new WaitingForResources(ctx, LOG, Duration.ZERO, 
STABILIZATION_TIMEOUT);
+        ctx.setHasDesiredResources(() -> true); // make resources available
+        ctx.setExpectCreatingExecutionGraph();
+        wfr.onNewResourcesAvailable(); // .. and notify
     }
 
     @Test
-    public void 
testSchedulingWithSufficientResourcesAndNoStabilizationTimeout() throws 
Exception {
-        try (MockContext ctx = new MockContext()) {
-
-            Duration noStabilizationTimeout = Duration.ofMillis(0);
-            WaitingForResources wfr =
-                    new WaitingForResources(
-                            ctx, log, Duration.ofSeconds(1000), 
noStabilizationTimeout);
-
-            ctx.setHasDesiredResources(() -> false);
-            ctx.setHasSufficientResources(() -> true);
-            ctx.setExpectCreatingExecutionGraph();
-            wfr.onNewResourcesAvailable();
-        }
+    void testSchedulingWithSufficientResourcesAndNoStabilizationTimeout() {
+        Duration noStabilizationTimeout = Duration.ofMillis(0);
+        WaitingForResources wfr =
+                new WaitingForResources(ctx, LOG, Duration.ofSeconds(1000), 
noStabilizationTimeout);
+
+        ctx.setHasDesiredResources(() -> false);
+        ctx.setHasSufficientResources(() -> true);
+        ctx.setExpectCreatingExecutionGraph();
+        wfr.onNewResourcesAvailable();
     }
 
     @Test
-    public void testNoSchedulingIfStabilizationTimeoutIsConfigured() throws 
Exception {
-        try (MockContext ctx = new MockContext()) {
+    void testNoSchedulingIfStabilizationTimeoutIsConfigured() {
+        Duration stabilizationTimeout = Duration.ofMillis(50000);
 
-            Duration stabilizationTimeout = Duration.ofMillis(50000);
+        WaitingForResources wfr =
+                new WaitingForResources(ctx, LOG, Duration.ofSeconds(1000), 
stabilizationTimeout);
 
-            WaitingForResources wfr =
-                    new WaitingForResources(
-                            ctx, log, Duration.ofSeconds(1000), 
stabilizationTimeout);
+        ctx.setHasDesiredResources(() -> false);
+        ctx.setHasSufficientResources(() -> true);
+        wfr.onNewResourcesAvailable();
+        // we are not triggering the scheduled tasks, to simulate a long 
stabilization timeout
 
-            ctx.setHasDesiredResources(() -> false);
-            ctx.setHasSufficientResources(() -> true);
-            wfr.onNewResourcesAvailable();
-            // we are not triggering the scheduled tasks, to simulate a long 
stabilization timeout
-
-            assertThat(ctx.hasStateTransition(), is(false));
-        }
+        assertThat(ctx.hasStateTransition()).isFalse();
     }
 
     @Test
-    public void 
testSchedulingWithSufficientResourcesAfterStabilizationTimeout() throws 
Exception {
-        try (MockContext ctx = new MockContext()) {
-
-            Duration initialResourceTimeout = Duration.ofMillis(-1);
-            Duration stabilizationTimeout = Duration.ofMillis(50_000L);
-
-            WaitingForResources wfr =
-                    new WaitingForResources(
-                            ctx,
-                            log,
-                            initialResourceTimeout,
-                            stabilizationTimeout,
-                            ctx.getClock(),
-                            null);
-            // sufficient resources available
-            ctx.setHasDesiredResources(() -> false);
-            ctx.setHasSufficientResources(() -> true);
-
-            // notify about sufficient resources
-            wfr.onNewResourcesAvailable();
-
-            ctx.setExpectCreatingExecutionGraph();
-
-            // execute all runnables and trigger expected state transition
-            final Duration afterStabilizationTimeout = 
stabilizationTimeout.plusMillis(1);
-            ctx.advanceTimeByMillis(afterStabilizationTimeout.toMillis());
-
-            ctx.runScheduledTasks(afterStabilizationTimeout.toMillis());
-
-            assertThat(ctx.hasStateTransition(), is(true));
-        }
+    void testSchedulingWithSufficientResourcesAfterStabilizationTimeout() {
+        Duration initialResourceTimeout = Duration.ofMillis(-1);
+        Duration stabilizationTimeout = Duration.ofMillis(50_000L);
+
+        WaitingForResources wfr =
+                new WaitingForResources(
+                        ctx,
+                        LOG,
+                        initialResourceTimeout,
+                        stabilizationTimeout,
+                        ctx.getClock(),
+                        null);
+        // sufficient resources available
+        ctx.setHasDesiredResources(() -> false);
+        ctx.setHasSufficientResources(() -> true);
+
+        // notify about sufficient resources
+        wfr.onNewResourcesAvailable();
+
+        ctx.setExpectCreatingExecutionGraph();
+
+        // execute all runnables and trigger expected state transition
+        final Duration afterStabilizationTimeout = 
stabilizationTimeout.plusMillis(1);
+        ctx.advanceTimeByMillis(afterStabilizationTimeout.toMillis());
+
+        ctx.runScheduledTasks(afterStabilizationTimeout.toMillis());
+
+        assertThat(ctx.hasStateTransition()).isTrue();
     }
 
     @Test
-    public void testStabilizationTimeoutReset() throws Exception {
-        try (MockContext ctx = new MockContext()) {
-
-            Duration initialResourceTimeout = Duration.ofMillis(-1);
-            Duration stabilizationTimeout = Duration.ofMillis(50L);
-
-            WaitingForResources wfr =
-                    new WaitingForResources(
-                            ctx,
-                            log,
-                            initialResourceTimeout,
-                            stabilizationTimeout,
-                            ctx.getClock(),
-                            null);
-
-            ctx.setHasDesiredResources(() -> false);
-
-            // notify about resources, trigger stabilization timeout
-            ctx.setHasSufficientResources(() -> true);
-            ctx.advanceTimeByMillis(40); // advance time, but don't trigger 
stabilizationTimeout
-            wfr.onNewResourcesAvailable();
-
-            // notify again, but insufficient (reset stabilization timeout)
-            ctx.setHasSufficientResources(() -> false);
-            ctx.advanceTimeByMillis(40);
-            wfr.onNewResourcesAvailable();
-
-            // notify again, but sufficient, trigger timeout
-            ctx.setHasSufficientResources(() -> true);
-            ctx.advanceTimeByMillis(40);
-            wfr.onNewResourcesAvailable();
-
-            // sanity check: no state transition has been triggered so far
-            assertThat(ctx.hasStateTransition(), is(false));
-            assertThat(ctx.getTestDuration(), 
greaterThan(stabilizationTimeout));
-
-            ctx.setExpectCreatingExecutionGraph();
-
-            ctx.advanceTimeByMillis(1);
-            assertThat(ctx.hasStateTransition(), is(false));
-
-            ctx.advanceTimeByMillis(stabilizationTimeout.toMillis());
-            assertThat(ctx.hasStateTransition(), is(true));
-        }
+    void testStabilizationTimeoutReset() {
+        Duration initialResourceTimeout = Duration.ofMillis(-1);
+        Duration stabilizationTimeout = Duration.ofMillis(50L);
+
+        WaitingForResources wfr =
+                new WaitingForResources(
+                        ctx,
+                        LOG,
+                        initialResourceTimeout,
+                        stabilizationTimeout,
+                        ctx.getClock(),
+                        null);
+
+        ctx.setHasDesiredResources(() -> false);
+
+        // notify about resources, trigger stabilization timeout
+        ctx.setHasSufficientResources(() -> true);
+        ctx.advanceTimeByMillis(40); // advance time, but don't trigger 
stabilizationTimeout
+        wfr.onNewResourcesAvailable();
+
+        // notify again, but insufficient (reset stabilization timeout)
+        ctx.setHasSufficientResources(() -> false);
+        ctx.advanceTimeByMillis(40);
+        wfr.onNewResourcesAvailable();
+
+        // notify again, but sufficient, trigger timeout
+        ctx.setHasSufficientResources(() -> true);
+        ctx.advanceTimeByMillis(40);
+        wfr.onNewResourcesAvailable();
+
+        // sanity check: no state transition has been triggered so far
+        assertThat(ctx.hasStateTransition()).isFalse();
+        assertThat(ctx.getTestDuration()).isGreaterThan(stabilizationTimeout);
+
+        ctx.setExpectCreatingExecutionGraph();
+
+        ctx.advanceTimeByMillis(1);
+        assertThat(ctx.hasStateTransition()).isFalse();
+
+        ctx.advanceTimeByMillis(stabilizationTimeout.toMillis());
+        assertThat(ctx.hasStateTransition()).isTrue();
     }
 
     @Test
-    public void testNoStateTransitionOnNoResourceTimeout() throws Exception {
-        try (MockContext ctx = new MockContext()) {
-            ctx.setHasDesiredResources(() -> false);
-            WaitingForResources wfr =
-                    new WaitingForResources(ctx, log, Duration.ofMillis(-1), 
STABILIZATION_TIMEOUT);
-
-            ctx.runScheduledTasks();
-            assertThat(ctx.hasStateTransition(), is(false));
-        }
+    void testNoStateTransitionOnNoResourceTimeout() {
+        ctx.setHasDesiredResources(() -> false);
+        WaitingForResources wfr =
+                new WaitingForResources(ctx, LOG, Duration.ofMillis(-1), 
STABILIZATION_TIMEOUT);
+
+        ctx.runScheduledTasks();
+        assertThat(ctx.hasStateTransition()).isFalse();
     }
 
     @Test
-    public void testStateTransitionOnResourceTimeout() throws Exception {
-        try (MockContext ctx = new MockContext()) {
-            ctx.setHasDesiredResources(() -> false);
-            WaitingForResources wfr =
-                    new WaitingForResources(ctx, log, Duration.ZERO, 
STABILIZATION_TIMEOUT);
+    void testStateTransitionOnResourceTimeout() {
+        ctx.setHasDesiredResources(() -> false);
+        WaitingForResources wfr =
+                new WaitingForResources(ctx, LOG, Duration.ZERO, 
STABILIZATION_TIMEOUT);
 
-            ctx.setExpectCreatingExecutionGraph();
+        ctx.setExpectCreatingExecutionGraph();
 
-            ctx.runScheduledTasks();
-        }
+        ctx.runScheduledTasks();
     }
 
     @Test
-    public void testTransitionToFinishedOnGlobalFailure() throws Exception {
-        final String testExceptionString = "This is a test exception";
-        try (MockContext ctx = new MockContext()) {
-            ctx.setHasDesiredResources(() -> false);
-            WaitingForResources wfr =
-                    new WaitingForResources(ctx, log, Duration.ZERO, 
STABILIZATION_TIMEOUT);
-
-            ctx.setExpectFinished(
-                    archivedExecutionGraph -> {
-                        assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.FAILED));
-                        assertThat(archivedExecutionGraph.getFailureInfo(), 
notNullValue());
-                        assertTrue(
-                                archivedExecutionGraph
-                                        .getFailureInfo()
-                                        .getExceptionAsString()
-                                        .contains(testExceptionString));
-                    });
-
-            wfr.handleGlobalFailure(
-                    new RuntimeException(testExceptionString),
-                    FailureEnricherUtils.EMPTY_FAILURE_LABELS);
-        }
+    void testTransitionToFinishedOnGlobalFailure() {
+        ctx.setHasDesiredResources(() -> false);
+        WaitingForResources wfr =
+                new WaitingForResources(ctx, LOG, Duration.ZERO, 
STABILIZATION_TIMEOUT);
+        RuntimeException expectedException = new RuntimeException("This is a 
test exception");
+
+        ctx.setExpectFinished(
+                archivedExecutionGraph -> {
+                    
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED);
+                    
assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull();
+                    assertThat(
+                                    archivedExecutionGraph
+                                            .getFailureInfo()
+                                            .getException()
+                                            
.deserializeError(this.getClass().getClassLoader()))
+                            .isEqualTo(expectedException);
+                });
+
+        wfr.handleGlobalFailure(expectedException, 
FailureEnricherUtils.EMPTY_FAILURE_LABELS);
     }
 
     @Test
-    public void testCancel() throws Exception {
-        try (MockContext ctx = new MockContext()) {
-            ctx.setHasDesiredResources(() -> false);
-            WaitingForResources wfr =
-                    new WaitingForResources(ctx, log, Duration.ZERO, 
STABILIZATION_TIMEOUT);
-
-            ctx.setExpectFinished(
-                    (archivedExecutionGraph -> {
-                        assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.CANCELED));
-                    }));
-            wfr.cancel();
-        }
+    void testCancel() {
+        ctx.setHasDesiredResources(() -> false);
+        WaitingForResources wfr =
+                new WaitingForResources(ctx, LOG, Duration.ZERO, 
STABILIZATION_TIMEOUT);
+
+        ctx.setExpectFinished(
+                archivedExecutionGraph -> {
+                    
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.CANCELED);
+                    
assertThat(archivedExecutionGraph.getFailureInfo()).isNull();
+                });
+        wfr.cancel();
     }
 
     @Test
-    public void testSuspend() throws Exception {
-        try (MockContext ctx = new MockContext()) {
-            ctx.setHasDesiredResources(() -> false);
-            WaitingForResources wfr =
-                    new WaitingForResources(ctx, log, Duration.ZERO, 
STABILIZATION_TIMEOUT);
-
-            ctx.setExpectFinished(
-                    (archivedExecutionGraph -> {
-                        assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.SUSPENDED));
-                        assertThat(archivedExecutionGraph.getFailureInfo(), 
notNullValue());
-                    }));
-
-            wfr.suspend(new RuntimeException("suspend"));
-        }
+    void testSuspend() {
+        ctx.setHasDesiredResources(() -> false);
+        WaitingForResources wfr =
+                new WaitingForResources(ctx, LOG, Duration.ZERO, 
STABILIZATION_TIMEOUT);
+
+        FlinkException expectedException = new FlinkException("This is a test 
exception");
+
+        ctx.setExpectFinished(
+                archivedExecutionGraph -> {
+                    
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.SUSPENDED);
+                    
assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull();
+                    assertThat(
+                                    archivedExecutionGraph
+                                            .getFailureInfo()
+                                            .getException()
+                                            
.deserializeError(this.getClass().getClassLoader()))
+                            .isEqualTo(expectedException);
+                });
+
+        wfr.suspend(expectedException);
     }
 
     @Test
-    public void testInternalRunScheduledTasks_correctExecutionOrder() {
-        MockContext ctx = new MockContext();
+    void testInternalRunScheduledTasks_correctExecutionOrder() {
         AtomicBoolean firstRun = new AtomicBoolean(false);
         AtomicBoolean secondRun = new AtomicBoolean(false);
         AtomicBoolean thirdRun = new AtomicBoolean(false);
@@ -302,16 +283,12 @@ public class WaitingForResourcesTest extends TestLogger {
         Runnable runFirstBecauseOfLowDelay = () -> firstRun.set(true);
         Runnable runSecondBecauseOfScheduleOrder =
                 () -> {
-                    if (!firstRun.get()) {
-                        fail("order violated");
-                    }
+                    assertThat(firstRun).as("order violated").isTrue();
                     secondRun.set(true);
                 };
         Runnable runLastBecauseOfHighDelay =
                 () -> {
-                    if (!secondRun.get()) {
-                        fail("order violated");
-                    }
+                    assertThat(secondRun).as("order violated").isTrue();
                     thirdRun.set(true);
                 };
 
@@ -328,19 +305,15 @@ public class WaitingForResourcesTest extends TestLogger {
 
         ctx.runScheduledTasks();
 
-        assertThat(thirdRun.get(), is(true));
+        assertThat(thirdRun).isTrue();
     }
 
     @Test
-    public void testInternalRunScheduledTasks_tasksAreRemovedAfterExecution() {
-        MockContext ctx = new MockContext();
-
+    void testInternalRunScheduledTasks_tasksAreRemovedAfterExecution() {
         AtomicBoolean executed = new AtomicBoolean(false);
         Runnable executeOnce =
                 () -> {
-                    if (executed.get()) {
-                        fail("Multiple executions");
-                    }
+                    assertThat(executed).as("Multiple executions").isFalse();
                     executed.set(true);
                 };
 
@@ -349,12 +322,11 @@ public class WaitingForResourcesTest extends TestLogger {
         // execute at least twice
         ctx.runScheduledTasks();
         ctx.runScheduledTasks();
+        assertThat(executed).isTrue();
     }
 
     @Test
-    public void testInternalRunScheduledTasks_upperBoundRespected() {
-        MockContext ctx = new MockContext();
-
+    void testInternalRunScheduledTasks_upperBoundRespected() {
         Runnable executeNever = () -> fail("Not expected");
 
         ctx.runIfState(new AdaptiveSchedulerTest.DummyState(), executeNever, 
Duration.ofMillis(10));
@@ -363,8 +335,7 @@ public class WaitingForResourcesTest extends TestLogger {
     }
 
     @Test
-    public void testInternalRunScheduledTasks_scheduleTaskFromRunnable() {
-        MockContext ctx = new MockContext();
+    void testInternalRunScheduledTasks_scheduleTaskFromRunnable() {
         final State state = new AdaptiveSchedulerTest.DummyState();
 
         AtomicBoolean executed = new AtomicBoolean(false);
@@ -378,10 +349,11 @@ public class WaitingForResourcesTest extends TestLogger {
 
         // choose time that includes inner execution as well
         ctx.runScheduledTasks(10);
-        assertThat(executed.get(), is(true));
+        assertThat(executed).isTrue();
     }
 
-    private static class MockContext implements WaitingForResources.Context, 
AutoCloseable {
+    private static class MockContext implements WaitingForResources.Context, 
AfterEachCallback {
+
         private static final Logger LOG = 
LoggerFactory.getLogger(MockContext.class);
 
         private final StateValidator<Void> 
creatingExecutionGraphStateValidator =
@@ -441,7 +413,7 @@ public class WaitingForResourcesTest extends TestLogger {
         }
 
         @Override
-        public void close() throws Exception {
+        public void afterEach(ExtensionContext extensionContext) throws 
Exception {
             creatingExecutionGraphStateValidator.close();
             finishedStateValidator.close();
         }
@@ -474,7 +446,7 @@ public class WaitingForResourcesTest extends TestLogger {
             final ScheduledTask<Void> scheduledTask =
                     new ScheduledTask<>(
                             () -> {
-                                if (!hasStateTransition) {
+                                if (!hasStateTransition()) {
                                     action.run();
                                 }
 
@@ -490,19 +462,23 @@ public class WaitingForResourcesTest extends TestLogger {
         @Override
         public void goToFinished(ArchivedExecutionGraph 
archivedExecutionGraph) {
             finishedStateValidator.validateInput(archivedExecutionGraph);
-            hasStateTransition = true;
+            registerStateTransition();
         }
 
         @Override
         public void goToCreatingExecutionGraph(@Nullable ExecutionGraph 
previousExecutionGraph) {
             creatingExecutionGraphStateValidator.validateInput(null);
-            hasStateTransition = true;
+            registerStateTransition();
         }
 
         public boolean hasStateTransition() {
             return hasStateTransition;
         }
 
+        public void registerStateTransition() {
+            hasStateTransition = true;
+        }
+
         public Clock getClock() {
             return testTime.getClock();
         }
@@ -550,6 +526,6 @@ public class WaitingForResourcesTest extends TestLogger {
     }
 
     static <T> Consumer<T> assertNonNull() {
-        return (item) -> assertThat(item, notNullValue());
+        return (item) -> assertThat(item).isNotNull();
     }
 }

Reply via email to