This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4aebc43721873480dfbbaf2cfe784c3833f1cdc9
Author: 1996fanrui <[email protected]>
AuthorDate: Mon Aug 14 22:10:50 2023 +0800

    [FLINK-32859][scheduler] Introduce the StateWithoutExecutionGraph
---
 .../flink/runtime/scheduler/adaptive/Created.java  |  53 +---------
 .../scheduler/adaptive/CreatingExecutionGraph.java |  69 +++----------
 ...reated.java => StateWithoutExecutionGraph.java} |  50 +++------
 .../scheduler/adaptive/WaitingForResources.java    |  53 ++--------
 .../runtime/scheduler/adaptive/CreatedTest.java    |  81 +--------------
 .../adaptive/CreatingExecutionGraphTest.java       | 114 +--------------------
 .../MockStateWithoutExecutionGraphContext.java     |  72 +++++++++++++
 ...st.java => StateWithoutExecutionGraphTest.java} |  87 ++++------------
 .../adaptive/WaitingForResourcesTest.java          | 101 +-----------------
 9 files changed, 139 insertions(+), 541 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java
index 3cbb51edb63..04147774074 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java
@@ -19,35 +19,17 @@
 package org.apache.flink.runtime.scheduler.adaptive;
 
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 
 import org.slf4j.Logger;
 
-import javax.annotation.Nullable;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-
 /** Initial state of the {@link AdaptiveScheduler}. */
-class Created implements State {
+class Created extends StateWithoutExecutionGraph {
 
     private final Context context;
 
-    private final Logger logger;
-
     Created(Context context, Logger logger) {
+        super(context, logger);
         this.context = context;
-        this.logger = logger;
-    }
-
-    @Override
-    public void cancel() {
-        
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.CANCELED, 
null));
-    }
-
-    @Override
-    public void suspend(Throwable cause) {
-        
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.SUSPENDED, 
cause));
     }
 
     @Override
@@ -55,41 +37,14 @@ class Created implements State {
         return JobStatus.INITIALIZING;
     }
 
-    @Override
-    public ArchivedExecutionGraph getJob() {
-        return context.getArchivedExecutionGraph(getJobStatus(), null);
-    }
-
-    @Override
-    public void handleGlobalFailure(
-            Throwable cause, CompletableFuture<Map<String, String>> 
failureLabels) {
-        
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, 
cause));
-    }
-
-    @Override
-    public Logger getLogger() {
-        return logger;
-    }
-
     /** Starts the scheduling by going into the {@link WaitingForResources} 
state. */
     void startScheduling() {
         context.goToWaitingForResources(null);
     }
 
     /** Context of the {@link Created} state. */
-    interface Context extends StateTransitions.ToFinished, 
StateTransitions.ToWaitingForResources {
-
-        /**
-         * Creates an {@link ArchivedExecutionGraph} for the given jobStatus 
and failure cause.
-         *
-         * @param jobStatus jobStatus to create the {@link 
ArchivedExecutionGraph} with
-         * @param cause cause represents the failure cause for the {@link 
ArchivedExecutionGraph};
-         *     {@code null} if there is no failure cause
-         * @return the created {@link ArchivedExecutionGraph}
-         */
-        ArchivedExecutionGraph getArchivedExecutionGraph(
-                JobStatus jobStatus, @Nullable Throwable cause);
-    }
+    interface Context
+            extends StateWithoutExecutionGraph.Context, 
StateTransitions.ToWaitingForResources {}
 
     static class Factory implements StateFactory<Created> {
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
index c876fe6ad1d..da90ef1468d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.scheduler.adaptive;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -43,7 +42,6 @@ import javax.annotation.Nullable;
 
 import java.time.Duration;
 import java.util.Collections;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
@@ -56,10 +54,9 @@ import java.util.concurrent.ScheduledFuture;
  * If there are enough slots for the {@link ExecutionGraph} to run, the state 
transitions to {@link
  * Executing}.
  */
-public class CreatingExecutionGraph implements State {
+public class CreatingExecutionGraph extends StateWithoutExecutionGraph {
 
     private final Context context;
-    private final Logger logger;
     private final OperatorCoordinatorHandlerFactory 
operatorCoordinatorHandlerFactory;
 
     private final @Nullable ExecutionGraph previousExecutionGraph;
@@ -71,8 +68,8 @@ public class CreatingExecutionGraph implements State {
             Logger logger,
             OperatorCoordinatorHandlerFactory operatorCoordinatorFactory,
             ExecutionGraph previousExecutionGraph1) {
+        super(context, logger);
         this.context = context;
-        this.logger = logger;
         this.operatorCoordinatorHandlerFactory = operatorCoordinatorFactory;
 
         FutureUtils.assertNoException(
@@ -93,11 +90,12 @@ public class CreatingExecutionGraph implements State {
             @Nullable ExecutionGraphWithVertexParallelism 
executionGraphWithVertexParallelism,
             @Nullable Throwable throwable) {
         if (throwable != null) {
-            logger.info(
-                    "Failed to go from {} to {} because the ExecutionGraph 
creation failed.",
-                    CreatingExecutionGraph.class.getSimpleName(),
-                    Executing.class.getSimpleName(),
-                    throwable);
+            getLogger()
+                    .info(
+                            "Failed to go from {} to {} because the 
ExecutionGraph creation failed.",
+                            CreatingExecutionGraph.class.getSimpleName(),
+                            Executing.class.getSimpleName(),
+                            throwable);
             
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, 
throwable));
         } else {
             for (ExecutionVertex vertex :
@@ -109,8 +107,9 @@ public class CreatingExecutionGraph implements State {
                     
context.tryToAssignSlots(executionGraphWithVertexParallelism);
 
             if (result.isSuccess()) {
-                logger.debug(
-                        "Successfully reserved and assigned the required slots 
for the ExecutionGraph.");
+                getLogger()
+                        .debug(
+                                "Successfully reserved and assigned the 
required slots for the ExecutionGraph.");
                 final ExecutionGraph executionGraph = 
result.getExecutionGraph();
                 final ExecutionGraphHandler executionGraphHandler =
                         new ExecutionGraphHandler(
@@ -144,62 +143,26 @@ public class CreatingExecutionGraph implements State {
                         operatorCoordinatorHandler,
                         Collections.emptyList());
             } else {
-                logger.debug(
-                        "Failed to reserve and assign the required slots. 
Waiting for new resources.");
+                getLogger()
+                        .debug(
+                                "Failed to reserve and assign the required 
slots. Waiting for new resources.");
                 context.goToWaitingForResources(previousExecutionGraph);
             }
         }
     }
 
-    @Override
-    public void cancel() {
-        
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.CANCELED, 
null));
-    }
-
-    @Override
-    public void suspend(Throwable cause) {
-        
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.SUSPENDED, 
cause));
-    }
-
     @Override
     public JobStatus getJobStatus() {
         return JobStatus.CREATED;
     }
 
-    @Override
-    public ArchivedExecutionGraph getJob() {
-        return context.getArchivedExecutionGraph(getJobStatus(), null);
-    }
-
-    @Override
-    public void handleGlobalFailure(
-            Throwable cause, CompletableFuture<Map<String, String>> 
failureLabels) {
-        
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, 
cause));
-    }
-
-    @Override
-    public Logger getLogger() {
-        return logger;
-    }
-
     /** Context for the {@link CreatingExecutionGraph} state. */
     interface Context
-            extends GlobalFailureHandler,
+            extends StateWithoutExecutionGraph.Context,
+                    GlobalFailureHandler,
                     StateTransitions.ToExecuting,
-                    StateTransitions.ToFinished,
                     StateTransitions.ToWaitingForResources {
 
-        /**
-         * Creates the {@link ArchivedExecutionGraph} for the given job status 
and cause. Cause can
-         * be null if there is no failure.
-         *
-         * @param jobStatus jobStatus to initialize the {@link 
ArchivedExecutionGraph} with
-         * @param cause cause describing a failure cause; {@code null} if 
there is none
-         * @return the created {@link ArchivedExecutionGraph}
-         */
-        ArchivedExecutionGraph getArchivedExecutionGraph(
-                JobStatus jobStatus, @Nullable Throwable cause);
-
         /**
          * Runs the given action after a delay if the state at this time 
equals the expected state.
          *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraph.java
similarity index 62%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraph.java
index 3cbb51edb63..16769e310df 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Created.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraph.java
@@ -28,14 +28,17 @@ import javax.annotation.Nullable;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
-/** Initial state of the {@link AdaptiveScheduler}. */
-class Created implements State {
+/**
+ * Abstract state class which contains its {@link Context} and {@link #logger} 
to execute common
+ * operations.
+ */
+abstract class StateWithoutExecutionGraph implements State {
 
     private final Context context;
 
     private final Logger logger;
 
-    Created(Context context, Logger logger) {
+    StateWithoutExecutionGraph(Context context, Logger logger) {
         this.context = context;
         this.logger = logger;
     }
@@ -50,11 +53,6 @@ class Created implements State {
         
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.SUSPENDED, 
cause));
     }
 
-    @Override
-    public JobStatus getJobStatus() {
-        return JobStatus.INITIALIZING;
-    }
-
     @Override
     public ArchivedExecutionGraph getJob() {
         return context.getArchivedExecutionGraph(getJobStatus(), null);
@@ -71,42 +69,18 @@ class Created implements State {
         return logger;
     }
 
-    /** Starts the scheduling by going into the {@link WaitingForResources} 
state. */
-    void startScheduling() {
-        context.goToWaitingForResources(null);
-    }
-
-    /** Context of the {@link Created} state. */
-    interface Context extends StateTransitions.ToFinished, 
StateTransitions.ToWaitingForResources {
+    /** Context of the {@link StateWithoutExecutionGraph} state. */
+    interface Context extends StateTransitions.ToFinished {
 
         /**
-         * Creates an {@link ArchivedExecutionGraph} for the given jobStatus 
and failure cause.
+         * Creates the {@link ArchivedExecutionGraph} for the given job status 
and cause. Cause can
+         * be null if there is no failure.
          *
-         * @param jobStatus jobStatus to create the {@link 
ArchivedExecutionGraph} with
-         * @param cause cause represents the failure cause for the {@link 
ArchivedExecutionGraph};
-         *     {@code null} if there is no failure cause
+         * @param jobStatus jobStatus to initialize the {@link 
ArchivedExecutionGraph} with
+         * @param cause cause describing a failure cause; {@code null} if 
there is none
          * @return the created {@link ArchivedExecutionGraph}
          */
         ArchivedExecutionGraph getArchivedExecutionGraph(
                 JobStatus jobStatus, @Nullable Throwable cause);
     }
-
-    static class Factory implements StateFactory<Created> {
-
-        private final Context context;
-        private final Logger log;
-
-        public Factory(Context context, Logger log) {
-            this.context = context;
-            this.log = log;
-        }
-
-        public Class<Created> getStateClass() {
-            return Created.class;
-        }
-
-        public Created getState() {
-            return new Created(this.context, this.log);
-        }
-    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
index 434b0741a28..d7933984db3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.scheduler.adaptive;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Deadline;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.clock.Clock;
@@ -32,19 +31,15 @@ import org.slf4j.Logger;
 import javax.annotation.Nullable;
 
 import java.time.Duration;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledFuture;
 
 /**
  * State which describes that the scheduler is waiting for resources in order 
to execute the job.
  */
-class WaitingForResources implements State, ResourceListener {
+class WaitingForResources extends StateWithoutExecutionGraph implements 
ResourceListener {
 
     private final Context context;
 
-    private final Logger log;
-
     private final Clock clock;
 
     /** If set, there's an ongoing deadline waiting for a resource 
stabilization. */
@@ -78,8 +73,8 @@ class WaitingForResources implements State, ResourceListener {
             Duration resourceStabilizationTimeout,
             Clock clock,
             @Nullable ExecutionGraph previousExecutionGraph) {
+        super(context, log);
         this.context = Preconditions.checkNotNull(context);
-        this.log = Preconditions.checkNotNull(log);
         this.resourceStabilizationTimeout =
                 Preconditions.checkNotNull(resourceStabilizationTimeout);
         this.clock = clock;
@@ -106,37 +101,11 @@ class WaitingForResources implements State, 
ResourceListener {
         }
     }
 
-    @Override
-    public void cancel() {
-        
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.CANCELED, 
null));
-    }
-
-    @Override
-    public void suspend(Throwable cause) {
-        
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.SUSPENDED, 
cause));
-    }
-
     @Override
     public JobStatus getJobStatus() {
         return JobStatus.CREATED;
     }
 
-    @Override
-    public ArchivedExecutionGraph getJob() {
-        return context.getArchivedExecutionGraph(getJobStatus(), null);
-    }
-
-    @Override
-    public void handleGlobalFailure(
-            Throwable cause, CompletableFuture<Map<String, String>> 
failureLabels) {
-        
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, 
cause));
-    }
-
-    @Override
-    public Logger getLogger() {
-        return log;
-    }
-
     @Override
     public void onNewResourcesAvailable() {
         checkDesiredOrSufficientResourcesAvailable();
@@ -174,8 +143,9 @@ class WaitingForResources implements State, 
ResourceListener {
     }
 
     private void resourceTimeout() {
-        log.debug(
-                "Initial resource allocation timeout triggered: Creating 
ExecutionGraph with available resources.");
+        getLogger()
+                .debug(
+                        "Initial resource allocation timeout triggered: 
Creating ExecutionGraph with available resources.");
         createExecutionGraphWithAvailableResources();
     }
 
@@ -185,18 +155,7 @@ class WaitingForResources implements State, 
ResourceListener {
 
     /** Context of the {@link WaitingForResources} state. */
     interface Context
-            extends StateTransitions.ToCreatingExecutionGraph, 
StateTransitions.ToFinished {
-
-        /**
-         * Creates the {@link ArchivedExecutionGraph} for the given job status 
and cause. Cause can
-         * be null if there is no failure.
-         *
-         * @param jobStatus jobStatus to initialize the {@link 
ArchivedExecutionGraph} with
-         * @param cause cause describing a failure cause; {@code null} if 
there is none
-         * @return the created {@link ArchivedExecutionGraph}
-         */
-        ArchivedExecutionGraph getArchivedExecutionGraph(
-                JobStatus jobStatus, @Nullable Throwable cause);
+            extends StateWithoutExecutionGraph.Context, 
StateTransitions.ToCreatingExecutionGraph {
 
         /**
          * Checks whether we have the desired resources.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java
index 302fb471c3e..9a13f5deb6e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java
@@ -18,15 +18,11 @@
 
 package org.apache.flink.runtime.scheduler.adaptive;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.failure.FailureEnricherUtils;
-import org.apache.flink.util.FlinkException;
 
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.AfterEachCallback;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
@@ -34,8 +30,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.util.function.Consumer;
-
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link Created} state. */
@@ -45,18 +39,6 @@ class CreatedTest {
 
     @RegisterExtension MockCreatedContext ctx = new MockCreatedContext();
 
-    @Test
-    void testCancel() {
-        Created created = new Created(ctx, LOG);
-
-        ctx.setExpectFinished(
-                archivedExecutionGraph -> {
-                    
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.CANCELED);
-                    
assertThat(archivedExecutionGraph.getFailureInfo()).isNull();
-                });
-        created.cancel();
-    }
-
     @Test
     void testStartScheduling() {
         Created created = new Created(ctx, LOG);
@@ -66,46 +48,6 @@ class CreatedTest {
         created.startScheduling();
     }
 
-    @Test
-    void testSuspend() {
-        FlinkException expectedException = new FlinkException("This is a test 
exception");
-        Created created = new Created(ctx, LOG);
-
-        ctx.setExpectFinished(
-                archivedExecutionGraph -> {
-                    
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.SUSPENDED);
-                    
assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull();
-                    assertThat(
-                                    archivedExecutionGraph
-                                            .getFailureInfo()
-                                            .getException()
-                                            
.deserializeError(this.getClass().getClassLoader()))
-                            .isEqualTo(expectedException);
-                });
-
-        created.suspend(expectedException);
-    }
-
-    @Test
-    void testFailure() {
-        Created created = new Created(ctx, LOG);
-        RuntimeException expectedException = new RuntimeException("This is a 
test exception");
-
-        ctx.setExpectFinished(
-                archivedExecutionGraph -> {
-                    
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED);
-                    
assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull();
-                    assertThat(
-                                    archivedExecutionGraph
-                                            .getFailureInfo()
-                                            .getException()
-                                            
.deserializeError(this.getClass().getClassLoader()))
-                            .isEqualTo(expectedException);
-                });
-
-        created.handleGlobalFailure(expectedException, 
FailureEnricherUtils.EMPTY_FAILURE_LABELS);
-    }
-
     @Test
     void testJobInformation() {
         Created created = new Created(ctx, LOG);
@@ -113,32 +55,15 @@ class CreatedTest {
         assertThat(job.getState()).isEqualTo(JobStatus.INITIALIZING);
     }
 
-    static class MockCreatedContext implements Created.Context, 
AfterEachCallback {
-        private final StateValidator<ArchivedExecutionGraph> 
finishedStateValidator =
-                new StateValidator<>("finished");
+    static class MockCreatedContext extends 
MockStateWithoutExecutionGraphContext
+            implements Created.Context {
         private final StateValidator<Void> waitingForResourcesStateValidator =
                 new StateValidator<>("WaitingForResources");
 
-        public void setExpectFinished(Consumer<ArchivedExecutionGraph> 
asserter) {
-            finishedStateValidator.expectInput(asserter);
-        }
-
         public void setExpectWaitingForResources() {
             waitingForResourcesStateValidator.expectInput((none) -> {});
         }
 
-        @Override
-        public void goToFinished(ArchivedExecutionGraph 
archivedExecutionGraph) {
-            finishedStateValidator.validateInput(archivedExecutionGraph);
-        }
-
-        @Override
-        public ArchivedExecutionGraph getArchivedExecutionGraph(
-                JobStatus jobStatus, @Nullable Throwable cause) {
-            return ArchivedExecutionGraph.createSparseArchivedExecutionGraph(
-                    new JobID(), "testJob", jobStatus, cause, null, 0L);
-        }
-
         @Override
         public void goToWaitingForResources(@Nullable ExecutionGraph 
previousExecutionGraph) {
             waitingForResourcesStateValidator.validateInput(null);
@@ -146,7 +71,7 @@ class CreatedTest {
 
         @Override
         public void afterEach(ExtensionContext extensionContext) throws 
Exception {
-            finishedStateValidator.close();
+            super.afterEach(extensionContext);
             waitingForResourcesStateValidator.close();
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
index 833835b7d22..b831b3bb62f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
@@ -18,14 +18,11 @@
 
 package org.apache.flink.runtime.scheduler.adaptive;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.core.testutils.CompletedScheduledFuture;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.failure.FailureEnricherUtils;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
@@ -36,7 +33,6 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.concurrent.Executors;
 
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.AfterEachCallback;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
@@ -63,79 +59,6 @@ class CreatingExecutionGraphTest {
     @RegisterExtension
     MockCreatingExecutionGraphContext context = new 
MockCreatingExecutionGraphContext();
 
-    @Test
-    void testCancelTransitionsToFinished() {
-        final CreatingExecutionGraph creatingExecutionGraph =
-                new CreatingExecutionGraph(
-                        context,
-                        new CompletableFuture<>(),
-                        LOG,
-                        
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler,
-                        null);
-
-        context.setExpectFinished(
-                archivedExecutionGraph -> {
-                    
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.CANCELED);
-                    
assertThat(archivedExecutionGraph.getFailureInfo()).isNull();
-                });
-        creatingExecutionGraph.cancel();
-    }
-
-    @Test
-    void testSuspendTransitionsToFinished() {
-        final CreatingExecutionGraph creatingExecutionGraph =
-                new CreatingExecutionGraph(
-                        context,
-                        new CompletableFuture<>(),
-                        LOG,
-                        
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler,
-                        null);
-
-        FlinkException expectedException = new FlinkException("This is a test 
exception");
-
-        context.setExpectFinished(
-                archivedExecutionGraph -> {
-                    
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.SUSPENDED);
-                    
assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull();
-                    assertThat(
-                                    archivedExecutionGraph
-                                            .getFailureInfo()
-                                            .getException()
-                                            
.deserializeError(this.getClass().getClassLoader()))
-                            .isEqualTo(expectedException);
-                });
-
-        creatingExecutionGraph.suspend(expectedException);
-    }
-
-    @Test
-    void testGlobalFailureTransitionsToFinished() {
-        final CreatingExecutionGraph creatingExecutionGraph =
-                new CreatingExecutionGraph(
-                        context,
-                        new CompletableFuture<>(),
-                        LOG,
-                        
CreatingExecutionGraphTest::createTestingOperatorCoordinatorHandler,
-                        null);
-
-        RuntimeException expectedException = new RuntimeException("This is a 
test exception");
-
-        context.setExpectFinished(
-                archivedExecutionGraph -> {
-                    
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED);
-                    
assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull();
-                    assertThat(
-                                    archivedExecutionGraph
-                                            .getFailureInfo()
-                                            .getException()
-                                            
.deserializeError(this.getClass().getClassLoader()))
-                            .isEqualTo(expectedException);
-                });
-
-        creatingExecutionGraph.handleGlobalFailure(
-                expectedException, FailureEnricherUtils.EMPTY_FAILURE_LABELS);
-    }
-
     @Test
     void testFailedExecutionGraphCreationTransitionsToFinished() {
         final 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
@@ -235,10 +158,8 @@ class CreatingExecutionGraphTest {
         return new TestingOperatorCoordinatorHandler();
     }
 
-    static class MockCreatingExecutionGraphContext
-            implements CreatingExecutionGraph.Context, AfterEachCallback {
-        private final StateValidator<ArchivedExecutionGraph> 
finishedStateValidator =
-                new StateValidator<>("Finished");
+    static class MockCreatingExecutionGraphContext extends 
MockStateWithoutExecutionGraphContext
+            implements CreatingExecutionGraph.Context {
         private final StateValidator<Void> waitingForResourcesStateValidator =
                 new StateValidator<>("WaitingForResources");
         private final StateValidator<ExecutionGraph> executingStateValidator =
@@ -255,12 +176,6 @@ class CreatingExecutionGraphTest {
                     // No-op.
                 };
 
-        private boolean hasStateTransition = false;
-
-        public void setExpectFinished(Consumer<ArchivedExecutionGraph> 
asserter) {
-            finishedStateValidator.expectInput(asserter);
-        }
-
         public void setExpectWaitingForResources() {
             waitingForResourcesStateValidator.expectInput((none) -> {});
         }
@@ -281,12 +196,6 @@ class CreatingExecutionGraphTest {
             this.globalFailureHandler = globalFailureHandler;
         }
 
-        @Override
-        public void goToFinished(ArchivedExecutionGraph 
archivedExecutionGraph) {
-            finishedStateValidator.validateInput(archivedExecutionGraph);
-            registerStateTransition();
-        }
-
         @Override
         public void goToExecuting(
                 ExecutionGraph executionGraph,
@@ -297,13 +206,6 @@ class CreatingExecutionGraphTest {
             registerStateTransition();
         }
 
-        @Override
-        public ArchivedExecutionGraph getArchivedExecutionGraph(
-                JobStatus jobStatus, @Nullable Throwable cause) {
-            return ArchivedExecutionGraph.createSparseArchivedExecutionGraph(
-                    new JobID(), "testJob", jobStatus, cause, null, 0L);
-        }
-
         @Override
         public ScheduledFuture<?> runIfState(State expectedState, Runnable 
action, Duration delay) {
             if (!hasStateTransition()) {
@@ -328,7 +230,7 @@ class CreatingExecutionGraphTest {
         @Override
         public void goToWaitingForResources(@Nullable ExecutionGraph 
previousExecutionGraph) {
             waitingForResourcesStateValidator.validateInput(null);
-            hasStateTransition = true;
+            registerStateTransition();
         }
 
         @Override
@@ -348,18 +250,10 @@ class CreatingExecutionGraphTest {
 
         @Override
         public void afterEach(ExtensionContext extensionContext) throws 
Exception {
-            finishedStateValidator.close();
+            super.afterEach(extensionContext);
             waitingForResourcesStateValidator.close();
             executingStateValidator.close();
         }
-
-        public boolean hasStateTransition() {
-            return hasStateTransition;
-        }
-
-        public void registerStateTransition() {
-            hasStateTransition = true;
-        }
     }
 
     private static CreatingExecutionGraph.ExecutionGraphWithVertexParallelism 
getGraph(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockStateWithoutExecutionGraphContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockStateWithoutExecutionGraphContext.java
new file mode 100644
index 00000000000..75dbab9a804
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockStateWithoutExecutionGraphContext.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import java.util.function.Consumer;
+
+/** Mock the {@link StateWithoutExecutionGraph.Context}. */
+class MockStateWithoutExecutionGraphContext
+        implements StateWithoutExecutionGraph.Context, AfterEachCallback {
+
+    private final StateValidator<ArchivedExecutionGraph> 
finishedStateValidator =
+            new StateValidator<>("Finished");
+
+    private boolean hasStateTransition = false;
+
+    public void setExpectFinished(Consumer<ArchivedExecutionGraph> asserter) {
+        finishedStateValidator.expectInput(asserter);
+    }
+
+    @Override
+    public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph) {
+        finishedStateValidator.validateInput(archivedExecutionGraph);
+        registerStateTransition();
+    }
+
+    @Override
+    public ArchivedExecutionGraph getArchivedExecutionGraph(
+            JobStatus jobStatus, @Nullable Throwable cause) {
+        return new ArchivedExecutionGraphBuilder()
+                .setState(jobStatus)
+                .setFailureCause(cause == null ? null : new ErrorInfo(cause, 
1337))
+                .build();
+    }
+
+    @Override
+    public void afterEach(ExtensionContext extensionContext) throws Exception {
+        finishedStateValidator.close();
+    }
+
+    public boolean hasStateTransition() {
+        return hasStateTransition;
+    }
+
+    public void registerStateTransition() {
+        hasStateTransition = true;
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraphTest.java
similarity index 53%
copy from 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java
copy to 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraphTest.java
index 302fb471c3e..349371180a0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraphTest.java
@@ -18,58 +18,41 @@
 
 package org.apache.flink.runtime.scheduler.adaptive;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.failure.FailureEnricherUtils;
 import org.apache.flink.util.FlinkException;
 
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.AfterEachCallback;
-import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
-import java.util.function.Consumer;
-
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for the {@link Created} state. */
-class CreatedTest {
+/** Tests for the {@link StateWithoutExecutionGraph} state. */
+public class StateWithoutExecutionGraphTest {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CreatedTest.class);
 
-    @RegisterExtension MockCreatedContext ctx = new MockCreatedContext();
+    @RegisterExtension
+    MockStateWithoutExecutionGraphContext ctx = new 
MockStateWithoutExecutionGraphContext();
 
     @Test
-    void testCancel() {
-        Created created = new Created(ctx, LOG);
+    void testCancelTransitionsToFinished() {
+        TestingStateWithoutExecutionGraph state = new 
TestingStateWithoutExecutionGraph(ctx, LOG);
 
         ctx.setExpectFinished(
                 archivedExecutionGraph -> {
                     
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.CANCELED);
                     
assertThat(archivedExecutionGraph.getFailureInfo()).isNull();
                 });
-        created.cancel();
+        state.cancel();
     }
 
     @Test
-    void testStartScheduling() {
-        Created created = new Created(ctx, LOG);
-
-        ctx.setExpectWaitingForResources();
-
-        created.startScheduling();
-    }
-
-    @Test
-    void testSuspend() {
+    void testSuspendTransitionsToFinished() {
         FlinkException expectedException = new FlinkException("This is a test 
exception");
-        Created created = new Created(ctx, LOG);
+        TestingStateWithoutExecutionGraph state = new 
TestingStateWithoutExecutionGraph(ctx, LOG);
 
         ctx.setExpectFinished(
                 archivedExecutionGraph -> {
@@ -83,12 +66,12 @@ class CreatedTest {
                             .isEqualTo(expectedException);
                 });
 
-        created.suspend(expectedException);
+        state.suspend(expectedException);
     }
 
     @Test
-    void testFailure() {
-        Created created = new Created(ctx, LOG);
+    void testTransitionToFinishedOnGlobalFailure() {
+        TestingStateWithoutExecutionGraph state = new 
TestingStateWithoutExecutionGraph(ctx, LOG);
         RuntimeException expectedException = new RuntimeException("This is a 
test exception");
 
         ctx.setExpectFinished(
@@ -103,51 +86,19 @@ class CreatedTest {
                             .isEqualTo(expectedException);
                 });
 
-        created.handleGlobalFailure(expectedException, 
FailureEnricherUtils.EMPTY_FAILURE_LABELS);
+        state.handleGlobalFailure(expectedException, 
FailureEnricherUtils.EMPTY_FAILURE_LABELS);
     }
 
-    @Test
-    void testJobInformation() {
-        Created created = new Created(ctx, LOG);
-        ArchivedExecutionGraph job = created.getJob();
-        assertThat(job.getState()).isEqualTo(JobStatus.INITIALIZING);
-    }
+    private static final class TestingStateWithoutExecutionGraph
+            extends StateWithoutExecutionGraph {
 
-    static class MockCreatedContext implements Created.Context, 
AfterEachCallback {
-        private final StateValidator<ArchivedExecutionGraph> 
finishedStateValidator =
-                new StateValidator<>("finished");
-        private final StateValidator<Void> waitingForResourcesStateValidator =
-                new StateValidator<>("WaitingForResources");
-
-        public void setExpectFinished(Consumer<ArchivedExecutionGraph> 
asserter) {
-            finishedStateValidator.expectInput(asserter);
-        }
-
-        public void setExpectWaitingForResources() {
-            waitingForResourcesStateValidator.expectInput((none) -> {});
-        }
-
-        @Override
-        public void goToFinished(ArchivedExecutionGraph 
archivedExecutionGraph) {
-            finishedStateValidator.validateInput(archivedExecutionGraph);
-        }
-
-        @Override
-        public ArchivedExecutionGraph getArchivedExecutionGraph(
-                JobStatus jobStatus, @Nullable Throwable cause) {
-            return ArchivedExecutionGraph.createSparseArchivedExecutionGraph(
-                    new JobID(), "testJob", jobStatus, cause, null, 0L);
-        }
-
-        @Override
-        public void goToWaitingForResources(@Nullable ExecutionGraph 
previousExecutionGraph) {
-            waitingForResourcesStateValidator.validateInput(null);
+        TestingStateWithoutExecutionGraph(Context context, Logger logger) {
+            super(context, logger);
         }
 
         @Override
-        public void afterEach(ExtensionContext extensionContext) throws 
Exception {
-            finishedStateValidator.close();
-            waitingForResourcesStateValidator.close();
+        public JobStatus getJobStatus() {
+            return null;
         }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
index bdcc73f5c9d..9c7aab6901b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
@@ -18,19 +18,12 @@
 
 package org.apache.flink.runtime.scheduler.adaptive;
 
-import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.core.testutils.ScheduledTask;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.failure.FailureEnricherUtils;
-import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.clock.Clock;
 import org.apache.flink.util.clock.ManualClock;
 
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.AfterEachCallback;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
@@ -215,65 +208,6 @@ class WaitingForResourcesTest {
         ctx.runScheduledTasks();
     }
 
-    @Test
-    void testTransitionToFinishedOnGlobalFailure() {
-        ctx.setHasDesiredResources(() -> false);
-        WaitingForResources wfr =
-                new WaitingForResources(ctx, LOG, Duration.ZERO, 
STABILIZATION_TIMEOUT);
-        RuntimeException expectedException = new RuntimeException("This is a 
test exception");
-
-        ctx.setExpectFinished(
-                archivedExecutionGraph -> {
-                    
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED);
-                    
assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull();
-                    assertThat(
-                                    archivedExecutionGraph
-                                            .getFailureInfo()
-                                            .getException()
-                                            
.deserializeError(this.getClass().getClassLoader()))
-                            .isEqualTo(expectedException);
-                });
-
-        wfr.handleGlobalFailure(expectedException, 
FailureEnricherUtils.EMPTY_FAILURE_LABELS);
-    }
-
-    @Test
-    void testCancel() {
-        ctx.setHasDesiredResources(() -> false);
-        WaitingForResources wfr =
-                new WaitingForResources(ctx, LOG, Duration.ZERO, 
STABILIZATION_TIMEOUT);
-
-        ctx.setExpectFinished(
-                archivedExecutionGraph -> {
-                    
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.CANCELED);
-                    
assertThat(archivedExecutionGraph.getFailureInfo()).isNull();
-                });
-        wfr.cancel();
-    }
-
-    @Test
-    void testSuspend() {
-        ctx.setHasDesiredResources(() -> false);
-        WaitingForResources wfr =
-                new WaitingForResources(ctx, LOG, Duration.ZERO, 
STABILIZATION_TIMEOUT);
-
-        FlinkException expectedException = new FlinkException("This is a test 
exception");
-
-        ctx.setExpectFinished(
-                archivedExecutionGraph -> {
-                    
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.SUSPENDED);
-                    
assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull();
-                    assertThat(
-                                    archivedExecutionGraph
-                                            .getFailureInfo()
-                                            .getException()
-                                            
.deserializeError(this.getClass().getClassLoader()))
-                            .isEqualTo(expectedException);
-                });
-
-        wfr.suspend(expectedException);
-    }
-
     @Test
     void testInternalRunScheduledTasks_correctExecutionOrder() {
         AtomicBoolean firstRun = new AtomicBoolean(false);
@@ -352,14 +286,13 @@ class WaitingForResourcesTest {
         assertThat(executed).isTrue();
     }
 
-    private static class MockContext implements WaitingForResources.Context, 
AfterEachCallback {
+    private static class MockContext extends 
MockStateWithoutExecutionGraphContext
+            implements WaitingForResources.Context {
 
         private static final Logger LOG = 
LoggerFactory.getLogger(MockContext.class);
 
         private final StateValidator<Void> 
creatingExecutionGraphStateValidator =
                 new StateValidator<>("executing");
-        private final StateValidator<ArchivedExecutionGraph> 
finishedStateValidator =
-                new StateValidator<>("finished");
 
         private Supplier<Boolean> hasDesiredResourcesSupplier = () -> false;
         private Supplier<Boolean> hasSufficientResourcesSupplier = () -> false;
@@ -367,7 +300,6 @@ class WaitingForResourcesTest {
         private final Queue<ScheduledTask<Void>> scheduledTasks =
                 new PriorityQueue<>(
                         Comparator.comparingLong(o -> 
o.getDelay(TimeUnit.MILLISECONDS)));
-        private boolean hasStateTransition = false;
 
         private final ManualTestTime testTime =
                 new ManualTestTime(
@@ -382,10 +314,6 @@ class WaitingForResourcesTest {
             hasSufficientResourcesSupplier = sup;
         }
 
-        void setExpectFinished(Consumer<ArchivedExecutionGraph> asserter) {
-            finishedStateValidator.expectInput(asserter);
-        }
-
         void setExpectCreatingExecutionGraph() {
             creatingExecutionGraphStateValidator.expectInput(none -> {});
         }
@@ -414,17 +342,8 @@ class WaitingForResourcesTest {
 
         @Override
         public void afterEach(ExtensionContext extensionContext) throws 
Exception {
+            super.afterEach(extensionContext);
             creatingExecutionGraphStateValidator.close();
-            finishedStateValidator.close();
-        }
-
-        @Override
-        public ArchivedExecutionGraph getArchivedExecutionGraph(
-                JobStatus jobStatus, @Nullable Throwable cause) {
-            return new ArchivedExecutionGraphBuilder()
-                    .setState(jobStatus)
-                    .setFailureCause(cause == null ? null : new 
ErrorInfo(cause, 1337))
-                    .build();
         }
 
         @Override
@@ -459,26 +378,12 @@ class WaitingForResourcesTest {
             return scheduledTask;
         }
 
-        @Override
-        public void goToFinished(ArchivedExecutionGraph 
archivedExecutionGraph) {
-            finishedStateValidator.validateInput(archivedExecutionGraph);
-            registerStateTransition();
-        }
-
         @Override
         public void goToCreatingExecutionGraph(@Nullable ExecutionGraph 
previousExecutionGraph) {
             creatingExecutionGraphStateValidator.validateInput(null);
             registerStateTransition();
         }
 
-        public boolean hasStateTransition() {
-            return hasStateTransition;
-        }
-
-        public void registerStateTransition() {
-            hasStateTransition = true;
-        }
-
         public Clock getClock() {
             return testTime.getClock();
         }

Reply via email to