[ 
https://issues.apache.org/jira/browse/FLINK-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15883315#comment-15883315
 ] 

ASF GitHub Bot commented on FLINK-5703:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3340#discussion_r102995512
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
 ---
    @@ -440,6 +440,72 @@ public void testSendCancelAndReceiveFail() {
                }
        }
     
    +   /**
    +    * For job manager failure recovery case, the execution may still in 
reconciling state but already recovered
    +    * basic information including slot, when process the failed execution, 
it will trigger to cancel all the current
    +    * executions. It is necessary to send cancel rpc to reconciling state 
execution with slot because the task manger
    +    * already reports its status for recovery.
    +    */
    +   @Test
    +   public void testCancelFromReconcilingWithSlot() {
    +           try {
    +                   final JobVertexID jid = new JobVertexID();
    +                   final ExecutionJobVertex ejv = getExecutionVertex(jid, 
new DirectScheduledExecutorService());
    +                   final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
    +                   final ActorGateway actorGateway = new 
CancelSequenceActorGateway(TestingUtils.directExecutionContext(), 1);
    +
    +                   final Instance instance = getInstance(new 
ActorTaskManagerGateway(actorGateway));
    +                   final SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
    +
    +                   setVertexState(vertex, ExecutionState.RECONCILING);
    +                   setVertexResource(vertex, slot);
    +
    +                   assertEquals(ExecutionState.RECONCILING, 
vertex.getExecutionState());
    +
    +                   vertex.cancel();
    +                   
vertex.getCurrentExecutionAttempt().cancelingComplete(); // response by task 
manager once actually canceled
    +
    +                   assertEquals(ExecutionState.CANCELED, 
vertex.getExecutionState());
    +                   assertTrue(slot.isReleased());
    +                   assertNull(vertex.getFailureCause());
    +                   
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
    +                   
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
    +                   
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
    +           } catch (Exception e) {
    +                   e.printStackTrace();
    +                   fail(e.getMessage());
    +           }
    +   }
    +
    +   /**
    +    * For job manager failure recovery case, the execution may still in 
reconciling state because the task manager
    +    *  does not report its status within duration time. It is no need to 
send cancel rpc for such execution with no real
    +    *  attempt id and slot. And it can be transition to canceled state 
directly, the same with the cases of scheduled or created.
    +    */
    +   @Test
    +   public void testCancelFromReconcilingNoSlot() {
    +           try {
    --- End diff --
    
    We are trying to avoid this pattern now (we used it in the earlier days).
    It is better for logging and debugging to simply declare `throws Exception` 
on the test method.


> ExecutionGraph recovery based on reconciliation with TaskManager reports
> ------------------------------------------------------------------------
>
>                 Key: FLINK-5703
>                 URL: https://issues.apache.org/jira/browse/FLINK-5703
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination, JobManager
>            Reporter: zhijiang
>            Assignee: zhijiang
>
> The ExecutionGraph structure would be recovered from TaskManager reports 
> during reconciling period, and the necessary information includes:
>     - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, 
> ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer 
> Execution)
>     - ExecutionVertex: Map<IntermediateResultPartitionID, 
> IntermediateResultPartition>
>     - ExecutionGraph: ConcurrentHashMap<ExecutionAttemptID, Execution>
> For {{RECONCILING}} ExecutionState, it should be transition into any existing 
> task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the 
> TaskManger should maintain the terminal task state 
> ({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this 
> mechanism in another jira. In addition, the state transition would trigger 
> different actions, and some actions rely on above necessary information. 
> Considering this limit, the recovery process will be divided into two steps:
>     - First, recovery all other necessary information except ExecutionState.
>     - Second, transition ExecutionState into real task state and trigger 
> actions. The behavior is the same with current {{UpdateTaskExecutorState}}.
> To make logic easy and consistency, during recovery period, all the other RPC 
> messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) 
> from TaskManager should be refused temporarily and responded with a special 
> message by JobMaster. Then the TaskManager should retry to send these 
> messages later until JobManager ends recovery and acknowledgement.
> For {{RECONCILING}} JobStatus, it would be transition into one of the states 
> ({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery.
>     - {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within 
> duration time and all the tasks are in {{RUNNING}} states.
>     - {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report 
> in time, or one of the tasks state is in {{FAILED}} or {{CANCELED}}
>     - {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within 
> duration time and all the tasks are in {{FINISHED}} states.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to