Thanks for sharing, Till and Yang.

@Lu
Sorry but I don't know how to explain the new test with the log. Let's wait
for others' reply.

@Till
It would be nice if JIRAs could be fixed. Thanks again for proposing them.

In addition, I was tracking an issue that RM keeps allocating and freeing
slots after a TM lost until its heartbeat timeout, when I found the
recovery costing as long as heartbeat timeout. That should be a minor bug
introduced by declarative resource management. I have created a JIRA about
the problem [1] and  we can discuss it there if necessary.

[1] https://issues.apache.org/jira/browse/FLINK-23216

Lu Niu <qqib...@gmail.com> 于2021年7月2日周五 上午3:13写道:

> Another side question, Shall we add metric to cover the complete
> restarting time (phase 1 + phase 2)? Current metric jm.restartingTime only
> covers phase 1. Thanks!
>
> Best
> Lu
>
> On Thu, Jul 1, 2021 at 12:09 PM Lu Niu <qqib...@gmail.com> wrote:
>
>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>
>> I did another test yesterday. In this test, I intentionally throw
>> exception from the source operator:
>> ```
>> if (runtimeContext.getIndexOfThisSubtask() == 1
>>         && errorFrenquecyInMin > 0
>>         && System.currentTimeMillis() - lastStartTime >=
>> errorFrenquecyInMin * 60 * 1000) {
>>       lastStartTime = System.currentTimeMillis();
>>       throw new RuntimeException(
>>           "Trigger expected exception at: " + lastStartTime);
>>     }
>> ```
>> In this case, I found phase 1 still takes about 30s and Phase 2 dropped
>> to 1s (because no need for container allocation). Why phase 1 still takes
>> 30s even though no TM is lost?
>>
>> Related logs:
>> ```
>> 2021-06-30 00:55:07,463 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ...
>> java.lang.RuntimeException: Trigger expected exception at: 1625014507446
>> 2021-06-30 00:55:07,509 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>> (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to
>> RESTARTING.
>> 2021-06-30 00:55:37,596 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>> (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to
>> RUNNING.
>> 2021-06-30 00:55:38,678 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        (time when
>> all tasks switch from CREATED to RUNNING)
>> ```
>> Best
>> Lu
>>
>>
>> On Thu, Jul 1, 2021 at 12:06 PM Lu Niu <qqib...@gmail.com> wrote:
>>
>>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>>
>>> I did another test yesterday. In this test, I intentionally throw
>>> exception from the source operator:
>>> ```
>>> if (runtimeContext.getIndexOfThisSubtask() == 1
>>>         && errorFrenquecyInMin > 0
>>>         && System.currentTimeMillis() - lastStartTime >=
>>> errorFrenquecyInMin * 60 * 1000) {
>>>       lastStartTime = System.currentTimeMillis();
>>>       throw new RuntimeException(
>>>           "Trigger expected exception at: " + lastStartTime);
>>>     }
>>> ```
>>> In this case, I found phase 1 still takes about 30s and Phase 2 dropped
>>> to 1s (because no need for container allocation).
>>>
>>> Some logs:
>>> ```
>>> ```
>>>
>>>
>>> On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> A quick addition, I think with FLINK-23202 it should now also be
>>>> possible to improve the heartbeat mechanism in the general case. We can
>>>> leverage the unreachability exception thrown if a remote target is no
>>>> longer reachable to mark an heartbeat target as no longer reachable [1].
>>>> This can then be considered as if the heartbeat timeout has been triggered.
>>>> That way we should detect lost TaskExecutors as fast as our heartbeat
>>>> interval is.
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-23209
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang <danrtsey...@gmail.com> wrote:
>>>>
>>>>> Since you are deploying Flink workloads on Yarn, the Flink
>>>>> ResourceManager should get the container
>>>>> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM,
>>>>> which is 8 seconds by default.
>>>>> And Flink ResourceManager will release the dead TaskManager container
>>>>> once received the completion event.
>>>>> As a result, Flink will not deploy tasks onto the dead TaskManagers.
>>>>>
>>>>>
>>>>> I think most of the time cost in Phase 1 might be cancelling the tasks
>>>>> on the dead TaskManagers.
>>>>>
>>>>>
>>>>> Best,
>>>>> Yang
>>>>>
>>>>>
>>>>> Till Rohrmann <trohrm...@apache.org> 于2021年7月1日周四 下午4:49写道:
>>>>>
>>>>>> The analysis of Gen is correct. Flink currently uses its heartbeat as
>>>>>> the primary means to detect dead TaskManagers. This means that Flink will
>>>>>> take at least `heartbeat.timeout` time before the system recovers. Even 
>>>>>> if
>>>>>> the cancellation happens fast (e.g. by having configured a low
>>>>>> akka.ask.timeout), then Flink will still try to deploy tasks onto the 
>>>>>> dead
>>>>>> TaskManager until it is marked as dead and its slots are released (unless
>>>>>> the ResourceManager does not get a signal from the underlying resource
>>>>>> management system that a container/pod has died). One way to improve the
>>>>>> situation is to introduce logic which can react to a ConnectionException
>>>>>> and then black lists or releases a TaskManager, for example. This is
>>>>>> currently not implemented in Flink, though.
>>>>>>
>>>>>> Concerning the cancellation operation: Flink currently does not
>>>>>> listen to the dead letters of Akka. This means that the 
>>>>>> `akka.ask.timeout`
>>>>>> is the primary means to fail the future result of a rpc which could not 
>>>>>> be
>>>>>> sent. This is also an improvement we should add to Flink's RpcService. 
>>>>>> I've
>>>>>> created a JIRA issue for this problem [1].
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23202
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu <qqib...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks Gen! cc flink-dev to collect more inputs.
>>>>>>>
>>>>>>> Best
>>>>>>> Lu
>>>>>>>
>>>>>>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo <luogen...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I'm also wondering here.
>>>>>>>>
>>>>>>>> In my opinion, it's because the JM can not confirm whether the TM
>>>>>>>> is lost or it's a temporary network trouble and will recover soon, 
>>>>>>>> since
>>>>>>>> I can see in the log that akka has got a Connection refused but JM 
>>>>>>>> still
>>>>>>>> sends a heartbeat request to the lost TM until it reaches heartbeat
>>>>>>>> timeout. But I'm not sure if it's indeed designed like this.
>>>>>>>>
>>>>>>>> I would really appreciate it if anyone who knows more details could
>>>>>>>> answer. Thanks.
>>>>>>>>
>>>>>>>

Reply via email to