ZOOKEEPER-3140: Allow Followers to host Observers Creates a new abstraction, LearnerMaster, to represent the portions of the Leader logic that are used in LearnerHandler. Leader implements LearnerMaster and a new class ObserverMaster implements LearnerMaster. Followers have the option of instantiating a ObserverMaster thread when they assume their role and so support Learner traffic.
A new parameter 'observerMasterPort' is used to control which Follower instances host Observers. Author: Brian Nixon <[email protected]> Reviewers: [email protected], [email protected], [email protected] Closes #628 from enixon/learner-master Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/b2513c11 Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/b2513c11 Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/b2513c11 Branch: refs/heads/master Commit: b2513c114931dc377bac5e1d39e2f81c6e8cf17e Parents: 6ea3c0b Author: Brian Nixon <[email protected]> Authored: Sat Dec 8 18:16:58 2018 -0800 Committer: Fangmin Lyu <[email protected]> Committed: Sat Dec 8 18:16:58 2018 -0800 ---------------------------------------------------------------------- .../main/resources/markdown/zookeeperAdmin.md | 7 + .../resources/markdown/zookeeperObservers.md | 32 + .../apache/zookeeper/server/ObserverBean.java | 14 + .../apache/zookeeper/server/admin/Commands.java | 15 + .../zookeeper/server/quorum/Follower.java | 46 +- .../zookeeper/server/quorum/FollowerBean.java | 10 + .../zookeeper/server/quorum/FollowerMXBean.java | 10 + .../server/quorum/FollowerRequestProcessor.java | 41 +- .../server/quorum/FollowerZooKeeperServer.java | 32 +- .../apache/zookeeper/server/quorum/Leader.java | 166 +++- .../server/quorum/LeaderZooKeeperServer.java | 11 + .../apache/zookeeper/server/quorum/Learner.java | 28 +- .../zookeeper/server/quorum/LearnerHandler.java | 166 ++-- .../server/quorum/LearnerHandlerBean.java | 66 ++ .../server/quorum/LearnerHandlerMXBean.java | 29 + .../zookeeper/server/quorum/LearnerMaster.java | 196 +++++ .../zookeeper/server/quorum/Observer.java | 98 ++- .../zookeeper/server/quorum/ObserverMXBean.java | 12 + .../zookeeper/server/quorum/ObserverMaster.java | 532 +++++++++++++ .../zookeeper/server/quorum/QuorumPeer.java | 90 ++- .../server/quorum/QuorumPeerConfig.java | 12 + .../zookeeper/server/quorum/QuorumPeerMain.java | 1 + .../java/org/apache/zookeeper/ZKTestCase.java | 26 + .../server/quorum/DelayRequestProcessor.java | 77 ++ .../server/quorum/LearnerHandlerTest.java | 55 +- .../zookeeper/server/util/PortForwarder.java | 105 +-- .../zookeeper/test/ObserverMasterTest.java | 780 +++++++++++++++++++ .../org/apache/zookeeper/test/ObserverTest.java | 163 +--- .../org/apache/zookeeper/test/ReconfigTest.java | 16 +- .../src/test/resources/findbugsExcludeFile.xml | 8 + 30 files changed, 2459 insertions(+), 385 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md ---------------------------------------------------------------------- diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md index bb7d792..0e92cf9 100644 --- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md +++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md @@ -550,6 +550,13 @@ in the configuration file: Note that SSL feature will be enabled when user plugs-in zookeeper.serverCnxnFactory, zookeeper.clientCnxnSocket as Netty. +* *observerMasterPort* : + the port to listen for observer connections; that is, the + port that observers attempt to connect to. + if the property is set then the server will host observer connections + when in follower mode in addition to when in leader mode and correspondingly + attempt to connect to any voting peer when in observer mode. + * *dataDir* : the location where ZooKeeper will store the in-memory database snapshots and, unless specified otherwise, the http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-docs/src/main/resources/markdown/zookeeperObservers.md ---------------------------------------------------------------------- diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperObservers.md b/zookeeper-docs/src/main/resources/markdown/zookeeperObservers.md index 4642b13..7865723 100644 --- a/zookeeper-docs/src/main/resources/markdown/zookeeperObservers.md +++ b/zookeeper-docs/src/main/resources/markdown/zookeeperObservers.md @@ -81,6 +81,38 @@ specified in every config file. You should see a command line prompt through which you can issue commands like _ls_ to query the ZooKeeper service. +<a name="ch_ObserverMasters"></a> + +## How to use Observer Masters + +Observers function simple as non-voting members of the ensemble, sharing +the Learner interface with Followers and holding only a slightly difference +internal pipeline. Both maintain connections along the quorum port with the +Leader by which they learn of all new proposals on the ensemble. + +By default, Observers connect to the Leader of the quorum along its +quorum port and this is how they learn of all new proposals on the +ensemble. There are benefits to allowing Observers to connect to the +Followers instead as a means of plugging in to the commit stream in place +of connecting to the Leader. It shifts the burden of supporting Observers +off the Leader and allow it to focus on coordinating the commit of writes. +This means better performance when the Leader is under high load, +particularly high network load such as can happen after a leader election +when many Learners need to sync. It reduces the total network connections +maintained on the Leader when there are a high number of observers. +Activating Followers to support Observers allow the overall number of +Observers to scale into the hundreds. One the other end, Observer +availability is improved since it will take shorter time for a high +number of Observers to finish syncing and start serving client traffic. + +This feature can be activated by letting all members of the ensemble know +which port will be used by the Followers to listen for Observer +connections. The following entry, when added to the server config file, +will instruct Observers to connect to peers (Leaders and Followers) on +port 2191 and instruct Followers to create an ObserverMaster thread to +listen and serve on that port. + + observerMasterPort=2191 <a name="ch_UseCases"></a> ## Example use cases http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/ObserverBean.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ObserverBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ObserverBean.java index 72d724e..167c96d 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ObserverBean.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ObserverBean.java @@ -20,6 +20,7 @@ package org.apache.zookeeper.server; import org.apache.zookeeper.server.quorum.Observer; import org.apache.zookeeper.server.quorum.ObserverMXBean; +import org.apache.zookeeper.server.quorum.QuorumPeer; /** * ObserverBean @@ -46,4 +47,17 @@ public class ObserverBean extends ZooKeeperServerBean implements ObserverMXBean{ return observer.getSocket().toString(); } + public String getLearnerMaster() { + QuorumPeer.QuorumServer learnerMaster = observer.getCurrentLearnerMaster(); + if (learnerMaster == null || learnerMaster.addr == null) { + return "Unknown"; + } + return learnerMaster.addr.getAddress().getHostAddress() + ":" + learnerMaster.addr.getPort(); + } + + public void setLearnerMaster(String learnerMaster) { + if (!observer.setLearnerMaster(learnerMaster)) { + throw new IllegalArgumentException("Not a valid learner master"); + } + } } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java index 4cfa772..29e1845 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java @@ -37,10 +37,13 @@ import org.apache.zookeeper.server.ServerStats; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.ZooTrace; +import org.apache.zookeeper.server.quorum.Follower; +import org.apache.zookeeper.server.quorum.FollowerZooKeeperServer; import org.apache.zookeeper.server.quorum.Leader; import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer; import org.apache.zookeeper.server.quorum.QuorumPeer; import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer; +import org.apache.zookeeper.server.quorum.ObserverZooKeeperServer; import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer; import org.apache.zookeeper.server.util.OSMXBean; import org.slf4j.Logger; @@ -375,6 +378,18 @@ public class Commands { response.put("min_proposal_size", leader.getProposalStats().getMinBufferSize()); } + if (zkServer instanceof FollowerZooKeeperServer) { + Follower follower = ((FollowerZooKeeperServer) zkServer).getFollower(); + Integer syncedObservers = follower.getSyncedObserverSize(); + if (syncedObservers != null) { + response.put("synced_observers", syncedObservers); + } + } + + if (zkServer instanceof ObserverZooKeeperServer) { + response.put("observer_master_id", ((ObserverZooKeeperServer)zkServer).getObserver().getLearnerMasterId()); + } + response.putAll(ServerMetrics.getAllValues()); return response; http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java index 78ae7aa..49280d3 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java @@ -42,7 +42,9 @@ public class Follower extends Learner{ private long lastQueued; // This is the same object as this.zk, but we cache the downcast op final FollowerZooKeeperServer fzk; - + + ObserverMaster om; + Follower(QuorumPeer self,FollowerZooKeeperServer zk) { this.self = self; this.zk=zk; @@ -96,6 +98,15 @@ public class Follower extends Learner{ long syncTime = Time.currentElapsedTime() - startTime; ServerMetrics.FOLLOWER_SYNC_TIME.add(syncTime); } + if (self.getObserverMasterPort() > 0) { + LOG.info("Starting ObserverMaster"); + + om = new ObserverMaster(self, fzk, self.getObserverMasterPort()); + om.start(); + } else { + om = null; + } + // create a reusable packet to reduce gc impact QuorumPacket qp = new QuorumPacket(); while (this.isRunning()) { readPacket(qp); @@ -113,6 +124,9 @@ public class Follower extends Learner{ pendingRevalidations.clear(); } } finally { + if (om != null) { + om.stop(); + } zk.unregisterJMX((Learner)this); } } @@ -145,9 +159,16 @@ public class Follower extends Learner{ } fzk.logRequest(hdr, txn); + + if (om != null) { + om.proposalReceived(qp); + } break; case Leader.COMMIT: fzk.commit(qp.getZxid()); + if (om != null) { + om.proposalCommitted(qp.getZxid()); + } break; case Leader.COMMITANDACTIVATE: @@ -159,11 +180,16 @@ public class Follower extends Learner{ // get new designated leader from (current) leader's message ByteBuffer buffer = ByteBuffer.wrap(qp.getData()); long suggestedLeaderId = buffer.getLong(); - boolean majorChange = - self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true); - // commit (writes the new config to ZK tree (/zookeeper/config) - fzk.commit(qp.getZxid()); - if (majorChange) { + final long zxid = qp.getZxid(); + boolean majorChange = + self.processReconfig(qv, suggestedLeaderId, zxid, true); + // commit (writes the new config to ZK tree (/zookeeper/config) + fzk.commit(zxid); + + if (om != null) { + om.informAndActivate(zxid, suggestedLeaderId); + } + if (majorChange) { throw new Exception("changes proposed in reconfig"); } break; @@ -171,7 +197,9 @@ public class Follower extends Learner{ LOG.error("Received an UPTODATE message after Follower started"); break; case Leader.REVALIDATE: - revalidate(qp); + if (om == null || !om.revalidateLearnerSession(qp)) { + revalidate(qp); + } break; case Leader.SYNC: fzk.sync(); @@ -205,6 +233,10 @@ public class Follower extends Learner{ return lastQueued; } + public Integer getSyncedObserverSize() { + return om == null ? null : om.getNumActiveObservers(); + } + @Override public void shutdown() { LOG.info("shutdown called", new Exception("shutdown Follower")); http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerBean.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerBean.java index 8773ab8..edfc9c7 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerBean.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerBean.java @@ -52,4 +52,14 @@ public class FollowerBean extends ZooKeeperServerBean implements FollowerMXBean public long getElectionTimeTaken() { return follower.self.getElectionTimeTaken(); } + + @Override + public int getObserverMasterPacketSizeLimit() { + return follower.om == null ? -1 : follower.om.getPktsSizeLimit(); + } + + @Override + public void setObserverMasterPacketSizeLimit(int sizeLimit) { + ObserverMaster.setPktsSizeLimit(sizeLimit); + } } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerMXBean.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerMXBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerMXBean.java index 45c7fd8..6b4edd0 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerMXBean.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerMXBean.java @@ -43,4 +43,14 @@ public interface FollowerMXBean extends ZooKeeperServerMXBean { * @return time taken for leader election in milliseconds. */ public long getElectionTimeTaken(); + + /** + * @return the size limit in bytes for the observer master commit packet queue + */ + public int getObserverMasterPacketSizeLimit(); + + /** + * set the size limit in bytes for the observer master commit packet queue + */ + public void setObserverMasterPacketSizeLimit(int sizeLimit); } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java index c623eba..2f345a8 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java @@ -111,26 +111,33 @@ public class FollowerRequestProcessor extends ZooKeeperCriticalThread implements } public void processRequest(Request request) { + processRequest(request, true); + } + + void processRequest(Request request, boolean checkForUpgrade) { if (!finished) { - // Before sending the request, check if the request requires a - // global session and what we have is a local session. If so do - // an upgrade. - Request upgradeRequest = null; - try { - upgradeRequest = zks.checkUpgradeSession(request); - } catch (KeeperException ke) { - if (request.getHdr() != null) { - request.getHdr().setType(OpCode.error); - request.setTxn(new ErrorTxn(ke.code().intValue())); + if (checkForUpgrade) { + // Before sending the request, check if the request requires a + // global session and what we have is a local session. If so do + // an upgrade. + Request upgradeRequest = null; + try { + upgradeRequest = zks.checkUpgradeSession(request); + } catch (KeeperException ke) { + if (request.getHdr() != null) { + request.getHdr().setType(OpCode.error); + request.setTxn(new ErrorTxn(ke.code().intValue())); + } + request.setException(ke); + LOG.info("Error creating upgrade request", ke); + } catch (IOException ie) { + LOG.error("Unexpected error in upgrade", ie); + } + if (upgradeRequest != null) { + queuedRequests.add(upgradeRequest); } - request.setException(ke); - LOG.info("Error creating upgrade request", ke); - } catch (IOException ie) { - LOG.error("Unexpected error in upgrade", ie); - } - if (upgradeRequest != null) { - queuedRequests.add(upgradeRequest); } + queuedRequests.add(request); } } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java index 78c12db..8f88bd3 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; import org.apache.jute.Record; +import org.apache.zookeeper.jmx.MBeanRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.zookeeper.server.ExitCode; @@ -34,6 +35,8 @@ import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.txn.TxnHeader; +import javax.management.JMException; + /** * Just like the standard ZooKeeperServer. We just replace the request * processors: FollowerRequestProcessor -> CommitProcessor -> @@ -113,13 +116,17 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer { } synchronized public void sync(){ - if(pendingSyncs.size() ==0){ + if(pendingSyncs.size() == 0) { LOG.warn("Not expecting a sync."); return; } Request r = pendingSyncs.remove(); - commitProcessor.commit(r); + if (r instanceof LearnerSyncRequest) { + LearnerSyncRequest lsr = (LearnerSyncRequest)r; + lsr.fh.queuePacket(new QuorumPacket(Leader.SYNC, 0, null, null)); + } + commitProcessor.commit(r); } @Override @@ -139,4 +146,25 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer { public Learner getLearner() { return getFollower(); } + + /** + * Process a request received from external Learner through the LearnerMaster + * These requests have already passed through validation and checks for + * session upgrade and can be injected into the middle of the pipeline. + * + * @param request received from external Learner + */ + void processObserverRequest(Request request) { + ((FollowerRequestProcessor)firstProcessor).processRequest(request, false); + } + + boolean registerJMX(LearnerHandlerBean handlerBean) { + try { + MBeanRegistry.getInstance().register(handlerBean, jmxServerBean); + return true; + } catch (JMException e) { + LOG.warn("Could not register connection", e); + } + return false; + } } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java index 0a892b1..721a1b4 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java @@ -19,6 +19,10 @@ package org.apache.zookeeper.server.quorum; import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.net.BindException; import java.net.ServerSocket; @@ -41,15 +45,20 @@ import java.util.concurrent.ConcurrentMap; import javax.security.sasl.SaslException; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.common.Time; import org.apache.zookeeper.common.X509Exception; +import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.server.FinalRequestProcessor; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; import org.apache.zookeeper.server.ServerMetrics; import org.apache.zookeeper.server.ZooKeeperCriticalThread; import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; +import org.apache.zookeeper.server.ZKDatabase; +import org.apache.zookeeper.server.ZooTrace; +import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.apache.zookeeper.server.util.SerializeUtils; import org.apache.zookeeper.server.util.ZxidUtils; @@ -60,7 +69,7 @@ import org.slf4j.LoggerFactory; /** * This class has the control logic for the Leader. */ -public class Leader { +public class Leader implements LearnerMaster { private static final Logger LOG = LoggerFactory.getLogger(Leader.class); static final private boolean nodelay = System.getProperty("leader.nodelay", "true").equals("true"); @@ -118,6 +127,9 @@ public class Leader { maxConcurrentSnapshots, maxConcurrentSnapshotTimeout); } + // beans for all learners + private final ConcurrentHashMap<LearnerHandler, LearnerHandlerBean> connectionBeans = new ConcurrentHashMap<>(); + /** * Returns a copy of the current learner snapshot */ @@ -181,7 +193,8 @@ public class Leader { * @param learner * instance of learner handle */ - void addLearnerHandler(LearnerHandler learner) { + @Override + public void addLearnerHandler(LearnerHandler learner) { synchronized (learners) { learners.add(learner); } @@ -192,7 +205,8 @@ public class Leader { * * @param peer */ - void removeLearnerHandler(LearnerHandler peer) { + @Override + public void removeLearnerHandler(LearnerHandler peer) { synchronized (forwardingFollowers) { forwardingFollowers.remove(peer); } @@ -866,6 +880,7 @@ public class Leader { * @param sid, the id of the server that sent the ack * @param followerAddr */ + @Override synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) { if (!allowedToCommit) return; // last op committed was a leader change - from now on // the new leader should commit @@ -1064,23 +1079,30 @@ public class Leader { sendObserverPacket(qp); } + public static QuorumPacket buildInformAndActivePacket(long zxid, + long designatedLeader, byte[] proposalData) { + byte[] data = new byte[proposalData.length + 8]; + ByteBuffer buffer = ByteBuffer.wrap(data); + buffer.putLong(designatedLeader); + buffer.put(proposalData); + + return new QuorumPacket(Leader.INFORMANDACTIVATE, zxid, data, null); + } /** * Create an inform&activate packet and send it to all observers. */ public void informAndActivate(Proposal proposal, long designatedLeader) { - byte[] proposalData = proposal.packet.getData(); - byte[] data = new byte[proposalData.length + 8]; - ByteBuffer buffer = ByteBuffer.wrap(data); - buffer.putLong(designatedLeader); - buffer.put(proposalData); - - QuorumPacket qp = new QuorumPacket(Leader.INFORMANDACTIVATE, proposal.request.zxid, data, null); - sendObserverPacket(qp); + sendObserverPacket(buildInformAndActivePacket(proposal.request.zxid, + designatedLeader, proposal.packet.getData())); } long lastProposed; + @Override + synchronized public long getLastProposed() { + return lastProposed; + } /** * Returns the current epoch of the leader. @@ -1146,6 +1168,7 @@ public class Leader { return p; } + @Override public LearnerSnapshotThrottler getLearnerSnapshotThrottler() { return learnerSnapshotThrottler; } @@ -1185,6 +1208,7 @@ public class Leader { * @return last proposed zxid * @throws InterruptedException */ + @Override synchronized public long startForwarding(LearnerHandler handler, long lastSeenZxid) { // Queue up any outstanding requests enabling the receipt of @@ -1221,6 +1245,16 @@ public class Leader { return lastProposed; } + + @Override + public void waitForStartup() throws InterruptedException { + synchronized(zk) { + while(!zk.isRunning() && !Thread.currentThread().isInterrupted()) { + zk.wait(20); + } + } + } + // VisibleForTesting protected final Set<Long> connectingFollowers = new HashSet<Long>(); @@ -1277,6 +1311,7 @@ public class Leader { } } + @Override public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException { synchronized(connectingFollowers) { if (!waitingForNewEpoch) { @@ -1313,10 +1348,17 @@ public class Leader { } } + @Override + public ZKDatabase getZKDatabase() { + return zk.getZKDatabase(); + } + // VisibleForTesting protected final Set<Long> electingFollowers = new HashSet<Long>(); // VisibleForTesting protected boolean electionFinished = false; + + @Override public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException { synchronized(electingFollowers) { if (electionFinished) { @@ -1417,6 +1459,7 @@ public class Leader { * @param sid * @throws InterruptedException */ + @Override public void waitForNewLeaderAck(long sid, long zxid) throws InterruptedException { @@ -1517,4 +1560,105 @@ public class Leader { private boolean isParticipant(long sid) { return self.getQuorumVerifier().getVotingMembers().containsKey(sid); } + + @Override + public int getCurrentTick() { + return self.tick.get(); + } + + @Override + public int syncTimeout() { + return self.tickTime * self.syncLimit; + } + + @Override + public int getTickOfNextAckDeadline() { + return self.tick.get() + self.syncLimit; + } + + @Override + public int getTickOfInitialAckDeadline() { + return self.tick.get() + self.initLimit + self.syncLimit; + } + + @Override + public long getAndDecrementFollowerCounter() { + return followerCounter.getAndDecrement(); + } + + @Override + public void touch(long sess, int to) { + zk.touch(sess, to); + } + + @Override + public void submitLearnerRequest(Request si) { + zk.submitLearnerRequest(si); + } + + @Override + public long getQuorumVerifierVersion() { + return self.getQuorumVerifier().getVersion(); + } + + @Override + public String getPeerInfo(long sid) { + QuorumPeer.QuorumServer server = self.getView().get(sid); + return server == null ? "" : server.toString(); + } + + @Override + public byte[] getQuorumVerifierBytes() { + return self.getLastSeenQuorumVerifier().toString().getBytes(); + } + + @Override + public QuorumAuthServer getQuorumAuthServer() { + return (self == null) ? null : self.authServer; + } + + @Override + public void revalidateSession(QuorumPacket qp, LearnerHandler learnerHandler) throws IOException { + ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData()); + DataInputStream dis = new DataInputStream(bis); + long id = dis.readLong(); + int to = dis.readInt(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + dos.writeLong(id); + boolean valid = zk.checkIfValidGlobalSession(id, to); + if (valid) { + try { + // set the session owner as the follower that owns the session + zk.setOwner(id, learnerHandler); + } catch (KeeperException.SessionExpiredException e) { + LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e); + } + } + if (LOG.isTraceEnabled()) { + ZooTrace.logTraceMessage(LOG, + ZooTrace.SESSION_TRACE_MASK, + "Session 0x" + Long.toHexString(id) + + " is valid: "+ valid); + } + dos.writeBoolean(valid); + qp.setData(bos.toByteArray()); + learnerHandler.queuePacket(qp); + } + + @Override + public void registerLearnerHandlerBean(final LearnerHandler learnerHandler, Socket socket) { + LearnerHandlerBean bean = new LearnerHandlerBean(learnerHandler, socket); + if (zk.registerJMX(bean)) { + connectionBeans.put(learnerHandler, bean); + } + } + + @Override + public void unregisterLearnerHandlerBean(final LearnerHandler learnerHandler) { + LearnerHandlerBean bean = connectionBeans.remove(learnerHandler); + if (bean != null){ + MBeanRegistry.getInstance().unregister(bean); + } + } } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java index e1d1bb6..6484e30 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java @@ -30,6 +30,7 @@ import org.apache.zookeeper.server.ServerCnxn; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import javax.management.JMException; import java.io.IOException; import java.util.concurrent.TimeUnit; @@ -186,6 +187,16 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer { } } + boolean registerJMX(LearnerHandlerBean handlerBean) { + try { + MBeanRegistry.getInstance().register(handlerBean, jmxServerBean); + return true; + } catch (JMException e) { + LOG.warn("Could not register connection", e); + } + return false; + } + @Override protected void unregisterJMX() { // unregister from JMX http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index faaa844..307b644 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -87,9 +87,17 @@ public class Learner { protected static final Logger LOG = LoggerFactory.getLogger(Learner.class); + /** + * Time to wait after connection attempt with the Leader or LearnerMaster before this + * Learner tries to connect again. + */ + private static final int leaderConnectDelayDuringRetryMs = + Integer.getInteger("zookeeper.leaderConnectDelayDuringRetryMs", 100); + static final private boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true"); static { - LOG.info("TCP NoDelay set to: " + nodelay); + LOG.info("leaderConnectDelayDuringRetryMs: {}", leaderConnectDelayDuringRetryMs); + LOG.info("TCP NoDelay set to: {}", nodelay); } final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = @@ -235,9 +243,10 @@ public class Learner { } /** - * Establish a connection with the Leader found by findLeader. Retries - * until either initLimit time has elapsed or 5 tries have happened. - * @param addr - the address of the Leader to connect to. + * Establish a connection with the LearnerMaster found by findLearnerMaster. + * Followers only connect to Leaders, Observers can connect to any active LearnerMaster. + * Retries until either initLimit time has elapsed or 5 tries have happened. + * @param addr - the address of the Peer to connect to. * @throws IOException - if the socket connection fails on the 5th attempt * <li>if there is an authentication failure while connecting to leader</li> * @throws ConnectException @@ -248,7 +257,7 @@ public class Learner { this.sock = createSocket(); int initLimitTime = self.tickTime * self.initLimit; - int remainingInitLimitTime = initLimitTime; + int remainingInitLimitTime; long startNanoTime = nanoTime(); for (int tries = 0; tries < 5; tries++) { @@ -286,7 +295,7 @@ public class Learner { this.sock = createSocket(); } } - Thread.sleep(1000); + Thread.sleep(leaderConnectDelayDuringRetryMs); } self.authLearner.authenticate(sock, hostname); @@ -309,8 +318,8 @@ public class Learner { } /** - * Once connected to the leader, perform the handshake protocol to - * establish a following / observing connection. + * Once connected to the leader or learner master, perform the handshake + * protocol to establish a following / observing connection. * @param pktType * @return the zxid the Leader sends for synchronization purposes. * @throws IOException @@ -369,7 +378,8 @@ public class Learner { } /** - * Finally, synchronize our history with the Leader. + * Finally, synchronize our history with the Leader (if Follower) + * or the LearnerMaster (if Observer). * @param newLeaderZxid * @throws IOException * @throws InterruptedException http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java index bc84916..78429e0 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java @@ -21,9 +21,7 @@ package org.apache.zookeeper.server.quorum; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.net.Socket; import java.nio.ByteBuffer; @@ -37,8 +35,6 @@ import javax.security.sasl.SaslException; import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; -import org.apache.jute.Record; -import org.apache.zookeeper.KeeperException.SessionExpiredException; import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.ServerMetrics; @@ -48,6 +44,7 @@ import org.apache.zookeeper.server.ZooKeeperThread; import org.apache.zookeeper.server.ZooTrace; import org.apache.zookeeper.server.quorum.Leader.Proposal; import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; +import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer; import org.apache.zookeeper.server.util.SerializeUtils; import org.apache.zookeeper.server.util.ZxidUtils; import org.apache.zookeeper.txn.TxnHeader; @@ -68,7 +65,7 @@ public class LearnerHandler extends ZooKeeperThread { return sock; } - final Leader leader; + final LearnerMaster learnerMaster; /** Deadline for receiving the next ack. If we are bootstrapping then * it's based on the initLimit, if we are done bootstrapping it's based @@ -85,6 +82,10 @@ public class LearnerHandler extends ZooKeeperThread { return sid; } + String getRemoteAddress() { + return sock == null ? "<null>" : sock.getRemoteSocketAddress().toString(); + } + protected int version = 0x1; int getVersion() { @@ -147,7 +148,7 @@ public class LearnerHandler extends ZooKeeperThread { return true; } else { long msDelay = (time - currentTime) / 1000000; - return (msDelay < (leader.self.tickTime * leader.self.syncLimit)); + return (msDelay < learnerMaster.syncTimeout()); } } }; @@ -167,7 +168,7 @@ public class LearnerHandler extends ZooKeeperThread { private volatile boolean sendingThreadStarted = false; /** - * For testing purpose, force leader to use snapshot to sync with followers + * For testing purpose, force learnerMaster to use snapshot to sync with followers */ public static final String FORCE_SNAP_SYNC = "zookeeper.forceSnapshotSync"; private boolean forceSnapSync = false; @@ -183,10 +184,10 @@ public class LearnerHandler extends ZooKeeperThread { */ private long leaderLastZxid; - LearnerHandler(Socket sock, BufferedInputStream bufferedInput,Leader leader) throws IOException { + LearnerHandler(Socket sock, BufferedInputStream bufferedInput, LearnerMaster learnerMaster) throws IOException { super("LearnerHandler-" + sock.getRemoteSocketAddress()); this.sock = sock; - this.leader = leader; + this.learnerMaster = learnerMaster; this.bufferedInput = bufferedInput; if (Boolean.getBoolean(FORCE_SNAP_SYNC)) { @@ -195,9 +196,9 @@ public class LearnerHandler extends ZooKeeperThread { } try { - if (leader.self != null) { - leader.self.authServer.authenticate(sock, - new DataInputStream(bufferedInput)); + QuorumAuthServer authServer = learnerMaster.getQuorumAuthServer(); + if (authServer != null) { + authServer.authenticate(sock, new DataInputStream(bufferedInput)); } } catch (IOException e) { LOG.error("Server failed to authenticate quorum learner, addr: {}, closing connection", @@ -368,9 +369,8 @@ public class LearnerHandler extends ZooKeeperThread { @Override public void run() { try { - leader.addLearnerHandler(this); - tickOfNextAckDeadline = leader.self.tick.get() - + leader.self.initLimit + leader.self.syncLimit; + learnerMaster.addLearnerHandler(this); + tickOfNextAckDeadline = learnerMaster.getTickOfInitialAckDeadline(); ia = BinaryInputArchive.getArchive(bufferedInput); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); @@ -384,6 +384,9 @@ public class LearnerHandler extends ZooKeeperThread { return; } + if (learnerMaster instanceof ObserverMaster && qp.getType() != Leader.OBSERVERINFO) { + throw new IOException("Non observer attempting to connect to ObserverMaster. type = " + qp.getType()); + } byte learnerInfoData[] = qp.getData(); if (learnerInfoData != null) { ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData); @@ -395,31 +398,34 @@ public class LearnerHandler extends ZooKeeperThread { } if (learnerInfoData.length >= 20) { long configVersion = bbsid.getLong(); - if (configVersion > leader.self.getQuorumVerifier().getVersion()) { + if (configVersion > learnerMaster.getQuorumVerifierVersion()) { throw new IOException("Follower is ahead of the leader (has a later activated configuration)"); } } } else { - this.sid = leader.followerCounter.getAndDecrement(); + this.sid = learnerMaster.getAndDecrementFollowerCounter(); } - if (leader.self.getView().containsKey(this.sid)) { - LOG.info("Follower sid: " + this.sid + " : info : " - + leader.self.getView().get(this.sid).toString()); + String followerInfo = learnerMaster.getPeerInfo(this.sid); + if (followerInfo.isEmpty()) { + LOG.info("Follower sid: " + this.sid + " not in the current config " + + Long.toHexString(learnerMaster.getQuorumVerifierVersion())); } else { - LOG.info("Follower sid: " + this.sid + " not in the current config " + Long.toHexString(leader.self.getQuorumVerifier().getVersion())); + LOG.info("Follower sid: " + this.sid + " : info : " + followerInfo); } if (qp.getType() == Leader.OBSERVERINFO) { learnerType = LearnerType.OBSERVER; } + learnerMaster.registerLearnerHandlerBean(this, sock); + long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); long peerLastZxid; StateSummary ss = null; long zxid = qp.getZxid(); - long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch); + long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch); long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0); if (this.getVersion() < 0x10000) { @@ -427,7 +433,7 @@ public class LearnerHandler extends ZooKeeperThread { long epoch = ZxidUtils.getEpochFromZxid(zxid); ss = new StateSummary(epoch, zxid); // fake the message - leader.waitForEpochAck(this.getSid(), ss); + learnerMaster.waitForEpochAck(this.getSid(), ss); } else { byte ver[] = new byte[4]; ByteBuffer.wrap(ver).putInt(0x10000); @@ -443,21 +449,21 @@ public class LearnerHandler extends ZooKeeperThread { } ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData()); ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid()); - leader.waitForEpochAck(this.getSid(), ss); + learnerMaster.waitForEpochAck(this.getSid(), ss); } peerLastZxid = ss.getLastZxid(); // Take any necessary action if we need to send TRUNC or DIFF // startForwarding() will be called in all cases - boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader); + boolean needSnap = syncFollower(peerLastZxid, learnerMaster); /* if we are not truncating or sending a diff just send a snapshot */ if (needSnap) { boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER; LearnerSnapshot snapshot = - leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle); + learnerMaster.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle); try { - long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid(); + long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid(); oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet"); bufferedOutput.flush(); @@ -470,7 +476,7 @@ public class LearnerHandler extends ZooKeeperThread { snapshot.getConcurrentSnapshotNumber(), snapshot.isEssential() ? "exempt" : "not exempt"); // Dump data to peer - leader.zk.getZKDatabase().serializeSnapshot(oa); + learnerMaster.getZKDatabase().serializeSnapshot(oa); oa.writeString("BenWasHere", "signature"); bufferedOutput.flush(); } finally { @@ -492,8 +498,7 @@ public class LearnerHandler extends ZooKeeperThread { oa.writeRecord(newLeaderQP, "packet"); } else { QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, - newLeaderZxid, leader.self.getLastSeenQuorumVerifier() - .toString().getBytes(), null); + newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null); queuedPackets.add(newLeaderQP); } bufferedOutput.flush(); @@ -503,7 +508,7 @@ public class LearnerHandler extends ZooKeeperThread { /* * Have to wait for the first ACK, wait until - * the leader is ready, and only then we can + * the learnerMaster is ready, and only then we can * start processing messages. */ qp = new QuorumPacket(); @@ -517,21 +522,18 @@ public class LearnerHandler extends ZooKeeperThread { if(LOG.isDebugEnabled()){ LOG.debug("Received NEWLEADER-ACK message from " + sid); } - leader.waitForNewLeaderAck(getSid(), qp.getZxid()); + learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid()); syncLimitCheck.start(); // now that the ack has been processed expect the syncLimit - sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit); + sock.setSoTimeout(learnerMaster.syncTimeout()); /* - * Wait until leader starts up + * Wait until learnerMaster starts up */ - synchronized(leader.zk){ - while(!leader.zk.isRunning() && !this.isInterrupted()){ - leader.zk.wait(20); - } - } + learnerMaster.waitForStartup(); + // Mutation packets will be queued during the serialize, // so we need to mark when the peer can actually start // using the data @@ -550,7 +552,7 @@ public class LearnerHandler extends ZooKeeperThread { if (LOG.isTraceEnabled()) { ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp); } - tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit; + tickOfNextAckDeadline = learnerMaster.getTickOfNextAckDeadline(); ByteBuffer bb; @@ -566,7 +568,7 @@ public class LearnerHandler extends ZooKeeperThread { } } syncLimitCheck.updateAck(qp.getZxid()); - leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress()); + learnerMaster.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress()); break; case Leader.PING: // Process the touches @@ -576,38 +578,11 @@ public class LearnerHandler extends ZooKeeperThread { while (dis.available() > 0) { long sess = dis.readLong(); int to = dis.readInt(); - leader.zk.touch(sess, to); + learnerMaster.touch(sess, to); } break; case Leader.REVALIDATE: - bis = new ByteArrayInputStream(qp.getData()); - dis = new DataInputStream(bis); - long id = dis.readLong(); - int to = dis.readInt(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(bos); - dos.writeLong(id); - boolean valid = leader.zk.checkIfValidGlobalSession(id, to); - if (valid) { - try { - //set the session owner - // as the follower that - // owns the session - leader.zk.setOwner(id, this); - } catch (SessionExpiredException e) { - LOG.error("Somehow session " + Long.toHexString(id) + - " expired right after being renewed! (impossible)", e); - } - } - if (LOG.isTraceEnabled()) { - ZooTrace.logTraceMessage(LOG, - ZooTrace.SESSION_TRACE_MASK, - "Session 0x" + Long.toHexString(id) - + " is valid: "+ valid); - } - dos.writeBoolean(valid); - qp.setData(bos.toByteArray()); - queuedPackets.add(qp); + learnerMaster.revalidateSession(qp, this); break; case Leader.REQUEST: bb = ByteBuffer.wrap(qp.getData()); @@ -622,7 +597,7 @@ public class LearnerHandler extends ZooKeeperThread { si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo()); } si.setOwner(this); - leader.zk.submitLearnerRequest(si); + learnerMaster.submitLearnerRequest(si); break; default: LOG.warn("unexpected quorum packet, type: {}", packetToString(qp)); @@ -646,9 +621,7 @@ public class LearnerHandler extends ZooKeeperThread { } catch (SnapshotThrottleException e) { LOG.error("too many concurrent snapshots: " + e); } finally { - LOG.warn("******* GOODBYE " - + (sock != null ? sock.getRemoteSocketAddress() : "<null>") - + " ********"); + LOG.warn("******* GOODBYE {} ********", getRemoteAddress()); shutdown(); } } @@ -681,19 +654,18 @@ public class LearnerHandler extends ZooKeeperThread { * and setup follower to receive packets from commit processor * * @param peerLastZxid - * @param db - * @param leader + * @param learnerMaster * @return true if snapshot transfer is needed. */ - public boolean syncFollower(long peerLastZxid, ZKDatabase db, Leader leader) { + boolean syncFollower(long peerLastZxid, LearnerMaster learnerMaster) { /* * When leader election is completed, the leader will set its * lastProcessedZxid to be (epoch < 32). There will be no txn associated * with this zxid. * * The learner will set its lastProcessedZxid to the same value if - * it get DIFF or SNAP from the leader. If the same learner come - * back to sync with leader using this zxid, we will never find this + * it get DIFF or SNAP from the learnerMaster. If the same learner come + * back to sync with learnerMaster using this zxid, we will never find this * zxid in our history. In this case, we will ignore TRUNC logic and * always send DIFF if we have old enough history */ @@ -701,6 +673,7 @@ public class LearnerHandler extends ZooKeeperThread { // Keep track of the latest zxid which already queued long currentZxid = peerLastZxid; boolean needSnap = true; + ZKDatabase db = learnerMaster.getZKDatabase(); boolean txnLogSyncEnabled = db.isTxnLogSyncEnabled(); ReentrantReadWriteLock lock = db.getLogLock(); ReadLock rl = lock.readLock(); @@ -721,7 +694,7 @@ public class LearnerHandler extends ZooKeeperThread { if (db.getCommittedLog().isEmpty()) { /* * It is possible that committedLog is empty. In that case - * setting these value to the latest txn in leader db + * setting these value to the latest txn in learnerMaster db * will reduce the case that we need to handle * * Here is how each case handle by the if block below @@ -737,7 +710,7 @@ public class LearnerHandler extends ZooKeeperThread { * Here are the cases that we want to handle * * 1. Force sending snapshot (for testing purpose) - * 2. Peer and leader is already sync, send empty diff + * 2. Peer and learnerMaster is already sync, send empty diff * 3. Follower has txn that we haven't seen. This may be old leader * so we need to send TRUNC. However, if peer has newEpochZxid, * we cannot send TRUNC since the follower has no txnlog @@ -750,7 +723,7 @@ public class LearnerHandler extends ZooKeeperThread { */ if (forceSnapSync) { - // Force leader to use snapshot to sync with follower + // Force learnerMaster to use snapshot to sync with follower LOG.warn("Forcing snapshot sync - should not see this in production"); } else if (lastProcessedZxid == peerLastZxid) { // Follower is already sync with us, send empty diff @@ -811,9 +784,12 @@ public class LearnerHandler extends ZooKeeperThread { Long.toHexString(peerLastZxid), txnLogSyncEnabled); } + if (needSnap) { + currentZxid = db.getDataTreeLastProcessedZxid(); + } LOG.debug("Start forwarding 0x" + Long.toHexString(currentZxid) + " for peer sid: " + getSid()); - leaderLastZxid = leader.startForwarding(this, currentZxid); + leaderLastZxid = learnerMaster.startForwarding(this, currentZxid); } finally { rl.unlock(); } @@ -884,7 +860,7 @@ public class LearnerHandler extends ZooKeeperThread { queueOpPacket(Leader.DIFF, lastCommittedZxid); needOpPacket = false; } else if (packetZxid > peerLastZxid ) { - // Peer have some proposals that the leader hasn't seen yet + // Peer have some proposals that the learnerMaster hasn't seen yet // it may used to be a leader if (ZxidUtils.getEpochFromZxid(packetZxid) != ZxidUtils.getEpochFromZxid(peerLastZxid)) { @@ -947,7 +923,8 @@ public class LearnerHandler extends ZooKeeperThread { LOG.warn("Ignoring unexpected exception during socket close", e); } this.interrupt(); - leader.removeLearnerHandler(this); + learnerMaster.removeLearnerHandler(this); + learnerMaster.unregisterLearnerHandlerBean(this); } public long tickOfNextAckDeadline() { @@ -955,7 +932,7 @@ public class LearnerHandler extends ZooKeeperThread { } /** - * ping calls from the leader to the peers + * ping calls from the learnerMaster to the peers */ public void ping() { // If learner hasn't sync properly yet, don't send ping packet @@ -965,9 +942,7 @@ public class LearnerHandler extends ZooKeeperThread { } long id; if (syncLimitCheck.check(System.nanoTime())) { - synchronized(leader) { - id = leader.lastProposed; - } + id = learnerMaster.getLastProposed(); QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null); queuePacket(ping); } else { @@ -990,9 +965,18 @@ public class LearnerHandler extends ZooKeeperThread { queuedPackets.add(p); } + static long packetSize(QuorumPacket p) { + /* Approximate base size of QuorumPacket: int + long + byte[] + List */ + long size = 4 + 8 + 8 + 8; + byte[] data = p.getData(); + if (data != null) { + size += data.length; + } + return size; + } + public boolean synced() { - return isAlive() - && leader.self.tick.get() <= tickOfNextAckDeadline; + return isAlive() && learnerMaster.getCurrentTick() <= tickOfNextAckDeadline; } /** http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandlerBean.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandlerBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandlerBean.java new file mode 100644 index 0000000..a97a880 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandlerBean.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.server.quorum; + +import org.apache.zookeeper.jmx.MBeanRegistry; +import org.apache.zookeeper.jmx.ZKMBeanInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.management.ObjectName; +import java.net.InetSocketAddress; +import java.net.Socket; + +public class LearnerHandlerBean implements LearnerHandlerMXBean, ZKMBeanInfo{ + private static final Logger LOG = LoggerFactory.getLogger(LearnerHandlerBean.class); + + private final LearnerHandler learnerHandler; + private final String remoteAddr; + + public LearnerHandlerBean(final LearnerHandler learnerHandler, final Socket socket) { + this.learnerHandler = learnerHandler; + InetSocketAddress sockAddr = (InetSocketAddress) socket.getRemoteSocketAddress(); + if (sockAddr == null) { + this.remoteAddr = "Unknown"; + } else { + this.remoteAddr = sockAddr.getAddress().getHostAddress() + ":" + sockAddr.getPort(); + } + } + + @Override + public String getName() { + return MBeanRegistry.getInstance().makeFullPath("Learner_Connections", ObjectName.quote(remoteAddr), + String.format("\"id:%d\"", learnerHandler.getSid())); + } + + @Override + public boolean isHidden() { + return false; + } + + @Override + public void terminateConnection() { + LOG.info("terminating learner handler connection on demand " + toString()); + learnerHandler.shutdown(); + } + + @Override + public String toString() { + return "LearnerHandlerBean{remoteIP=" + remoteAddr + ",ServerId=" + learnerHandler.getSid() + "}"; + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandlerMXBean.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandlerMXBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandlerMXBean.java new file mode 100644 index 0000000..3d85a53 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandlerMXBean.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.server.quorum; + +/** + * This MBean represents a server connection for a learner. + */ +public interface LearnerHandlerMXBean { + /** + * Terminate the connection. The learner will attempt to reconnect to + * the leader or to the next ObserverMaster if that feature is enabled + */ + public void terminateConnection(); +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerMaster.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerMaster.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerMaster.java new file mode 100644 index 0000000..3bffacf --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerMaster.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.ZKDatabase; +import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer; + +import java.io.IOException; +import java.net.Socket; +import java.net.SocketAddress; + +/** + * interface for keeping Observers in sync + */ +public interface LearnerMaster { + /** + * start tracking a learner handler + * @param learnerHandler to track + */ + void addLearnerHandler(LearnerHandler learnerHandler); + + /** + * stop tracking a learner handler + * @param learnerHandler to drop + */ + void removeLearnerHandler(LearnerHandler learnerHandler); + + /** + * wait for the leader of the new epoch to be confirmed by followers + * @param sid learner id + * @param ss + * @throws IOException + * @throws InterruptedException + */ + void waitForEpochAck(long sid, StateSummary ss) throws IOException, InterruptedException; + + /** + * snapshot throttler + * @return snapshot throttler + */ + LearnerSnapshotThrottler getLearnerSnapshotThrottler(); + + /** + * wait for server to start + * @throws InterruptedException + */ + void waitForStartup() throws InterruptedException; + + /** + * get the first zxid of the next epoch + * @param sid learner id + * @param lastAcceptedEpoch + * @return + * @throws InterruptedException + * @throws IOException + */ + long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException; + + /** + * ZKDatabase + * @return ZKDatabase + */ + ZKDatabase getZKDatabase(); + + /** + * wait for new leader to settle + * @param sid id of learner + * @param zxid zxid at learner + * @throws InterruptedException + */ + void waitForNewLeaderAck(long sid, long zxid) throws InterruptedException; + + /** + * last proposed zxid + * @return last proposed zxid + */ + long getLastProposed(); + + /** + * the current tick + * @return the current tick + */ + int getCurrentTick(); + + /** + * time allowed for sync response + * @return time allowed for sync response + */ + int syncTimeout(); + + /** + * deadline tick marking observer sync (initial) + * @return deadline tick marking observer sync (initial) + */ + int getTickOfNextAckDeadline(); + + /** + * next deadline tick marking observer sync (steady state) + * @return next deadline tick marking observer sync (steady state) + */ + int getTickOfInitialAckDeadline(); + + /** + * decrement follower count + * @return previous follower count + */ + long getAndDecrementFollowerCounter(); + + /** + * handle ack packet + * @param sid leader id + * @param zxid packet zxid + * @param localSocketAddress forwarder's address + */ + void processAck(long sid, long zxid, SocketAddress localSocketAddress); + + /** + * mark session as alive + * @param sess session id + * @param to timeout + */ + void touch(long sess, int to); + + /** + * handle revalidate packet + * @param qp session packet + * @param learnerHandler learner + * @throws IOException + */ + void revalidateSession(QuorumPacket qp, LearnerHandler learnerHandler) throws IOException; + + /** + * proxy request from learner to server + * @param si request + */ + void submitLearnerRequest(Request si); + + /** + * begin forwarding packets to learner handler + * @param learnerHandler learner + * @param lastSeenZxid zxid of learner + * @return last zxid forwarded + */ + long startForwarding(LearnerHandler learnerHandler, long lastSeenZxid); + + /** + * version of current quorum verifier + * @return version of current quorum verifier + */ + long getQuorumVerifierVersion(); + + /** + * + * @param sid server id + * @return server information in the view + */ + String getPeerInfo(long sid); + + /** + * identifier of current quorum verifier for new leader + * @return identifier of current quorum verifier for new leader + */ + byte[] getQuorumVerifierBytes(); + + QuorumAuthServer getQuorumAuthServer(); + + /** + * registers the handler's bean + * @param learnerHandler handler + * @param socket connection to learner + */ + void registerLearnerHandlerBean(final LearnerHandler learnerHandler, Socket socket); + + /** + * unregisters the handler's bean + * @param learnerHandler handler + */ + void unregisterLearnerHandlerBean(final LearnerHandler learnerHandler); +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java index f0f724e..b688e03 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java @@ -20,6 +20,7 @@ package org.apache.zookeeper.server.quorum; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicReference; import org.apache.jute.Record; import org.apache.zookeeper.server.ObserverBean; @@ -29,6 +30,9 @@ import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.apache.zookeeper.server.util.SerializeUtils; import org.apache.zookeeper.txn.SetDataTxn; import org.apache.zookeeper.txn.TxnHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Observers are peers that do not take part in the atomic broadcast protocol. @@ -41,6 +45,31 @@ import org.apache.zookeeper.txn.TxnHeader; */ public class Observer extends Learner{ + private static final Logger LOG = LoggerFactory.getLogger(Observer.class); + + /** + * When observer lost its connection with the leader, it waits for 0 to the + * specified value before trying to reconnect with the leader. So that + * the entire observer fleet won't try to run leader election and reconnect + * to the leader at once. Default value is zero. + */ + public static final String OBSERVER_RECONNECT_DELAY_MS = + "zookeeper.observer.reconnectDelayMs"; + + private static final long reconnectDelayMs; + + static { + reconnectDelayMs = Long.getLong(OBSERVER_RECONNECT_DELAY_MS, 0); + LOG.info(OBSERVER_RECONNECT_DELAY_MS + " = " + reconnectDelayMs); + } + + /** + * next learner master to try, when specified + */ + private final static AtomicReference<QuorumPeer.QuorumServer> nextLearnerMaster = new AtomicReference<>(); + + private QuorumPeer.QuorumServer currentLearnerMaster = null; + Observer(QuorumPeer self,ObserverZooKeeperServer observerZooKeeperServer) { this.self = self; this.zk=observerZooKeeperServer; @@ -63,17 +92,16 @@ public class Observer extends Learner{ zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean); try { - QuorumServer leaderServer = findLeader(); - LOG.info("Observing " + leaderServer.addr); + QuorumServer master = findLearnerMaster(); try { - connectToLeader(leaderServer.addr, leaderServer.hostname); + connectToLeader(master.addr, master.hostname); long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO); if (self.isReconfigStateChange()) throw new Exception("learned about role change"); syncWithLeader(newLeaderZxid); QuorumPacket qp = new QuorumPacket(); - while (this.isRunning()) { + while (this.isRunning() && nextLearnerMaster.get() == null) { readPacket(qp); processPacket(qp); } @@ -89,10 +117,29 @@ public class Observer extends Learner{ pendingRevalidations.clear(); } } finally { + currentLearnerMaster = null; zk.unregisterJMX(this); } } + private QuorumServer findLearnerMaster() { + QuorumPeer.QuorumServer prescribedLearnerMaster = nextLearnerMaster.getAndSet(null); + if (prescribedLearnerMaster != null && self.validateLearnerMaster(Long.toString(prescribedLearnerMaster.id)) == null) { + LOG.warn("requested next learner master {} is no longer valid", prescribedLearnerMaster); + prescribedLearnerMaster = null; + } + final QuorumPeer.QuorumServer master = (prescribedLearnerMaster == null) ? + self.findLearnerMaster(findLeader()) : + prescribedLearnerMaster; + currentLearnerMaster = master; + if (master == null) { + LOG.warn("No learner master found"); + } else { + LOG.info("Observing new leader sid={} addr={}", master.id, master.addr); + } + return master; + } + /** * Controls the response of an observer to the receipt of a quorumpacket * @param qp @@ -162,5 +209,48 @@ public class Observer extends Learner{ LOG.info("shutdown called", new Exception("shutdown Observer")); super.shutdown(); } + + static void waitForReconnectDelay() { + if (reconnectDelayMs > 0) { + long randomDelay = (long) (reconnectDelayMs * Math.random()); + LOG.info("Waiting for " + randomDelay + + " ms before reconnecting with the leader"); + try { + Thread.sleep(randomDelay); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting" + e.getMessage()); + } + } + } + + public long getLearnerMasterId() { + QuorumPeer.QuorumServer current = currentLearnerMaster; + return current == null ? -1 : current.id; + } + + /** + * Prompts the Observer to disconnect from its current learner master and reconnect + * to the specified server. If that connection attempt fails, the Observer will + * fail over to the next available learner master. + */ + public boolean setLearnerMaster(String learnerMaster) { + final QuorumPeer.QuorumServer server = self.validateLearnerMaster(learnerMaster); + if (server == null) { + return false; + } else if (server.equals(currentLearnerMaster)) { + LOG.info("Already connected to requested learner master sid={} addr={}", + server.id, server.addr); + return true; + } else { + LOG.info("Requesting disconnect and reconnect to new learner master sid={} addr={}", + server.id, server.addr); + nextLearnerMaster.set(server); + return true; + } + } + + public QuorumPeer.QuorumServer getCurrentLearnerMaster() { + return currentLearnerMaster; + } } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMXBean.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMXBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMXBean.java index 2c1799a..5145e72 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMXBean.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMXBean.java @@ -34,4 +34,16 @@ public interface ObserverMXBean extends ZooKeeperServerMXBean { * @return socket address */ public String getQuorumAddress(); + + /** + * @return address of the current learner master + */ + public String getLearnerMaster(); + + /** + * requests the Observer switch to a new learner master + * + * @param learnerMaster address of the desired learner master + */ + public void setLearnerMaster(String learnerMaster); }
