[
https://issues.apache.org/jira/browse/FLINK-13487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16896346#comment-16896346
]
Yun Gao commented on FLINK-13487:
---------------------------------
Very thanks [~SleePy] and [~zjwang] for the help at this issue! A more detailed
cause of this issue is described as follows.
>From the logs recorded in the _error_log_ attached, we can see that:
# The TM registers to JM for two times.
# When the task is submitted, the jobManagerTable does not contains the
corresponding JobId, and thus cause the test failed.
The basic process of the test is:
{code:java}
// Step 1
taskManagerServices.getJobLeaderService().addJob(jobId, jobMasterAddress);
// Step 2
jobManagerLeaderRetriever.notifyListener(jobMasterAddress, UUID.randomUUID());
// Step 3
taskExecutorGateway.requestSlot()
// Step 4
taskExecutorGateway.submitTask(){code} {code}
The two registration is triggered by step 1/2 and step 3 respectively. However,
since the registration is asynchronous, the submit task is executed before the
two registrations get finished. To fix this problem, we need to postpone the
submitting till the slot is offered to JM. In real case this is guaranteed by
JM not start deploying the task before the slot offer succeed.
A more detailed description is as follows for reference:
In step 1, the job id is added to the JobLeaderService of the TM side, and it
triggers the leader retrieve process. Since the leader retrieve service is an
instance of SettableLeaderRetrieveService and by now the address has not been
set, the leader retrieve will not take effects.
In step 2, the SettableLeaderRetrieveService is notified with the actual
JobMaster address and then TM starts to register to JM. Part of the
registration should be done asynchronously by the execution thread pool of the
RpcService. However, in this case the asynchronous part might takes longer time
than usual.
In step 3, the request slot allocates the slot first, then TM found that the
registration has not succeeded. Then is call jobLeaderService#addJob again.
Since now the JM address has been set, it will trigger the second registration.
The registration is also executed asynchronously and the request will return
successfully.
In step 4, the submit started before the registration succeeded. Then the error
occurred.
The the finally part in the test code executes and shutdown the TM, the the
tests end and the original submit failing exception was thrown. This is the
reason that the exception is printed after the TM stopped. At this time, the
Akka thread of TM continued to run since the exception in the akka thread is
caught, and one of the registration finally succeed and it triggers the slot
offering again. This is why we can also see the offering slot event in the
error log.
The above exception could reoccur stably if we add a sleep in the
RetryingRegistration#register, before the call of
"completionFuture.complete(Tuple2.of(gateway, success))" in the asynchronous
executor thread. We can also see the two jobLeaderService#addJob calls in this
case. If we also add sleep to the finally part of the test method, we can also
see the offer slot event after the submitting failure.
> TaskExecutorPartitionLifecycleTest.testPartitionReleaseAfterReleaseCall
> failed on Travis
> ----------------------------------------------------------------------------------------
>
> Key: FLINK-13487
> URL: https://issues.apache.org/jira/browse/FLINK-13487
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task, Tests
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Yun Gao
> Priority: Blocker
> Fix For: 1.9.0
>
> Attachments: error_log.png
>
>
> https://api.travis-ci.org/v3/job/564925114/log.txt
> {code}
> 21:14:47.090 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time
> elapsed: 5.754 s <<< FAILURE! - in
> org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest
> 21:14:47.090 [ERROR]
> testPartitionReleaseAfterReleaseCall(org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest)
> Time elapsed: 0.136 s <<< ERROR!
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException:
> Could not submit task because there is no JobManager associated for the job
> 2a0ab40cb53241799b71ff6fd2f53d3d.
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest.testPartitionRelease(TaskExecutorPartitionLifecycleTest.java:331)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest.testPartitionReleaseAfterReleaseCall(TaskExecutorPartitionLifecycleTest.java:201)
> Caused by:
> org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException:
> Could not submit task because there is no JobManager associated for the job
> 2a0ab40cb53241799b71ff6fd2f53d3d.
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)