Repository: zookeeper Updated Branches: refs/heads/branch-3.5 11e576870 -> ac864f53b
ZOOKEEPER-2080: Fix deadlock in dynamic reconfiguration Use explicit fine grained locks for synchronizing access to QuorumVerifier states in QuorumPeer. Author: Michael Han <[email protected]> Reviewers: Alexander Shraer <[email protected]>, Edward Ribeiro <[email protected]>, Flavio Junqueira <[email protected]> Closes #92 from hanm/ZOOKEEPER-2080 and squashes the following commits: 25f0caf [Michael Han] Further simplify code - suggested by Alex. 08301f4 [Michael Han] Remove QuorumVerifier from connectOne signatures. Get last seen quorum verifier inside connectOne, when needed, instead. de367bf [Michael Han] Address review comments from Alex and Edward. 1. Use Object instead of byte[0] for lock. Naming and modifier updates as well. 2. Synchronize on the lock object in connectOne, also simplified code a little bit. 8995dd4 [Michael Han] ZOOKEEPER-2080: Fix deadlock in dynamic reconfiguration. Use explicit fine grained locks for synchronizing access to QuorumVerifier states in QuorumPeer. (cherry picked from commit 434abbbb7eef271fab02306bcc9c8ad29ec2fe2e) Signed-off-by: Rakesh Radhakrishnan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/ac864f53 Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/ac864f53 Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/ac864f53 Branch: refs/heads/branch-3.5 Commit: ac864f53bf7d1f3c8630348ed4cdeb2539ec3932 Parents: 11e5768 Author: Michael Han <[email protected]> Authored: Thu Feb 9 16:10:23 2017 +0530 Committer: Rakesh Radhakrishnan <[email protected]> Committed: Thu Feb 9 16:10:50 2017 +0530 ---------------------------------------------------------------------- .../server/quorum/QuorumCnxManager.java | 40 ++-- .../zookeeper/server/quorum/QuorumPeer.java | 186 +++++++++++-------- 2 files changed, 128 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/ac864f53/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java index 58c159b..57cf8d9 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java @@ -30,6 +30,7 @@ import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.channels.UnresolvedAddressException; import java.util.Enumeration; +import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -37,6 +38,7 @@ import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.zookeeper.server.ZooKeeperThread; +import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -306,7 +308,7 @@ public class QuorumCnxManager { * connection if it wins. Notice that it checks whether it has a connection * to this server already or not. If it does, then it sends the smallest * possible long value to lose the challenge. - * + * */ public void receiveConnection(Socket sock) { Long sid = null, protocolVersion = null; @@ -466,29 +468,31 @@ public class QuorumCnxManager { * * @param sid server id */ - synchronized void connectOne(long sid){ if (senderWorkerMap.get(sid) != null) { - LOG.debug("There is a connection already for server " + sid); - return; + LOG.debug("There is a connection already for server " + sid); + return; } - synchronized(self) { - boolean knownId = false; + synchronized (self.QV_LOCK) { + boolean knownId = false; // Resolve hostname for the remote server before attempting to // connect in case the underlying ip address has changed. self.recreateSocketAddresses(sid); - if (self.getView().containsKey(sid)) { - knownId = true; - if (connectOne(sid, self.getView().get(sid).electionAddr)) - return; - } - if (self.getLastSeenQuorumVerifier()!=null && self.getLastSeenQuorumVerifier().getAllMembers().containsKey(sid) - && (!knownId || (self.getLastSeenQuorumVerifier().getAllMembers().get(sid).electionAddr != - self.getView().get(sid).electionAddr))) { - knownId = true; - if (connectOne(sid, self.getLastSeenQuorumVerifier().getAllMembers().get(sid).electionAddr)) - return; - } + Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView(); + QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier(); + Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers(); + if (lastCommittedView.containsKey(sid)) { + knownId = true; + if (connectOne(sid, lastCommittedView.get(sid).electionAddr)) + return; + } + if (lastSeenQV != null && lastProposedView.containsKey(sid) + && (!knownId || (lastProposedView.get(sid).electionAddr != + lastCommittedView.get(sid).electionAddr))) { + knownId = true; + if (connectOne(sid, lastProposedView.get(sid).electionAddr)) + return; + } if (!knownId) { LOG.warn("Invalid server id: " + sid); return; http://git-wip-us.apache.org/repos/asf/zookeeper/blob/ac864f53/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java index 0274c9e..38b0299 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -434,6 +434,10 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider //last proposed quorum verifier public QuorumVerifier lastSeenQuorumVerifier = null; + // Lock object that guard access to quorumVerifier and lastSeenQuorumVerifier. + final Object QV_LOCK = new Object(); + + /** * My id */ @@ -665,28 +669,40 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider } } - public synchronized InetSocketAddress getQuorumAddress(){ - return myQuorumAddr; + public InetSocketAddress getQuorumAddress(){ + synchronized (QV_LOCK) { + return myQuorumAddr; + } } - public synchronized void setQuorumAddress(InetSocketAddress addr){ - myQuorumAddr = addr; + public void setQuorumAddress(InetSocketAddress addr){ + synchronized (QV_LOCK) { + myQuorumAddr = addr; + } } - + public InetSocketAddress getElectionAddress(){ - return myElectionAddr; + synchronized (QV_LOCK) { + return myElectionAddr; + } } public void setElectionAddress(InetSocketAddress addr){ - myElectionAddr = addr; + synchronized (QV_LOCK) { + myElectionAddr = addr; + } } public InetSocketAddress getClientAddress(){ - return myClientAddr; + synchronized (QV_LOCK) { + return myClientAddr; + } } public void setClientAddress(InetSocketAddress addr){ - myClientAddr = addr; + synchronized (QV_LOCK) { + myClientAddr = addr; + } } private int electionType; @@ -1396,25 +1412,32 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider } /** - * Return QuorumVerifier object for the last committed configuration + * Return QuorumVerifier object for the last committed configuration. */ - - public synchronized QuorumVerifier getQuorumVerifier(){ - return quorumVerifier; - + public QuorumVerifier getQuorumVerifier(){ + synchronized (QV_LOCK) { + return quorumVerifier; + } } - public synchronized QuorumVerifier getLastSeenQuorumVerifier(){ - return lastSeenQuorumVerifier; + /** + * Return QuorumVerifier object for the last proposed configuration. + */ + public QuorumVerifier getLastSeenQuorumVerifier(){ + synchronized (QV_LOCK) { + return lastSeenQuorumVerifier; + } } - public synchronized void connectNewPeers(){ - if (qcm!=null && getQuorumVerifier()!=null && getLastSeenQuorumVerifier()!=null) { - Map<Long, QuorumServer> committedView = getQuorumVerifier().getAllMembers(); - for (Entry<Long, QuorumServer> e: getLastSeenQuorumVerifier().getAllMembers().entrySet()){ - if (e.getKey() != getId() && !committedView.containsKey(e.getKey())) - qcm.connectOne(e.getKey()); - } + private void connectNewPeers(){ + synchronized (QV_LOCK) { + if (qcm != null && quorumVerifier != null && lastSeenQuorumVerifier != null) { + Map<Long, QuorumServer> committedView = quorumVerifier.getAllMembers(); + for (Entry<Long, QuorumServer> e : lastSeenQuorumVerifier.getAllMembers().entrySet()) { + if (e.getKey() != getId() && !committedView.containsKey(e.getKey())) + qcm.connectOne(e.getKey()); + } + } } } @@ -1431,73 +1454,76 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider return configFilename + QuorumPeerConfig.nextDynamicConfigFileSuffix; } - public synchronized void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){ - if (lastSeenQuorumVerifier!=null && lastSeenQuorumVerifier.getVersion() > qv.getVersion()) { - LOG.error("setLastSeenQuorumVerifier called with stale config " + qv.getVersion() + - ". Current version: " + quorumVerifier.getVersion()); - - } - // assuming that a version uniquely identifies a configuration, so if - // version is the same, nothing to do here. - if (lastSeenQuorumVerifier != null && - lastSeenQuorumVerifier.getVersion() == qv.getVersion()) { - return; - } - lastSeenQuorumVerifier = qv; - connectNewPeers(); - if (writeToDisk) { - try { - QuorumPeerConfig.writeDynamicConfig( - getNextDynamicConfigFilename(), qv, true); - } catch(IOException e){ - LOG.error("Error writing next dynamic config file to disk: ", e.getMessage()); - } - } + public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){ + synchronized (QV_LOCK) { + if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() > qv.getVersion()) { + LOG.error("setLastSeenQuorumVerifier called with stale config " + qv.getVersion() + + ". Current version: " + quorumVerifier.getVersion()); - } - - public synchronized QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){ - if ((quorumVerifier != null) && (quorumVerifier.getVersion() >= qv.getVersion())) { - // this is normal. For example - server found out about new config through FastLeaderElection gossiping - // and then got the same config in UPTODATE message so its already known - LOG.debug(getId() + " setQuorumVerifier called with known or old config " + qv.getVersion() + - ". Current version: " + quorumVerifier.getVersion()); - return quorumVerifier; - } - QuorumVerifier prevQV = quorumVerifier; - quorumVerifier = qv; - if (lastSeenQuorumVerifier == null || (qv.getVersion() > lastSeenQuorumVerifier.getVersion())) + } + // assuming that a version uniquely identifies a configuration, so if + // version is the same, nothing to do here. + if (lastSeenQuorumVerifier != null && + lastSeenQuorumVerifier.getVersion() == qv.getVersion()) { + return; + } lastSeenQuorumVerifier = qv; - - if (writeToDisk) { - // some tests initialize QuorumPeer without a static config file - if (configFilename != null) { + connectNewPeers(); + if (writeToDisk) { try { - String dynamicConfigFilename = makeDynamicConfigFilename( - qv.getVersion()); QuorumPeerConfig.writeDynamicConfig( - dynamicConfigFilename, qv, false); - QuorumPeerConfig.editStaticConfig(configFilename, - dynamicConfigFilename, - needEraseClientInfoFromStaticConfig()); + getNextDynamicConfigFilename(), qv, true); } catch (IOException e) { - LOG.error("Error closing file: ", e.getMessage()); + LOG.error("Error writing next dynamic config file to disk: ", e.getMessage()); } - } else { - LOG.info("writeToDisk == true but configFilename == null"); } } + } + + public QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){ + synchronized (QV_LOCK) { + if ((quorumVerifier != null) && (quorumVerifier.getVersion() >= qv.getVersion())) { + // this is normal. For example - server found out about new config through FastLeaderElection gossiping + // and then got the same config in UPTODATE message so its already known + LOG.debug(getId() + " setQuorumVerifier called with known or old config " + qv.getVersion() + + ". Current version: " + quorumVerifier.getVersion()); + return quorumVerifier; + } + QuorumVerifier prevQV = quorumVerifier; + quorumVerifier = qv; + if (lastSeenQuorumVerifier == null || (qv.getVersion() > lastSeenQuorumVerifier.getVersion())) + lastSeenQuorumVerifier = qv; + + if (writeToDisk) { + // some tests initialize QuorumPeer without a static config file + if (configFilename != null) { + try { + String dynamicConfigFilename = makeDynamicConfigFilename( + qv.getVersion()); + QuorumPeerConfig.writeDynamicConfig( + dynamicConfigFilename, qv, false); + QuorumPeerConfig.editStaticConfig(configFilename, + dynamicConfigFilename, + needEraseClientInfoFromStaticConfig()); + } catch (IOException e) { + LOG.error("Error closing file: ", e.getMessage()); + } + } else { + LOG.info("writeToDisk == true but configFilename == null"); + } + } - if (qv.getVersion() == lastSeenQuorumVerifier.getVersion()){ - QuorumPeerConfig.deleteFile( getNextDynamicConfigFilename() ); - } - QuorumServer qs = qv.getAllMembers().get(getId()); - if (qs!=null){ - setQuorumAddress(qs.addr); - setElectionAddress(qs.electionAddr); - setClientAddress(qs.clientAddr); + if (qv.getVersion() == lastSeenQuorumVerifier.getVersion()) { + QuorumPeerConfig.deleteFile(getNextDynamicConfigFilename()); + } + QuorumServer qs = qv.getAllMembers().get(getId()); + if (qs != null) { + setQuorumAddress(qs.addr); + setElectionAddress(qs.electionAddr); + setClientAddress(qs.clientAddr); + } + return prevQV; } - return prevQV; } private String makeDynamicConfigFilename(long version) {
