tillrohrmann commented on a change in pull request #14879:
URL: https://github.com/apache/flink/pull/14879#discussion_r572220376



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/Executing.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+
+/** State which represents a running job with an {@link ExecutionGraph} and 
assigned slots. */
+class Executing extends StateWithExecutionGraph implements ResourceConsumer {
+
+    private final Context context;
+
+    private final ClassLoader userCodeClassLoader;
+
+    Executing(
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            Logger logger,
+            Context context,
+            ClassLoader userCodeClassLoader) {
+        super(context, executionGraph, executionGraphHandler, 
operatorCoordinatorHandler, logger);
+        this.context = context;
+        this.userCodeClassLoader = userCodeClassLoader;
+    }
+
+    @Override
+    public void onEnter() {
+        deploy();
+    }
+
+    @Override
+    public void cancel() {
+        context.goToCanceling(
+                getExecutionGraph(), getExecutionGraphHandler(), 
getOperatorCoordinatorHandler());
+    }
+
+    @Override
+    public void handleGlobalFailure(Throwable cause) {
+        handleAnyFailure(cause);
+    }
+
+    private void handleAnyFailure(Throwable cause) {
+        final FailureResult failureResult = context.howToHandleFailure(cause);
+
+        if (failureResult.canRestart()) {
+            context.goToRestarting(
+                    getExecutionGraph(),
+                    getExecutionGraphHandler(),
+                    getOperatorCoordinatorHandler(),
+                    failureResult.getBackoffTime());
+        } else {
+            context.goToFailing(
+                    getExecutionGraph(),
+                    getExecutionGraphHandler(),
+                    getOperatorCoordinatorHandler(),
+                    failureResult.getFailureCause());
+        }
+    }
+
+    @Override
+    boolean updateTaskExecutionState(TaskExecutionStateTransition 
taskExecutionState) {
+        final boolean successfulUpdate = 
getExecutionGraph().updateState(taskExecutionState);
+
+        if (successfulUpdate) {
+            if (taskExecutionState.getExecutionState() == 
ExecutionState.FAILED) {
+                Throwable cause = 
taskExecutionState.getError(userCodeClassLoader);
+                handleAnyFailure(cause);
+            }
+        }
+
+        return successfulUpdate;
+    }
+
+    @Override
+    void onTerminalState(JobStatus terminalState) {
+        
context.goToFinished(ArchivedExecutionGraph.createFrom(getExecutionGraph()));
+    }
+
+    private void deploy() {
+        final ExecutionGraph executionGraph = getExecutionGraph();
+        executionGraph.transitionToRunning();

Review comment:
       Can't we assume that an `ExecutionGraph` is in state `RUNNING` when it 
is being passed to a `StateWithExecutionGraph`? This simplifies things a bit.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StateWithExecutionGraph.java
##########
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.KvStateHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * Abstract state class which contains an {@link ExecutionGraph} and the 
required handlers to
+ * execute common operations.
+ */
+abstract class StateWithExecutionGraph implements State {
+    private final Context context;
+
+    private final ExecutionGraph executionGraph;
+
+    private final ExecutionGraphHandler executionGraphHandler;
+
+    private final OperatorCoordinatorHandler operatorCoordinatorHandler;
+
+    private final KvStateHandler kvStateHandler;
+
+    private final Logger logger;
+
+    StateWithExecutionGraph(
+            Context context,
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            Logger logger) {
+        this.context = context;
+        this.executionGraph = executionGraph;
+        this.executionGraphHandler = executionGraphHandler;
+        this.operatorCoordinatorHandler = operatorCoordinatorHandler;
+        this.kvStateHandler = new KvStateHandler(executionGraph);
+        this.logger = logger;
+
+        FutureUtils.assertNoException(
+                executionGraph
+                        .getTerminationFuture()
+                        .thenAcceptAsync(
+                                jobStatus -> {
+                                    context.runIfState(this, () -> 
onTerminalState(jobStatus));

Review comment:
       Where are you filtering the non-global terminal state out?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/MockStateWithExecutionGraphContext.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+class MockStateWithExecutionGraphContext implements 
StateWithExecutionGraph.Context, AutoCloseable {
+
+    private final StateValidator<ArchivedExecutionGraph> 
finishedStateValidator =
+            new StateValidator<>("finished");
+
+    private final ManuallyTriggeredComponentMainThreadExecutor executor =
+            new ManuallyTriggeredComponentMainThreadExecutor();
+
+    protected boolean hadStateTransition = false;
+
+    public void setExpectFinished(Consumer<ArchivedExecutionGraph> asserter) {
+        finishedStateValidator.expectInput(asserter);
+    }
+
+    @Override
+    public void runIfState(State expectedState, Runnable action) {
+        if (!hadStateTransition) {
+            action.run();
+        }
+    }
+
+    @Override
+    public boolean isState(State expectedState) {
+        throw new UnsupportedOperationException("Not covered by this test at 
the moment");
+    }
+
+    @Override
+    public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph) {
+        finishedStateValidator.validateInput(archivedExecutionGraph);
+        hadStateTransition = true;
+    }
+
+    @Override
+    public ComponentMainThreadExecutor getMainThreadExecutor() {
+        return executor;
+    }
+
+    @Override
+    public void close() throws Exception {
+        // trigger executor to make sure there are no outstanding state 
transitions
+        executor.triggerAll();
+        executor.shutdown();
+        executor.awaitTermination(10, TimeUnit.MINUTES);
+        assertNotStateTransition();
+    }
+
+    protected void assertNotStateTransition() {

Review comment:
       I think you have forgotten to check the other states in the 
`MockExecutingContext`.

##########
File path: flink-runtime/src/test/resources/log4j2-test.properties
##########
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-rootLogger.level = OFF
+rootLogger.level = INFO

Review comment:
       Please revert.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/ExecutingTest.java
##########
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.NoOpExecutionDeploymentListener;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader;
+import 
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.runtime.scheduler.declarative.WaitingForResourcesTest.assertNonNull;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/** Tests for declarative scheduler's {@link Executing} state. */
+public class ExecutingTest extends TestLogger {
+
+    @Test
+    public void testExecutionGraphDeploymentOnEnter() throws Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            Executing exec = new ExecutingStateBuilder().build(ctx);
+
+            assertThat(exec.getExecutionGraph().getState(), 
is(JobStatus.CREATED));
+
+            exec.onEnter();
+
+            assertThat(exec.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));

Review comment:
       Technically speaking this does not test that we are deploying the EG.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredComponentMainThreadExecutor.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+/** Testing ComponentMainThreadExecutor which can be manually triggered. */
+public class ManuallyTriggeredComponentMainThreadExecutor
+        extends ManuallyTriggeredScheduledExecutorService implements 
ComponentMainThreadExecutor {
+
+    private Thread executorThread = null;

Review comment:
       Can we set this thread when creating the executor instead of setting it 
dynamically? We could simply call `assertRunningInMainThread` in the `trigger` 
method before calling `super.trigger()`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/MockStateWithExecutionGraphContext.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+class MockStateWithExecutionGraphContext implements 
StateWithExecutionGraph.Context, AutoCloseable {
+
+    private final StateValidator<ArchivedExecutionGraph> 
finishedStateValidator =
+            new StateValidator<>("finished");
+
+    private final ManuallyTriggeredComponentMainThreadExecutor executor =
+            new ManuallyTriggeredComponentMainThreadExecutor();
+
+    protected boolean hadStateTransition = false;
+
+    public void setExpectFinished(Consumer<ArchivedExecutionGraph> asserter) {
+        finishedStateValidator.expectInput(asserter);
+    }
+
+    @Override
+    public void runIfState(State expectedState, Runnable action) {
+        if (!hadStateTransition) {
+            action.run();
+        }
+    }
+
+    @Override
+    public boolean isState(State expectedState) {
+        throw new UnsupportedOperationException("Not covered by this test at 
the moment");
+    }
+
+    @Override
+    public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph) {
+        finishedStateValidator.validateInput(archivedExecutionGraph);
+        hadStateTransition = true;
+    }
+
+    @Override
+    public ComponentMainThreadExecutor getMainThreadExecutor() {
+        return executor;
+    }
+
+    @Override
+    public void close() throws Exception {
+        // trigger executor to make sure there are no outstanding state 
transitions
+        executor.triggerAll();
+        executor.shutdown();
+        executor.awaitTermination(10, TimeUnit.MINUTES);
+        assertNotStateTransition();
+    }
+
+    protected void assertNotStateTransition() {

Review comment:
       `assertNoStateTransition`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StateWithExecutionGraph.java
##########
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.KvStateHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * Abstract state class which contains an {@link ExecutionGraph} and the 
required handlers to
+ * execute common operations.
+ */
+abstract class StateWithExecutionGraph implements State {
+    private final Context context;
+
+    private final ExecutionGraph executionGraph;
+
+    private final ExecutionGraphHandler executionGraphHandler;
+
+    private final OperatorCoordinatorHandler operatorCoordinatorHandler;
+
+    private final KvStateHandler kvStateHandler;
+
+    private final Logger logger;
+
+    StateWithExecutionGraph(
+            Context context,
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            Logger logger) {
+        this.context = context;
+        this.executionGraph = executionGraph;
+        this.executionGraphHandler = executionGraphHandler;
+        this.operatorCoordinatorHandler = operatorCoordinatorHandler;
+        this.kvStateHandler = new KvStateHandler(executionGraph);
+        this.logger = logger;
+
+        FutureUtils.assertNoException(
+                executionGraph
+                        .getTerminationFuture()
+                        .thenAcceptAsync(
+                                jobStatus -> {
+                                    context.runIfState(this, () -> 
onTerminalState(jobStatus));
+                                },

Review comment:
       nit: No curly brackets needed here.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/MockStateWithExecutionGraphContext.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+class MockStateWithExecutionGraphContext implements 
StateWithExecutionGraph.Context, AutoCloseable {
+
+    private final StateValidator<ArchivedExecutionGraph> 
finishedStateValidator =
+            new StateValidator<>("finished");
+
+    private final ManuallyTriggeredComponentMainThreadExecutor executor =
+            new ManuallyTriggeredComponentMainThreadExecutor();
+
+    protected boolean hadStateTransition = false;
+
+    public void setExpectFinished(Consumer<ArchivedExecutionGraph> asserter) {
+        finishedStateValidator.expectInput(asserter);
+    }
+
+    @Override
+    public void runIfState(State expectedState, Runnable action) {
+        if (!hadStateTransition) {
+            action.run();
+        }
+    }
+
+    @Override
+    public boolean isState(State expectedState) {
+        throw new UnsupportedOperationException("Not covered by this test at 
the moment");
+    }
+
+    @Override
+    public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph) {
+        finishedStateValidator.validateInput(archivedExecutionGraph);
+        hadStateTransition = true;
+    }
+
+    @Override
+    public ComponentMainThreadExecutor getMainThreadExecutor() {
+        return executor;
+    }
+
+    @Override
+    public void close() throws Exception {
+        // trigger executor to make sure there are no outstanding state 
transitions
+        executor.triggerAll();
+        executor.shutdown();
+        executor.awaitTermination(10, TimeUnit.MINUTES);
+        assertNotStateTransition();
+    }
+
+    protected void assertNotStateTransition() {

Review comment:
       Could you explain how this method works? It seems that tests which go 
into the Restarting state still continue working when adding this assertion at 
the end. My expectation would be that the assertion fails.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/RestartingTest.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.NoOpExecutionDeploymentListener;
+import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader;
+import 
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.runtime.scheduler.declarative.WaitingForResourcesTest.assertNonNull;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link Restarting} state of the declarative scheduler. */
+public class RestartingTest extends TestLogger {
+
+    @Test
+    public void testExecutionGraphCancellationOnEnter() throws Exception {
+        try (MockRestartingContext ctx = new MockRestartingContext()) {
+            CancellableExecutionGraph cancellableExecutionGraph = new 
CancellableExecutionGraph();
+            Restarting restarting = createRestartingState(ctx, 
cancellableExecutionGraph);
+
+            restarting.onEnter();
+            assertThat(cancellableExecutionGraph.isCancelled(), is(true));
+        }
+    }
+
+    @Test
+    public void testTransitionToWaitingForResourcesWhenCancellationComplete() 
throws Exception {
+        try (MockRestartingContext ctx = new MockRestartingContext()) {
+            Restarting restarting = createRestartingState(ctx);
+            ctx.setExpectWaitingForResources();
+            restarting.onTerminalState(JobStatus.CANCELED);
+        }
+    }
+
+    @Test
+    public void testCancel() throws Exception {
+        try (MockRestartingContext ctx = new MockRestartingContext()) {
+            Restarting restarting = createRestartingState(ctx);
+            ctx.setExpectCancelling(assertNonNull());
+            restarting.cancel();
+        }
+    }
+
+    @Test
+    public void testSuspend() throws Exception {
+        try (MockRestartingContext ctx = new MockRestartingContext()) {
+            Restarting restarting = createRestartingState(ctx);
+            ctx.setExpectFinished(
+                    archivedExecutionGraph ->
+                            assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.SUSPENDED)));
+            final Throwable cause = new RuntimeException("suspend");
+            restarting.suspend(cause);
+        }
+    }
+
+    @Test
+    public void testGlobalFailure() throws Exception {

Review comment:
       What should happen in case of a global failure?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StateWithExecutionGraph.java
##########
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.KvStateHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * Abstract state class which contains an {@link ExecutionGraph} and the 
required handlers to
+ * execute common operations.
+ */
+abstract class StateWithExecutionGraph implements State {
+    private final Context context;
+
+    private final ExecutionGraph executionGraph;
+
+    private final ExecutionGraphHandler executionGraphHandler;
+
+    private final OperatorCoordinatorHandler operatorCoordinatorHandler;
+
+    private final KvStateHandler kvStateHandler;
+
+    private final Logger logger;
+
+    StateWithExecutionGraph(
+            Context context,
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            Logger logger) {
+        this.context = context;
+        this.executionGraph = executionGraph;
+        this.executionGraphHandler = executionGraphHandler;
+        this.operatorCoordinatorHandler = operatorCoordinatorHandler;
+        this.kvStateHandler = new KvStateHandler(executionGraph);
+        this.logger = logger;
+
+        FutureUtils.assertNoException(
+                executionGraph
+                        .getTerminationFuture()
+                        .thenAcceptAsync(
+                                jobStatus -> {
+                                    context.runIfState(this, () -> 
onTerminalState(jobStatus));
+                                },
+                                context.getMainThreadExecutor()));
+    }
+
+    ExecutionGraph getExecutionGraph() {
+        return executionGraph;
+    }
+
+    OperatorCoordinatorHandler getOperatorCoordinatorHandler() {
+        return operatorCoordinatorHandler;
+    }
+
+    ExecutionGraphHandler getExecutionGraphHandler() {
+        return executionGraphHandler;
+    }
+
+    @Override
+    public void onLeave(State newState) {
+        if 
(!StateWithExecutionGraph.class.isAssignableFrom(newState.getClass())) {
+            // we are leaving the StateWithExecutionGraph --> we need to 
dispose temporary services
+            operatorCoordinatorHandler.disposeAllOperatorCoordinators();
+        }
+    }
+
+    @Override
+    public ArchivedExecutionGraph getJob() {
+        return ArchivedExecutionGraph.createFrom(executionGraph);
+    }
+
+    @Override
+    public JobStatus getJobStatus() {
+        return executionGraph.getState();
+    }
+
+    @Override
+    public void suspend(Throwable cause) {
+        executionGraph.suspend(cause);
+        Preconditions.checkState(executionGraph.getState() == 
JobStatus.SUSPENDED);
+        
context.goToFinished(ArchivedExecutionGraph.createFrom(executionGraph));
+    }
+
+    @Override
+    public Logger getLogger() {
+        return logger;
+    }
+
+    void notifyPartitionDataAvailable(ResultPartitionID partitionID) {
+        executionGraph.notifyPartitionDataAvailable(partitionID);
+    }
+
+    SerializedInputSplit requestNextInputSplit(
+            JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws 
IOException {
+        return executionGraphHandler.requestNextInputSplit(vertexID, 
executionAttempt);
+    }
+
+    ExecutionState requestPartitionState(
+            IntermediateDataSetID intermediateResultId, ResultPartitionID 
resultPartitionId)
+            throws PartitionProducerDisposedException {
+        return 
executionGraphHandler.requestPartitionState(intermediateResultId, 
resultPartitionId);
+    }
+
+    void acknowledgeCheckpoint(
+            JobID jobID,
+            ExecutionAttemptID executionAttemptID,
+            long checkpointId,
+            CheckpointMetrics checkpointMetrics,
+            TaskStateSnapshot checkpointState) {
+
+        executionGraphHandler.acknowledgeCheckpoint(
+                jobID, executionAttemptID, checkpointId, checkpointMetrics, 
checkpointState);
+    }
+
+    void declineCheckpoint(DeclineCheckpoint decline) {
+        executionGraphHandler.declineCheckpoint(decline);
+    }
+
+    void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
+        executionGraph.updateAccumulators(accumulatorSnapshot);
+    }
+
+    KvStateLocation requestKvStateLocation(JobID jobId, String 
registrationName)
+            throws FlinkJobNotFoundException, UnknownKvStateLocation {
+        return kvStateHandler.requestKvStateLocation(jobId, registrationName);
+    }
+
+    void notifyKvStateRegistered(
+            JobID jobId,
+            JobVertexID jobVertexId,
+            KeyGroupRange keyGroupRange,
+            String registrationName,
+            KvStateID kvStateId,
+            InetSocketAddress kvStateServerAddress)
+            throws FlinkJobNotFoundException {
+        kvStateHandler.notifyKvStateRegistered(
+                jobId,
+                jobVertexId,
+                keyGroupRange,
+                registrationName,
+                kvStateId,
+                kvStateServerAddress);
+    }
+
+    void notifyKvStateUnregistered(
+            JobID jobId,
+            JobVertexID jobVertexId,
+            KeyGroupRange keyGroupRange,
+            String registrationName)
+            throws FlinkJobNotFoundException {
+        kvStateHandler.notifyKvStateUnregistered(
+                jobId, jobVertexId, keyGroupRange, registrationName);
+    }
+
+    CompletableFuture<String> triggerSavepoint(String targetDirectory, boolean 
cancelJob) {
+        final CheckpointCoordinator checkpointCoordinator =
+                executionGraph.getCheckpointCoordinator();
+        if (checkpointCoordinator == null) {
+            throw new IllegalStateException(
+                    String.format("Job %s is not a streaming job.", 
executionGraph.getJobID()));
+        } else if (targetDirectory == null
+                && 
!checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
+            logger.info(
+                    "Trying to cancel job {} with savepoint, but no savepoint 
directory configured.",
+                    executionGraph.getJobID());
+
+            throw new IllegalStateException(
+                    "No savepoint directory configured. You can either specify 
a directory "
+                            + "while cancelling via -s :targetDirectory or 
configure a cluster-wide "
+                            + "default via key '"
+                            + CheckpointingOptions.SAVEPOINT_DIRECTORY.key()
+                            + "'.");
+        }
+
+        logger.info(
+                "Triggering {}savepoint for job {}.",
+                cancelJob ? "cancel-with-" : "",
+                executionGraph.getJobID());
+
+        if (cancelJob) {
+            checkpointCoordinator.stopCheckpointScheduler();
+        }
+
+        return checkpointCoordinator
+                .triggerSavepoint(targetDirectory)
+                .thenApply(CompletedCheckpoint::getExternalPointer)
+                .handleAsync(
+                        (path, throwable) -> {
+                            if (throwable != null) {
+                                if (cancelJob && context.isState(this)) {
+                                    
startCheckpointScheduler(checkpointCoordinator);
+                                }
+                                throw new CompletionException(throwable);
+                            } else if (cancelJob && context.isState(this)) {
+                                logger.info(
+                                        "Savepoint stored in {}. Now 
cancelling {}.",
+                                        path,
+                                        executionGraph.getJobID());
+                                cancel();
+                            }
+                            return path;
+                        },
+                        context.getMainThreadExecutor());
+    }
+
+    CompletableFuture<String> stopWithSavepoint(
+            String targetDirectory, boolean advanceToEndOfEventTime) {
+        final CheckpointCoordinator checkpointCoordinator =
+                executionGraph.getCheckpointCoordinator();
+
+        if (checkpointCoordinator == null) {
+            return FutureUtils.completedExceptionally(
+                    new IllegalStateException(
+                            String.format(
+                                    "Job %s is not a streaming job.", 
executionGraph.getJobID())));
+        }
+
+        if (targetDirectory == null
+                && 
!checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
+            logger.info(
+                    "Trying to cancel job {} with savepoint, but no savepoint 
directory configured.",
+                    executionGraph.getJobID());
+
+            return FutureUtils.completedExceptionally(
+                    new IllegalStateException(
+                            "No savepoint directory configured. You can either 
specify a directory "
+                                    + "while cancelling via -s 
:targetDirectory or configure a cluster-wide "
+                                    + "default via key '"
+                                    + 
CheckpointingOptions.SAVEPOINT_DIRECTORY.key()
+                                    + "'."));
+        }
+
+        logger.info("Triggering stop-with-savepoint for job {}.", 
executionGraph.getJobID());
+
+        // we stop the checkpoint coordinator so that we are guaranteed
+        // to have only the data of the synchronous savepoint committed.
+        // in case of failure, and if the job restarts, the coordinator
+        // will be restarted by the CheckpointCoordinatorDeActivator.
+        checkpointCoordinator.stopCheckpointScheduler();
+
+        final CompletableFuture<String> savepointFuture =
+                checkpointCoordinator
+                        .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
+                        .thenApply(CompletedCheckpoint::getExternalPointer);
+
+        final CompletableFuture<JobStatus> terminationFuture =
+                executionGraph
+                        .getTerminationFuture()
+                        .handle(
+                                (jobstatus, throwable) -> {
+                                    if (throwable != null) {
+                                        logger.info(
+                                                "Failed during stopping job {} 
with a savepoint. Reason: {}",
+                                                executionGraph.getJobID(),
+                                                throwable.getMessage());
+                                        throw new 
CompletionException(throwable);
+                                    } else if (jobstatus != 
JobStatus.FINISHED) {
+                                        logger.info(
+                                                "Failed during stopping job {} 
with a savepoint. Reason: Reached state {} instead of FINISHED.",
+                                                executionGraph.getJobID(),
+                                                jobstatus);
+                                        throw new CompletionException(
+                                                new FlinkException(
+                                                        "Reached state "
+                                                                + jobstatus
+                                                                + " instead of 
FINISHED."));
+                                    }
+                                    return jobstatus;
+                                });
+
+        return savepointFuture
+                .thenCompose((path) -> terminationFuture.thenApply((jobStatus 
-> path)))
+                .handleAsync(
+                        (path, throwable) -> {
+                            if (throwable != null) {
+                                if (context.isState(this)) {
+                                    // restart the checkpoint coordinator if 
stopWithSavepoint
+                                    // failed.
+                                    
startCheckpointScheduler(checkpointCoordinator);
+                                }
+                                throw new CompletionException(throwable);
+                            }
+                            if (ensureAllExecutionsFinished()) {
+                                Throwable error =
+                                        new FlinkException(
+                                                "Not all executions were 
stopped with a savepoint.");
+                                handleGlobalFailure(error);
+                                throw new CompletionException(error);
+                            }
+
+                            return path;
+                        },
+                        context.getMainThreadExecutor());
+    }

Review comment:
       I would suggest to solve this problem properly with an explicit `State` 
which marks that we are stopping with a savepoint. Hence, we could implement 
this method with an `UnsupportedOperationException` and then do this as a 
follow up.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/RestartingTest.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.NoOpExecutionDeploymentListener;
+import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader;
+import 
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.runtime.scheduler.declarative.WaitingForResourcesTest.assertNonNull;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link Restarting} state of the declarative scheduler. */
+public class RestartingTest extends TestLogger {
+
+    @Test
+    public void testExecutionGraphCancellationOnEnter() throws Exception {
+        try (MockRestartingContext ctx = new MockRestartingContext()) {
+            CancellableExecutionGraph cancellableExecutionGraph = new 
CancellableExecutionGraph();
+            Restarting restarting = createRestartingState(ctx, 
cancellableExecutionGraph);
+
+            restarting.onEnter();
+            assertThat(cancellableExecutionGraph.isCancelled(), is(true));
+        }
+    }
+
+    @Test
+    public void testTransitionToWaitingForResourcesWhenCancellationComplete() 
throws Exception {
+        try (MockRestartingContext ctx = new MockRestartingContext()) {
+            Restarting restarting = createRestartingState(ctx);
+            ctx.setExpectWaitingForResources();
+            restarting.onTerminalState(JobStatus.CANCELED);
+        }
+    }
+
+    @Test
+    public void testCancel() throws Exception {
+        try (MockRestartingContext ctx = new MockRestartingContext()) {
+            Restarting restarting = createRestartingState(ctx);
+            ctx.setExpectCancelling(assertNonNull());
+            restarting.cancel();
+        }
+    }
+
+    @Test
+    public void testSuspend() throws Exception {
+        try (MockRestartingContext ctx = new MockRestartingContext()) {
+            Restarting restarting = createRestartingState(ctx);
+            ctx.setExpectFinished(
+                    archivedExecutionGraph ->
+                            assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.SUSPENDED)));
+            final Throwable cause = new RuntimeException("suspend");
+            restarting.suspend(cause);
+        }
+    }
+
+    @Test
+    public void testGlobalFailure() throws Exception {
+        try (MockRestartingContext ctx = new MockRestartingContext()) {
+            Restarting restarting = createRestartingState(ctx);
+            restarting.handleGlobalFailure(new RuntimeException());
+            ctx.assertNotStateTransition();
+        }
+    }
+
+    public Restarting createRestartingState(
+            MockRestartingContext ctx, ExecutionGraph executionGraph) {
+        final ExecutionGraphHandler executionGraphHandler =
+                new ExecutionGraphHandler(
+                        executionGraph,
+                        log,
+                        ctx.getMainThreadExecutor(),
+                        ctx.getMainThreadExecutor());
+        final OperatorCoordinatorHandler operatorCoordinatorHandler =
+                new OperatorCoordinatorHandler(
+                        executionGraph,
+                        (throwable) -> {
+                            throw new RuntimeException("Error in test", 
throwable);
+                        });
+        return new Restarting(
+                ctx,
+                executionGraph,
+                executionGraphHandler,
+                operatorCoordinatorHandler,
+                log,
+                Duration.ZERO);
+    }
+
+    public Restarting createRestartingState(MockRestartingContext ctx)
+            throws JobException, JobExecutionException {
+        ExecutionGraph executionGraph = 
TestingExecutionGraphBuilder.newBuilder().build();
+        return createRestartingState(ctx, executionGraph);
+    }
+
+    private static class MockRestartingContext extends 
MockStateWithExecutionGraphContext
+            implements Restarting.Context {
+
+        private final StateValidator<ExecutingTest.CancellingArguments> 
cancellingStateValidator =
+                new StateValidator<>("Cancelling");
+
+        private final StateValidator<Void> waitingForResourcesStateValidator =
+                new StateValidator<>("WaitingForResources");
+
+        public void 
setExpectCancelling(Consumer<ExecutingTest.CancellingArguments> asserter) {
+            cancellingStateValidator.expectInput(asserter);
+        }
+
+        public void setExpectWaitingForResources() {
+            waitingForResourcesStateValidator.expectInput((none) -> {});
+        }
+
+        @Override
+        public void goToCanceling(
+                ExecutionGraph executionGraph,
+                ExecutionGraphHandler executionGraphHandler,
+                OperatorCoordinatorHandler operatorCoordinatorHandler) {
+            cancellingStateValidator.validateInput(
+                    new ExecutingTest.CancellingArguments(
+                            executionGraph, executionGraphHandler, 
operatorCoordinatorHandler));
+            hadStateTransition = true;
+        }
+
+        @Override
+        public void goToWaitingForResources() {
+            waitingForResourcesStateValidator.validateInput(null);
+            hadStateTransition = true;
+        }
+
+        @Override
+        public void runIfState(State expectedState, Runnable action, Duration 
delay) {
+            if (!hadStateTransition) {
+                action.run();
+            }
+        }
+
+        @Override
+        public void close() throws Exception {
+            super.close();
+            assertNotStateTransition();
+        }
+
+        public void assertNotStateTransition() {

Review comment:
       ```suggestion
           public void assertNoStateTransition() {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to