[
https://issues.apache.org/jira/browse/FLINK-12887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16883233#comment-16883233
]
Xiaogang Shi commented on FLINK-12887:
--------------------------------------
As described in my first comment, we are using
{{scheduleRunAsyncWithoutFencing}} to kill leaked containers. When Yarn RM
restarts, it will take over containers from previous attempts, some of which
may be in stuck. Those containers must be killed to release resources. In our
private version, we schedule a delayed operation to check whether TMs in these
containers register themselves in RM in time. If a container's TM does not
register itself in time, the container will be killed. Because RM may have
granted its leadership when it restarts, the check operation must be scheduled
without fencing.
For the scenario described, it's true that it could not happen in current
implementation because we now enforce the method {{setFencingToken}} to be
called in the main thread. I'm sorry for my confusing description. My point
here is that by simply scheduling message with Akka dispatcher, we can omit the
inconvenient brought by unnecessary enveloping.
I think {{AkkaInvocationHandler}} knows the underlying {{AkkaRpcActor}} as it
is referenced by {{rpcEndpoint}}. I guess the problem actually is due to the
unknown {{ActorSystem}}. If so, we can add the actor's {{ActorSystem}} in the
constructor of {{AkkaInvocationHandler}}, and use the {{ActorSystem}}'s
dispatcher to schedule delayed {{RunAsync}} messages. What do you think?
[~till.rohrmann]
> Schedule UnfencedMessage would lost envelope info
> --------------------------------------------------
>
> Key: FLINK-12887
> URL: https://issues.apache.org/jira/browse/FLINK-12887
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.9.0
> Reporter: TisonKun
> Priority: Major
>
> We provide {{runAsync}}, {{callAsync}} and {{scheduleRunAsync}} for
> {{MainThreadExecutable}}, while providing {{runAsyncWithoutFencing}} and
> {{callAsyncWithoutFencing}} additionally for {{FencedMainThreadExecutable}}.
> Let's think about a case when we want to schedule a unfenced runnable or any
> other unfenced message(currently, we don't have such code path but it's
> semantically valid.).
> 1. {{FencedAkkaRpcActor}} received an unfenced runnable with delay
> 2. It extracted the runnable from unfenced message and call
> {{super.handleRpcMessage}}.
> 3. {{AkkaRpcActor}} enveloped the message and schedule it by
> {{AkkaRpcActor#L410}}.
> However, {{FencedAkkaRpcActor#envelopeSelfMessage}} was called for envelope.
> Thus the unfenced message now become a fenced message.
> We can anyway implement {{scheduleRunAsyncWithoutFencing}} to schedule
> unfenced message directly by {{actorsystem.scheduler.scheduleOnce(...,
> dispatcher)}}, but with current codebase I notice that {{RunAsync}} has a
> wried {{atTimeNanos}}(i.e., delay) property. Ideally how to schedule a
> message is shown on what params ScheduleExecutorService called with, at least
> we cannot extract an unfenced message and envelop it into a fence message and
> then schedule it, which goes into wrong semantic.
> cc [~till.rohrmann]
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)