tillrohrmann commented on a change in pull request #9364:
[FLINK-13593][checkpointing] Prevent failing the wrong execution attempt in
CheckpointFailureManager
URL: https://github.com/apache/flink/pull/9364#discussion_r312359832
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
##########
@@ -733,6 +733,51 @@ public void testFailureWhileRestarting() throws Exception
{
assertEquals(JobStatus.RUNNING, executionGraph.getState());
}
+ @Test
+ public void testFailingOfTheSameExecutionWhileRestarting() throws
Exception {
+ JobVertex sender =
ExecutionGraphTestUtils.createJobVertex("Task1", 1, NoOpInvokable.class);
+ JobVertex receiver =
ExecutionGraphTestUtils.createJobVertex("Task2", 1, NoOpInvokable.class);
+ JobGraph jobGraph = new JobGraph("Pointwise job", sender,
receiver);
+
+ try (SlotPool slotPool = createSlotPoolImpl()) {
+ ExecutionGraph eg =
TestingExecutionGraphBuilder.newBuilder()
+ .setRestartStrategy(new TestRestartStrategy(1,
false))
+ .setJobGraph(jobGraph)
+ .setNumberOfTasks(2)
+ .buildAndScheduleForExecution(slotPool);
+
+ Iterator<ExecutionVertex> executionVertices =
eg.getAllExecutionVertices().iterator();
+
+ Execution finishedExecution =
executionVertices.next().getCurrentExecutionAttempt();
+ Execution failedExecution =
executionVertices.next().getCurrentExecutionAttempt();
+
+ finishedExecution.markFinished();
+
+ failedExecution.fail(new Exception("Test Exception"));
+ failedExecution.completeCancelling();
+
+ assertEquals(JobStatus.RUNNING, eg.getState());
+
+ // At this point all resources have been assigned
+ for (ExecutionVertex vertex :
eg.getAllExecutionVertices()) {
+ assertNotNull("No assigned resource (test
instability).", vertex.getCurrentAssignedResource());
+
vertex.getCurrentExecutionAttempt().switchToRunning();
+ }
+
+ // fail global with old finished execution, this should
not affect the execution
+ eg.failGlobal(new Exception("This should have no
effect"), finishedExecution.getAttemptId());
+
+ for (ExecutionVertex vertex:
eg.getAllExecutionVertices()) {
+
vertex.getCurrentExecutionAttempt().markFinished();
+ }
Review comment:
I think this is not necessary. We could simply check that the job state is
still `RUNNING`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services