[ 
https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17330890#comment-17330890
 ] 

Chen Qin edited comment on FLINK-10052 at 4/23/21, 4:13 PM:
------------------------------------------------------------

here is another exception we observed in another job, may or may not caused by 
this pr.

{code:java}
2021-04-23 11:09:03,388 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Closing TaskExecutor connection 
container_e26_1617655625710_8692_01_000115 because: ResourceManager leader 
changed to new address null
2021-04-23 11:09:03,391 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
USER_AGGREGATE_STATE.user_signal_v2.SINK-async (200/360) 
(bf815073df08c3426bf41b63d74510fb) switched from RUNNING to FAILED on 
container_e26_1617655625710_8692_01_000115 @ xxxx.ec2.pin220.com 
(dataPort=46719).
org.apache.flink.util.FlinkException: ResourceManager leader changed to new 
address null
        at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093)
        at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173)
        at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:539)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
        at akka.actor.ActorCell.invoke(ActorCell.scala:581)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
        at akka.dispatch.Mailbox.run(Mailbox.scala:229)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}



was (Author: foxss):
here is another exception we observed in another job after apply this pr

{code:java}
2021-04-23 11:09:03,388 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Closing TaskExecutor connection 
container_e26_1617655625710_8692_01_000115 because: ResourceManager leader 
changed to new address null
2021-04-23 11:09:03,391 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
USER_AGGREGATE_STATE.user_signal_v2.SINK-async (200/360) 
(bf815073df08c3426bf41b63d74510fb) switched from RUNNING to FAILED on 
container_e26_1617655625710_8692_01_000115 @ xxxx.ec2.pin220.com 
(dataPort=46719).
org.apache.flink.util.FlinkException: ResourceManager leader changed to new 
address null
        at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093)
        at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173)
        at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:539)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
        at akka.actor.ActorCell.invoke(ActorCell.scala:581)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
        at akka.dispatch.Mailbox.run(Mailbox.scala:229)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}


> Tolerate temporarily suspended ZooKeeper connections
> ----------------------------------------------------
>
>                 Key: FLINK-10052
>                 URL: https://issues.apache.org/jira/browse/FLINK-10052
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>    Affects Versions: 1.4.2, 1.5.2, 1.6.0, 1.8.1
>            Reporter: Till Rohrmann
>            Assignee: Zili Chen
>            Priority: Major
>              Labels: pull-request-available, stale-assigned
>             Fix For: 1.13.0
>
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> This issue results from FLINK-10011 which uncovered a problem with Flink's HA 
> recovery and proposed the following solution to harden Flink:
> The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator 
> recipe for leader election. The leader latch revokes leadership in case of a 
> suspended ZooKeeper connection. This can be premature in case that the system 
> can reconnect to ZooKeeper before its session expires. The effect of the lost 
> leadership is that all jobs will be canceled and directly restarted after 
> regaining the leadership.
> Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper 
> connection, it would be better to wait until the ZooKeeper connection is 
> LOST. That way we would allow the system to reconnect and not lose the 
> leadership. This could be achievable by using Curator's {{LeaderSelector}} 
> instead of the {{LeaderLatch}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to