tillrohrmann commented on a change in pull request #9364: 
[FLINK-13593][checkpointing] Prevent failing the wrong execution attempt in 
CheckpointFailureManager
URL: https://github.com/apache/flink/pull/9364#discussion_r312359832
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 ##########
 @@ -733,6 +733,51 @@ public void testFailureWhileRestarting() throws Exception 
{
                assertEquals(JobStatus.RUNNING, executionGraph.getState());
        }
 
+       @Test
+       public void testFailingOfTheSameExecutionWhileRestarting() throws 
Exception {
+               JobVertex sender = 
ExecutionGraphTestUtils.createJobVertex("Task1", 1, NoOpInvokable.class);
+               JobVertex receiver = 
ExecutionGraphTestUtils.createJobVertex("Task2", 1, NoOpInvokable.class);
+               JobGraph jobGraph = new JobGraph("Pointwise job", sender, 
receiver);
+
+               try (SlotPool slotPool = createSlotPoolImpl()) {
+                       ExecutionGraph eg = 
TestingExecutionGraphBuilder.newBuilder()
+                               .setRestartStrategy(new TestRestartStrategy(1, 
false))
+                               .setJobGraph(jobGraph)
+                               .setNumberOfTasks(2)
+                               .buildAndScheduleForExecution(slotPool);
+
+                       Iterator<ExecutionVertex> executionVertices = 
eg.getAllExecutionVertices().iterator();
+
+                       Execution finishedExecution = 
executionVertices.next().getCurrentExecutionAttempt();
+                       Execution failedExecution = 
executionVertices.next().getCurrentExecutionAttempt();
+
+                       finishedExecution.markFinished();
+
+                       failedExecution.fail(new Exception("Test Exception"));
+                       failedExecution.completeCancelling();
+
+                       assertEquals(JobStatus.RUNNING, eg.getState());
+
+                       // At this point all resources have been assigned
+                       for (ExecutionVertex vertex : 
eg.getAllExecutionVertices()) {
+                               assertNotNull("No assigned resource (test 
instability).", vertex.getCurrentAssignedResource());
+                               
vertex.getCurrentExecutionAttempt().switchToRunning();
+                       }
+
+                       // fail global with old finished execution, this should 
not affect the execution
+                       eg.failGlobal(new Exception("This should have no 
effect"), finishedExecution.getAttemptId());
+
+                       for (ExecutionVertex vertex: 
eg.getAllExecutionVertices()) {
+                               
vertex.getCurrentExecutionAttempt().markFinished();
+                       }
 
 Review comment:
   I think this is not necessary. We could simply check that the job state is 
still `RUNNING`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to