Repository: zookeeper Updated Branches: refs/heads/branch-3.5 c6bd49c4c -> 914c971a7
ZOOKEEPER-2778: QuorumPeer: encapsulate addresses [ZOOKEEPER-2778] QuorumPeer: encapsulate quorum/election/client addresses in an AddressTuple held through an AtomicReference Author: Michael Edwards <Michael Edwards> Author: Michael Edwards <[email protected]> Reviewers: [email protected] Closes #707 from mkedwards/branch-3.5 and squashes the following commits: 78df674c4 [Michael Edwards] [ZOOKEEPER-2778] QuorumPeer: halt old QCM when clobbering existing election algorithm 5038179e2 [Michael Edwards] [ZOOKEEPER-2778] QuorumPeer: warn when clobbering existing election algorithm bbeeebf87 [Michael Edwards] [ZOOKEEPER-2778] LeaderBeanTest: set up mock QuorumVerifier so that addresses get set 9701f0576 [Michael Edwards] [ZOOKEEPER-2778] QuorumPeer: fix access to newly private data members from ReconfigTest 0531d9c8e [Michael Edwards] [ZOOKEEPER-2778] QuorumPeer: fixes from code review 03d259bae [Michael Edwards] [ZOOKEEPER-2778] QuorumPeer: add fast path for already-non-null quorum/election address 4cd10c865 [Michael Edwards] [ZOOKEEPER-2778] QuorumPeer/QuorumCnxManager: address deadlock and visibility issues 3694a4e31 [Michael Edwards] [ZOOKEEPER-2778] QuorumPeer: encapsulate quorum/election/client addresses in an AddressTuple held through an AtomicReference Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/914c971a Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/914c971a Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/914c971a Branch: refs/heads/branch-3.5 Commit: 914c971a749fae624fd7610f94d761ca200440e8 Parents: c6bd49c Author: Michael Edwards <Michael Edwards> Authored: Fri Dec 7 13:11:39 2018 +0100 Committer: Andor Molnar <[email protected]> Committed: Fri Dec 7 13:11:39 2018 +0100 ---------------------------------------------------------------------- .../server/quorum/QuorumCnxManager.java | 3 +- .../zookeeper/server/quorum/QuorumPeer.java | 177 +++++++++++-------- .../zookeeper/server/quorum/LeaderBeanTest.java | 22 +++ .../org/apache/zookeeper/test/ReconfigTest.java | 4 +- 4 files changed, 133 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/914c971a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java index 209cbcd..519b019 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java @@ -403,7 +403,8 @@ public class QuorumCnxManager { // represents protocol version (in other words - message type) dout.writeLong(PROTOCOL_VERSION); dout.writeLong(self.getId()); - String addr = self.getElectionAddress().getHostString() + ":" + self.getElectionAddress().getPort(); + final InetSocketAddress electionAddr = self.getElectionAddress(); + String addr = electionAddr.getHostString() + ":" + electionAddr.getPort(); byte[] addr_bytes = addr.getBytes(); dout.writeInt(addr_bytes.length); dout.write(addr_bytes); http://git-wip-us.apache.org/repos/asf/zookeeper/blob/914c971a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java index a2253a2..260ccd9 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -42,6 +42,7 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import javax.security.sasl.SaslException; @@ -109,7 +110,11 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider LocalPeerBean jmxLocalPeerBean; private Map<Long, RemotePeerBean> jmxRemotePeerBean; LeaderElectionBean jmxLeaderElectionBean; - private QuorumCnxManager qcm; + + // The QuorumCnxManager is held through an AtomicReference to ensure cross-thread visibility + // of updates; see the implementation comment at setLastSeenQuorumVerifier(). + private AtomicReference<QuorumCnxManager> qcmRef = new AtomicReference<>(); + QuorumAuthServer authServer; QuorumAuthLearner authLearner; @@ -122,6 +127,18 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider */ private ZKDatabase zkDb; + public static final class AddressTuple { + public final InetSocketAddress quorumAddr; + public final InetSocketAddress electionAddr; + public final InetSocketAddress clientAddr; + + public AddressTuple(InetSocketAddress quorumAddr, InetSocketAddress electionAddr, InetSocketAddress clientAddr) { + this.quorumAddr = quorumAddr; + this.electionAddr = electionAddr; + this.clientAddr = clientAddr; + } + } + public static class QuorumServer { public InetSocketAddress addr = null; @@ -456,10 +473,10 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider */ //last committed quorum verifier - public QuorumVerifier quorumVerifier; + private QuorumVerifier quorumVerifier; //last proposed quorum verifier - public QuorumVerifier lastSeenQuorumVerifier = null; + private QuorumVerifier lastSeenQuorumVerifier = null; // Lock object that guard access to quorumVerifier and lastSeenQuorumVerifier. final Object QV_LOCK = new Object(); @@ -730,16 +747,14 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider DatagramSocket udpSocket; - private InetSocketAddress myQuorumAddr; - private InetSocketAddress myElectionAddr = null; - private InetSocketAddress myClientAddr = null; + private final AtomicReference<AddressTuple> myAddrs = new AtomicReference<>(); /** * Resolves hostname for a given server ID. * * This method resolves hostname for a given server ID in both quorumVerifer * and lastSeenQuorumVerifier. If the server ID matches the local server ID, - * it also updates myQuorumAddr and myElectionAddr. + * it also updates myAddrs. */ public void recreateSocketAddresses(long id) { QuorumVerifier qv = getQuorumVerifier(); @@ -748,8 +763,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider if (qs != null) { qs.recreateSocketAddresses(); if (id == getId()) { - setQuorumAddress(qs.addr); - setElectionAddress(qs.electionAddr); + setAddrs(qs.addr, qs.electionAddr, qs.clientAddr); } } } @@ -762,39 +776,43 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider } } - public InetSocketAddress getQuorumAddress(){ - synchronized (QV_LOCK) { - return myQuorumAddr; + private AddressTuple getAddrs(){ + AddressTuple addrs = myAddrs.get(); + if (addrs != null) { + return addrs; } - } - - public void setQuorumAddress(InetSocketAddress addr){ - synchronized (QV_LOCK) { - myQuorumAddr = addr; + try { + synchronized (QV_LOCK) { + addrs = myAddrs.get(); + while (addrs == null) { + QV_LOCK.wait(); + addrs = myAddrs.get(); + } + return addrs; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); } } + public InetSocketAddress getQuorumAddress(){ + return getAddrs().quorumAddr; + } + public InetSocketAddress getElectionAddress(){ - synchronized (QV_LOCK) { - return myElectionAddr; - } + return getAddrs().electionAddr; } - public void setElectionAddress(InetSocketAddress addr){ - synchronized (QV_LOCK) { - myElectionAddr = addr; - } - } - public InetSocketAddress getClientAddress(){ - synchronized (QV_LOCK) { - return myClientAddr; - } + final AddressTuple addrs = myAddrs.get(); + return (addrs == null) ? null : addrs.clientAddr; } - public void setClientAddress(InetSocketAddress addr){ + private void setAddrs(InetSocketAddress quorumAddr, InetSocketAddress electionAddr, InetSocketAddress clientAddr){ synchronized (QV_LOCK) { - myClientAddr = addr; + myAddrs.set(new AddressTuple(quorumAddr, electionAddr, clientAddr)); + QV_LOCK.notifyAll(); } } @@ -961,7 +979,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider //} if (electionType == 0) { try { - udpSocket = new DatagramSocket(myQuorumAddr.getPort()); + udpSocket = new DatagramSocket(getQuorumAddress().getPort()); responder = new ResponderThread(); responder.start(); } catch (SocketException e) { @@ -1077,7 +1095,12 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider le = new AuthFastLeaderElection(this, true); break; case 3: - qcm = createCnxnManager(); + QuorumCnxManager qcm = createCnxnManager(); + QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm); + if (oldQcm != null) { + LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)"); + oldQcm.halt(); + } QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ listener.start(); @@ -1544,18 +1567,6 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider } } - private void connectNewPeers(){ - synchronized (QV_LOCK) { - if (qcm != null && quorumVerifier != null && lastSeenQuorumVerifier != null) { - Map<Long, QuorumServer> committedView = quorumVerifier.getAllMembers(); - for (Entry<Long, QuorumServer> e : lastSeenQuorumVerifier.getAllMembers().entrySet()) { - if (e.getKey() != getId() && !committedView.containsKey(e.getKey())) - qcm.connectOne(e.getKey()); - } - } - } - } - public synchronized void restartLeaderElection(QuorumVerifier qvOLD, QuorumVerifier qvNEW){ if (qvOLD == null || !qvOLD.equals(qvNEW)) { LOG.warn("Restarting Leader Election"); @@ -1573,33 +1584,61 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider return configFilename + QuorumPeerConfig.nextDynamicConfigFileSuffix; } + // On entry to this method, qcm must be non-null and the locks on both qcm and QV_LOCK + // must be held. We don't want quorumVerifier/lastSeenQuorumVerifier to change out from + // under us, so we have to hold QV_LOCK; and since the call to qcm.connectOne() will take + // the lock on qcm (and take QV_LOCK again inside that), the caller needs to have taken + // qcm outside QV_LOCK to avoid a deadlock against other callers of qcm.connectOne(). + private void connectNewPeers(QuorumCnxManager qcm){ + if (quorumVerifier != null && lastSeenQuorumVerifier != null) { + Map<Long, QuorumServer> committedView = quorumVerifier.getAllMembers(); + for (Entry<Long, QuorumServer> e : lastSeenQuorumVerifier.getAllMembers().entrySet()) { + if (e.getKey() != getId() && !committedView.containsKey(e.getKey())) + qcm.connectOne(e.getKey()); + } + } + } + public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){ - synchronized (QV_LOCK) { - if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() > qv.getVersion()) { - LOG.error("setLastSeenQuorumVerifier called with stale config " + qv.getVersion() + - ". Current version: " + quorumVerifier.getVersion()); + // If qcm is non-null, we may call qcm.connectOne(), which will take the lock on qcm + // and then take QV_LOCK. Take the locks in the same order to ensure that we don't + // deadlock against other callers of connectOne(). If qcmRef gets set in another + // thread while we're inside the synchronized block, that does no harm; if we didn't + // take a lock on qcm (because it was null when we sampled it), we won't call + // connectOne() on it. (Use of an AtomicReference is enough to guarantee visibility + // of updates that provably happen in another thread before entering this method.) + QuorumCnxManager qcm = qcmRef.get(); + Object outerLockObject = (qcm != null) ? qcm : QV_LOCK; + synchronized (outerLockObject) { + synchronized (QV_LOCK) { + if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() > qv.getVersion()) { + LOG.error("setLastSeenQuorumVerifier called with stale config " + qv.getVersion() + + ". Current version: " + quorumVerifier.getVersion()); + } + // assuming that a version uniquely identifies a configuration, so if + // version is the same, nothing to do here. + if (lastSeenQuorumVerifier != null && + lastSeenQuorumVerifier.getVersion() == qv.getVersion()) { + return; + } + lastSeenQuorumVerifier = qv; + if (qcm != null) { + connectNewPeers(qcm); + } - } - // assuming that a version uniquely identifies a configuration, so if - // version is the same, nothing to do here. - if (lastSeenQuorumVerifier != null && - lastSeenQuorumVerifier.getVersion() == qv.getVersion()) { - return; - } - lastSeenQuorumVerifier = qv; - connectNewPeers(); - if (writeToDisk) { - try { - String fileName = getNextDynamicConfigFilename(); - if (fileName != null) { - QuorumPeerConfig.writeDynamicConfig(fileName, qv, true); + if (writeToDisk) { + try { + String fileName = getNextDynamicConfigFilename(); + if (fileName != null) { + QuorumPeerConfig.writeDynamicConfig(fileName, qv, true); + } + } catch (IOException e) { + LOG.error("Error writing next dynamic config file to disk: ", e.getMessage()); } - } catch (IOException e) { - LOG.error("Error writing next dynamic config file to disk: ", e.getMessage()); } } } - } + } public QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){ synchronized (QV_LOCK) { @@ -1639,9 +1678,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider } QuorumServer qs = qv.getAllMembers().get(getId()); if (qs != null) { - setQuorumAddress(qs.addr); - setElectionAddress(qs.electionAddr); - setClientAddress(qs.clientAddr); + setAddrs(qs.addr, qs.electionAddr, qs.clientAddr); } return prevQV; } @@ -1820,7 +1857,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider * get reference to QuorumCnxManager */ public QuorumCnxManager getQuorumCnxManager() { - return qcm; + return qcmRef.get(); } private long readLongFromFile(String name) throws IOException { File file = new File(logFactory.getSnapDir(), name); http://git-wip-us.apache.org/repos/asf/zookeeper/blob/914c971a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java index 31cef79..99d5b5d 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java @@ -21,10 +21,13 @@ package org.apache.zookeeper.server.quorum; import org.apache.jute.OutputArchive; import org.apache.jute.Record; import org.apache.zookeeper.common.X509Exception; +import org.apache.zookeeper.PortAssignment; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; +import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; +import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; import org.apache.zookeeper.server.util.SerializeUtils; import org.apache.zookeeper.test.ClientBase; import org.apache.zookeeper.txn.TxnHeader; @@ -36,6 +39,10 @@ import org.mockito.stubbing.Answer; import java.io.File; import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -43,6 +50,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class LeaderBeanTest { private Leader leader; @@ -54,8 +62,22 @@ public class LeaderBeanTest { @Before public void setUp() throws IOException, X509Exception { qp = new QuorumPeer(); + long myId = qp.getId(); + + int clientPort = PortAssignment.unique(); + Map<Long, QuorumServer> peersView = new HashMap<Long, QuorumServer>(); + InetAddress clientIP = InetAddress.getLoopbackAddress(); + + peersView.put(Long.valueOf(myId), + new QuorumServer(myId, new InetSocketAddress(clientIP, PortAssignment.unique()), + new InetSocketAddress(clientIP, PortAssignment.unique()), + new InetSocketAddress(clientIP, clientPort), LearnerType.PARTICIPANT)); + QuorumVerifier quorumVerifierMock = mock(QuorumVerifier.class); + when(quorumVerifierMock.getAllMembers()).thenReturn(peersView); + qp.setQuorumVerifier(quorumVerifierMock, false); + File tmpDir = ClientBase.createTmpDir(); fileTxnSnapLog = new FileTxnSnapLog(new File(tmpDir, "data"), new File(tmpDir, "data_txnlog")); http://git-wip-us.apache.org/repos/asf/zookeeper/blob/914c971a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java index a050f7a..fb0e5f0 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java @@ -844,7 +844,7 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ testNormalOperation(zkArr[4], zkArr[5]); for (int i = 1; i <= 5; i++) { - if (!(qu.getPeer(i).peer.quorumVerifier instanceof QuorumHierarchical)) + if (!(qu.getPeer(i).peer.getQuorumVerifier() instanceof QuorumHierarchical)) Assert.fail("peer " + i + " doesn't think the quorum system is Hieararchical!"); } @@ -881,7 +881,7 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ testNormalOperation(zkArr[1], zkArr[2]); for (int i = 1; i <= 2; i++) { - if (!(qu.getPeer(i).peer.quorumVerifier instanceof QuorumMaj)) + if (!(qu.getPeer(i).peer.getQuorumVerifier() instanceof QuorumMaj)) Assert.fail("peer " + i + " doesn't think the quorum system is a majority quorum system!");
