[
https://issues.apache.org/jira/browse/FLINK-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15883315#comment-15883315
]
ASF GitHub Bot commented on FLINK-5703:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/3340#discussion_r102995512
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
---
@@ -440,6 +440,72 @@ public void testSendCancelAndReceiveFail() {
}
}
+ /**
+ * For job manager failure recovery case, the execution may still in
reconciling state but already recovered
+ * basic information including slot, when process the failed execution,
it will trigger to cancel all the current
+ * executions. It is necessary to send cancel rpc to reconciling state
execution with slot because the task manger
+ * already reports its status for recovery.
+ */
+ @Test
+ public void testCancelFromReconcilingWithSlot() {
+ try {
+ final JobVertexID jid = new JobVertexID();
+ final ExecutionJobVertex ejv = getExecutionVertex(jid,
new DirectScheduledExecutorService());
+ final ExecutionVertex vertex = new ExecutionVertex(ejv,
0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
+ final ActorGateway actorGateway = new
CancelSequenceActorGateway(TestingUtils.directExecutionContext(), 1);
+
+ final Instance instance = getInstance(new
ActorTaskManagerGateway(actorGateway));
+ final SimpleSlot slot = instance.allocateSimpleSlot(new
JobID());
+
+ setVertexState(vertex, ExecutionState.RECONCILING);
+ setVertexResource(vertex, slot);
+
+ assertEquals(ExecutionState.RECONCILING,
vertex.getExecutionState());
+
+ vertex.cancel();
+
vertex.getCurrentExecutionAttempt().cancelingComplete(); // response by task
manager once actually canceled
+
+ assertEquals(ExecutionState.CANCELED,
vertex.getExecutionState());
+ assertTrue(slot.isReleased());
+ assertNull(vertex.getFailureCause());
+
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * For job manager failure recovery case, the execution may still in
reconciling state because the task manager
+ * does not report its status within duration time. It is no need to
send cancel rpc for such execution with no real
+ * attempt id and slot. And it can be transition to canceled state
directly, the same with the cases of scheduled or created.
+ */
+ @Test
+ public void testCancelFromReconcilingNoSlot() {
+ try {
--- End diff --
We are trying to avoid this pattern now (we used it in the earlier days).
It is better for logging and debugging to simply declare `throws Exception`
on the test method.
> ExecutionGraph recovery based on reconciliation with TaskManager reports
> ------------------------------------------------------------------------
>
> Key: FLINK-5703
> URL: https://issues.apache.org/jira/browse/FLINK-5703
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Coordination, JobManager
> Reporter: zhijiang
> Assignee: zhijiang
>
> The ExecutionGraph structure would be recovered from TaskManager reports
> during reconciling period, and the necessary information includes:
> - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp,
> ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer
> Execution)
> - ExecutionVertex: Map<IntermediateResultPartitionID,
> IntermediateResultPartition>
> - ExecutionGraph: ConcurrentHashMap<ExecutionAttemptID, Execution>
> For {{RECONCILING}} ExecutionState, it should be transition into any existing
> task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the
> TaskManger should maintain the terminal task state
> ({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this
> mechanism in another jira. In addition, the state transition would trigger
> different actions, and some actions rely on above necessary information.
> Considering this limit, the recovery process will be divided into two steps:
> - First, recovery all other necessary information except ExecutionState.
> - Second, transition ExecutionState into real task state and trigger
> actions. The behavior is the same with current {{UpdateTaskExecutorState}}.
> To make logic easy and consistency, during recovery period, all the other RPC
> messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc)
> from TaskManager should be refused temporarily and responded with a special
> message by JobMaster. Then the TaskManager should retry to send these
> messages later until JobManager ends recovery and acknowledgement.
> For {{RECONCILING}} JobStatus, it would be transition into one of the states
> ({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery.
> - {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within
> duration time and all the tasks are in {{RUNNING}} states.
> - {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report
> in time, or one of the tasks state is in {{FAILED}} or {{CANCELED}}
> - {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within
> duration time and all the tasks are in {{FINISHED}} states.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)