tillrohrmann commented on a change in pull request #14948:
URL: https://github.com/apache/flink/pull/14948#discussion_r577607349
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeScheduler.java
##########
@@ -731,6 +731,29 @@ public void goToFailing(
failureCause));
}
+ @Override
+ public void goToStopWithSavepoint(
+ ExecutionGraph executionGraph,
+ ExecutionGraphHandler executionGraphHandler,
+ OperatorCoordinatorHandler operatorCoordinatorHandler,
+ CheckpointCoordinator checkpointCoordinator,
+ String targetDirectory,
+ boolean advanceToEndOfEventTime,
+ CompletableFuture<String> operationCompletionFuture) {
+ transitionToState(
+ new StopWithSavepoint(
+ this,
+ executionGraph,
+ executionGraphHandler,
+ operatorCoordinatorHandler,
+ LOG,
+ userCodeClassLoader,
+ checkpointCoordinator,
Review comment:
For what do we need the `checkpointCoordinator`? Isn't it already
included in the `ExecutionGraph`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/Executing.java
##########
@@ -141,29 +100,54 @@ public void notifyNewResourcesAvailable() {
}
}
- /** Context of the {@link Executing} state. */
- interface Context extends StateWithExecutionGraph.Context {
+ CompletableFuture<String> stopWithSavepoint(
+ String targetDirectory, boolean advanceToEndOfEventTime) {
Review comment:
`@Nullable` seems to be missing.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** State describing a job that is currently taking a savepoint and stopping
afterwards. */
+class StopWithSavepoint extends ExecutingStateWithFailureHandler {
+ private final CompletableFuture<String> operationCompletionFuture;
+ private final boolean advanceToEndOfEventTime;
+ private final String targetDirectory;
+ private final CheckpointCoordinator checkpointCoordinator;
+ private final Context context;
+
+ StopWithSavepoint(
+ Context context,
+ ExecutionGraph executionGraph,
+ ExecutionGraphHandler executionGraphHandler,
+ OperatorCoordinatorHandler operatorCoordinatorHandler,
+ Logger logger,
+ ClassLoader userCodeClassLoader,
+ CheckpointCoordinator checkpointCoordinator,
+ String targetDirectory,
+ boolean advanceToEndOfEventTime,
+ CompletableFuture<String> operationCompletionFuture) {
+ super(
+ context,
+ executionGraph,
+ executionGraphHandler,
+ operatorCoordinatorHandler,
+ logger,
+ userCodeClassLoader);
+ this.context = context;
+ this.checkpointCoordinator = checkpointCoordinator;
+ this.targetDirectory = targetDirectory;
+ this.advanceToEndOfEventTime = advanceToEndOfEventTime;
+ this.operationCompletionFuture = operationCompletionFuture;
+ }
+
+ @Override
+ public void onEnter() {
+ final ExecutionGraph executionGraph = getExecutionGraph();
+ // we stop the checkpoint coordinator so that we are guaranteed
+ // to have only the data of the synchronous savepoint committed.
+ // in case of failure, and if the job restarts, the coordinator
+ // will be restarted by the CheckpointCoordinatorDeActivator.
+ checkpointCoordinator.stopCheckpointScheduler();
+
+ CompletableFuture<String> savepointFuture =
+ checkpointCoordinator
+ .triggerSynchronousSavepoint(advanceToEndOfEventTime,
targetDirectory)
+ // handle errors while creating the savepoint
+ .handleAsync(
+ this::handleSavepointOperationResult,
+ context.getMainThreadExecutor())
+ // Wait for job to be finished
+ .thenCompose(
+ path ->
+ executionGraph
+ .getTerminationFuture()
+ .thenApply(
+ status ->
+ new
JobStatusAndSavepointPath(
+
status, path)))
+ // check that the job finished successfully
+ .handleAsync(
+ this::handleStopWithSavepointOperationResult,
+ context.getMainThreadExecutor());
+
+ FutureUtils.forward(savepointFuture, operationCompletionFuture);
+ }
+
+ private String handleSavepointOperationResult(
+ CompletedCheckpoint completedCheckpoint, Throwable throwable) {
+ if (throwable != null) {
+ context.runIfState(this, () -> handleAnyFailure(throwable));
+ throw new SavepointCreationException(
+ "Unable to stop with savepoint. Error while creating the
savepoint.",
+ throwable);
+ }
+ return completedCheckpoint.getExternalPointer();
+ }
+
+ private String handleStopWithSavepointOperationResult(
+ JobStatusAndSavepointPath statusAndPath, Throwable throwable) {
+ // filter out exception from first stage to show correct root cause
+ if (throwable != null) {
+ if (throwable instanceof SavepointCreationException) {
+ throw (SavepointCreationException) throwable;
+ } else {
+ throw new CompletionException(
+ "Error while stopping job after creating savepoint",
throwable);
+ }
+ }
+
+ if (statusAndPath.getStatus() == JobStatus.FINISHED) {
+ // ensure that all vertices from the execution graph are in state
FINISHED. There might
+ // be cases where only some subgraphs of the job are finished.
+ if (!areAllVerticesFinished()) {
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("Printing vertex states:");
+ for (Execution e :
getExecutionGraph().getRegisteredExecutions().values()) {
+ getLogger().debug("{} = {}", e.getVertex(),
e.getState());
+ }
+ }
+ final FlinkException exception =
+ new FlinkException(
+ "Not all vertices of job are finished. Stop
with savepoint operation failed.");
+ context.runIfState(this, () -> handleAnyFailure(exception));
+ throw new CompletionException("Unable to stop with savepoint",
exception);
+ }
+ context.runIfState(
+ this,
+ () ->
+ context.goToFinished(
+
ArchivedExecutionGraph.createFrom(getExecutionGraph())));
+ return statusAndPath.getPath();
+ } else {
+ throw new CompletionException(
+ new FlinkException(
+ "Reached state "
+ + statusAndPath.getStatus()
+ + " instead of FINISHED "));
+ }
+ }
+
+ private boolean areAllVerticesFinished() {
+ return getExecutionGraph().getRegisteredExecutions().values().stream()
+ .noneMatch(e -> e.getState() != ExecutionState.FINISHED);
+ }
+
+ @Override
+ protected void handleAnyFailure(@Nonnull Throwable cause) {
+ Preconditions.checkState(context.isState(this), "Assuming
StopWithSavepoint state");
+ // restart the checkpoint coordinator if stopWithSavepoint failed.
+ startCheckpointScheduler(checkpointCoordinator);
Review comment:
We could say that we do this whenever we leave this state in any other
state than `Finished`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** State describing a job that is currently taking a savepoint and stopping
afterwards. */
+class StopWithSavepoint extends ExecutingStateWithFailureHandler {
+ private final CompletableFuture<String> operationCompletionFuture;
+ private final boolean advanceToEndOfEventTime;
+ private final String targetDirectory;
+ private final CheckpointCoordinator checkpointCoordinator;
+ private final Context context;
+
+ StopWithSavepoint(
+ Context context,
+ ExecutionGraph executionGraph,
+ ExecutionGraphHandler executionGraphHandler,
+ OperatorCoordinatorHandler operatorCoordinatorHandler,
+ Logger logger,
+ ClassLoader userCodeClassLoader,
+ CheckpointCoordinator checkpointCoordinator,
+ String targetDirectory,
+ boolean advanceToEndOfEventTime,
+ CompletableFuture<String> operationCompletionFuture) {
+ super(
+ context,
+ executionGraph,
+ executionGraphHandler,
+ operatorCoordinatorHandler,
+ logger,
+ userCodeClassLoader);
+ this.context = context;
+ this.checkpointCoordinator = checkpointCoordinator;
+ this.targetDirectory = targetDirectory;
+ this.advanceToEndOfEventTime = advanceToEndOfEventTime;
+ this.operationCompletionFuture = operationCompletionFuture;
+ }
+
+ @Override
+ public void onEnter() {
+ final ExecutionGraph executionGraph = getExecutionGraph();
+ // we stop the checkpoint coordinator so that we are guaranteed
+ // to have only the data of the synchronous savepoint committed.
+ // in case of failure, and if the job restarts, the coordinator
+ // will be restarted by the CheckpointCoordinatorDeActivator.
+ checkpointCoordinator.stopCheckpointScheduler();
+
+ CompletableFuture<String> savepointFuture =
+ checkpointCoordinator
+ .triggerSynchronousSavepoint(advanceToEndOfEventTime,
targetDirectory)
+ // handle errors while creating the savepoint
+ .handleAsync(
+ this::handleSavepointOperationResult,
+ context.getMainThreadExecutor())
+ // Wait for job to be finished
+ .thenCompose(
+ path ->
+ executionGraph
+ .getTerminationFuture()
+ .thenApply(
+ status ->
+ new
JobStatusAndSavepointPath(
+
status, path)))
+ // check that the job finished successfully
+ .handleAsync(
+ this::handleStopWithSavepointOperationResult,
+ context.getMainThreadExecutor());
+
+ FutureUtils.forward(savepointFuture, operationCompletionFuture);
Review comment:
Instead of using future callbacks, the `StopWithSavepoint` state could
have small internal state machine which reacts to signals. Two signals we need
to handle are the terminal job status and the savepoint status. The former is
already available via `onGloballyTerminalState`. The latter could be
implemented by having `onSavepointSuccess(String path)` and a
`onSavepointFailure(Throwable cause)`. That way we wouldn't have to deal with
nested future callbacks. Maybe we could even reuse the improvements from
FLINK-21030.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/scheduling/DeclarativeSchedulerITCase.java
##########
@@ -88,6 +103,155 @@ public void testGlobalFailoverCanRecoverState() throws
Exception {
env.execute();
}
+ private enum StopWithSavepointTestBehavior {
+ NO_FAILURE,
+ FAIL_ON_CHECKPOINT,
+ FAIL_ON_STOP,
+ FAIL_ON_FIRST_CHECKPOINT_ONLY
+ }
+
+ @Test
+ public void testStopWithSavepointNoError() throws Exception {
+ testStopWithSavepoint(StopWithSavepointTestBehavior.NO_FAILURE);
+ }
+
+ /** Expected behavior is that the job fails. */
+ @Test
+ public void testStopWithSavepointFailOnCheckpoint() throws Exception {
+ try {
+
testStopWithSavepoint(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT);
+ fail("Expect exception");
+ } catch (ExecutionException e) {
+ assertThat(e.getMessage(), containsString("CheckpointException"));
+ }
+ }
+
+ @Test
+ public void testStopWithSavepointFailOnStop() throws Exception {
+ try {
+ testStopWithSavepoint(StopWithSavepointTestBehavior.FAIL_ON_STOP);
+ fail("Expect exception");
+ } catch (ExecutionException e) {
+ assertThat(e.getMessage(), containsString("Reached state FAILED
instead of FINISHED"));
+ }
+ }
+
+ @Test
+ public void testStopWithSavepointFailOnFirstSavepointSucceedOnSecond()
throws Exception {
+
assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
+
+ env.setParallelism(PARALLELISM);
+
+ env.addSource(new
DummySource(StopWithSavepointTestBehavior.FAIL_ON_FIRST_CHECKPOINT_ONLY))
+ .addSink(new DiscardingSink<>());
+ DummySource.resetForParallelism(PARALLELISM);
+
+ JobClient client = env.executeAsync();
+
+ DummySource.awaitRunning();
+ DummySource.resetForParallelism(PARALLELISM);
+ final File savepointDirectory = tempFolder.newFolder("savepoint");
+ try {
+ client.stopWithSavepoint(false,
savepointDirectory.getAbsolutePath()).get();
+ fail("Expect failure of operation");
+ } catch (ExecutionException e) {
+ assertThat(e.getMessage(), containsString("CheckpointException"));
+ }
+
+ // trigger second savepoint
+ DummySource.awaitRunning();
+ final String savepoint =
+ client.stopWithSavepoint(false,
savepointDirectory.getAbsolutePath()).get();
+ assertThat(savepoint,
containsString(savepointDirectory.getAbsolutePath()));
+ }
+
+ /** Tests the stop with savepoint operation */
+ private void testStopWithSavepoint(StopWithSavepointTestBehavior behavior)
throws Exception {
+
assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ if (behavior ==
StopWithSavepointTestBehavior.FAIL_ON_FIRST_CHECKPOINT_ONLY) {
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
Review comment:
Are we calling `testStopWithSavepoint` with
`FAIL_ON_FIRST_CHECKPOINT_ONLY`?
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/scheduling/DeclarativeSchedulerITCase.java
##########
@@ -88,6 +103,155 @@ public void testGlobalFailoverCanRecoverState() throws
Exception {
env.execute();
}
+ private enum StopWithSavepointTestBehavior {
+ NO_FAILURE,
+ FAIL_ON_CHECKPOINT,
+ FAIL_ON_STOP,
+ FAIL_ON_FIRST_CHECKPOINT_ONLY
+ }
+
+ @Test
+ public void testStopWithSavepointNoError() throws Exception {
+ testStopWithSavepoint(StopWithSavepointTestBehavior.NO_FAILURE);
+ }
+
+ /** Expected behavior is that the job fails. */
+ @Test
+ public void testStopWithSavepointFailOnCheckpoint() throws Exception {
+ try {
+
testStopWithSavepoint(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT);
+ fail("Expect exception");
+ } catch (ExecutionException e) {
+ assertThat(e.getMessage(), containsString("CheckpointException"));
+ }
+ }
+
+ @Test
+ public void testStopWithSavepointFailOnStop() throws Exception {
+ try {
+ testStopWithSavepoint(StopWithSavepointTestBehavior.FAIL_ON_STOP);
+ fail("Expect exception");
+ } catch (ExecutionException e) {
+ assertThat(e.getMessage(), containsString("Reached state FAILED
instead of FINISHED"));
+ }
+ }
+
+ @Test
+ public void testStopWithSavepointFailOnFirstSavepointSucceedOnSecond()
throws Exception {
+
assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
+
+ env.setParallelism(PARALLELISM);
+
+ env.addSource(new
DummySource(StopWithSavepointTestBehavior.FAIL_ON_FIRST_CHECKPOINT_ONLY))
+ .addSink(new DiscardingSink<>());
+ DummySource.resetForParallelism(PARALLELISM);
+
+ JobClient client = env.executeAsync();
+
+ DummySource.awaitRunning();
+ DummySource.resetForParallelism(PARALLELISM);
+ final File savepointDirectory = tempFolder.newFolder("savepoint");
+ try {
+ client.stopWithSavepoint(false,
savepointDirectory.getAbsolutePath()).get();
+ fail("Expect failure of operation");
+ } catch (ExecutionException e) {
+ assertThat(e.getMessage(), containsString("CheckpointException"));
+ }
+
+ // trigger second savepoint
+ DummySource.awaitRunning();
+ final String savepoint =
+ client.stopWithSavepoint(false,
savepointDirectory.getAbsolutePath()).get();
+ assertThat(savepoint,
containsString(savepointDirectory.getAbsolutePath()));
+ }
+
+ /** Tests the stop with savepoint operation */
+ private void testStopWithSavepoint(StopWithSavepointTestBehavior behavior)
throws Exception {
+
assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ if (behavior ==
StopWithSavepointTestBehavior.FAIL_ON_FIRST_CHECKPOINT_ONLY) {
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
+ } else {
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ }
+ env.setParallelism(PARALLELISM);
+
+ env.addSource(new DummySource(behavior)).addSink(new
DiscardingSink<>());
+ DummySource.resetForParallelism(PARALLELISM);
+
+ JobClient client = env.executeAsync();
+
+ DummySource.awaitRunning();
+ final File savepointDirectory = tempFolder.newFolder("savepoint");
+ final String savepoint =
+ client.stopWithSavepoint(false,
savepointDirectory.getAbsolutePath()).get();
+ assertThat(savepoint,
containsString(savepointDirectory.getAbsolutePath()));
+ assertThat(client.getJobStatus().get(), is(JobStatus.FINISHED));
+ }
+
+ private static final class DummySource extends
RichParallelSourceFunction<Integer>
+ implements CheckpointedFunction, CheckpointListener {
+ private final StopWithSavepointTestBehavior behavior;
+ private volatile boolean running = true;
+ private static CountDownLatch instancesRunning;
+ private volatile boolean checkpointComplete = false;
+
+ public DummySource(StopWithSavepointTestBehavior behavior) {
+ this.behavior = behavior;
+ }
+
+ private static void resetForParallelism(int para) {
+ instancesRunning = new CountDownLatch(para);
+ }
+
+ private static void awaitRunning() throws InterruptedException {
+ Preconditions.checkNotNull(instancesRunning);
+ instancesRunning.await();
+ }
+
+ @Override
+ public void run(SourceContext<Integer> ctx) throws Exception {
+ Preconditions.checkNotNull(instancesRunning);
+ instancesRunning.countDown();
+ int i = Integer.MIN_VALUE;
+ while (running) {
+ Thread.sleep(10L);
+ ctx.collect(i++);
+ }
Review comment:
I think this should be executed under the checkpoint lock.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** State describing a job that is currently taking a savepoint and stopping
afterwards. */
+class StopWithSavepoint extends ExecutingStateWithFailureHandler {
+ private final CompletableFuture<String> operationCompletionFuture;
+ private final boolean advanceToEndOfEventTime;
+ private final String targetDirectory;
+ private final CheckpointCoordinator checkpointCoordinator;
+ private final Context context;
+
+ StopWithSavepoint(
+ Context context,
+ ExecutionGraph executionGraph,
+ ExecutionGraphHandler executionGraphHandler,
+ OperatorCoordinatorHandler operatorCoordinatorHandler,
+ Logger logger,
+ ClassLoader userCodeClassLoader,
+ CheckpointCoordinator checkpointCoordinator,
Review comment:
E.g. we could have methods for stopping & starting the `CC` and to
create a savepoint.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** State describing a job that is currently taking a savepoint and stopping
afterwards. */
+class StopWithSavepoint extends ExecutingStateWithFailureHandler {
Review comment:
I am not sure whether introducing `ExecutingStateWithFailureHandler` is
really helpful here. I think it causes us to overlook how to handle certain
signals. For example, when `cancel` is called, then we should fail the
savepoint operation and complete `operationCompletionFuture` exceptionally. The
same actually applies to `suspend`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/Executing.java
##########
@@ -141,29 +100,54 @@ public void notifyNewResourcesAvailable() {
}
}
- /** Context of the {@link Executing} state. */
- interface Context extends StateWithExecutionGraph.Context {
+ CompletableFuture<String> stopWithSavepoint(
+ String targetDirectory, boolean advanceToEndOfEventTime) {
+ final ExecutionGraph executionGraph = getExecutionGraph();
+ final CheckpointCoordinator checkpointCoordinator =
+ executionGraph.getCheckpointCoordinator();
+
+ // check if stop with savepoint is possible:
+ if (checkpointCoordinator == null) {
+ return FutureUtils.completedExceptionally(
+ new IllegalStateException(
+ String.format(
+ "Job %s is not a streaming job.",
executionGraph.getJobID())));
+ }
- /**
- * Transitions into the {@link Canceling} state.
- *
- * @param executionGraph executionGraph to pass to the {@link
Canceling} state
- * @param executionGraphHandler executionGraphHandler to pass to the
{@link Canceling} state
- * @param operatorCoordinatorHandler operatorCoordinatorHandler to
pass to the {@link
- * Canceling} state
- */
- void goToCanceling(
- ExecutionGraph executionGraph,
- ExecutionGraphHandler executionGraphHandler,
- OperatorCoordinatorHandler operatorCoordinatorHandler);
+ if (targetDirectory == null
+ &&
!checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
+ getLogger()
+ .info(
+ "Trying to cancel job {} with savepoint, but no
savepoint directory configured.",
+ executionGraph.getJobID());
+
+ return FutureUtils.completedExceptionally(
+ new IllegalStateException(
+ "No savepoint directory configured. You can either
specify a directory "
+ + "while cancelling via -s
:targetDirectory or configure a cluster-wide "
+ + "default via key '"
+ +
CheckpointingOptions.SAVEPOINT_DIRECTORY.key()
+ + "'."));
+ }
- /**
- * Asks how to handle the failure.
- *
- * @param failure failure describing the failure cause
- * @return {@link FailureResult} which describes how to handle the
failure
- */
- FailureResult howToHandleFailure(Throwable failure);
+ getLogger().info("Triggering stop-with-savepoint for job {}.",
executionGraph.getJobID());
+
+ CompletableFuture<String> operationCompletionFuture = new
CompletableFuture<>();
Review comment:
Maybe instead of creating this future, we could define
`goToStopWithSavepoint()` to return this future. That way we would no have to
pass the future into the `StopWithSavepoint` state but could make it directly
part of it.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** State describing a job that is currently taking a savepoint and stopping
afterwards. */
+class StopWithSavepoint extends ExecutingStateWithFailureHandler {
+ private final CompletableFuture<String> operationCompletionFuture;
+ private final boolean advanceToEndOfEventTime;
+ private final String targetDirectory;
+ private final CheckpointCoordinator checkpointCoordinator;
+ private final Context context;
+
+ StopWithSavepoint(
+ Context context,
+ ExecutionGraph executionGraph,
+ ExecutionGraphHandler executionGraphHandler,
+ OperatorCoordinatorHandler operatorCoordinatorHandler,
+ Logger logger,
+ ClassLoader userCodeClassLoader,
+ CheckpointCoordinator checkpointCoordinator,
+ String targetDirectory,
+ boolean advanceToEndOfEventTime,
+ CompletableFuture<String> operationCompletionFuture) {
+ super(
+ context,
+ executionGraph,
+ executionGraphHandler,
+ operatorCoordinatorHandler,
+ logger,
+ userCodeClassLoader);
+ this.context = context;
+ this.checkpointCoordinator = checkpointCoordinator;
+ this.targetDirectory = targetDirectory;
+ this.advanceToEndOfEventTime = advanceToEndOfEventTime;
+ this.operationCompletionFuture = operationCompletionFuture;
+ }
+
+ @Override
+ public void onEnter() {
+ final ExecutionGraph executionGraph = getExecutionGraph();
+ // we stop the checkpoint coordinator so that we are guaranteed
+ // to have only the data of the synchronous savepoint committed.
+ // in case of failure, and if the job restarts, the coordinator
+ // will be restarted by the CheckpointCoordinatorDeActivator.
Review comment:
Let's not try to rely on things which are far away from this class. It
is much easier to maintain something if things are close to each other.
Concretely, the `StopWithSavepoint` stops the `CheckpointCoordinator` and hence
it should be responsible for activating it again. If we don't do this, then
changes to the `CheckpointCoordinatorDeActivator` will surprisingly also affect
this class.
Having to write an in-code comment explaining this contract should be a good
indicator for such a problem.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepointTest.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
+import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculator;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.ExecutionAttemptMappingProvider;
+import org.apache.flink.runtime.checkpoint.NoOpFailJobCall;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.NoOpExecutionDeploymentListener;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader;
+import
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+import static
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionVertex;
+import static
org.apache.flink.runtime.scheduler.declarative.WaitingForResourcesTest.assertNonNull;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link StopWithSavepoint} state. */
+public class StopWithSavepointTest extends TestLogger {
+
+ /** Test failure during savepoint creation: go to failing */
+ @Test
+ public void testFailureDuringSavepointCreationWithNoRestart() throws
Exception {
+ MockExecutingStateWithFailureHandlerContext ctx =
+ new MockExecutingStateWithFailureHandlerContext();
+
+ StopWithSavepointEnvironment stopWithSavepointEnv =
createStopWithSavepointEnvironment(ctx);
+ StopWithSavepoint stopWithSavepoint = stopWithSavepointEnv.getState();
+ stopWithSavepoint.onEnter();
+
+
ctx.setHowToHandleFailure(ExecutingStateWithFailureHandler.FailureResult::canNotRestart);
+ // fail future returned by the CheckpointCoordinator
+ stopWithSavepointEnv
+ .getSavepointFuture()
+ .completeExceptionally(new RuntimeException("Savepoint
creation failed"));
+
+
stopWithSavepointEnv.getExecutionGraphTerminationFuture().complete(JobStatus.FAILED);
Review comment:
Why is this necessary? A savepoint failure should not require the EG to
reach a globally terminal state before we can recover?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** State describing a job that is currently taking a savepoint and stopping
afterwards. */
+class StopWithSavepoint extends ExecutingStateWithFailureHandler {
+ private final CompletableFuture<String> operationCompletionFuture;
+ private final boolean advanceToEndOfEventTime;
+ private final String targetDirectory;
+ private final CheckpointCoordinator checkpointCoordinator;
+ private final Context context;
+
+ StopWithSavepoint(
+ Context context,
+ ExecutionGraph executionGraph,
+ ExecutionGraphHandler executionGraphHandler,
+ OperatorCoordinatorHandler operatorCoordinatorHandler,
+ Logger logger,
+ ClassLoader userCodeClassLoader,
+ CheckpointCoordinator checkpointCoordinator,
+ String targetDirectory,
+ boolean advanceToEndOfEventTime,
+ CompletableFuture<String> operationCompletionFuture) {
+ super(
+ context,
+ executionGraph,
+ executionGraphHandler,
+ operatorCoordinatorHandler,
+ logger,
+ userCodeClassLoader);
+ this.context = context;
+ this.checkpointCoordinator = checkpointCoordinator;
+ this.targetDirectory = targetDirectory;
+ this.advanceToEndOfEventTime = advanceToEndOfEventTime;
+ this.operationCompletionFuture = operationCompletionFuture;
+ }
+
+ @Override
+ public void onEnter() {
+ final ExecutionGraph executionGraph = getExecutionGraph();
+ // we stop the checkpoint coordinator so that we are guaranteed
+ // to have only the data of the synchronous savepoint committed.
+ // in case of failure, and if the job restarts, the coordinator
+ // will be restarted by the CheckpointCoordinatorDeActivator.
+ checkpointCoordinator.stopCheckpointScheduler();
+
+ CompletableFuture<String> savepointFuture =
+ checkpointCoordinator
+ .triggerSynchronousSavepoint(advanceToEndOfEventTime,
targetDirectory)
+ // handle errors while creating the savepoint
+ .handleAsync(
+ this::handleSavepointOperationResult,
+ context.getMainThreadExecutor())
+ // Wait for job to be finished
+ .thenCompose(
+ path ->
+ executionGraph
+ .getTerminationFuture()
+ .thenApply(
+ status ->
+ new
JobStatusAndSavepointPath(
+
status, path)))
+ // check that the job finished successfully
+ .handleAsync(
+ this::handleStopWithSavepointOperationResult,
+ context.getMainThreadExecutor());
+
+ FutureUtils.forward(savepointFuture, operationCompletionFuture);
+ }
+
+ private String handleSavepointOperationResult(
+ CompletedCheckpoint completedCheckpoint, Throwable throwable) {
+ if (throwable != null) {
+ context.runIfState(this, () -> handleAnyFailure(throwable));
+ throw new SavepointCreationException(
+ "Unable to stop with savepoint. Error while creating the
savepoint.",
+ throwable);
+ }
+ return completedCheckpoint.getExternalPointer();
+ }
+
+ private String handleStopWithSavepointOperationResult(
+ JobStatusAndSavepointPath statusAndPath, Throwable throwable) {
+ // filter out exception from first stage to show correct root cause
+ if (throwable != null) {
+ if (throwable instanceof SavepointCreationException) {
+ throw (SavepointCreationException) throwable;
+ } else {
+ throw new CompletionException(
+ "Error while stopping job after creating savepoint",
throwable);
+ }
+ }
+
+ if (statusAndPath.getStatus() == JobStatus.FINISHED) {
+ // ensure that all vertices from the execution graph are in state
FINISHED. There might
+ // be cases where only some subgraphs of the job are finished.
+ if (!areAllVerticesFinished()) {
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("Printing vertex states:");
+ for (Execution e :
getExecutionGraph().getRegisteredExecutions().values()) {
+ getLogger().debug("{} = {}", e.getVertex(),
e.getState());
+ }
+ }
+ final FlinkException exception =
+ new FlinkException(
+ "Not all vertices of job are finished. Stop
with savepoint operation failed.");
+ context.runIfState(this, () -> handleAnyFailure(exception));
+ throw new CompletionException("Unable to stop with savepoint",
exception);
+ }
+ context.runIfState(
+ this,
+ () ->
+ context.goToFinished(
+
ArchivedExecutionGraph.createFrom(getExecutionGraph())));
+ return statusAndPath.getPath();
+ } else {
+ throw new CompletionException(
+ new FlinkException(
+ "Reached state "
+ + statusAndPath.getStatus()
+ + " instead of FINISHED "));
+ }
+ }
+
+ private boolean areAllVerticesFinished() {
+ return getExecutionGraph().getRegisteredExecutions().values().stream()
+ .noneMatch(e -> e.getState() != ExecutionState.FINISHED);
+ }
+
+ @Override
+ protected void handleAnyFailure(@Nonnull Throwable cause) {
+ Preconditions.checkState(context.isState(this), "Assuming
StopWithSavepoint state");
+ // restart the checkpoint coordinator if stopWithSavepoint failed.
+ startCheckpointScheduler(checkpointCoordinator);
+
+ if (getLogger().isDebugEnabled()) {
+ getLogger()
+ .debug(
+ "Restarting or Failing job because of failure
while stopping with savepoint.",
+ cause);
+ } else {
+ getLogger()
+ .info(
+ "Restarting or Failing job because of failure
while stopping with savepoint. Reason: {}",
+ cause.getMessage());
+ }
+ super.handleAnyFailure(cause);
+ }
+
+ @Override
+ void onGloballyTerminalState(JobStatus globallyTerminalState) {
+ // handled already
Review comment:
This should be an indicator that we now have different control paths for
one and the same thing. This is usually not ideal and makes maintenance harder.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** State describing a job that is currently taking a savepoint and stopping
afterwards. */
+class StopWithSavepoint extends ExecutingStateWithFailureHandler {
+ private final CompletableFuture<String> operationCompletionFuture;
+ private final boolean advanceToEndOfEventTime;
+ private final String targetDirectory;
+ private final CheckpointCoordinator checkpointCoordinator;
+ private final Context context;
+
+ StopWithSavepoint(
+ Context context,
+ ExecutionGraph executionGraph,
+ ExecutionGraphHandler executionGraphHandler,
+ OperatorCoordinatorHandler operatorCoordinatorHandler,
+ Logger logger,
+ ClassLoader userCodeClassLoader,
+ CheckpointCoordinator checkpointCoordinator,
+ String targetDirectory,
+ boolean advanceToEndOfEventTime,
+ CompletableFuture<String> operationCompletionFuture) {
+ super(
+ context,
+ executionGraph,
+ executionGraphHandler,
+ operatorCoordinatorHandler,
+ logger,
+ userCodeClassLoader);
+ this.context = context;
+ this.checkpointCoordinator = checkpointCoordinator;
+ this.targetDirectory = targetDirectory;
+ this.advanceToEndOfEventTime = advanceToEndOfEventTime;
+ this.operationCompletionFuture = operationCompletionFuture;
+ }
+
+ @Override
+ public void onEnter() {
+ final ExecutionGraph executionGraph = getExecutionGraph();
+ // we stop the checkpoint coordinator so that we are guaranteed
+ // to have only the data of the synchronous savepoint committed.
+ // in case of failure, and if the job restarts, the coordinator
+ // will be restarted by the CheckpointCoordinatorDeActivator.
+ checkpointCoordinator.stopCheckpointScheduler();
+
+ CompletableFuture<String> savepointFuture =
+ checkpointCoordinator
+ .triggerSynchronousSavepoint(advanceToEndOfEventTime,
targetDirectory)
+ // handle errors while creating the savepoint
+ .handleAsync(
+ this::handleSavepointOperationResult,
+ context.getMainThreadExecutor())
+ // Wait for job to be finished
+ .thenCompose(
+ path ->
+ executionGraph
+ .getTerminationFuture()
+ .thenApply(
+ status ->
+ new
JobStatusAndSavepointPath(
+
status, path)))
+ // check that the job finished successfully
+ .handleAsync(
+ this::handleStopWithSavepointOperationResult,
+ context.getMainThreadExecutor());
+
+ FutureUtils.forward(savepointFuture, operationCompletionFuture);
+ }
+
+ private String handleSavepointOperationResult(
+ CompletedCheckpoint completedCheckpoint, Throwable throwable) {
+ if (throwable != null) {
+ context.runIfState(this, () -> handleAnyFailure(throwable));
+ throw new SavepointCreationException(
+ "Unable to stop with savepoint. Error while creating the
savepoint.",
+ throwable);
+ }
+ return completedCheckpoint.getExternalPointer();
+ }
+
+ private String handleStopWithSavepointOperationResult(
+ JobStatusAndSavepointPath statusAndPath, Throwable throwable) {
+ // filter out exception from first stage to show correct root cause
+ if (throwable != null) {
+ if (throwable instanceof SavepointCreationException) {
+ throw (SavepointCreationException) throwable;
+ } else {
+ throw new CompletionException(
+ "Error while stopping job after creating savepoint",
throwable);
+ }
+ }
+
+ if (statusAndPath.getStatus() == JobStatus.FINISHED) {
+ // ensure that all vertices from the execution graph are in state
FINISHED. There might
+ // be cases where only some subgraphs of the job are finished.
+ if (!areAllVerticesFinished()) {
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("Printing vertex states:");
+ for (Execution e :
getExecutionGraph().getRegisteredExecutions().values()) {
+ getLogger().debug("{} = {}", e.getVertex(),
e.getState());
+ }
+ }
+ final FlinkException exception =
+ new FlinkException(
+ "Not all vertices of job are finished. Stop
with savepoint operation failed.");
+ context.runIfState(this, () -> handleAnyFailure(exception));
+ throw new CompletionException("Unable to stop with savepoint",
exception);
+ }
+ context.runIfState(
+ this,
+ () ->
+ context.goToFinished(
+
ArchivedExecutionGraph.createFrom(getExecutionGraph())));
+ return statusAndPath.getPath();
+ } else {
+ throw new CompletionException(
+ new FlinkException(
+ "Reached state "
+ + statusAndPath.getStatus()
+ + " instead of FINISHED "));
+ }
+ }
+
+ private boolean areAllVerticesFinished() {
+ return getExecutionGraph().getRegisteredExecutions().values().stream()
+ .noneMatch(e -> e.getState() != ExecutionState.FINISHED);
+ }
+
+ @Override
+ protected void handleAnyFailure(@Nonnull Throwable cause) {
+ Preconditions.checkState(context.isState(this), "Assuming
StopWithSavepoint state");
+ // restart the checkpoint coordinator if stopWithSavepoint failed.
+ startCheckpointScheduler(checkpointCoordinator);
+
+ if (getLogger().isDebugEnabled()) {
+ getLogger()
+ .debug(
+ "Restarting or Failing job because of failure
while stopping with savepoint.",
+ cause);
+ } else {
+ getLogger()
+ .info(
+ "Restarting or Failing job because of failure
while stopping with savepoint. Reason: {}",
+ cause.getMessage());
+ }
+ super.handleAnyFailure(cause);
Review comment:
This is a bit of personal taste but I find it hard to see what happens
in case of a failure in the state `StopWithSavepoint`. If we didn't use the
`ExecutingStateWithFailureHandler` and instead have an explicit
`context.goToRestarting` then it would be a bit clearer. It currently works
because all failures are handled as global failovers. But once this changes,
this class will break because we want to fail with global failovers explicitly.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** State describing a job that is currently taking a savepoint and stopping
afterwards. */
+class StopWithSavepoint extends ExecutingStateWithFailureHandler {
+ private final CompletableFuture<String> operationCompletionFuture;
+ private final boolean advanceToEndOfEventTime;
+ private final String targetDirectory;
+ private final CheckpointCoordinator checkpointCoordinator;
+ private final Context context;
+
+ StopWithSavepoint(
+ Context context,
+ ExecutionGraph executionGraph,
+ ExecutionGraphHandler executionGraphHandler,
+ OperatorCoordinatorHandler operatorCoordinatorHandler,
+ Logger logger,
+ ClassLoader userCodeClassLoader,
+ CheckpointCoordinator checkpointCoordinator,
+ String targetDirectory,
+ boolean advanceToEndOfEventTime,
+ CompletableFuture<String> operationCompletionFuture) {
+ super(
+ context,
+ executionGraph,
+ executionGraphHandler,
+ operatorCoordinatorHandler,
+ logger,
+ userCodeClassLoader);
+ this.context = context;
+ this.checkpointCoordinator = checkpointCoordinator;
+ this.targetDirectory = targetDirectory;
+ this.advanceToEndOfEventTime = advanceToEndOfEventTime;
+ this.operationCompletionFuture = operationCompletionFuture;
+ }
+
+ @Override
+ public void onEnter() {
+ final ExecutionGraph executionGraph = getExecutionGraph();
+ // we stop the checkpoint coordinator so that we are guaranteed
+ // to have only the data of the synchronous savepoint committed.
+ // in case of failure, and if the job restarts, the coordinator
+ // will be restarted by the CheckpointCoordinatorDeActivator.
+ checkpointCoordinator.stopCheckpointScheduler();
+
+ CompletableFuture<String> savepointFuture =
+ checkpointCoordinator
+ .triggerSynchronousSavepoint(advanceToEndOfEventTime,
targetDirectory)
+ // handle errors while creating the savepoint
+ .handleAsync(
+ this::handleSavepointOperationResult,
+ context.getMainThreadExecutor())
+ // Wait for job to be finished
+ .thenCompose(
+ path ->
+ executionGraph
+ .getTerminationFuture()
+ .thenApply(
+ status ->
+ new
JobStatusAndSavepointPath(
+
status, path)))
+ // check that the job finished successfully
+ .handleAsync(
+ this::handleStopWithSavepointOperationResult,
Review comment:
Why are we handling the final job result by accessing the
`getTerminationFuture`? Don't we already have `onGloballyTerminalState` which
tells us if the `ExecutionGraph` has reached a globally terminal state?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** State describing a job that is currently taking a savepoint and stopping
afterwards. */
+class StopWithSavepoint extends ExecutingStateWithFailureHandler {
+ private final CompletableFuture<String> operationCompletionFuture;
+ private final boolean advanceToEndOfEventTime;
+ private final String targetDirectory;
+ private final CheckpointCoordinator checkpointCoordinator;
+ private final Context context;
+
+ StopWithSavepoint(
+ Context context,
+ ExecutionGraph executionGraph,
+ ExecutionGraphHandler executionGraphHandler,
+ OperatorCoordinatorHandler operatorCoordinatorHandler,
+ Logger logger,
+ ClassLoader userCodeClassLoader,
+ CheckpointCoordinator checkpointCoordinator,
+ String targetDirectory,
+ boolean advanceToEndOfEventTime,
+ CompletableFuture<String> operationCompletionFuture) {
+ super(
+ context,
+ executionGraph,
+ executionGraphHandler,
+ operatorCoordinatorHandler,
+ logger,
+ userCodeClassLoader);
+ this.context = context;
+ this.checkpointCoordinator = checkpointCoordinator;
+ this.targetDirectory = targetDirectory;
+ this.advanceToEndOfEventTime = advanceToEndOfEventTime;
+ this.operationCompletionFuture = operationCompletionFuture;
+ }
+
+ @Override
+ public void onEnter() {
+ final ExecutionGraph executionGraph = getExecutionGraph();
+ // we stop the checkpoint coordinator so that we are guaranteed
+ // to have only the data of the synchronous savepoint committed.
+ // in case of failure, and if the job restarts, the coordinator
+ // will be restarted by the CheckpointCoordinatorDeActivator.
+ checkpointCoordinator.stopCheckpointScheduler();
+
+ CompletableFuture<String> savepointFuture =
+ checkpointCoordinator
+ .triggerSynchronousSavepoint(advanceToEndOfEventTime,
targetDirectory)
+ // handle errors while creating the savepoint
+ .handleAsync(
+ this::handleSavepointOperationResult,
+ context.getMainThreadExecutor())
+ // Wait for job to be finished
+ .thenCompose(
+ path ->
+ executionGraph
+ .getTerminationFuture()
+ .thenApply(
+ status ->
+ new
JobStatusAndSavepointPath(
+
status, path)))
+ // check that the job finished successfully
+ .handleAsync(
+ this::handleStopWithSavepointOperationResult,
+ context.getMainThreadExecutor());
+
+ FutureUtils.forward(savepointFuture, operationCompletionFuture);
+ }
+
+ private String handleSavepointOperationResult(
+ CompletedCheckpoint completedCheckpoint, Throwable throwable) {
+ if (throwable != null) {
+ context.runIfState(this, () -> handleAnyFailure(throwable));
+ throw new SavepointCreationException(
+ "Unable to stop with savepoint. Error while creating the
savepoint.",
+ throwable);
Review comment:
Handling the failure and sending the result to the user happens here in
two places. The latter is even obfuscated a bit through throwing an exception
which is forwarded to the result future at some other place. This is a rather
complicated control flow. Couldn't we do this in the failure handling logic?
Are we treating savepoint failures as global failures? It looks a bit like
this. I think this is wrong. A savepoint failure should only fail the operation
and transition us back to the `Executing` state.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** State describing a job that is currently taking a savepoint and stopping
afterwards. */
+class StopWithSavepoint extends ExecutingStateWithFailureHandler {
+ private final CompletableFuture<String> operationCompletionFuture;
+ private final boolean advanceToEndOfEventTime;
+ private final String targetDirectory;
+ private final CheckpointCoordinator checkpointCoordinator;
+ private final Context context;
+
+ StopWithSavepoint(
+ Context context,
+ ExecutionGraph executionGraph,
+ ExecutionGraphHandler executionGraphHandler,
+ OperatorCoordinatorHandler operatorCoordinatorHandler,
+ Logger logger,
+ ClassLoader userCodeClassLoader,
+ CheckpointCoordinator checkpointCoordinator,
+ String targetDirectory,
+ boolean advanceToEndOfEventTime,
+ CompletableFuture<String> operationCompletionFuture) {
+ super(
+ context,
+ executionGraph,
+ executionGraphHandler,
+ operatorCoordinatorHandler,
+ logger,
+ userCodeClassLoader);
+ this.context = context;
+ this.checkpointCoordinator = checkpointCoordinator;
+ this.targetDirectory = targetDirectory;
+ this.advanceToEndOfEventTime = advanceToEndOfEventTime;
+ this.operationCompletionFuture = operationCompletionFuture;
+ }
+
+ @Override
+ public void onEnter() {
+ final ExecutionGraph executionGraph = getExecutionGraph();
+ // we stop the checkpoint coordinator so that we are guaranteed
+ // to have only the data of the synchronous savepoint committed.
+ // in case of failure, and if the job restarts, the coordinator
+ // will be restarted by the CheckpointCoordinatorDeActivator.
+ checkpointCoordinator.stopCheckpointScheduler();
+
+ CompletableFuture<String> savepointFuture =
+ checkpointCoordinator
+ .triggerSynchronousSavepoint(advanceToEndOfEventTime,
targetDirectory)
+ // handle errors while creating the savepoint
+ .handleAsync(
+ this::handleSavepointOperationResult,
+ context.getMainThreadExecutor())
+ // Wait for job to be finished
+ .thenCompose(
+ path ->
+ executionGraph
+ .getTerminationFuture()
+ .thenApply(
+ status ->
+ new
JobStatusAndSavepointPath(
+
status, path)))
+ // check that the job finished successfully
+ .handleAsync(
+ this::handleStopWithSavepointOperationResult,
+ context.getMainThreadExecutor());
+
+ FutureUtils.forward(savepointFuture, operationCompletionFuture);
+ }
+
+ private String handleSavepointOperationResult(
+ CompletedCheckpoint completedCheckpoint, Throwable throwable) {
+ if (throwable != null) {
+ context.runIfState(this, () -> handleAnyFailure(throwable));
+ throw new SavepointCreationException(
+ "Unable to stop with savepoint. Error while creating the
savepoint.",
+ throwable);
+ }
+ return completedCheckpoint.getExternalPointer();
+ }
+
+ private String handleStopWithSavepointOperationResult(
+ JobStatusAndSavepointPath statusAndPath, Throwable throwable) {
+ // filter out exception from first stage to show correct root cause
+ if (throwable != null) {
+ if (throwable instanceof SavepointCreationException) {
+ throw (SavepointCreationException) throwable;
+ } else {
+ throw new CompletionException(
+ "Error while stopping job after creating savepoint",
throwable);
+ }
+ }
+
+ if (statusAndPath.getStatus() == JobStatus.FINISHED) {
+ // ensure that all vertices from the execution graph are in state
FINISHED. There might
+ // be cases where only some subgraphs of the job are finished.
+ if (!areAllVerticesFinished()) {
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("Printing vertex states:");
+ for (Execution e :
getExecutionGraph().getRegisteredExecutions().values()) {
+ getLogger().debug("{} = {}", e.getVertex(),
e.getState());
+ }
+ }
+ final FlinkException exception =
+ new FlinkException(
+ "Not all vertices of job are finished. Stop
with savepoint operation failed.");
+ context.runIfState(this, () -> handleAnyFailure(exception));
+ throw new CompletionException("Unable to stop with savepoint",
exception);
Review comment:
here again we have one path for the failure handling and another path
for communicating the result back to the user. I think this should be one and
the same path. If we leave this state in any other state than `Finished`, then
we should fail the operation.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepointTest.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
+import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculator;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.ExecutionAttemptMappingProvider;
+import org.apache.flink.runtime.checkpoint.NoOpFailJobCall;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.NoOpExecutionDeploymentListener;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader;
+import
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+import static
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionVertex;
+import static
org.apache.flink.runtime.scheduler.declarative.WaitingForResourcesTest.assertNonNull;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link StopWithSavepoint} state. */
+public class StopWithSavepointTest extends TestLogger {
+
+ /** Test failure during savepoint creation: go to failing */
+ @Test
+ public void testFailureDuringSavepointCreationWithNoRestart() throws
Exception {
+ MockExecutingStateWithFailureHandlerContext ctx =
+ new MockExecutingStateWithFailureHandlerContext();
+
+ StopWithSavepointEnvironment stopWithSavepointEnv =
createStopWithSavepointEnvironment(ctx);
+ StopWithSavepoint stopWithSavepoint = stopWithSavepointEnv.getState();
+ stopWithSavepoint.onEnter();
+
+
ctx.setHowToHandleFailure(ExecutingStateWithFailureHandler.FailureResult::canNotRestart);
+ // fail future returned by the CheckpointCoordinator
+ stopWithSavepointEnv
+ .getSavepointFuture()
+ .completeExceptionally(new RuntimeException("Savepoint
creation failed"));
+
+
stopWithSavepointEnv.getExecutionGraphTerminationFuture().complete(JobStatus.FAILED);
+
+ ctx.setExpectFailing(assertNonNull());
+
+ ctx.close(); // trigger outstanding executions
+
+ assertThat(
+
stopWithSavepointEnv.getOperationCompletionFuture().isCompletedExceptionally(),
+ is(true));
+ }
+
+ private static class StopWithSavepointEnvironment {
+ private final StopWithSavepoint state;
+ private final CompletableFuture<JobStatus>
executionGraphTerminationFuture;
+ private final CompletableFuture<String> operationCompletionFuture;
+ private final CompletableFuture<CompletedCheckpoint> savepointFuture;
+
+ StopWithSavepointEnvironment(
+ StopWithSavepoint state,
+ CompletableFuture<JobStatus> executionGraphTerminationFuture,
+ CompletableFuture<String> operationCompletionFuture,
+ CompletableFuture<CompletedCheckpoint> savepointFuture) {
+ this.state = state;
+ this.executionGraphTerminationFuture =
executionGraphTerminationFuture;
+ this.operationCompletionFuture = operationCompletionFuture;
+ this.savepointFuture = savepointFuture;
+ }
+
+ public StopWithSavepoint getState() {
+ return state;
+ }
+
+ public CompletableFuture<JobStatus>
getExecutionGraphTerminationFuture() {
+ return executionGraphTerminationFuture;
+ }
+
+ public CompletableFuture<String> getOperationCompletionFuture() {
+ return operationCompletionFuture;
+ }
+
+ public CompletableFuture<CompletedCheckpoint> getSavepointFuture() {
+ return savepointFuture;
+ }
+ }
+
+ private StopWithSavepointEnvironment createStopWithSavepointEnvironment(
+ MockExecutingStateWithFailureHandlerContext ctx) throws
IOException {
+ CompletableFuture<JobStatus> executionGraphTerminationFuture = new
CompletableFuture<>();
+ CompletableFuture<String> operationCompletionFuture = new
CompletableFuture<>();
+ CompletableFuture<CompletedCheckpoint> savepointFuture = new
CompletableFuture<>();
+ final ExecutionGraph executionGraph =
+ new MockExecutionGraph(executionGraphTerminationFuture);
+ executionGraph.transitionToRunning();
+ final ExecutionGraphHandler executionGraphHandler =
+ new ExecutionGraphHandler(
+ executionGraph,
+ log,
+ ctx.getMainThreadExecutor(),
+ ctx.getMainThreadExecutor());
+ OperatorCoordinatorHandler operatorCoordinatorHandler =
+ new OperatorCoordinatorHandler(
+ executionGraph,
+ (throwable) -> {
+ throw new RuntimeException("Error in test",
throwable);
+ });
+
+ CheckpointCoordinator checkpointCoordinator =
+ new MockCheckpointCoordinator(savepointFuture);
+
+ StopWithSavepoint stopWithSavepoint =
+ new StopWithSavepoint(
+ ctx,
+ executionGraph,
+ executionGraphHandler,
+ operatorCoordinatorHandler,
+ log,
+ ClassLoader.getSystemClassLoader(),
+ checkpointCoordinator,
+ "savepointTargetDir",
+ false,
+ operationCompletionFuture);
+ return new StopWithSavepointEnvironment(
+ stopWithSavepoint,
+ executionGraphTerminationFuture,
+ operationCompletionFuture,
+ savepointFuture);
+ }
+
+ private static class MockCheckpointCoordinator extends
CheckpointCoordinator {
Review comment:
Let's not continue with this pattern. Extending concrete classes is a
recipe for a lot of rabbit holes.
Maybe an easier solution could be to define the interface the
`StopWithSavepoint` state requires and then have some adapters to map it to the
underlying `CheckpointCoordinator` (or even let the `CheckpointCoordinator`
implement this interface).
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** State describing a job that is currently taking a savepoint and stopping
afterwards. */
+class StopWithSavepoint extends ExecutingStateWithFailureHandler {
+ private final CompletableFuture<String> operationCompletionFuture;
+ private final boolean advanceToEndOfEventTime;
+ private final String targetDirectory;
+ private final CheckpointCoordinator checkpointCoordinator;
+ private final Context context;
+
+ StopWithSavepoint(
+ Context context,
+ ExecutionGraph executionGraph,
+ ExecutionGraphHandler executionGraphHandler,
+ OperatorCoordinatorHandler operatorCoordinatorHandler,
+ Logger logger,
+ ClassLoader userCodeClassLoader,
+ CheckpointCoordinator checkpointCoordinator,
Review comment:
How well is the state testable with accessing the
`CheckpointCoordinator` explicitly? Would it be easier if we hid the
coordinator behind some interface or a `context` method?
##########
File path: flink-tests/src/test/resources/log4j2-test.properties
##########
@@ -18,7 +18,7 @@
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
-rootLogger.level = OFF
+rootLogger.level = INFO
Review comment:
Probably not intended.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepointTest.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
+import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculator;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.ExecutionAttemptMappingProvider;
+import org.apache.flink.runtime.checkpoint.NoOpFailJobCall;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.NoOpExecutionDeploymentListener;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader;
+import
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+import static
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionVertex;
+import static
org.apache.flink.runtime.scheduler.declarative.WaitingForResourcesTest.assertNonNull;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link StopWithSavepoint} state. */
+public class StopWithSavepointTest extends TestLogger {
+
+ /** Test failure during savepoint creation: go to failing */
+ @Test
+ public void testFailureDuringSavepointCreationWithNoRestart() throws
Exception {
+ MockExecutingStateWithFailureHandlerContext ctx =
+ new MockExecutingStateWithFailureHandlerContext();
+
+ StopWithSavepointEnvironment stopWithSavepointEnv =
createStopWithSavepointEnvironment(ctx);
+ StopWithSavepoint stopWithSavepoint = stopWithSavepointEnv.getState();
+ stopWithSavepoint.onEnter();
+
+
ctx.setHowToHandleFailure(ExecutingStateWithFailureHandler.FailureResult::canNotRestart);
+ // fail future returned by the CheckpointCoordinator
+ stopWithSavepointEnv
+ .getSavepointFuture()
+ .completeExceptionally(new RuntimeException("Savepoint
creation failed"));
+
+
stopWithSavepointEnv.getExecutionGraphTerminationFuture().complete(JobStatus.FAILED);
+
+ ctx.setExpectFailing(assertNonNull());
+
+ ctx.close(); // trigger outstanding executions
+
+ assertThat(
+
stopWithSavepointEnv.getOperationCompletionFuture().isCompletedExceptionally(),
+ is(true));
+ }
+
+ private static class StopWithSavepointEnvironment {
+ private final StopWithSavepoint state;
+ private final CompletableFuture<JobStatus>
executionGraphTerminationFuture;
+ private final CompletableFuture<String> operationCompletionFuture;
+ private final CompletableFuture<CompletedCheckpoint> savepointFuture;
+
+ StopWithSavepointEnvironment(
+ StopWithSavepoint state,
+ CompletableFuture<JobStatus> executionGraphTerminationFuture,
+ CompletableFuture<String> operationCompletionFuture,
+ CompletableFuture<CompletedCheckpoint> savepointFuture) {
+ this.state = state;
+ this.executionGraphTerminationFuture =
executionGraphTerminationFuture;
+ this.operationCompletionFuture = operationCompletionFuture;
+ this.savepointFuture = savepointFuture;
+ }
+
+ public StopWithSavepoint getState() {
+ return state;
+ }
+
+ public CompletableFuture<JobStatus>
getExecutionGraphTerminationFuture() {
+ return executionGraphTerminationFuture;
+ }
+
+ public CompletableFuture<String> getOperationCompletionFuture() {
+ return operationCompletionFuture;
+ }
+
+ public CompletableFuture<CompletedCheckpoint> getSavepointFuture() {
+ return savepointFuture;
+ }
+ }
+
+ private StopWithSavepointEnvironment createStopWithSavepointEnvironment(
+ MockExecutingStateWithFailureHandlerContext ctx) throws
IOException {
+ CompletableFuture<JobStatus> executionGraphTerminationFuture = new
CompletableFuture<>();
+ CompletableFuture<String> operationCompletionFuture = new
CompletableFuture<>();
+ CompletableFuture<CompletedCheckpoint> savepointFuture = new
CompletableFuture<>();
+ final ExecutionGraph executionGraph =
+ new MockExecutionGraph(executionGraphTerminationFuture);
+ executionGraph.transitionToRunning();
+ final ExecutionGraphHandler executionGraphHandler =
+ new ExecutionGraphHandler(
+ executionGraph,
+ log,
+ ctx.getMainThreadExecutor(),
+ ctx.getMainThreadExecutor());
+ OperatorCoordinatorHandler operatorCoordinatorHandler =
+ new OperatorCoordinatorHandler(
+ executionGraph,
+ (throwable) -> {
+ throw new RuntimeException("Error in test",
throwable);
+ });
+
+ CheckpointCoordinator checkpointCoordinator =
+ new MockCheckpointCoordinator(savepointFuture);
+
+ StopWithSavepoint stopWithSavepoint =
+ new StopWithSavepoint(
+ ctx,
+ executionGraph,
+ executionGraphHandler,
+ operatorCoordinatorHandler,
+ log,
+ ClassLoader.getSystemClassLoader(),
+ checkpointCoordinator,
+ "savepointTargetDir",
+ false,
+ operationCompletionFuture);
+ return new StopWithSavepointEnvironment(
+ stopWithSavepoint,
+ executionGraphTerminationFuture,
+ operationCompletionFuture,
+ savepointFuture);
+ }
+
+ private static class MockCheckpointCoordinator extends
CheckpointCoordinator {
+ private static ExecutionVertex vertex = mockExecutionVertex(new
ExecutionAttemptID());
+ private static ExecutionVertex[] defaultVertices = new
ExecutionVertex[] {vertex};
+ private final CompletableFuture<CompletedCheckpoint> savepointFuture;
+
+ MockCheckpointCoordinator(CompletableFuture<CompletedCheckpoint>
savepointFuture) {
+ super(
+ new JobID(),
+ new CheckpointCoordinatorConfiguration
+
.CheckpointCoordinatorConfigurationBuilder()
+ .build(),
+ Collections.emptyList(),
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(1),
+ new MemoryStateBackend(),
+ Executors.directExecutor(),
+ new CheckpointsCleaner(),
+ new ManuallyTriggeredScheduledExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY,
+ new CheckpointFailureManager(0, NoOpFailJobCall.INSTANCE),
+ new CheckpointPlanCalculator(
+ new JobID(),
+ Arrays.asList(defaultVertices),
+ Arrays.asList(defaultVertices),
+ Arrays.asList(defaultVertices)),
+ new
ExecutionAttemptMappingProvider(Arrays.asList(defaultVertices)));
Review comment:
The pain to set these things up should be a good indicator that we are
doing something wrong here.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepointTest.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
+import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculator;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.ExecutionAttemptMappingProvider;
+import org.apache.flink.runtime.checkpoint.NoOpFailJobCall;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.NoOpExecutionDeploymentListener;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader;
+import
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+import static
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionVertex;
+import static
org.apache.flink.runtime.scheduler.declarative.WaitingForResourcesTest.assertNonNull;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link StopWithSavepoint} state. */
+public class StopWithSavepointTest extends TestLogger {
+
+ /** Test failure during savepoint creation: go to failing */
+ @Test
+ public void testFailureDuringSavepointCreationWithNoRestart() throws
Exception {
+ MockExecutingStateWithFailureHandlerContext ctx =
+ new MockExecutingStateWithFailureHandlerContext();
+
+ StopWithSavepointEnvironment stopWithSavepointEnv =
createStopWithSavepointEnvironment(ctx);
+ StopWithSavepoint stopWithSavepoint = stopWithSavepointEnv.getState();
+ stopWithSavepoint.onEnter();
+
+
ctx.setHowToHandleFailure(ExecutingStateWithFailureHandler.FailureResult::canNotRestart);
+ // fail future returned by the CheckpointCoordinator
+ stopWithSavepointEnv
+ .getSavepointFuture()
+ .completeExceptionally(new RuntimeException("Savepoint
creation failed"));
+
+
stopWithSavepointEnv.getExecutionGraphTerminationFuture().complete(JobStatus.FAILED);
+
+ ctx.setExpectFailing(assertNonNull());
+
+ ctx.close(); // trigger outstanding executions
+
+ assertThat(
+
stopWithSavepointEnv.getOperationCompletionFuture().isCompletedExceptionally(),
+ is(true));
+ }
+
+ private static class StopWithSavepointEnvironment {
+ private final StopWithSavepoint state;
+ private final CompletableFuture<JobStatus>
executionGraphTerminationFuture;
+ private final CompletableFuture<String> operationCompletionFuture;
+ private final CompletableFuture<CompletedCheckpoint> savepointFuture;
+
+ StopWithSavepointEnvironment(
+ StopWithSavepoint state,
+ CompletableFuture<JobStatus> executionGraphTerminationFuture,
+ CompletableFuture<String> operationCompletionFuture,
+ CompletableFuture<CompletedCheckpoint> savepointFuture) {
+ this.state = state;
+ this.executionGraphTerminationFuture =
executionGraphTerminationFuture;
+ this.operationCompletionFuture = operationCompletionFuture;
+ this.savepointFuture = savepointFuture;
+ }
+
+ public StopWithSavepoint getState() {
+ return state;
+ }
+
+ public CompletableFuture<JobStatus>
getExecutionGraphTerminationFuture() {
+ return executionGraphTerminationFuture;
+ }
+
+ public CompletableFuture<String> getOperationCompletionFuture() {
+ return operationCompletionFuture;
+ }
+
+ public CompletableFuture<CompletedCheckpoint> getSavepointFuture() {
+ return savepointFuture;
+ }
+ }
+
+ private StopWithSavepointEnvironment createStopWithSavepointEnvironment(
+ MockExecutingStateWithFailureHandlerContext ctx) throws
IOException {
+ CompletableFuture<JobStatus> executionGraphTerminationFuture = new
CompletableFuture<>();
+ CompletableFuture<String> operationCompletionFuture = new
CompletableFuture<>();
+ CompletableFuture<CompletedCheckpoint> savepointFuture = new
CompletableFuture<>();
+ final ExecutionGraph executionGraph =
+ new MockExecutionGraph(executionGraphTerminationFuture);
+ executionGraph.transitionToRunning();
+ final ExecutionGraphHandler executionGraphHandler =
+ new ExecutionGraphHandler(
+ executionGraph,
+ log,
+ ctx.getMainThreadExecutor(),
+ ctx.getMainThreadExecutor());
+ OperatorCoordinatorHandler operatorCoordinatorHandler =
+ new OperatorCoordinatorHandler(
+ executionGraph,
+ (throwable) -> {
+ throw new RuntimeException("Error in test",
throwable);
+ });
+
+ CheckpointCoordinator checkpointCoordinator =
+ new MockCheckpointCoordinator(savepointFuture);
+
+ StopWithSavepoint stopWithSavepoint =
+ new StopWithSavepoint(
+ ctx,
+ executionGraph,
+ executionGraphHandler,
+ operatorCoordinatorHandler,
+ log,
+ ClassLoader.getSystemClassLoader(),
+ checkpointCoordinator,
+ "savepointTargetDir",
+ false,
+ operationCompletionFuture);
+ return new StopWithSavepointEnvironment(
+ stopWithSavepoint,
+ executionGraphTerminationFuture,
+ operationCompletionFuture,
+ savepointFuture);
+ }
+
+ private static class MockCheckpointCoordinator extends
CheckpointCoordinator {
+ private static ExecutionVertex vertex = mockExecutionVertex(new
ExecutionAttemptID());
+ private static ExecutionVertex[] defaultVertices = new
ExecutionVertex[] {vertex};
+ private final CompletableFuture<CompletedCheckpoint> savepointFuture;
+
+ MockCheckpointCoordinator(CompletableFuture<CompletedCheckpoint>
savepointFuture) {
+ super(
+ new JobID(),
+ new CheckpointCoordinatorConfiguration
+
.CheckpointCoordinatorConfigurationBuilder()
+ .build(),
+ Collections.emptyList(),
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(1),
+ new MemoryStateBackend(),
+ Executors.directExecutor(),
+ new CheckpointsCleaner(),
+ new ManuallyTriggeredScheduledExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY,
+ new CheckpointFailureManager(0, NoOpFailJobCall.INSTANCE),
+ new CheckpointPlanCalculator(
+ new JobID(),
+ Arrays.asList(defaultVertices),
+ Arrays.asList(defaultVertices),
+ Arrays.asList(defaultVertices)),
+ new
ExecutionAttemptMappingProvider(Arrays.asList(defaultVertices)));
+ this.savepointFuture = savepointFuture;
+ }
+
+ @Override
+ public CompletableFuture<CompletedCheckpoint>
triggerSynchronousSavepoint(
+ boolean advanceToEndOfEventTime, @Nullable String
targetLocation) {
+ return savepointFuture;
+ }
+ }
+
+ private static class MockExecutionGraph extends ExecutionGraph {
+ private final CompletableFuture<JobStatus> mockTerminationFuture;
+
+ private MockExecutionGraph(CompletableFuture<JobStatus>
mockTerminationFuture)
+ throws IOException {
+ super(
+ new JobInformation(
+ new JobID(),
+ "Test Job",
+ new SerializedValue<>(new ExecutionConfig()),
+ new Configuration(),
+ Collections.emptyList(),
+ Collections.emptyList()),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ AkkaUtils.getDefaultTimeout(),
+ 1,
+ ExecutionGraph.class.getClassLoader(),
+ VoidBlobWriter.getInstance(),
+
PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(
+ new Configuration()),
+ NettyShuffleMaster.INSTANCE,
+ NoOpJobMasterPartitionTracker.INSTANCE,
+ ScheduleMode.EAGER,
+ NoOpExecutionDeploymentListener.get(),
+ (execution, newState) -> {},
+ 0L);
+ this.mockTerminationFuture = mockTerminationFuture;
+ }
+
+ @Override
+ public CompletableFuture<JobStatus> getTerminationFuture() {
+ return mockTerminationFuture;
+ }
+ }
Review comment:
Yet another `EG` sub class...
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/scheduling/DeclarativeSchedulerITCase.java
##########
@@ -88,6 +103,155 @@ public void testGlobalFailoverCanRecoverState() throws
Exception {
env.execute();
}
+ private enum StopWithSavepointTestBehavior {
+ NO_FAILURE,
+ FAIL_ON_CHECKPOINT,
+ FAIL_ON_STOP,
+ FAIL_ON_FIRST_CHECKPOINT_ONLY
+ }
+
+ @Test
+ public void testStopWithSavepointNoError() throws Exception {
+ testStopWithSavepoint(StopWithSavepointTestBehavior.NO_FAILURE);
+ }
+
+ /** Expected behavior is that the job fails. */
+ @Test
+ public void testStopWithSavepointFailOnCheckpoint() throws Exception {
+ try {
+
testStopWithSavepoint(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT);
+ fail("Expect exception");
+ } catch (ExecutionException e) {
+ assertThat(e.getMessage(), containsString("CheckpointException"));
Review comment:
`FlinkMatchers.containsMessage`
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/scheduling/DeclarativeSchedulerITCase.java
##########
@@ -88,6 +103,155 @@ public void testGlobalFailoverCanRecoverState() throws
Exception {
env.execute();
}
+ private enum StopWithSavepointTestBehavior {
+ NO_FAILURE,
+ FAIL_ON_CHECKPOINT,
+ FAIL_ON_STOP,
+ FAIL_ON_FIRST_CHECKPOINT_ONLY
+ }
+
+ @Test
+ public void testStopWithSavepointNoError() throws Exception {
+ testStopWithSavepoint(StopWithSavepointTestBehavior.NO_FAILURE);
+ }
+
+ /** Expected behavior is that the job fails. */
+ @Test
+ public void testStopWithSavepointFailOnCheckpoint() throws Exception {
+ try {
+
testStopWithSavepoint(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT);
+ fail("Expect exception");
Review comment:
Shouldn't we also assert that the job is still running?
##########
File path: flink-runtime/src/test/resources/log4j2-test.properties
##########
@@ -18,7 +18,7 @@
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
-rootLogger.level = OFF
+rootLogger.level = INFO
Review comment:
Seems like not intended.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepoint.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** State describing a job that is currently taking a savepoint and stopping
afterwards. */
+class StopWithSavepoint extends ExecutingStateWithFailureHandler {
Review comment:
I think in general one should be very careful when using inheritance. If
the base class is not well designed for inheritance, then inheritance is
usually a good way to get stuck in rabbit holes in the future.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StopWithSavepointTest.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
+import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculator;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.ExecutionAttemptMappingProvider;
+import org.apache.flink.runtime.checkpoint.NoOpFailJobCall;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.NoOpExecutionDeploymentListener;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader;
+import
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+import static
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionVertex;
+import static
org.apache.flink.runtime.scheduler.declarative.WaitingForResourcesTest.assertNonNull;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link StopWithSavepoint} state. */
+public class StopWithSavepointTest extends TestLogger {
+
+ /** Test failure during savepoint creation: go to failing */
+ @Test
+ public void testFailureDuringSavepointCreationWithNoRestart() throws
Exception {
+ MockExecutingStateWithFailureHandlerContext ctx =
+ new MockExecutingStateWithFailureHandlerContext();
+
+ StopWithSavepointEnvironment stopWithSavepointEnv =
createStopWithSavepointEnvironment(ctx);
+ StopWithSavepoint stopWithSavepoint = stopWithSavepointEnv.getState();
+ stopWithSavepoint.onEnter();
+
+
ctx.setHowToHandleFailure(ExecutingStateWithFailureHandler.FailureResult::canNotRestart);
+ // fail future returned by the CheckpointCoordinator
+ stopWithSavepointEnv
+ .getSavepointFuture()
+ .completeExceptionally(new RuntimeException("Savepoint
creation failed"));
+
+
stopWithSavepointEnv.getExecutionGraphTerminationFuture().complete(JobStatus.FAILED);
+
+ ctx.setExpectFailing(assertNonNull());
+
+ ctx.close(); // trigger outstanding executions
+
+ assertThat(
+
stopWithSavepointEnv.getOperationCompletionFuture().isCompletedExceptionally(),
Review comment:
I am not a huge fan of this environment. Looking alone on this test, it
is not clear to me why the env has the operation completion future. If the
operation future belonged to the state, then it would be clearer, for example.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]