tillrohrmann commented on a change in pull request #15884:
URL: https://github.com/apache/flink/pull/15884#discussion_r630179531
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
##########
@@ -101,6 +101,8 @@ private void handleSavepointCompletion(
completeOperationAndGoToFinished(savepoint);
} else {
if (throwable != null) {
+ // creating the savepoint has failed but job is still running
+ Preconditions.checkState(getExecutionGraph().getState() ==
JobStatus.RUNNING);
Review comment:
Why did you introduce this `checkState`? Wouldn't this be caught by the
`Executing` state?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
##########
@@ -119,6 +124,26 @@ public void handleGlobalFailure(Throwable cause) {
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED,
cause));
}
+ private void deployExecutionGraph(ExecutionGraph executionGraph) {
+ for (ExecutionJobVertex executionJobVertex :
executionGraph.getVerticesTopologically()) {
+ for (ExecutionVertex executionVertex :
executionJobVertex.getTaskVertices()) {
+ deploySafely(executionVertex);
+ }
+ }
+ }
+
+ private void deploySafely(ExecutionVertex executionVertex) {
+ try {
+ executionVertex.deploy();
+ } catch (JobException e) {
+ handleDeploymentFailure(executionVertex, e);
+ }
+ }
+
+ private void handleDeploymentFailure(ExecutionVertex executionVertex,
JobException e) {
+ executionVertex.markFailed(e);
+ }
Review comment:
`ExecutionVertex.markFailed` will trigger an `updateTaskExecutionState`
which will only be processed by an `StateWithExecutionGraph`. Hence, I think
this will now simply be ignored.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
##########
@@ -146,6 +150,8 @@ public void
testSuccessfulSlotAssignmentTransitionsToExecuting() throws Exceptio
executionGraphWithvertexParallelismFuture.complete(
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
executionGraph, new TestingVertexParallelism()));
+
+ assertThat(mockExecutionJobVertex.isExecutionDeployed(), is(true));
Review comment:
There is no test which ensures the proper behavior if deploying fails in
state `CreatingExecutionGraph`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]