XComp commented on a change in pull request #14948:
URL: https://github.com/apache/flink/pull/14948#discussion_r585407542
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1725,6 +1725,9 @@ public void startCheckpointScheduler() {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is
shut down");
}
+ Preconditions.checkState(
+ isPeriodicCheckpointingConfigured(),
Review comment:
I verified that the condition applies in production code. I'm just
wondering, whether we should use the `isPeriodicCheckpointingConfigured` method
also in the [ExecutionGraph's
enableCheckpointing](https://github.com/apache/flink/blob/e084dabe6c1722dc1ced7d8fddb3c82e7af7b103/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java#L477)
method to have the method consistently used everywhere to check whether
periodic checkpointing is enabled.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StopWithSavepointOperations.java
##########
@@ -18,15 +18,22 @@
package org.apache.flink.runtime.checkpoint;
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
/**
- * {@code CheckpointScheduling} provides methods for starting and stopping the
periodic scheduling
- * of checkpoints.
+ * {@code StopWithSavepointOperations} provides methods for the
stop-with-savepoint operation, such
+ * as starting and stopping the periodic scheduling of checkpoints, or
triggering a savepoint.
*/
-public interface CheckpointScheduling {
+public interface StopWithSavepointOperations {
Review comment:
I was thinking about doing the same for
[FLINK-21030](https://issues.apache.org/jira/browse/FLINK-21030). I hesitated
because I felt that the savepoint creation method is semantically closer to the
other Checkpoint-related methods provided by `SchedulerNG`. But if we want to
collect the stop-with-savepoint methods in a dedicated interface, we should
also consider adding the wait for execution termination and not move it into
`SchedulerUtils` as it's part of the collection.
This would enable us to move the entire stop-with-savepoint logic into
`StopWithSavepointTerminationManager` (i.e. stopping Checkpoint scheduling and
triggering the subtasks). This would remove some redundant code.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -430,14 +430,28 @@ protected void onStart() throws JobMasterException {
final TaskExecutionState taskExecutionState) {
checkNotNull(taskExecutionState, "taskExecutionState");
- if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {
- return CompletableFuture.completedFuture(Acknowledge.get());
- } else {
- return FutureUtils.completedExceptionally(
- new ExecutionGraphException(
- "The execution attempt "
- + taskExecutionState.getID()
- + " was not found."));
+ try {
+ if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ } else {
+ return FutureUtils.completedExceptionally(
+ new ExecutionGraphException(
+ "The execution attempt "
+ + taskExecutionState.getID()
+ + " was not found."));
+ }
+ } catch (Throwable throwable) {
+ // if the taskExecutionState contains an error, it does not make
sense to let the RPC
Review comment:
Wouldn't that also apply for a cancelled state? And one thing I'd like
to mention here is that it could (rarely) happen that the task changes to a
`FAILED` state but no error is provided. This is covered by
[FLINK-21376](https://issues.apache.org/jira/browse/FLINK-21376).
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
##########
@@ -501,12 +502,11 @@ public void declineCheckpoint(DeclineCheckpoint decline) {
}
@Override
- public CompletableFuture<String> stopWithSavepoint(String targetDirectory,
boolean terminate) {
+ public CompletableFuture<String> stopWithSavepoint(
+ @Nullable String targetDirectory, boolean terminate) {
return state.tryCall(
- StateWithExecutionGraph.class,
- stateWithExecutionGraph ->
- stateWithExecutionGraph.stopWithSavepoint(
- targetDirectory, terminate),
+ Executing.class,
+ executing ->
executing.stopWithSavepoint(targetDirectory, terminate),
"stopWithSavepoint")
.orElse(
FutureUtils.completedExceptionally(
Review comment:
nit: I feel like we're mis-using the `CheckpointException` here. We're
failing because of being in an invalid state - we didn't reach the Checkpoint
components, yet. Hence, I'd propose going for `FlinkException`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
##########
@@ -143,6 +147,45 @@ public void notifyNewResourcesAvailable() {
}
}
+ CompletableFuture<String> stopWithSavepoint(
Review comment:
The error handling of `stopWithSavepoint` can be moved into
`StopWithSavepointTerminationManager` if we decide to extend
`StopWithSavepointOperations` accordingly.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StopWithSavepointOperations;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.scheduler.SchedulerUtils;
+import
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl;
+import
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * When a "stop with savepoint" operation (wait until savepoint has been
created, then cancel job)
+ * is triggered on the {@link Executing} state, we transition into this state.
This state is
+ * delegating the tracking of the stop with savepoint operation to the {@link
+ * StopWithSavepointTerminationManager}, which is shared with {@link
SchedulerBase}. What remains
+ * for this state is reacting to signals from the termination manager, and
tracking the
+ * "operationCompletionFuture" (notify the user if we got cancelled, suspended
etc. in the meantime)
+ */
+public class StopWithSavepoint extends StateWithExecutionGraph
Review comment:
Looks like we can make this `package private`
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
##########
@@ -88,6 +119,177 @@ public void testGlobalFailoverCanRecoverState() throws
Exception {
env.execute();
}
+ private enum StopWithSavepointTestBehavior {
+ NO_FAILURE,
+ FAIL_ON_CHECKPOINT,
+ FAIL_ON_STOP,
+ FAIL_ON_FIRST_CHECKPOINT_ONLY
+ }
+
+ @Test
+ public void testStopWithSavepointNoError() throws Exception {
+ StreamExecutionEnvironment env =
getEnvWithSource(StopWithSavepointTestBehavior.NO_FAILURE);
+
+ DummySource.resetForParallelism(PARALLELISM);
+
+ JobClient client = env.executeAsync();
+
+ DummySource.awaitRunning();
+
+ final File savepointDirectory = tempFolder.newFolder("savepoint");
+ final String savepoint =
+ client.stopWithSavepoint(false,
savepointDirectory.getAbsolutePath()).get();
+ assertThat(savepoint,
containsString(savepointDirectory.getAbsolutePath()));
+ assertThat(client.getJobStatus().get(), is(JobStatus.FINISHED));
+ }
+
+ @Test
+ public void testStopWithSavepointFailOnCheckpoint() throws Exception {
+ StreamExecutionEnvironment env =
+
getEnvWithSource(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT);
+
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
0L));
+
+ DummySource.resetForParallelism(PARALLELISM);
+
+ JobClient client = env.executeAsync();
+
+ DummySource.awaitRunning();
+ try {
+ client.stopWithSavepoint(false,
tempFolder.newFolder("savepoint").getAbsolutePath())
+ .get();
+ fail("Expect exception");
+ } catch (ExecutionException e) {
+ assertThat(e.getMessage(), containsString("Failure while stopping
with savepoint"));
+ }
+ // expect job to run again (maybe restart)
+ CommonTestUtils.waitUntilCondition(
+ () -> client.getJobStatus().get() == JobStatus.RUNNING,
+ Deadline.fromNow(Duration.of(1, ChronoUnit.MINUTES)));
+ }
+
+ @Test
+ public void testStopWithSavepointFailOnStop() throws Exception {
+ StreamExecutionEnvironment env =
+ getEnvWithSource(StopWithSavepointTestBehavior.FAIL_ON_STOP);
+
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
0L));
+
+ DummySource.resetForParallelism(PARALLELISM);
+
+ JobClient client = env.executeAsync();
+
+ DummySource.awaitRunning();
+ try {
+ client.stopWithSavepoint(false,
tempFolder.newFolder("savepoint").getAbsolutePath())
+ .get();
+ fail("Expect exception");
+ } catch (ExecutionException e) {
+ assertThat(e.getMessage(), containsString("Failure while stopping
with savepoint"));
+ }
+ // expect job to run again (maybe restart)
+ CommonTestUtils.waitUntilCondition(
+ () -> client.getJobStatus().get() == JobStatus.RUNNING,
+ Deadline.fromNow(Duration.of(1, ChronoUnit.MINUTES)));
+ }
+
+ @Test
+ public void testStopWithSavepointFailOnFirstSavepointSucceedOnSecond()
throws Exception {
+
assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
Review comment:
This seems to be obsolete due to `ensureDeclarativeResourceManagement()`
being called before the test.a
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
##########
@@ -72,10 +94,19 @@ private static Configuration getConfiguration() {
.setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
.build());
+ @Before
+ public void ensureDeclarativeResourceManagement() {
+
assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
Review comment:
The `configuration` we check here is different to the one used in the
`MiniCluster`. There is no real point having the member `configuration` here, I
guess.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StopWithSavepointOperations.java
##########
@@ -18,15 +18,22 @@
package org.apache.flink.runtime.checkpoint;
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
/**
- * {@code CheckpointScheduling} provides methods for starting and stopping the
periodic scheduling
- * of checkpoints.
+ * {@code StopWithSavepointOperations} provides methods for the
stop-with-savepoint operation, such
+ * as starting and stopping the periodic scheduling of checkpoints, or
triggering a savepoint.
*/
-public interface CheckpointScheduling {
+public interface StopWithSavepointOperations {
Review comment:
Adding something like `checkpointingIsConfigured()` and
`savepointDirectoryIsConfigured` would make it possible to move the entire
stop-with-savepoint logic (including the error handling into
`StopWithSavepointTermination` which, then, could be renamed into
`StopWithSavepointOperation`).
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
##########
@@ -88,6 +119,177 @@ public void testGlobalFailoverCanRecoverState() throws
Exception {
env.execute();
}
+ private enum StopWithSavepointTestBehavior {
+ NO_FAILURE,
+ FAIL_ON_CHECKPOINT,
+ FAIL_ON_STOP,
+ FAIL_ON_FIRST_CHECKPOINT_ONLY
+ }
+
+ @Test
+ public void testStopWithSavepointNoError() throws Exception {
+ StreamExecutionEnvironment env =
getEnvWithSource(StopWithSavepointTestBehavior.NO_FAILURE);
+
+ DummySource.resetForParallelism(PARALLELISM);
+
+ JobClient client = env.executeAsync();
+
+ DummySource.awaitRunning();
+
+ final File savepointDirectory = tempFolder.newFolder("savepoint");
+ final String savepoint =
+ client.stopWithSavepoint(false,
savepointDirectory.getAbsolutePath()).get();
+ assertThat(savepoint,
containsString(savepointDirectory.getAbsolutePath()));
+ assertThat(client.getJobStatus().get(), is(JobStatus.FINISHED));
+ }
+
+ @Test
+ public void testStopWithSavepointFailOnCheckpoint() throws Exception {
+ StreamExecutionEnvironment env =
+
getEnvWithSource(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT);
+
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
0L));
+
+ DummySource.resetForParallelism(PARALLELISM);
+
+ JobClient client = env.executeAsync();
+
+ DummySource.awaitRunning();
+ try {
+ client.stopWithSavepoint(false,
tempFolder.newFolder("savepoint").getAbsolutePath())
+ .get();
+ fail("Expect exception");
+ } catch (ExecutionException e) {
+ assertThat(e.getMessage(), containsString("Failure while stopping
with savepoint"));
+ }
+ // expect job to run again (maybe restart)
+ CommonTestUtils.waitUntilCondition(
+ () -> client.getJobStatus().get() == JobStatus.RUNNING,
+ Deadline.fromNow(Duration.of(1, ChronoUnit.MINUTES)));
+ }
+
+ @Test
+ public void testStopWithSavepointFailOnStop() throws Exception {
+ StreamExecutionEnvironment env =
+ getEnvWithSource(StopWithSavepointTestBehavior.FAIL_ON_STOP);
+
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
0L));
+
+ DummySource.resetForParallelism(PARALLELISM);
+
+ JobClient client = env.executeAsync();
+
+ DummySource.awaitRunning();
+ try {
+ client.stopWithSavepoint(false,
tempFolder.newFolder("savepoint").getAbsolutePath())
+ .get();
+ fail("Expect exception");
+ } catch (ExecutionException e) {
+ assertThat(e.getMessage(), containsString("Failure while stopping
with savepoint"));
+ }
+ // expect job to run again (maybe restart)
+ CommonTestUtils.waitUntilCondition(
+ () -> client.getJobStatus().get() == JobStatus.RUNNING,
+ Deadline.fromNow(Duration.of(1, ChronoUnit.MINUTES)));
+ }
+
+ @Test
+ public void testStopWithSavepointFailOnFirstSavepointSucceedOnSecond()
throws Exception {
+
assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
+
+ env.setParallelism(PARALLELISM);
+
+ env.addSource(new
DummySource(StopWithSavepointTestBehavior.FAIL_ON_FIRST_CHECKPOINT_ONLY))
+ .addSink(new DiscardingSink<>());
+ DummySource.resetForParallelism(PARALLELISM);
+
+ JobClient client = env.executeAsync();
+
+ DummySource.awaitRunning();
+ DummySource.resetForParallelism(PARALLELISM);
+ final File savepointDirectory = tempFolder.newFolder("savepoint");
+ try {
+ client.stopWithSavepoint(false,
savepointDirectory.getAbsolutePath()).get();
+ fail("Expect failure of operation");
+ } catch (ExecutionException e) {
+ assertThat(e.getMessage(), containsString("Failure while stopping
with savepoint"));
+ }
+
+ // trigger second savepoint
+ DummySource.awaitRunning();
+ final String savepoint =
+ client.stopWithSavepoint(false,
savepointDirectory.getAbsolutePath()).get();
+ assertThat(savepoint,
containsString(savepointDirectory.getAbsolutePath()));
+ }
+
+ private static StreamExecutionEnvironment getEnvWithSource(
+ StopWithSavepointTestBehavior behavior) {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+ env.addSource(new DummySource(behavior)).addSink(new
DiscardingSink<>());
+ return env;
+ }
+
+ private static final class DummySource extends
RichParallelSourceFunction<Integer>
Review comment:
Could we make this a generic test class that could be, for instance,
also utilized in `SavepointITCase`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
##########
@@ -136,7 +136,7 @@ public ArchivedExecutionGraph getJob() {
@Override
public void suspend(Throwable cause) {
executionGraph.suspend(cause);
- Preconditions.checkState(executionGraph.getState() ==
JobStatus.SUSPENDED);
+ // Preconditions.checkState(executionGraph.getState() ==
JobStatus.SUSPENDED);
Review comment:
Is this commented-out due to the WIP efforts or did we forget to comment
it in again?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationManager.java
##########
@@ -51,10 +51,10 @@ public StopWithSavepointTerminationManager(
* operations run on.
* @return A {@code CompletableFuture} containing the path to the created
savepoint.
*/
- public CompletableFuture<String> stopWithSavepoint(
+ public CompletableFuture<String> trackStopWithSavepoint(
CompletableFuture<CompletedCheckpoint> completedSavepointFuture,
CompletableFuture<Collection<ExecutionState>>
terminatedExecutionStatesFuture,
- ComponentMainThreadExecutor mainThreadExecutor) {
+ Executor mainThreadExecutor) {
Review comment:
Good point. Thanks for cleaning that up! 👍
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StopWithSavepointOperations;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.scheduler.SchedulerUtils;
+import
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl;
+import
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * When a "stop with savepoint" operation (wait until savepoint has been
created, then cancel job)
+ * is triggered on the {@link Executing} state, we transition into this state.
This state is
+ * delegating the tracking of the stop with savepoint operation to the {@link
+ * StopWithSavepointTerminationManager}, which is shared with {@link
SchedulerBase}. What remains
+ * for this state is reacting to signals from the termination manager, and
tracking the
+ * "operationCompletionFuture" (notify the user if we got cancelled, suspended
etc. in the meantime)
+ */
+public class StopWithSavepoint extends StateWithExecutionGraph
Review comment:
I feel like we could even make `StopWithSavepoint` extend `Executing` as
it's a special case of it. This way, we wouldn't need to cover certain methods
like `getJobStatus` or the failure handling as it's the same as far as I can
see.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
##########
@@ -884,7 +915,7 @@ private void stopCheckpointServicesSafely(JobStatus
terminalState) {
}
@Override
- public Executor getMainThreadExecutor() {
+ public ComponentMainThreadExecutor getMainThreadExecutor() {
Review comment:
This change does not seem to be necessary anymore after adapting the
`StopWithSavepointTerminationManager.trackStopWithSavepoint` method signature
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StopWithSavepointOperations;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.scheduler.SchedulerUtils;
+import
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl;
+import
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * When a "stop with savepoint" operation (wait until savepoint has been
created, then cancel job)
+ * is triggered on the {@link Executing} state, we transition into this state.
This state is
+ * delegating the tracking of the stop with savepoint operation to the {@link
+ * StopWithSavepointTerminationManager}, which is shared with {@link
SchedulerBase}. What remains
+ * for this state is reacting to signals from the termination manager, and
tracking the
+ * "operationCompletionFuture" (notify the user if we got cancelled, suspended
etc. in the meantime)
+ */
+public class StopWithSavepoint extends StateWithExecutionGraph
+ implements StopWithSavepointOperations {
+
+ private final CompletableFuture<String> operationCompletionFuture;
+ private final Context context;
+ private final ClassLoader userCodeClassLoader;
+
+ StopWithSavepoint(
+ Context context,
+ ExecutionGraph executionGraph,
+ ExecutionGraphHandler executionGraphHandler,
+ OperatorCoordinatorHandler operatorCoordinatorHandler,
+ Logger logger,
+ ClassLoader userCodeClassLoader,
+ String targetDirectory,
+ boolean terminate) {
+ super(context, executionGraph, executionGraphHandler,
operatorCoordinatorHandler, logger);
+ this.context = context;
+ this.userCodeClassLoader = userCodeClassLoader;
+
+ // to ensure that all disjoint subgraphs of a job finish successfully
on savepoint creation,
+ // we track the job termination via all execution termination futures
(FLINK-21030).
+ final CompletableFuture<Collection<ExecutionState>>
executionTerminationsFuture =
+
SchedulerUtils.getCombinedExecutionTerminationFuture(executionGraph);
+
+ // do not trigger checkpoints while creating the final savepoint
+ stopCheckpointScheduler();
+
+ // trigger savepoint. This operation will also terminate/suspend the
job once the savepoint
+ // has been created.
+ final CompletableFuture<CompletedCheckpoint> savepointFuture =
+ triggerSynchronousSavepoint(terminate, targetDirectory);
+
+ final StopWithSavepointTerminationManager
stopWithSavepointTerminationManager =
+ new StopWithSavepointTerminationManager(
+ new StopWithSavepointTerminationHandlerImpl(
+ executionGraph.getJobID(), context, this,
logger));
+
+ this.operationCompletionFuture =
+ stopWithSavepointTerminationManager.trackStopWithSavepoint(
+ savepointFuture,
+ executionTerminationsFuture,
+ context.getMainThreadExecutor());
+ }
+
+ @Override
+ public void cancel() {
+ // the canceling state will cancel the execution graph, which will
eventually lead to a call
+ // of handleAnyFailure(), completing the operationCompletionFuture.
+ context.goToCanceling(
+ getExecutionGraph(), getExecutionGraphHandler(),
getOperatorCoordinatorHandler());
+ }
+
+ @Override
+ public JobStatus getJobStatus() {
+ return JobStatus.RUNNING;
+ }
+
+ @Override
+ public void handleGlobalFailure(Throwable cause) {
+ handleAnyFailure(cause);
+ }
+
+ @Override
+ boolean updateTaskExecutionState(TaskExecutionStateTransition
taskExecutionStateTransition) {
+ final boolean successfulUpdate =
+ getExecutionGraph().updateState(taskExecutionStateTransition);
+
+ if (successfulUpdate) {
+ if (taskExecutionStateTransition.getExecutionState() ==
ExecutionState.FAILED) {
+ Throwable cause =
taskExecutionStateTransition.getError(userCodeClassLoader);
+ handleAnyFailure(cause);
+ }
+ }
+
+ return successfulUpdate;
+ }
+
+ @Override
+ void onGloballyTerminalState(JobStatus globallyTerminalState) {
+
context.goToFinished(ArchivedExecutionGraph.createFrom(getExecutionGraph()));
+ }
+
+ private void handleAnyFailure(Throwable cause) {
+ operationCompletionFuture.completeExceptionally(
+ new FlinkException("Failure while stopping with savepoint",
cause));
+ final Executing.FailureResult failureResult =
context.howToHandleFailure(cause);
+
+ if (failureResult.canRestart()) {
+ startCheckpointScheduler();
Review comment:
We don't have to explicitly enable Checkpoint scheduling here. This is
done by the `CheckpointCoordinatorDeActivator` after the `JobStatus` changes
from `RESTARTING` to `RUNNING` again.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/SchedulerFailureHandler.java
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+/** Interface for schedulers to notify them about global failures. */
+public interface SchedulerFailureHandler {
Review comment:
I like that one. What about calling it `GlobalFailureHandler`. I don't
see a need to connect with the Scheduler through the name.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
##########
@@ -585,15 +585,22 @@ public void
testStopWithSavepointFailingAfterSnapshotCreation() throws Exception
private static BiConsumer<JobID, ExecutionException>
assertAfterSnapshotCreationFailure() {
return (jobId, actualException) -> {
- Optional<FlinkException> actualFlinkException =
- ExceptionUtils.findThrowable(actualException,
FlinkException.class);
- assertTrue(actualFlinkException.isPresent());
- assertThat(
- actualFlinkException.get(),
- containsMessage(
- String.format(
- "Inconsistent execution state after
stopping with savepoint. At least one execution is still in one of the
following states: FAILED. A global fail-over is triggered to recover the job
%s.",
- jobId)));
+ if (isAdaptiveSchedulerEnabled(new Configuration())) {
Review comment:
I would expect the behavior being the same independent of the used
scheduler.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
##########
@@ -684,40 +692,54 @@ private static void
testStopWithFailingSourceInOnePipeline(
exceptionAssertion.accept(jobGraph.getJobID(), e);
}
- // access the REST endpoint of the cluster to determine the state
of each
- // ExecutionVertex
- final RestClient restClient =
- new RestClient(
- RestClientConfiguration.fromConfiguration(
- new UnmodifiableConfiguration(new
Configuration())),
- TestingUtils.defaultExecutor());
-
- final URI restAddress = cluster.getRestAddres();
- final JobDetailsHeaders detailsHeaders =
JobDetailsHeaders.getInstance();
- final JobMessageParameters params =
detailsHeaders.getUnresolvedMessageParameters();
- params.jobPathParameter.resolve(jobGraph.getJobID());
-
- CommonTestUtils.waitUntilCondition(
- () -> {
- JobDetailsInfo detailsInfo =
- restClient
- .sendRequest(
- restAddress.getHost(),
- restAddress.getPort(),
- detailsHeaders,
- params,
- EmptyRequestBody.getInstance())
- .get();
-
- return
detailsInfo.getJobVerticesPerState().get(ExecutionState.RUNNING)
- == 2;
- },
- Deadline.fromNow(Duration.ofSeconds(10)));
+ waitUntilAllTasksAreRunning(cluster.getRestAddres(),
jobGraph.getJobID());
} finally {
cluster.after();
}
}
+ public static void waitUntilAllTasksAreRunning(URI restAddress, JobID
jobId) throws Exception {
Review comment:
Good point. That improves the readability. 👍
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
##########
@@ -88,6 +119,177 @@ public void testGlobalFailoverCanRecoverState() throws
Exception {
env.execute();
}
+ private enum StopWithSavepointTestBehavior {
+ NO_FAILURE,
+ FAIL_ON_CHECKPOINT,
+ FAIL_ON_STOP,
+ FAIL_ON_FIRST_CHECKPOINT_ONLY
+ }
+
+ @Test
+ public void testStopWithSavepointNoError() throws Exception {
+ StreamExecutionEnvironment env =
getEnvWithSource(StopWithSavepointTestBehavior.NO_FAILURE);
+
+ DummySource.resetForParallelism(PARALLELISM);
+
+ JobClient client = env.executeAsync();
+
+ DummySource.awaitRunning();
+
+ final File savepointDirectory = tempFolder.newFolder("savepoint");
+ final String savepoint =
+ client.stopWithSavepoint(false,
savepointDirectory.getAbsolutePath()).get();
+ assertThat(savepoint,
containsString(savepointDirectory.getAbsolutePath()));
+ assertThat(client.getJobStatus().get(), is(JobStatus.FINISHED));
+ }
+
+ @Test
+ public void testStopWithSavepointFailOnCheckpoint() throws Exception {
+ StreamExecutionEnvironment env =
+
getEnvWithSource(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT);
+
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
0L));
+
+ DummySource.resetForParallelism(PARALLELISM);
+
+ JobClient client = env.executeAsync();
+
+ DummySource.awaitRunning();
+ try {
+ client.stopWithSavepoint(false,
tempFolder.newFolder("savepoint").getAbsolutePath())
+ .get();
+ fail("Expect exception");
+ } catch (ExecutionException e) {
+ assertThat(e.getMessage(), containsString("Failure while stopping
with savepoint"));
Review comment:
nit: `FlinkMatchers.containsMessage` might work as well
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
##########
@@ -36,31 +41,48 @@
import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.Before;
import org.junit.ClassRule;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
+import java.io.File;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
/** Integration tests for the adaptive scheduler. */
public class AdaptiveSchedulerITCase extends TestLogger {
+ @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
private static final int NUMBER_TASK_MANAGERS = 2;
private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
private static final int PARALLELISM = NUMBER_SLOTS_PER_TASK_MANAGER *
NUMBER_TASK_MANAGERS;
private static final Configuration configuration = getConfiguration();
private static Configuration getConfiguration() {
- final Configuration configuration = new Configuration();
+ final Configuration conf = new Configuration();
Review comment:
I'm not sure whether the renaming is actually necessary here. I would
propose reverting it for the sake of the git history.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
##########
@@ -72,10 +94,19 @@ private static Configuration getConfiguration() {
.setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
.build());
+ @Before
+ public void ensureDeclarativeResourceManagement() {
+
assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
Review comment:
```suggestion
assumeTrue(ClusterOptions.isAdaptiveSchedulerEnabled(configuration));
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
##########
@@ -143,6 +147,45 @@ public void notifyNewResourcesAvailable() {
}
}
+ CompletableFuture<String> stopWithSavepoint(
Review comment:
Looking at the code once more, I realize that my proposed change would
destroy the separation that is implemented right now: The error handling runs
in `Execution` state whereas the actual stop-with-savepoint routines are called
in `StopWithSavepoint` state. This separation makes sense. Hence, I would vote
against my initial proposal.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
##########
@@ -143,6 +147,45 @@ public void notifyNewResourcesAvailable() {
}
}
+ CompletableFuture<String> stopWithSavepoint(
Review comment:
One way to overcome the code redundancy could be to move the error
handling into a static utility method in `StopWithSavepointTerminationManager`
that could be called by `SchedulerBase` and `Executing`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]