[ 
https://issues.apache.org/jira/browse/FLINK-18068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17124685#comment-17124685
 ] 

Till Rohrmann edited comment on FLINK-18068 at 6/3/20, 7:27 AM:
----------------------------------------------------------------

I think the problem is rather in {{AkkaRpcActor}} at various places. There we 
catch {{Throwables}} and only propagate them in case of fatal errors or OOMs. I 
think it would be better to handle them consistently and let framework 
unchecked exceptions to bubble up. One thing which makes it a bit harder is 
that user code {{Throwables}} should not cause the system to fail.

I think the proper solution would be that per default we propagate all thrown 
exceptions. At places where user code exceptions can occur and if they are 
tolerable, the higher level code should catch it and handle it properly.

The problem I see with this approach is that we are using at some places 
exceptions for normal control flow (e.g. when requesting a job which does not 
exist, then we throw a {{FlinkJobNotFoundException}}). I think this is a 
mistake and instead we should return a more meaningful return value instead 
(e.g. {{Optional<JobInformation>}}). But this is a bigger effort to change.


was (Author: till.rohrmann):
I think the problem is rather in {{AkkaRpcActor}} at various places. There we 
catch {{Throwables}} and only propagate them in case of fatal errors or OOMs. I 
think it would be better to handle them consistently and let framework 
unchecked exceptions to bubble up. One thing which makes it a bit harder is 
that user code {{Throwables}} should not cause the system to fail.

I think the proper solution would be that per default we propagate all thrown 
exceptions. At places where user code exceptions can occur and if they are 
tolerable, the higher level code should catch it and handle it properly.

> Job scheduling stops but not exits after throwing non-fatal exception
> ---------------------------------------------------------------------
>
>                 Key: FLINK-18068
>                 URL: https://issues.apache.org/jira/browse/FLINK-18068
>             Project: Flink
>          Issue Type: Improvement
>          Components: Deployment / YARN
>    Affects Versions: 1.10.1
>            Reporter: Jiayi Liao
>            Priority: Major
>
> The batch job will stop but still be alive with doing nothing for a long time 
> (maybe forever?) if any non fatal exception is thrown from interacting with 
> YARN. Here is the example :
> {code:java}
> java.lang.IllegalStateException: The RMClient's and YarnResourceManagers 
> internal state about the number of pending container requests for resource 
> <memory:10240, vCores:1.0> has diverged. Number client's pending container 
> requests 40 != Number RM's pending container requests 0.
>         at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:217) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>         at 
> org.apache.flink.yarn.YarnResourceManager.getPendingRequestsAndCheckConsistency(YarnResourceManager.java:518)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>         at 
> org.apache.flink.yarn.YarnResourceManager.onContainersOfResourceAllocated(YarnResourceManager.java:431)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>         at 
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:395)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>         at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>         at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>         at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>         at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
> [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
> [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>         at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
> [flink-ljy-1.0.jar:?]
>         at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
> [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>         at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
> [flink-ljy-1.0.jar:?]
>         at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> [flink-ljy-1.0.jar:?]
>         at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> [flink-ljy-1.0.jar:?]
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
> [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>         at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
> [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>         at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>         at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>         at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>         at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>         at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>         at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to