[
https://issues.apache.org/jira/browse/FLINK-19557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Till Rohrmann updated FLINK-19557:
----------------------------------
Fix Version/s: 1.10.3
> Issue retrieving leader after zookeeper session reconnect
> ---------------------------------------------------------
>
> Key: FLINK-19557
> URL: https://issues.apache.org/jira/browse/FLINK-19557
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.12.0, 1.11.2
> Reporter: Max Mizikar
> Assignee: Till Rohrmann
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.12.0, 1.10.3, 1.11.3
>
>
> We have noticed an issue with leaders being retrieved after reconnecting to
> zookeeper. The steps to reproduce this issue are to break the connection
> between a job manager that is not the leader and zookeeper. Wait for the
> session to be lost between the two. At this point, flink notifies for a loss
> of leader. After the loss of leader has occured, reconnect the job manager to
> zookeeper. At this point, the leader will still be the same as it was before,
> but when trying to access the rest API, you will see this
> {code}
> $ curl -s localhost:8999/jobs
> {"errors":["Service temporarily unavailable due to an ongoing leader
> election. Please refresh."]}
> {code}
> I have been using `stress -t 60 -m 2048` (which spins up 2048 threads
> continuously alloc and freeing 256MB, to swap out the job manager and cause
> the connection loss.
> I have done some amount of digging on this. The
> ZooKeeperLeaderRetrievalService has this code block for handling state changes
> {code}
> protected void handleStateChange(ConnectionState newState) {
> switch (newState) {
> case CONNECTED:
> LOG.debug("Connected to ZooKeeper quorum.
> Leader retrieval can start.");
> break;
> case SUSPENDED:
> LOG.warn("Connection to ZooKeeper suspended.
> Can no longer retrieve the leader from " +
> "ZooKeeper.");
> synchronized (lock) {
> notifyLeaderLoss();
> }
> break;
> case RECONNECTED:
> LOG.info("Connection to ZooKeeper was
> reconnected. Leader retrieval can be restarted.");
> break;
> case LOST:
> LOG.warn("Connection to ZooKeeper lost. Can no
> longer retrieve the leader from " +
> "ZooKeeper.");
> synchronized (lock) {
> notifyLeaderLoss();
> }
> break;
> }
> }
> {code}
> It calls notifyLeaderLoss() when the connection is lost, but it doesn't do
> anything when the connection is reconnected. It appears that curator's
> NodeCache will retrieve the value of the leader znode after reconnect, but it
> won't notify the listeners if the value is the same as before the connection
> loss. So, unless a leader election happens after a zookeeper connection loss,
> the job managers that are not the leader will never know that there is a
> leader.
> The method that is called for NodeCache when a new value is retrieved
> {code}
> private void setNewData(ChildData newData) throws InterruptedException
> {
> ChildData previousData = data.getAndSet(newData);
> if ( !Objects.equal(previousData, newData) )
> {
> listeners.forEach(listener -> {
> try
> {
> listener.nodeChanged();
> }
> catch ( Exception e )
> {
> ThreadUtils.checkInterrupted(e);
> log.error("Calling listener", e);
> }
> });
> if ( rebuildTestExchanger != null )
> {
> try
> {
> rebuildTestExchanger.exchange(new Object());
> }
> catch ( InterruptedException e )
> {
> Thread.currentThread().interrupt();
> }
> }
> }
> }
> {code}
> note the
> {code}
> if ( !Objects.equal(previousData, newData) )
> {code}
> seems to be preventing the job managers from getting the leader after a
> zookeeper connection loss.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)