XComp commented on a change in pull request #14948:
URL: https://github.com/apache/flink/pull/14948#discussion_r585407542



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1725,6 +1725,9 @@ public void startCheckpointScheduler() {
             if (shutdown) {
                 throw new IllegalArgumentException("Checkpoint coordinator is 
shut down");
             }
+            Preconditions.checkState(
+                    isPeriodicCheckpointingConfigured(),

Review comment:
       I verified that the condition applies in production code. I'm just 
wondering, whether we should use the `isPeriodicCheckpointingConfigured` method 
also in the [ExecutionGraph's 
enableCheckpointing](https://github.com/apache/flink/blob/e084dabe6c1722dc1ced7d8fddb3c82e7af7b103/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java#L477)
 method to have the method consistently used everywhere to check whether 
periodic checkpointing is enabled.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StopWithSavepointOperations.java
##########
@@ -18,15 +18,22 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
 /**
- * {@code CheckpointScheduling} provides methods for starting and stopping the 
periodic scheduling
- * of checkpoints.
+ * {@code StopWithSavepointOperations} provides methods for the 
stop-with-savepoint operation, such
+ * as starting and stopping the periodic scheduling of checkpoints, or 
triggering a savepoint.
  */
-public interface CheckpointScheduling {
+public interface StopWithSavepointOperations {

Review comment:
       I was thinking about doing the same for 
[FLINK-21030](https://issues.apache.org/jira/browse/FLINK-21030). I hesitated 
because I felt that the savepoint creation method is semantically closer to the 
other Checkpoint-related methods provided by `SchedulerNG`. But if we want to 
collect the stop-with-savepoint methods in a dedicated interface, we should 
also consider adding the wait for execution termination and not move it into 
`SchedulerUtils` as it's part of the collection.
   
   This would enable us to move the entire stop-with-savepoint logic into 
`StopWithSavepointTerminationManager` (i.e. stopping Checkpoint scheduling and 
triggering the subtasks). This would remove some redundant code.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -430,14 +430,28 @@ protected void onStart() throws JobMasterException {
             final TaskExecutionState taskExecutionState) {
         checkNotNull(taskExecutionState, "taskExecutionState");
 
-        if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {
-            return CompletableFuture.completedFuture(Acknowledge.get());
-        } else {
-            return FutureUtils.completedExceptionally(
-                    new ExecutionGraphException(
-                            "The execution attempt "
-                                    + taskExecutionState.getID()
-                                    + " was not found."));
+        try {
+            if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {
+                return CompletableFuture.completedFuture(Acknowledge.get());
+            } else {
+                return FutureUtils.completedExceptionally(
+                        new ExecutionGraphException(
+                                "The execution attempt "
+                                        + taskExecutionState.getID()
+                                        + " was not found."));
+            }
+        } catch (Throwable throwable) {
+            // if the taskExecutionState contains an error, it does not make 
sense to let the RPC

Review comment:
       Wouldn't that also apply for a cancelled state? And one thing I'd like 
to mention here is that it could (rarely) happen that the task changes to a 
`FAILED` state but no error is provided. This is covered by 
[FLINK-21376](https://issues.apache.org/jira/browse/FLINK-21376).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
##########
@@ -501,12 +502,11 @@ public void declineCheckpoint(DeclineCheckpoint decline) {
     }
 
     @Override
-    public CompletableFuture<String> stopWithSavepoint(String targetDirectory, 
boolean terminate) {
+    public CompletableFuture<String> stopWithSavepoint(
+            @Nullable String targetDirectory, boolean terminate) {
         return state.tryCall(
-                        StateWithExecutionGraph.class,
-                        stateWithExecutionGraph ->
-                                stateWithExecutionGraph.stopWithSavepoint(
-                                        targetDirectory, terminate),
+                        Executing.class,
+                        executing -> 
executing.stopWithSavepoint(targetDirectory, terminate),
                         "stopWithSavepoint")
                 .orElse(
                         FutureUtils.completedExceptionally(

Review comment:
       nit: I feel like we're mis-using the `CheckpointException` here. We're 
failing because of being in an invalid state - we didn't reach the Checkpoint 
components, yet. Hence, I'd propose going for `FlinkException`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
##########
@@ -143,6 +147,45 @@ public void notifyNewResourcesAvailable() {
         }
     }
 
+    CompletableFuture<String> stopWithSavepoint(

Review comment:
       The error handling of `stopWithSavepoint` can be moved into 
`StopWithSavepointTerminationManager` if we decide to extend 
`StopWithSavepointOperations` accordingly.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StopWithSavepointOperations;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.scheduler.SchedulerUtils;
+import 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl;
+import 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * When a "stop with savepoint" operation (wait until savepoint has been 
created, then cancel job)
+ * is triggered on the {@link Executing} state, we transition into this state. 
This state is
+ * delegating the tracking of the stop with savepoint operation to the {@link
+ * StopWithSavepointTerminationManager}, which is shared with {@link 
SchedulerBase}. What remains
+ * for this state is reacting to signals from the termination manager, and 
tracking the
+ * "operationCompletionFuture" (notify the user if we got cancelled, suspended 
etc. in the meantime)
+ */
+public class StopWithSavepoint extends StateWithExecutionGraph

Review comment:
       Looks like we can make this `package private`

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
##########
@@ -88,6 +119,177 @@ public void testGlobalFailoverCanRecoverState() throws 
Exception {
         env.execute();
     }
 
+    private enum StopWithSavepointTestBehavior {
+        NO_FAILURE,
+        FAIL_ON_CHECKPOINT,
+        FAIL_ON_STOP,
+        FAIL_ON_FIRST_CHECKPOINT_ONLY
+    }
+
+    @Test
+    public void testStopWithSavepointNoError() throws Exception {
+        StreamExecutionEnvironment env = 
getEnvWithSource(StopWithSavepointTestBehavior.NO_FAILURE);
+
+        DummySource.resetForParallelism(PARALLELISM);
+
+        JobClient client = env.executeAsync();
+
+        DummySource.awaitRunning();
+
+        final File savepointDirectory = tempFolder.newFolder("savepoint");
+        final String savepoint =
+                client.stopWithSavepoint(false, 
savepointDirectory.getAbsolutePath()).get();
+        assertThat(savepoint, 
containsString(savepointDirectory.getAbsolutePath()));
+        assertThat(client.getJobStatus().get(), is(JobStatus.FINISHED));
+    }
+
+    @Test
+    public void testStopWithSavepointFailOnCheckpoint() throws Exception {
+        StreamExecutionEnvironment env =
+                
getEnvWithSource(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT);
+        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0L));
+
+        DummySource.resetForParallelism(PARALLELISM);
+
+        JobClient client = env.executeAsync();
+
+        DummySource.awaitRunning();
+        try {
+            client.stopWithSavepoint(false, 
tempFolder.newFolder("savepoint").getAbsolutePath())
+                    .get();
+            fail("Expect exception");
+        } catch (ExecutionException e) {
+            assertThat(e.getMessage(), containsString("Failure while stopping 
with savepoint"));
+        }
+        // expect job to run again (maybe restart)
+        CommonTestUtils.waitUntilCondition(
+                () -> client.getJobStatus().get() == JobStatus.RUNNING,
+                Deadline.fromNow(Duration.of(1, ChronoUnit.MINUTES)));
+    }
+
+    @Test
+    public void testStopWithSavepointFailOnStop() throws Exception {
+        StreamExecutionEnvironment env =
+                getEnvWithSource(StopWithSavepointTestBehavior.FAIL_ON_STOP);
+        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0L));
+
+        DummySource.resetForParallelism(PARALLELISM);
+
+        JobClient client = env.executeAsync();
+
+        DummySource.awaitRunning();
+        try {
+            client.stopWithSavepoint(false, 
tempFolder.newFolder("savepoint").getAbsolutePath())
+                    .get();
+            fail("Expect exception");
+        } catch (ExecutionException e) {
+            assertThat(e.getMessage(), containsString("Failure while stopping 
with savepoint"));
+        }
+        // expect job to run again (maybe restart)
+        CommonTestUtils.waitUntilCondition(
+                () -> client.getJobStatus().get() == JobStatus.RUNNING,
+                Deadline.fromNow(Duration.of(1, ChronoUnit.MINUTES)));
+    }
+
+    @Test
+    public void testStopWithSavepointFailOnFirstSavepointSucceedOnSecond() 
throws Exception {
+        
assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));

Review comment:
       This seems to be obsolete due to `ensureDeclarativeResourceManagement()` 
being called before the test.a

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
##########
@@ -72,10 +94,19 @@ private static Configuration getConfiguration() {
                             
.setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
                             .build());
 
+    @Before
+    public void ensureDeclarativeResourceManagement() {
+        
assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));

Review comment:
       The `configuration` we check here is different to the one used in the 
`MiniCluster`. There is no real point having the member `configuration` here, I 
guess.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StopWithSavepointOperations.java
##########
@@ -18,15 +18,22 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
 /**
- * {@code CheckpointScheduling} provides methods for starting and stopping the 
periodic scheduling
- * of checkpoints.
+ * {@code StopWithSavepointOperations} provides methods for the 
stop-with-savepoint operation, such
+ * as starting and stopping the periodic scheduling of checkpoints, or 
triggering a savepoint.
  */
-public interface CheckpointScheduling {
+public interface StopWithSavepointOperations {

Review comment:
       Adding something like `checkpointingIsConfigured()` and 
`savepointDirectoryIsConfigured` would make it possible to move the entire 
stop-with-savepoint logic (including the error handling into 
`StopWithSavepointTermination` which, then, could be renamed into 
`StopWithSavepointOperation`).

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
##########
@@ -88,6 +119,177 @@ public void testGlobalFailoverCanRecoverState() throws 
Exception {
         env.execute();
     }
 
+    private enum StopWithSavepointTestBehavior {
+        NO_FAILURE,
+        FAIL_ON_CHECKPOINT,
+        FAIL_ON_STOP,
+        FAIL_ON_FIRST_CHECKPOINT_ONLY
+    }
+
+    @Test
+    public void testStopWithSavepointNoError() throws Exception {
+        StreamExecutionEnvironment env = 
getEnvWithSource(StopWithSavepointTestBehavior.NO_FAILURE);
+
+        DummySource.resetForParallelism(PARALLELISM);
+
+        JobClient client = env.executeAsync();
+
+        DummySource.awaitRunning();
+
+        final File savepointDirectory = tempFolder.newFolder("savepoint");
+        final String savepoint =
+                client.stopWithSavepoint(false, 
savepointDirectory.getAbsolutePath()).get();
+        assertThat(savepoint, 
containsString(savepointDirectory.getAbsolutePath()));
+        assertThat(client.getJobStatus().get(), is(JobStatus.FINISHED));
+    }
+
+    @Test
+    public void testStopWithSavepointFailOnCheckpoint() throws Exception {
+        StreamExecutionEnvironment env =
+                
getEnvWithSource(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT);
+        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0L));
+
+        DummySource.resetForParallelism(PARALLELISM);
+
+        JobClient client = env.executeAsync();
+
+        DummySource.awaitRunning();
+        try {
+            client.stopWithSavepoint(false, 
tempFolder.newFolder("savepoint").getAbsolutePath())
+                    .get();
+            fail("Expect exception");
+        } catch (ExecutionException e) {
+            assertThat(e.getMessage(), containsString("Failure while stopping 
with savepoint"));
+        }
+        // expect job to run again (maybe restart)
+        CommonTestUtils.waitUntilCondition(
+                () -> client.getJobStatus().get() == JobStatus.RUNNING,
+                Deadline.fromNow(Duration.of(1, ChronoUnit.MINUTES)));
+    }
+
+    @Test
+    public void testStopWithSavepointFailOnStop() throws Exception {
+        StreamExecutionEnvironment env =
+                getEnvWithSource(StopWithSavepointTestBehavior.FAIL_ON_STOP);
+        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0L));
+
+        DummySource.resetForParallelism(PARALLELISM);
+
+        JobClient client = env.executeAsync();
+
+        DummySource.awaitRunning();
+        try {
+            client.stopWithSavepoint(false, 
tempFolder.newFolder("savepoint").getAbsolutePath())
+                    .get();
+            fail("Expect exception");
+        } catch (ExecutionException e) {
+            assertThat(e.getMessage(), containsString("Failure while stopping 
with savepoint"));
+        }
+        // expect job to run again (maybe restart)
+        CommonTestUtils.waitUntilCondition(
+                () -> client.getJobStatus().get() == JobStatus.RUNNING,
+                Deadline.fromNow(Duration.of(1, ChronoUnit.MINUTES)));
+    }
+
+    @Test
+    public void testStopWithSavepointFailOnFirstSavepointSucceedOnSecond() 
throws Exception {
+        
assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
+
+        env.setParallelism(PARALLELISM);
+
+        env.addSource(new 
DummySource(StopWithSavepointTestBehavior.FAIL_ON_FIRST_CHECKPOINT_ONLY))
+                .addSink(new DiscardingSink<>());
+        DummySource.resetForParallelism(PARALLELISM);
+
+        JobClient client = env.executeAsync();
+
+        DummySource.awaitRunning();
+        DummySource.resetForParallelism(PARALLELISM);
+        final File savepointDirectory = tempFolder.newFolder("savepoint");
+        try {
+            client.stopWithSavepoint(false, 
savepointDirectory.getAbsolutePath()).get();
+            fail("Expect failure of operation");
+        } catch (ExecutionException e) {
+            assertThat(e.getMessage(), containsString("Failure while stopping 
with savepoint"));
+        }
+
+        // trigger second savepoint
+        DummySource.awaitRunning();
+        final String savepoint =
+                client.stopWithSavepoint(false, 
savepointDirectory.getAbsolutePath()).get();
+        assertThat(savepoint, 
containsString(savepointDirectory.getAbsolutePath()));
+    }
+
+    private static StreamExecutionEnvironment getEnvWithSource(
+            StopWithSavepointTestBehavior behavior) {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+        env.addSource(new DummySource(behavior)).addSink(new 
DiscardingSink<>());
+        return env;
+    }
+
+    private static final class DummySource extends 
RichParallelSourceFunction<Integer>

Review comment:
       Could we make this a generic test class that could be, for instance, 
also utilized in `SavepointITCase`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
##########
@@ -136,7 +136,7 @@ public ArchivedExecutionGraph getJob() {
     @Override
     public void suspend(Throwable cause) {
         executionGraph.suspend(cause);
-        Preconditions.checkState(executionGraph.getState() == 
JobStatus.SUSPENDED);
+        // Preconditions.checkState(executionGraph.getState() == 
JobStatus.SUSPENDED);

Review comment:
       Is this commented-out due to the WIP efforts or did we forget to comment 
it in again?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationManager.java
##########
@@ -51,10 +51,10 @@ public StopWithSavepointTerminationManager(
      *     operations run on.
      * @return A {@code CompletableFuture} containing the path to the created 
savepoint.
      */
-    public CompletableFuture<String> stopWithSavepoint(
+    public CompletableFuture<String> trackStopWithSavepoint(
             CompletableFuture<CompletedCheckpoint> completedSavepointFuture,
             CompletableFuture<Collection<ExecutionState>> 
terminatedExecutionStatesFuture,
-            ComponentMainThreadExecutor mainThreadExecutor) {
+            Executor mainThreadExecutor) {

Review comment:
       Good point. Thanks for cleaning that up! 👍 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StopWithSavepointOperations;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.scheduler.SchedulerUtils;
+import 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl;
+import 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * When a "stop with savepoint" operation (wait until savepoint has been 
created, then cancel job)
+ * is triggered on the {@link Executing} state, we transition into this state. 
This state is
+ * delegating the tracking of the stop with savepoint operation to the {@link
+ * StopWithSavepointTerminationManager}, which is shared with {@link 
SchedulerBase}. What remains
+ * for this state is reacting to signals from the termination manager, and 
tracking the
+ * "operationCompletionFuture" (notify the user if we got cancelled, suspended 
etc. in the meantime)
+ */
+public class StopWithSavepoint extends StateWithExecutionGraph

Review comment:
       I feel like we could even make `StopWithSavepoint` extend `Executing` as 
it's a special case of it. This way, we wouldn't need to cover certain methods 
like `getJobStatus` or the failure handling as it's the same as far as I can 
see.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
##########
@@ -884,7 +915,7 @@ private void stopCheckpointServicesSafely(JobStatus 
terminalState) {
     }
 
     @Override
-    public Executor getMainThreadExecutor() {
+    public ComponentMainThreadExecutor getMainThreadExecutor() {

Review comment:
       This change does not seem to be necessary anymore after adapting the 
`StopWithSavepointTerminationManager.trackStopWithSavepoint` method signature

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StopWithSavepointOperations;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.scheduler.SchedulerUtils;
+import 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl;
+import 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * When a "stop with savepoint" operation (wait until savepoint has been 
created, then cancel job)
+ * is triggered on the {@link Executing} state, we transition into this state. 
This state is
+ * delegating the tracking of the stop with savepoint operation to the {@link
+ * StopWithSavepointTerminationManager}, which is shared with {@link 
SchedulerBase}. What remains
+ * for this state is reacting to signals from the termination manager, and 
tracking the
+ * "operationCompletionFuture" (notify the user if we got cancelled, suspended 
etc. in the meantime)
+ */
+public class StopWithSavepoint extends StateWithExecutionGraph
+        implements StopWithSavepointOperations {
+
+    private final CompletableFuture<String> operationCompletionFuture;
+    private final Context context;
+    private final ClassLoader userCodeClassLoader;
+
+    StopWithSavepoint(
+            Context context,
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            Logger logger,
+            ClassLoader userCodeClassLoader,
+            String targetDirectory,
+            boolean terminate) {
+        super(context, executionGraph, executionGraphHandler, 
operatorCoordinatorHandler, logger);
+        this.context = context;
+        this.userCodeClassLoader = userCodeClassLoader;
+
+        // to ensure that all disjoint subgraphs of a job finish successfully 
on savepoint creation,
+        // we track the job termination via all execution termination futures 
(FLINK-21030).
+        final CompletableFuture<Collection<ExecutionState>> 
executionTerminationsFuture =
+                
SchedulerUtils.getCombinedExecutionTerminationFuture(executionGraph);
+
+        // do not trigger checkpoints while creating the final savepoint
+        stopCheckpointScheduler();
+
+        // trigger savepoint. This operation will also terminate/suspend the 
job once the savepoint
+        // has been created.
+        final CompletableFuture<CompletedCheckpoint> savepointFuture =
+                triggerSynchronousSavepoint(terminate, targetDirectory);
+
+        final StopWithSavepointTerminationManager 
stopWithSavepointTerminationManager =
+                new StopWithSavepointTerminationManager(
+                        new StopWithSavepointTerminationHandlerImpl(
+                                executionGraph.getJobID(), context, this, 
logger));
+
+        this.operationCompletionFuture =
+                stopWithSavepointTerminationManager.trackStopWithSavepoint(
+                        savepointFuture,
+                        executionTerminationsFuture,
+                        context.getMainThreadExecutor());
+    }
+
+    @Override
+    public void cancel() {
+        // the canceling state will cancel the execution graph, which will 
eventually lead to a call
+        // of handleAnyFailure(), completing the operationCompletionFuture.
+        context.goToCanceling(
+                getExecutionGraph(), getExecutionGraphHandler(), 
getOperatorCoordinatorHandler());
+    }
+
+    @Override
+    public JobStatus getJobStatus() {
+        return JobStatus.RUNNING;
+    }
+
+    @Override
+    public void handleGlobalFailure(Throwable cause) {
+        handleAnyFailure(cause);
+    }
+
+    @Override
+    boolean updateTaskExecutionState(TaskExecutionStateTransition 
taskExecutionStateTransition) {
+        final boolean successfulUpdate =
+                getExecutionGraph().updateState(taskExecutionStateTransition);
+
+        if (successfulUpdate) {
+            if (taskExecutionStateTransition.getExecutionState() == 
ExecutionState.FAILED) {
+                Throwable cause = 
taskExecutionStateTransition.getError(userCodeClassLoader);
+                handleAnyFailure(cause);
+            }
+        }
+
+        return successfulUpdate;
+    }
+
+    @Override
+    void onGloballyTerminalState(JobStatus globallyTerminalState) {
+        
context.goToFinished(ArchivedExecutionGraph.createFrom(getExecutionGraph()));
+    }
+
+    private void handleAnyFailure(Throwable cause) {
+        operationCompletionFuture.completeExceptionally(
+                new FlinkException("Failure while stopping with savepoint", 
cause));
+        final Executing.FailureResult failureResult = 
context.howToHandleFailure(cause);
+
+        if (failureResult.canRestart()) {
+            startCheckpointScheduler();

Review comment:
       We don't have to explicitly enable Checkpoint scheduling here. This is 
done by the `CheckpointCoordinatorDeActivator` after the `JobStatus` changes 
from `RESTARTING` to `RUNNING` again.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/SchedulerFailureHandler.java
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+/** Interface for schedulers to notify them about global failures. */
+public interface SchedulerFailureHandler {

Review comment:
       I like that one. What about calling it `GlobalFailureHandler`. I don't 
see a need to connect with the Scheduler through the name.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
##########
@@ -585,15 +585,22 @@ public void 
testStopWithSavepointFailingAfterSnapshotCreation() throws Exception
 
     private static BiConsumer<JobID, ExecutionException> 
assertAfterSnapshotCreationFailure() {
         return (jobId, actualException) -> {
-            Optional<FlinkException> actualFlinkException =
-                    ExceptionUtils.findThrowable(actualException, 
FlinkException.class);
-            assertTrue(actualFlinkException.isPresent());
-            assertThat(
-                    actualFlinkException.get(),
-                    containsMessage(
-                            String.format(
-                                    "Inconsistent execution state after 
stopping with savepoint. At least one execution is still in one of the 
following states: FAILED. A global fail-over is triggered to recover the job 
%s.",
-                                    jobId)));
+            if (isAdaptiveSchedulerEnabled(new Configuration())) {

Review comment:
       I would expect the behavior being the same independent of the used 
scheduler.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
##########
@@ -684,40 +692,54 @@ private static void 
testStopWithFailingSourceInOnePipeline(
                 exceptionAssertion.accept(jobGraph.getJobID(), e);
             }
 
-            // access the REST endpoint of the cluster to determine the state 
of each
-            // ExecutionVertex
-            final RestClient restClient =
-                    new RestClient(
-                            RestClientConfiguration.fromConfiguration(
-                                    new UnmodifiableConfiguration(new 
Configuration())),
-                            TestingUtils.defaultExecutor());
-
-            final URI restAddress = cluster.getRestAddres();
-            final JobDetailsHeaders detailsHeaders = 
JobDetailsHeaders.getInstance();
-            final JobMessageParameters params = 
detailsHeaders.getUnresolvedMessageParameters();
-            params.jobPathParameter.resolve(jobGraph.getJobID());
-
-            CommonTestUtils.waitUntilCondition(
-                    () -> {
-                        JobDetailsInfo detailsInfo =
-                                restClient
-                                        .sendRequest(
-                                                restAddress.getHost(),
-                                                restAddress.getPort(),
-                                                detailsHeaders,
-                                                params,
-                                                EmptyRequestBody.getInstance())
-                                        .get();
-
-                        return 
detailsInfo.getJobVerticesPerState().get(ExecutionState.RUNNING)
-                                == 2;
-                    },
-                    Deadline.fromNow(Duration.ofSeconds(10)));
+            waitUntilAllTasksAreRunning(cluster.getRestAddres(), 
jobGraph.getJobID());
         } finally {
             cluster.after();
         }
     }
 
+    public static void waitUntilAllTasksAreRunning(URI restAddress, JobID 
jobId) throws Exception {

Review comment:
       Good point. That improves the readability. 👍 

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
##########
@@ -88,6 +119,177 @@ public void testGlobalFailoverCanRecoverState() throws 
Exception {
         env.execute();
     }
 
+    private enum StopWithSavepointTestBehavior {
+        NO_FAILURE,
+        FAIL_ON_CHECKPOINT,
+        FAIL_ON_STOP,
+        FAIL_ON_FIRST_CHECKPOINT_ONLY
+    }
+
+    @Test
+    public void testStopWithSavepointNoError() throws Exception {
+        StreamExecutionEnvironment env = 
getEnvWithSource(StopWithSavepointTestBehavior.NO_FAILURE);
+
+        DummySource.resetForParallelism(PARALLELISM);
+
+        JobClient client = env.executeAsync();
+
+        DummySource.awaitRunning();
+
+        final File savepointDirectory = tempFolder.newFolder("savepoint");
+        final String savepoint =
+                client.stopWithSavepoint(false, 
savepointDirectory.getAbsolutePath()).get();
+        assertThat(savepoint, 
containsString(savepointDirectory.getAbsolutePath()));
+        assertThat(client.getJobStatus().get(), is(JobStatus.FINISHED));
+    }
+
+    @Test
+    public void testStopWithSavepointFailOnCheckpoint() throws Exception {
+        StreamExecutionEnvironment env =
+                
getEnvWithSource(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT);
+        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0L));
+
+        DummySource.resetForParallelism(PARALLELISM);
+
+        JobClient client = env.executeAsync();
+
+        DummySource.awaitRunning();
+        try {
+            client.stopWithSavepoint(false, 
tempFolder.newFolder("savepoint").getAbsolutePath())
+                    .get();
+            fail("Expect exception");
+        } catch (ExecutionException e) {
+            assertThat(e.getMessage(), containsString("Failure while stopping 
with savepoint"));

Review comment:
       nit: `FlinkMatchers.containsMessage` might work as well

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
##########
@@ -36,31 +41,48 @@
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import javax.annotation.Nullable;
 
+import java.io.File;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
 
 /** Integration tests for the adaptive scheduler. */
 public class AdaptiveSchedulerITCase extends TestLogger {
 
+    @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
     private static final int NUMBER_TASK_MANAGERS = 2;
     private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
     private static final int PARALLELISM = NUMBER_SLOTS_PER_TASK_MANAGER * 
NUMBER_TASK_MANAGERS;
 
     private static final Configuration configuration = getConfiguration();
 
     private static Configuration getConfiguration() {
-        final Configuration configuration = new Configuration();
+        final Configuration conf = new Configuration();

Review comment:
       I'm not sure whether the renaming is actually necessary here. I would 
propose reverting it for the sake of the git history.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
##########
@@ -72,10 +94,19 @@ private static Configuration getConfiguration() {
                             
.setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
                             .build());
 
+    @Before
+    public void ensureDeclarativeResourceManagement() {
+        
assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));

Review comment:
       ```suggestion
           assumeTrue(ClusterOptions.isAdaptiveSchedulerEnabled(configuration));
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
##########
@@ -143,6 +147,45 @@ public void notifyNewResourcesAvailable() {
         }
     }
 
+    CompletableFuture<String> stopWithSavepoint(

Review comment:
       Looking at the code once more, I realize that my proposed change would 
destroy the separation that is implemented right now: The error handling runs 
in `Execution` state whereas the actual stop-with-savepoint routines are called 
in `StopWithSavepoint` state. This separation makes sense. Hence, I would vote 
against my initial proposal.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
##########
@@ -143,6 +147,45 @@ public void notifyNewResourcesAvailable() {
         }
     }
 
+    CompletableFuture<String> stopWithSavepoint(

Review comment:
       One way to overcome the code redundancy could be to move the error 
handling into a static utility method in `StopWithSavepointTerminationManager` 
that could be called by `SchedulerBase` and `Executing`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to