[ 
https://issues.apache.org/jira/browse/FLINK-19557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-19557:
----------------------------------
    Fix Version/s: 1.10.3

> Issue retrieving leader after zookeeper session reconnect
> ---------------------------------------------------------
>
>                 Key: FLINK-19557
>                 URL: https://issues.apache.org/jira/browse/FLINK-19557
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.12.0, 1.11.2
>            Reporter: Max Mizikar
>            Assignee: Till Rohrmann
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.12.0, 1.10.3, 1.11.3
>
>
> We have noticed an issue with leaders being retrieved after reconnecting to 
> zookeeper. The steps to reproduce this issue are to break the connection 
> between a job manager that is not the leader and zookeeper. Wait for the 
> session to be lost between the two. At this point, flink notifies for a loss 
> of leader. After the loss of leader has occured, reconnect the job manager to 
> zookeeper. At this point, the leader will still be the same as it was before, 
> but when trying to access the rest API, you will see this
> {code}
> $ curl -s localhost:8999/jobs
> {"errors":["Service temporarily unavailable due to an ongoing leader 
> election. Please refresh."]}
> {code}
> I have been using `stress -t 60 -m 2048` (which spins up 2048 threads 
> continuously alloc and freeing 256MB, to swap out the job manager and cause 
> the connection loss.
> I have done some amount of digging on this. The 
> ZooKeeperLeaderRetrievalService has this code block for handling state changes
> {code}
>       protected void handleStateChange(ConnectionState newState) {
>               switch (newState) {
>                       case CONNECTED:
>                               LOG.debug("Connected to ZooKeeper quorum. 
> Leader retrieval can start.");
>                               break;
>                       case SUSPENDED:
>                               LOG.warn("Connection to ZooKeeper suspended. 
> Can no longer retrieve the leader from " +
>                                               "ZooKeeper.");
>                               synchronized (lock) {
>                                       notifyLeaderLoss();
>                               }
>                               break;
>                       case RECONNECTED:
>                               LOG.info("Connection to ZooKeeper was 
> reconnected. Leader retrieval can be restarted.");
>                               break;
>                       case LOST:
>                               LOG.warn("Connection to ZooKeeper lost. Can no 
> longer retrieve the leader from " +
>                                               "ZooKeeper.");
>                               synchronized (lock) {
>                                       notifyLeaderLoss();
>                               }
>                               break;
>               }
>       }
> {code}
> It calls notifyLeaderLoss() when the connection is lost, but it doesn't do 
> anything when the connection is reconnected. It appears that curator's 
> NodeCache will retrieve the value of the leader znode after reconnect, but it 
> won't notify the listeners if the value is the same as before the connection 
> loss. So, unless a leader election happens after a zookeeper connection loss, 
> the job managers that are not the leader will never know that there is a 
> leader.
> The method that is called for NodeCache when a new value is retrieved
> {code}
>     private void setNewData(ChildData newData) throws InterruptedException
>     {
>         ChildData   previousData = data.getAndSet(newData);
>         if ( !Objects.equal(previousData, newData) )
>         {
>             listeners.forEach(listener -> {
>                 try
>                 {
>                     listener.nodeChanged();
>                 }
>                 catch ( Exception e )
>                 {
>                     ThreadUtils.checkInterrupted(e);
>                     log.error("Calling listener", e);
>                 }
>             });
>             if ( rebuildTestExchanger != null )
>             {
>                 try
>                 {
>                     rebuildTestExchanger.exchange(new Object());
>                 }
>                 catch ( InterruptedException e )
>                 {
>                     Thread.currentThread().interrupt();
>                 }
>             }
>         }
>     }
> {code}
> note the
> {code}
>         if ( !Objects.equal(previousData, newData) )
> {code}
> seems to be preventing the job managers from getting the leader after a 
> zookeeper connection loss.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to