http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java new file mode 100644 index 0000000..07de57b --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java @@ -0,0 +1,532 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import org.apache.zookeeper.jmx.MBeanRegistry; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.ZKDatabase; + +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Used by Followers to host Observers. This reduces the network load on the Leader process by pushing + * the responsibility for keeping Observers in sync off the leading peer. + * + * It is expected that Observers will continue to perform the initial vetting of clients and requests. + * Observers send the request to the follower where it is received by an ObserverMaster. + * + * The ObserverMaster forwards a copy of the request to the ensemble Leader and inserts it into its own + * request processor pipeline where it can be matched with the response comes back. All commits received + * from the Leader will be forwarded along to every Learner connected to the ObserverMaster. + * + * New Learners connecting to a Follower will receive a LearnerHandler object and be party to its syncing logic + * to be brought up to date. + * + * The logic is quite a bit simpler than the corresponding logic in Leader because it only hosts observers. + */ +public class ObserverMaster implements LearnerMaster, Runnable { + private static final Logger LOG = LoggerFactory.getLogger(ObserverMaster.class); + + //Follower counter + private final AtomicLong followerCounter = new AtomicLong(-1); + + private QuorumPeer self; + private FollowerZooKeeperServer zks; + private int port; + + private Set<LearnerHandler> activeObservers = Collections.newSetFromMap(new ConcurrentHashMap<LearnerHandler,Boolean>()); + + private final ConcurrentHashMap<LearnerHandler, LearnerHandlerBean> connectionBeans = new ConcurrentHashMap<>(); + + /** + * we want to keep a log of past txns so that observers can sync up with us when we connect, + * but we can't keep everything in memory, so this limits how much memory will be dedicated + * to keeping recent txns. + */ + private final static int PKTS_SIZE_LIMIT = 32 * 1024 * 1024; + private static volatile int pktsSizeLimit = Integer.getInteger("zookeeper.observerMaster.sizeLimit", PKTS_SIZE_LIMIT); + private ConcurrentLinkedQueue<QuorumPacket> proposedPkts = new ConcurrentLinkedQueue<>(); + private ConcurrentLinkedQueue<QuorumPacket> committedPkts = new ConcurrentLinkedQueue<>(); + private int pktsSize = 0; + + private long lastProposedZxid; + + // ensure ordering of revalidations returned to this learner + private final Object revalidateSessionLock = new Object(); + + // Throttle when there are too many concurrent snapshots being sent to observers + private static final String MAX_CONCURRENT_SNAPSHOTS = "zookeeper.leader.maxConcurrentSnapshots"; + private static final int maxConcurrentSnapshots; + + private static final String MAX_CONCURRENT_DIFFS = "zookeeper.leader.maxConcurrentDiffs"; + private static final int maxConcurrentDiffs; + static { + maxConcurrentSnapshots = Integer.getInteger(MAX_CONCURRENT_SNAPSHOTS, 10); + LOG.info(MAX_CONCURRENT_SNAPSHOTS + " = " + maxConcurrentSnapshots); + + maxConcurrentDiffs = Integer.getInteger(MAX_CONCURRENT_DIFFS, 100); + LOG.info(MAX_CONCURRENT_DIFFS + " = " + maxConcurrentDiffs); + } + + private final ConcurrentLinkedQueue<Revalidation> pendingRevalidations = new ConcurrentLinkedQueue<>(); + static class Revalidation { + public final long sessionId; + public final int timeout; + public final LearnerHandler handler; + + Revalidation(final Long sessionId, final int timeout, final LearnerHandler handler) { + this.sessionId = sessionId; + this.timeout = timeout; + this.handler = handler; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + final Revalidation that = (Revalidation) o; + return sessionId == that.sessionId && timeout == that.timeout && handler.equals(that.handler); + } + + @Override + public int hashCode() { + int result = (int) (sessionId ^ (sessionId >>> 32)); + result = 31 * result + timeout; + result = 31 * result + handler.hashCode(); + return result; + } + } + + private final LearnerSnapshotThrottler learnerSnapshotThrottler = + new LearnerSnapshotThrottler(maxConcurrentSnapshots); + + private Thread thread; + private ServerSocket ss; + private boolean listenerRunning; + private ScheduledExecutorService pinger; + + Runnable ping = new Runnable() { + @Override + public void run() { + for (LearnerHandler lh: activeObservers) { + lh.ping(); + } + } + }; + + ObserverMaster(QuorumPeer self, FollowerZooKeeperServer zks, int port) { + this.self = self; + this.zks = zks; + this.port = port; + } + + @Override + public void addLearnerHandler(LearnerHandler learnerHandler) { + if (!listenerRunning) { + throw new RuntimeException(("ObserverMaster is not running")); + } + } + + @Override + public void removeLearnerHandler(LearnerHandler learnerHandler) { + activeObservers.remove(learnerHandler); + } + + @Override + public int syncTimeout() { + return self.getSyncLimit() * self.getTickTime(); + } + + @Override + public int getTickOfNextAckDeadline() { + return self.tick.get() + self.syncLimit; + } + + @Override + public int getTickOfInitialAckDeadline() { + return self.tick.get() + self.initLimit + self.syncLimit; + } + + @Override + public long getAndDecrementFollowerCounter() { + return followerCounter.getAndDecrement(); + } + + @Override + public void waitForEpochAck(long sid, StateSummary ss) throws IOException, InterruptedException { + // since this is done by an active follower, we don't need to wait for anything + } + + @Override + public LearnerSnapshotThrottler getLearnerSnapshotThrottler() { + return learnerSnapshotThrottler; + } + + @Override + public void waitForStartup() throws InterruptedException { + // since this is done by an active follower, we don't need to wait for anything + } + + @Override + synchronized public long getLastProposed() { + return lastProposedZxid; + } + + @Override + public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException { + return self.getCurrentEpoch(); + } + + @Override + public ZKDatabase getZKDatabase() { + return zks.getZKDatabase(); + } + + @Override + public void waitForNewLeaderAck(long sid, long zxid) throws InterruptedException { + // no need to wait since we are a follower + } + + @Override + public int getCurrentTick() { + return self.tick.get(); + } + + @Override + public void processAck(long sid, long zxid, SocketAddress localSocketAddress) { + if ((zxid & 0xffffffffL) == 0) { + /* + * We no longer process NEWLEADER ack by this method. However, + * the learner sends ack back to the leader after it gets UPTODATE + * so we just ignore the message. + */ + return; + } + + throw new RuntimeException("Observers shouldn't send ACKS ack = " + Long.toHexString(zxid)); + } + + @Override + public void touch(long sess, int to) { + zks.getSessionTracker().touchSession(sess, to); + } + + boolean revalidateLearnerSession(QuorumPacket qp) throws IOException { + ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData()); + DataInputStream dis = new DataInputStream(bis); + long id = dis.readLong(); + boolean valid = dis.readBoolean(); + Iterator<Revalidation> itr = pendingRevalidations.iterator(); + if (!itr.hasNext()) { + // not a learner session, handle locally + return false; + } + Revalidation revalidation = itr.next(); + if (revalidation.sessionId != id) { + // not a learner session, handle locally + return false; + } + itr.remove(); + LearnerHandler learnerHandler = revalidation.handler; + // create a copy here as the qp object is reused by the Follower and may be mutated + QuorumPacket deepCopy = new QuorumPacket(qp.getType(), qp.getZxid(), + Arrays.copyOf(qp.getData(), qp.getData().length), + qp.getAuthinfo() == null ? null : new ArrayList<>(qp.getAuthinfo())); + learnerHandler.queuePacket(deepCopy); + // To keep consistent as leader, touch the session when it's + // revalidating the session, only update if it's a valid session. + if (valid) { + touch(revalidation.sessionId, revalidation.timeout); + } + return true; + } + + @Override + public void revalidateSession(QuorumPacket qp, LearnerHandler learnerHandler) throws IOException { + ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData()); + DataInputStream dis = new DataInputStream(bis); + long id = dis.readLong(); + int to = dis.readInt(); + synchronized (revalidateSessionLock) { + pendingRevalidations.add(new Revalidation(id, to, learnerHandler)); + Learner learner = zks.getLearner(); + if (learner != null) { + learner.writePacket(qp, true); + } + } + } + + @Override + public void submitLearnerRequest(Request si) { + zks.processObserverRequest(si); + } + + @Override + synchronized public long startForwarding(LearnerHandler learnerHandler, long lastSeenZxid) { + Iterator<QuorumPacket> itr = committedPkts.iterator(); + if (itr.hasNext()) { + QuorumPacket packet = itr.next(); + if (packet.getZxid() > lastSeenZxid + 1) { + LOG.error("LearnerHandler is too far behind ({} < {}), disconnecting {} at {}", Long.toHexString(lastSeenZxid + 1), + Long.toHexString(packet.getZxid()), learnerHandler.getSid(), learnerHandler.getRemoteAddress()); + learnerHandler.shutdown(); + return -1; + } else if (packet.getZxid() == lastSeenZxid + 1) { + learnerHandler.queuePacket(packet); + } + long queueHeadZxid = packet.getZxid(); + long queueBytesUsed = LearnerHandler.packetSize(packet); + while (itr.hasNext()) { + packet = itr.next(); + if (packet.getZxid() <= lastSeenZxid) { + continue; + } + learnerHandler.queuePacket(packet); + queueBytesUsed += LearnerHandler.packetSize(packet); + } + LOG.info("finished syncing observer from retained commit queue: sid {}, " + + "queue head 0x{}, queue tail 0x{}, sync position 0x{}, num packets used {}, " + + "num bytes used {}", + learnerHandler.getSid(), + Long.toHexString(queueHeadZxid), + Long.toHexString(packet.getZxid()), + Long.toHexString(lastSeenZxid), + packet.getZxid() - lastSeenZxid, + queueBytesUsed); + } + activeObservers.add(learnerHandler); + return lastProposedZxid; + } + + @Override + public long getQuorumVerifierVersion() { + return self.getQuorumVerifier().getVersion(); + } + + @Override + public String getPeerInfo(long sid) { + QuorumPeer.QuorumServer server = self.getView().get(sid); + return server == null ? "" : server.toString(); + } + + @Override + public byte[] getQuorumVerifierBytes() { + return self.getLastSeenQuorumVerifier().toString().getBytes(); + } + + @Override + public QuorumAuthServer getQuorumAuthServer() { + return (self == null) ? null : self.authServer; + } + + void proposalReceived(QuorumPacket qp) { + proposedPkts.add(new QuorumPacket(Leader.INFORM, qp.getZxid(), qp.getData(), null)); + } + + private synchronized QuorumPacket removeProposedPacket(long zxid) { + QuorumPacket pkt = proposedPkts.peek(); + if (pkt == null || pkt.getZxid() > zxid) { + LOG.debug("ignore missing proposal packet for {}", Long.toHexString(zxid)); + return null; + } + if (pkt.getZxid() != zxid) { + final String m = String.format("Unexpected proposal packet on commit ack, expected zxid 0x%d got zxid 0x%d", + zxid, pkt.getZxid()); + LOG.error(m); + throw new RuntimeException(m); + } + proposedPkts.remove(); + return pkt; + } + + private synchronized void cacheCommittedPacket(final QuorumPacket pkt) { + committedPkts.add(pkt); + pktsSize += LearnerHandler.packetSize(pkt); + // remove 5 packets for every one added as we near the size limit + for (int i = 0; pktsSize > pktsSizeLimit * 0.8 && i < 5; i++) { + QuorumPacket oldPkt = committedPkts.poll(); + if (oldPkt == null) { + pktsSize = 0; + break; + } + pktsSize -= LearnerHandler.packetSize(oldPkt); + } + // enforce the size limit as a hard cap + while (pktsSize > pktsSizeLimit) { + QuorumPacket oldPkt = committedPkts.poll(); + if (oldPkt == null) { + pktsSize = 0; + break; + } + pktsSize -= LearnerHandler.packetSize(oldPkt); + } + } + + private synchronized void sendPacket(final QuorumPacket pkt) { + for (LearnerHandler lh: activeObservers) { + lh.queuePacket(pkt); + } + lastProposedZxid = pkt.getZxid(); + } + + synchronized void proposalCommitted(long zxid) { + QuorumPacket pkt = removeProposedPacket(zxid); + if (pkt == null) { + return; + } + cacheCommittedPacket(pkt); + sendPacket(pkt); + } + + synchronized void informAndActivate(long zxid, long suggestedLeaderId) { + QuorumPacket pkt = removeProposedPacket(zxid); + if (pkt == null) { + return; + } + + // Build the INFORMANDACTIVATE packet + QuorumPacket informAndActivateQP = Leader.buildInformAndActivePacket( + zxid, suggestedLeaderId, pkt.getData()); + cacheCommittedPacket(informAndActivateQP); + sendPacket(informAndActivateQP); + } + + synchronized public void start() throws IOException { + if (thread != null && thread.isAlive()) { + return; + } + listenerRunning = true; + int backlog = 10; // dog science + if (self.shouldUsePortUnification() || self.isSslQuorum()) { + boolean allowInsecureConnection = self.shouldUsePortUnification(); + if (self.getQuorumListenOnAllIPs()) { + ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection, port, backlog); + } else { + ss = new UnifiedServerSocket( + self.getX509Util(), + allowInsecureConnection, + port, + backlog, + self.getQuorumAddress().getAddress()); + } + } else { + if (self.getQuorumListenOnAllIPs()) { + ss = new ServerSocket(port, backlog); + } else { + ss = new ServerSocket(port, backlog, self.getQuorumAddress().getAddress()); + } + } + thread = new Thread(this, "ObserverMaster"); + thread.start(); + pinger = Executors.newSingleThreadScheduledExecutor(); + pinger.scheduleAtFixedRate(ping, self.tickTime /2, self.tickTime/2, TimeUnit.MILLISECONDS); + } + + public void run() { + while (listenerRunning) { + try { + Socket s = ss.accept(); + // start with the initLimit, once the ack is processed + // in LearnerHandler switch to the syncLimit + s.setSoTimeout(self.tickTime * self.initLimit); + BufferedInputStream is = new BufferedInputStream(s.getInputStream()); + LearnerHandler lh = new LearnerHandler(s, is, this); + lh.start(); + } catch (Exception e) { + if (listenerRunning) { + LOG.debug("Ignoring accept exception (maybe shutting down)", e); + } else { + LOG.debug("Ignoring accept exception (maybe client closed)", e); + } + } + } + /* + * we don't need to close ss because we only got here because listenerRunning is + * false and that is set and then ss is closed() in stop() + */ + } + + synchronized public void stop() { + listenerRunning = false; + if (pinger != null) { + pinger.shutdownNow(); + } + if (ss != null) { + try { + ss.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + for (LearnerHandler lh: activeObservers) { + lh.shutdown(); + } + } + + int getNumActiveObservers() { + return activeObservers.size(); + } + + int getPktsSizeLimit() { + return pktsSizeLimit; + } + + static void setPktsSizeLimit(final int sizeLimit) { + pktsSizeLimit = sizeLimit; + } + + + @Override + public void registerLearnerHandlerBean(final LearnerHandler learnerHandler, Socket socket) { + LearnerHandlerBean bean = new LearnerHandlerBean(learnerHandler, socket); + if (zks.registerJMX(bean)) { + connectionBeans.put(learnerHandler, bean); + } + } + + @Override + public void unregisterLearnerHandlerBean(final LearnerHandler learnerHandler) { + LearnerHandlerBean bean = connectionBeans.remove(learnerHandler); + if (bean != null){ + MBeanRegistry.getInstance().unregister(bean); + } + } +}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java index 035db0c..9d36fe2 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -144,6 +144,16 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider } } + private int observerMasterPort; + + public int getObserverMasterPort() { + return observerMasterPort; + } + + public void setObserverMasterPort(int observerMasterPort) { + this.observerMasterPort = observerMasterPort; + } + public static class QuorumServer { public InetSocketAddress addr = null; @@ -1231,8 +1241,14 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider LOG.warn("Unexpected exception",e ); } finally { observer.shutdown(); - setObserver(null); - updateServerState(); + setObserver(null); + updateServerState(); + + // Add delay jitter before we switch to LOOKING + // state to reduce the load of ObserverMaster + if (isRunning()) { + Observer.waitForReconnectDelay(); + } } break; case FOLLOWING: @@ -1651,6 +1667,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider if (qs != null) { setAddrs(qs.addr, qs.electionAddr, qs.clientAddr); } + updateObserverMasterList(); return prevQV; } } @@ -1989,7 +2006,74 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider } } - private boolean updateLearnerType(QuorumVerifier newQV) { + private ArrayList<QuorumServer> observerMasters = new ArrayList<>(); + private void updateObserverMasterList() { + if (observerMasterPort <= 0) { + return; // observer masters not enabled + } + observerMasters.clear(); + StringBuilder sb = new StringBuilder(); + for (QuorumServer server : quorumVerifier.getVotingMembers().values()) { + InetSocketAddress addr = new InetSocketAddress(server.addr.getAddress(), observerMasterPort); + observerMasters.add(new QuorumServer(server.id, addr)); + sb.append(addr).append(","); + } + LOG.info("Updated learner master list to be {}", sb.toString()); + Collections.shuffle(observerMasters); + } + + private boolean useObserverMasters() { + return getLearnerType() == LearnerType.OBSERVER && observerMasters.size() > 0; + } + + private int nextObserverMaster = 0; + private QuorumServer nextObserverMaster() { + if (nextObserverMaster >= observerMasters.size()) { + nextObserverMaster = 0; + } + return observerMasters.get(nextObserverMaster++); + } + + QuorumServer findLearnerMaster(QuorumServer leader) { + return useObserverMasters() ? nextObserverMaster() : leader; + } + + /** + * Vet a given learner master's information. + * Allows specification by server id, ip only, or ip and port + */ + QuorumServer validateLearnerMaster(String desiredMaster) { + if (useObserverMasters()) { + Long sid; + try { + sid = Long.parseLong(desiredMaster); + } catch (NumberFormatException e) { + sid = null; + } + for (QuorumServer server : observerMasters) { + if (sid == null) { + String serverAddr = server.addr.getAddress().getHostAddress() + ':' + server.addr.getPort(); + if (serverAddr.startsWith(desiredMaster)) { + return server; + } + } else { + if (sid.equals(server.id)) { + return server; + } + } + } + if (sid == null) { + LOG.info("could not find learner master address={}", desiredMaster); + } else { + LOG.warn("could not find learner master sid={}", sid); + } + } else { + LOG.info("cannot validate request, observer masters not enabled"); + } + return null; + } + + private boolean updateLearnerType(QuorumVerifier newQV) { //check if I'm an observer in new config if (newQV.getObservingMembers().containsKey(getId())) { if (getLearnerType()!=LearnerType.OBSERVER){ http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java index aee5efc..2c4dd0d 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java @@ -71,6 +71,7 @@ public class QuorumPeerConfig { protected InetSocketAddress secureClientPortAddress; protected boolean sslQuorum = false; protected boolean shouldUsePortUnification = false; + protected int observerMasterPort; protected File dataDir; protected File dataLogDir; protected String dynamicConfigFileStr = null; @@ -239,6 +240,7 @@ public class QuorumPeerConfig { throws IOException, ConfigException { int clientPort = 0; int secureClientPort = 0; + int observerMasterPort = 0; String clientPortAddress = null; String secureClientPortAddress = null; VerifyingFileFactory vff = new VerifyingFileFactory.Builder(LOG).warnForRelativePath().build(); @@ -261,6 +263,8 @@ public class QuorumPeerConfig { secureClientPort = Integer.parseInt(value); } else if (key.equals("secureClientPortAddress")){ secureClientPortAddress = value.trim(); + } else if (key.equals("observerMasterPort")) { + observerMasterPort = Integer.parseInt(value); } else if (key.equals("tickTime")) { tickTime = Integer.parseInt(value); } else if (key.equals("maxClientCnxns")) { @@ -412,6 +416,13 @@ public class QuorumPeerConfig { configureSSLAuth(); } + if (observerMasterPort <= 0) { + LOG.info("observerMasterPort is not set"); + } else { + this.observerMasterPort = observerMasterPort; + LOG.info("observerMasterPort is {}", observerMasterPort); + } + if (tickTime == 0) { throw new IllegalArgumentException("tickTime is not set"); } @@ -754,6 +765,7 @@ public class QuorumPeerConfig { public InetSocketAddress getClientPortAddress() { return clientPortAddress; } public InetSocketAddress getSecureClientPortAddress() { return secureClientPortAddress; } + public int getObserverMasterPort() { return observerMasterPort; } public File getDataDir() { return dataDir; } public File getDataLogDir() { return dataLogDir; } public int getTickTime() { return tickTime; } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java index d2a02b2..a2e2bfc 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java @@ -188,6 +188,7 @@ public class QuorumPeerMain { quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); quorumPeer.setInitLimit(config.getInitLimit()); quorumPeer.setSyncLimit(config.getSyncLimit()); + quorumPeer.setObserverMasterPort(config.getObserverMasterPort()); quorumPeer.setConfigFileName(config.getConfigFilename()); quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java index 54a6be5..f657893 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java @@ -18,6 +18,7 @@ package org.apache.zookeeper; +import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.junit.Rule; @@ -75,4 +76,29 @@ public class ZKTestCase { }; + public interface WaitForCondition { + /** + * @return true when success + */ + boolean evaluate(); + } + + /** + * Wait for condition to be true; otherwise fail the test if it exceed + * timeout + * @param msg error message to print when fail + * @param condition condition to evaluate + * @param timeout timeout in seconds + * @throws InterruptedException + */ + public void waitFor(String msg, WaitForCondition condition, int timeout) + throws InterruptedException { + for (int i = 0; i < timeout; ++i) { + if (condition.evaluate()) { + return; + } + Thread.sleep(100); + } + Assert.fail(msg); + } } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DelayRequestProcessor.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DelayRequestProcessor.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DelayRequestProcessor.java new file mode 100644 index 0000000..aeb4e0f --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DelayRequestProcessor.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.RequestProcessor; + +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Allows the blocking of the request processor queue on a ZooKeeperServer. + * + * This is used to simulate arbitrary length delays or to produce delays + * in request processing that are maximally inconvenient for a given feature + * for the purposes of testing it. + */ +public class DelayRequestProcessor implements RequestProcessor { + + private boolean blocking; + RequestProcessor next; + + private LinkedBlockingQueue<Request> incomingRequests = new LinkedBlockingQueue<>(); + + private DelayRequestProcessor(RequestProcessor next) { + this.blocking = true; + this.next = next; + } + + @Override + public void processRequest(Request request) throws RequestProcessorException { + if (blocking) { + incomingRequests.add(request); + } else { + next.processRequest(request); + } + } + + public void submitRequest(Request request) throws RequestProcessorException { + next.processRequest(request); + } + + @Override + public void shutdown() { + } + + public void unblockQueue() throws RequestProcessorException { + if (blocking) { + for (Request request : incomingRequests) { + next.processRequest(request); + } + blocking = false; + } + } + + public static DelayRequestProcessor injectDelayRequestProcessor(FollowerZooKeeperServer zooKeeperServer) { + RequestProcessor finalRequestProcessor = zooKeeperServer.commitProcessor.nextProcessor; + DelayRequestProcessor delayRequestProcessor = new DelayRequestProcessor(finalRequestProcessor); + zooKeeperServer.commitProcessor.nextProcessor = delayRequestProcessor; + return delayRequestProcessor; + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java index f238971..1cd33ec 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java @@ -39,7 +39,6 @@ import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.quorum.Leader.Proposal; import org.apache.zookeeper.server.util.ZxidUtils; -import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Matchers; @@ -127,6 +126,9 @@ public class LearnerHandlerTest extends ZKTestCase { @Before public void setUp() throws Exception { + db = new MockZKDatabase(null); + sock = mock(Socket.class); + // Intercept when startForwarding is called leader = mock(Leader.class); when( @@ -137,10 +139,8 @@ public class LearnerHandlerTest extends ZKTestCase { return 0; } }); + when(leader.getZKDatabase()).thenReturn(db); - sock = mock(Socket.class); - - db = new MockZKDatabase(null); learnerHandler = new MockLearnerHandler(sock, leader); } @@ -204,7 +204,7 @@ public class LearnerHandlerTest extends ZKTestCase { peerZxid = 3; db.lastProcessedZxid = 1; db.committedLog.clear(); - assertFalse(learnerHandler.syncFollower(peerZxid, db, leader)); + assertFalse(learnerHandler.syncFollower(peerZxid, leader)); // We send TRUNC and forward any packet starting lastProcessedZxid assertOpType(Leader.TRUNC, db.lastProcessedZxid, db.lastProcessedZxid); reset(); @@ -213,7 +213,7 @@ public class LearnerHandlerTest extends ZKTestCase { peerZxid = 1; db.lastProcessedZxid = 1; db.committedLog.clear(); - assertFalse(learnerHandler.syncFollower(peerZxid, db, leader)); + assertFalse(learnerHandler.syncFollower(peerZxid, leader)); // We send DIFF and forward any packet starting lastProcessedZxid assertOpType(Leader.DIFF, db.lastProcessedZxid, db.lastProcessedZxid); assertEquals(1, learnerHandler.getQueuedPackets().size()); @@ -226,7 +226,7 @@ public class LearnerHandlerTest extends ZKTestCase { db.lastProcessedZxid = 1; db.committedLog.clear(); // We send SNAP - assertTrue(learnerHandler.syncFollower(peerZxid, db, leader)); + assertTrue(learnerHandler.syncFollower(peerZxid, leader)); assertEquals(0, learnerHandler.getQueuedPackets().size()); reset(); @@ -248,7 +248,7 @@ public class LearnerHandlerTest extends ZKTestCase { // Peer has zxid that we have never seen peerZxid = 4; - assertFalse(learnerHandler.syncFollower(peerZxid, db, leader)); + assertFalse(learnerHandler.syncFollower(peerZxid, leader)); // We send TRUNC to 3 and forward any packet starting 5 assertOpType(Leader.TRUNC, 3, 5); // DIFF + 1 proposals + 1 commit @@ -258,7 +258,7 @@ public class LearnerHandlerTest extends ZKTestCase { // Peer is within committedLog range peerZxid = 2; - assertFalse(learnerHandler.syncFollower(peerZxid, db, leader)); + assertFalse(learnerHandler.syncFollower(peerZxid, leader)); // We send DIFF and forward any packet starting lastProcessedZxid assertOpType(Leader.DIFF, db.getmaxCommittedLog(), db.getmaxCommittedLog()); @@ -271,7 +271,7 @@ public class LearnerHandlerTest extends ZKTestCase { peerZxid = 1; db.setSnapshotSizeFactor(-1); // We send SNAP - assertTrue(learnerHandler.syncFollower(peerZxid, db, leader)); + assertTrue(learnerHandler.syncFollower(peerZxid, leader)); assertEquals(0, learnerHandler.getQueuedPackets().size()); reset(); } @@ -297,7 +297,7 @@ public class LearnerHandlerTest extends ZKTestCase { // Peer has zxid that we have never seen peerZxid = 4; - assertFalse(learnerHandler.syncFollower(peerZxid, db, leader)); + assertFalse(learnerHandler.syncFollower(peerZxid, leader)); // We send TRUNC to 3 and forward any packet starting at maxCommittedLog assertOpType(Leader.TRUNC, 3, db.getmaxCommittedLog()); // DIFF + 4 proposals + 4 commit @@ -307,7 +307,7 @@ public class LearnerHandlerTest extends ZKTestCase { // Peer zxid is in txnlog range peerZxid = 3; - assertFalse(learnerHandler.syncFollower(peerZxid, db, leader)); + assertFalse(learnerHandler.syncFollower(peerZxid, leader)); // We send DIFF and forward any packet starting at maxCommittedLog assertOpType(Leader.DIFF, db.getmaxCommittedLog(), db.getmaxCommittedLog()); @@ -336,11 +336,12 @@ public class LearnerHandlerTest extends ZKTestCase { db.lastProcessedZxid = 7; db.txnLog.add(createProposal(2)); db.txnLog.add(createProposal(3)); + when(leader.getZKDatabase()).thenReturn(db); // Peer zxid peerZxid = 4; assertTrue("Couldn't identify snapshot transfer!", - learnerHandler.syncFollower(peerZxid, db, leader)); + learnerHandler.syncFollower(peerZxid, leader)); reset(); } @@ -362,7 +363,7 @@ public class LearnerHandlerTest extends ZKTestCase { // Peer has zxid that we have never seen peerZxid = 4; - assertFalse(learnerHandler.syncFollower(peerZxid, db, leader)); + assertFalse(learnerHandler.syncFollower(peerZxid, leader)); // We send TRUNC to 3 and forward any packet starting at // lastProcessedZxid assertOpType(Leader.TRUNC, 3, db.lastProcessedZxid); @@ -373,7 +374,7 @@ public class LearnerHandlerTest extends ZKTestCase { // Peer has zxid in txnlog range peerZxid = 2; - assertFalse(learnerHandler.syncFollower(peerZxid, db, leader)); + assertFalse(learnerHandler.syncFollower(peerZxid, leader)); // We send DIFF and forward any packet starting at lastProcessedZxid assertOpType(Leader.DIFF, db.lastProcessedZxid, db.lastProcessedZxid); // DIFF + 4 proposals + 4 commit @@ -383,7 +384,7 @@ public class LearnerHandlerTest extends ZKTestCase { // Peer miss the txnlog peerZxid = 1; - assertTrue(learnerHandler.syncFollower(peerZxid, db, leader)); + assertTrue(learnerHandler.syncFollower(peerZxid, leader)); // We send snap assertEquals(0, learnerHandler.getQueuedPackets().size()); reset(); @@ -414,7 +415,7 @@ public class LearnerHandlerTest extends ZKTestCase { // Peer has zxid that we have never seen peerZxid = getZxid(0xf, 4); - assertFalse(learnerHandler.syncFollower(peerZxid, db, leader)); + assertFalse(learnerHandler.syncFollower(peerZxid, leader)); // We send TRUNC to 3 and forward any packet starting at maxCommittedLog assertOpType(Leader.TRUNC, getZxid(0xf, 3), db.getmaxCommittedLog()); // DIFF + 4 proposals + 4 commit @@ -425,7 +426,7 @@ public class LearnerHandlerTest extends ZKTestCase { // Peer zxid is in txnlog range peerZxid = getZxid(0xf, 3); - assertFalse(learnerHandler.syncFollower(peerZxid, db, leader)); + assertFalse(learnerHandler.syncFollower(peerZxid, leader)); // We send DIFF and forward any packet starting at maxCommittedLog assertOpType(Leader.DIFF, db.getmaxCommittedLog(), db.getmaxCommittedLog()); @@ -456,13 +457,13 @@ public class LearnerHandlerTest extends ZKTestCase { // We should get snap, we can do better here, but the main logic is // that we should never send diff if we have never seen any txn older // than peer zxid - assertTrue(learnerHandler.syncFollower(peerZxid, db, leader)); + assertTrue(learnerHandler.syncFollower(peerZxid, leader)); assertEquals(0, learnerHandler.getQueuedPackets().size()); reset(); // Peer has zxid of epoch 1 peerZxid = getZxid(1, 0); - assertFalse(learnerHandler.syncFollower(peerZxid, db, leader)); + assertFalse(learnerHandler.syncFollower(peerZxid, leader)); // We send DIFF to (1, 2) and forward any packet starting at (1, 2) assertOpType(Leader.DIFF, getZxid(1, 2), getZxid(1, 2)); // DIFF + 2 proposals + 2 commit @@ -472,7 +473,7 @@ public class LearnerHandlerTest extends ZKTestCase { // Peer has zxid of epoch 2, so it is already sync peerZxid = getZxid(2, 0); - assertFalse(learnerHandler.syncFollower(peerZxid, db, leader)); + assertFalse(learnerHandler.syncFollower(peerZxid, leader)); // We send DIFF to (2, 0) and forward any packet starting at (2, 0) assertOpType(Leader.DIFF, getZxid(2, 0), getZxid(2, 0)); // DIFF only @@ -498,7 +499,7 @@ public class LearnerHandlerTest extends ZKTestCase { // Peer has zxid of epoch 3 peerZxid = getZxid(3, 0); - assertFalse(learnerHandler.syncFollower(peerZxid, db, leader)); + assertFalse(learnerHandler.syncFollower(peerZxid, leader)); // We send DIFF to (6,0) and forward any packet starting at (4,1) assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(4, 1)); // DIFF + 1 proposals + 1 commit @@ -508,7 +509,7 @@ public class LearnerHandlerTest extends ZKTestCase { // Peer has zxid of epoch 4 peerZxid = getZxid(4, 0); - assertFalse(learnerHandler.syncFollower(peerZxid, db, leader)); + assertFalse(learnerHandler.syncFollower(peerZxid, leader)); // We send DIFF to (6,0) and forward any packet starting at (4,1) assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(4, 1)); // DIFF + 1 proposals + 1 commit @@ -518,7 +519,7 @@ public class LearnerHandlerTest extends ZKTestCase { // Peer has zxid of epoch 5 peerZxid = getZxid(5, 0); - assertFalse(learnerHandler.syncFollower(peerZxid, db, leader)); + assertFalse(learnerHandler.syncFollower(peerZxid, leader)); // We send DIFF to (6,0) and forward any packet starting at (5,0) assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(5, 0)); // DIFF only @@ -527,7 +528,7 @@ public class LearnerHandlerTest extends ZKTestCase { // Peer has zxid of epoch 6 peerZxid = getZxid(6, 0); - assertFalse(learnerHandler.syncFollower(peerZxid, db, leader)); + assertFalse(learnerHandler.syncFollower(peerZxid, leader)); // We send DIFF to (6,0) and forward any packet starting at (6, 0) assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(6, 0)); // DIFF only @@ -558,7 +559,7 @@ public class LearnerHandlerTest extends ZKTestCase { // Peer has zxid of epoch 1 peerZxid = getZxid(1, 0); - assertFalse(learnerHandler.syncFollower(peerZxid, db, leader)); + assertFalse(learnerHandler.syncFollower(peerZxid, leader)); // We send DIFF to (1, 2) and forward any packet starting at (1, 2) assertOpType(Leader.DIFF, getZxid(1, 2), getZxid(1, 2)); // DIFF + 2 proposals + 2 commit @@ -585,7 +586,7 @@ public class LearnerHandlerTest extends ZKTestCase { // Peer has zxid (3, 1) peerZxid = getZxid(3, 1); - assertTrue(learnerHandler.syncFollower(peerZxid, db, leader)); + assertTrue(learnerHandler.syncFollower(peerZxid, leader)); assertEquals(0, learnerHandler.getQueuedPackets().size()); reset(); } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/PortForwarder.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/PortForwarder.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/PortForwarder.java index acbad80..f222121 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/PortForwarder.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/PortForwarder.java @@ -16,9 +16,6 @@ * limitations under the License. */ -/** - * - */ package org.apache.zookeeper.server.util; import java.io.IOException; @@ -29,6 +26,8 @@ import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; import java.net.SocketTimeoutException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -95,9 +94,10 @@ public class PortForwarder extends Thread { private final OutputStream out; private final Socket toClose; private final Socket toClose2; + private boolean isFinished = false; PortForwardWorker(Socket toClose, Socket toClose2, InputStream in, - OutputStream out) throws IOException { + OutputStream out) { this.toClose = toClose; this.toClose2 = toClose2; this.in = in; @@ -118,50 +118,57 @@ public class PortForwarder extends Thread { this.out.write(buf, 0, read); } catch (IOException e) { LOG.warn("exception during write", e); - try { - toClose.close(); - } catch (IOException ex) { - // ignore - } - try { - toClose2.close(); - } catch (IOException ex) { - // ignore - } break; } + } else if (read < 0) { + throw new IOException("read " + read); } } catch (SocketTimeoutException e) { LOG.error("socket timeout", e); } - Thread.sleep(1); } + Thread.sleep(1); } catch (InterruptedException e) { LOG.warn("Interrupted", e); - try { - toClose.close(); - } catch (IOException ex) { - // ignore - } - try { - toClose2.close(); - } catch (IOException ex) { - // ignore silently - } } catch (SocketException e) { if (!"Socket closed".equals(e.getMessage())) { LOG.error("Unexpected exception", e); } } catch (IOException e) { LOG.error("Unexpected exception", e); + } finally { + shutdown(); } LOG.info("Shutting down forward for " + toClose); + isFinished = true; } + boolean waitForShutdown(long timeoutMs) throws InterruptedException { + synchronized (this) { + if (!isFinished) { + this.wait(timeoutMs); + } + } + return isFinished; + } + + public void shutdown() { + try { + toClose.close(); + } catch (IOException ex) { + // ignore + } + try { + toClose2.close(); + } catch (IOException ex) { + // ignore silently + } + } } private volatile boolean stopped = false; - private ExecutorService workers = Executors.newCachedThreadPool(); + private ExecutorService workerExecutor = Executors.newCachedThreadPool(); + private List<PortForwardWorker> workers = new ArrayList<>(); private ServerSocket serverSocket; private final int to; @@ -207,30 +214,31 @@ public class PortForwarder extends Thread { + " to:" + to); sock.setSoTimeout(30000); target.setSoTimeout(30000); - this.workers.execute(new PortForwardWorker(sock, target, + + + workers.add(new PortForwardWorker(sock, target, sock.getInputStream(), target.getOutputStream())); - this.workers.execute(new PortForwardWorker(target, sock, + workers.add(new PortForwardWorker(target, sock, target.getInputStream(), sock.getOutputStream())); - } catch (SocketTimeoutException e) { - LOG.warn("socket timed out local:" - + (sock != null ? sock.getLocalPort(): "") - + " from:" + (sock != null ? sock.getPort(): "") - + " to:" + to, e); + for (PortForwardWorker worker: workers) { + workerExecutor.submit(worker); + } + } catch (SocketTimeoutException e) { + LOG.warn("socket timed out", e); } catch (ConnectException e) { - LOG.warn("connection exception local:" - + (sock != null ? sock.getLocalPort(): "") - + " from:" + (sock != null ? sock.getPort(): "") + LOG.warn("connection exception local:" + sock.getLocalPort() + + " from:" + sock.getPort() + " to:" + to, e); sock.close(); } catch (IOException e) { if (!"Socket closed".equals(e.getMessage())) { - LOG.warn("unexpected exception local:" - + (sock != null ? sock.getLocalPort(): "") - + " from:" + (sock != null ? sock.getPort(): "") - + " to:" + to, e); + LOG.warn("unexpected exception local:" + sock.getLocalPort() + + " from:" + sock.getPort() + + " to:" + to, e); throw e; } } + } } catch (IOException e) { LOG.error("Unexpected exception to:" + to, e); @@ -242,15 +250,16 @@ public class PortForwarder extends Thread { public void shutdown() throws Exception { this.stopped = true; this.serverSocket.close(); - this.workers.shutdownNow(); - try { - if (!this.workers.awaitTermination(5, TimeUnit.SECONDS)) { - throw new Exception( - "Failed to stop forwarding within 5 seconds"); + this.join(); + this.workerExecutor.shutdownNow(); + for (PortForwardWorker worker: workers) { + worker.shutdown(); + } + + for (PortForwardWorker worker: workers) { + if (!worker.waitForShutdown(5000)) { + throw new Exception("Failed to stop forwarding within 5 seconds"); } - } catch (InterruptedException e) { - throw new Exception("Failed to stop forwarding"); } - this.join(); } } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java new file mode 100644 index 0000000..4cec0ef --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java @@ -0,0 +1,780 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.jmx.CommonNames; +import org.apache.zookeeper.jmx.MBeanRegistry; +import org.apache.zookeeper.jmx.ZKMBeanInfo; +import org.apache.zookeeper.server.admin.Commands; +import org.apache.zookeeper.server.quorum.DelayRequestProcessor; +import org.apache.zookeeper.server.quorum.FollowerZooKeeperServer; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.apache.zookeeper.server.quorum.QuorumPeerTestBase; +import org.apache.zookeeper.server.util.PortForwarder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException.ConnectionLossException; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.admin.ZooKeeperAdmin; +import org.junit.Assert; +import org.junit.Test; + +import javax.management.Attribute; +import javax.management.AttributeNotFoundException; +import javax.management.InstanceNotFoundException; +import javax.management.InvalidAttributeValueException; +import javax.management.MBeanException; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.ReflectionException; +import javax.management.RuntimeMBeanException; + +@RunWith(Parameterized.class) +public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{ + protected static final Logger LOG = LoggerFactory.getLogger(ObserverMasterTest.class); + + public ObserverMasterTest(Boolean testObserverMaster) { + this.testObserverMaster = testObserverMaster; + } + + @Parameterized.Parameters + public static List<Object []> data() { return Arrays.asList(new Object [][] { + {Boolean.TRUE}, + {Boolean.FALSE}}); + } + + private Boolean testObserverMaster; + + private CountDownLatch latch; + ZooKeeper zk; + private WatchedEvent lastEvent = null; + + private int CLIENT_PORT_QP1; + private int CLIENT_PORT_QP2; + private int CLIENT_PORT_OBS; + private int OM_PORT; + private MainThread q1; + private MainThread q2; + private MainThread q3; + + private PortForwarder setUp(final int omProxyPort) throws IOException { + ClientBase.setupTestEnv(); + + final int PORT_QP1 = PortAssignment.unique(); + final int PORT_QP2 = PortAssignment.unique(); + final int PORT_OBS = PortAssignment.unique(); + final int PORT_QP_LE1 = PortAssignment.unique(); + final int PORT_QP_LE2 = PortAssignment.unique(); + final int PORT_OBS_LE = PortAssignment.unique(); + + CLIENT_PORT_QP1 = PortAssignment.unique(); + CLIENT_PORT_QP2 = PortAssignment.unique(); + CLIENT_PORT_OBS = PortAssignment.unique(); + + OM_PORT = PortAssignment.unique(); + + String quorumCfgSection = + "server.1=127.0.0.1:" + (PORT_QP1) + + ":" + (PORT_QP_LE1) + ";" + CLIENT_PORT_QP1 + + "\nserver.2=127.0.0.1:" + (PORT_QP2) + + ":" + (PORT_QP_LE2) + ";" + CLIENT_PORT_QP2 + + "\nserver.3=127.0.0.1:" + + (PORT_OBS)+ ":" + (PORT_OBS_LE) + ":observer" + ";" + CLIENT_PORT_OBS; + String extraCfgs = testObserverMaster ? String.format("observerMasterPort=%d%n", OM_PORT) : ""; + String extraCfgsObs = testObserverMaster ? String.format("observerMasterPort=%d%n", omProxyPort <= 0 ? OM_PORT : omProxyPort) : ""; + + PortForwarder forwarder = null; + if (testObserverMaster && omProxyPort >= 0) { + forwarder = new PortForwarder(omProxyPort, OM_PORT); + } + + q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection, extraCfgs); + q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection, extraCfgs); + q3 = new MainThread(3, CLIENT_PORT_OBS, quorumCfgSection, extraCfgsObs); + q1.start(); + q2.start(); + Assert.assertTrue("waiting for server 1 being up", + ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, + CONNECTION_TIMEOUT)); + Assert.assertTrue("waiting for server 2 being up", + ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, + CONNECTION_TIMEOUT)); + return forwarder; + } + + private void shutdown() throws InterruptedException { + LOG.info("Shutting down all servers"); + zk.close(); + + q1.shutdown(); + q2.shutdown(); + q3.shutdown(); + + Assert.assertTrue("Waiting for server 1 to shut down", + ClientBase.waitForServerDown("127.0.0.1:"+CLIENT_PORT_QP1, + ClientBase.CONNECTION_TIMEOUT)); + Assert.assertTrue("Waiting for server 2 to shut down", + ClientBase.waitForServerDown("127.0.0.1:"+CLIENT_PORT_QP2, + ClientBase.CONNECTION_TIMEOUT)); + Assert.assertTrue("Waiting for server 3 to shut down", + ClientBase.waitForServerDown("127.0.0.1:"+CLIENT_PORT_OBS, + ClientBase.CONNECTION_TIMEOUT)); + } + + @Test + public void testLaggingObserverMaster() throws Exception { + final int OM_PROXY_PORT = PortAssignment.unique(); + PortForwarder forwarder = setUp(OM_PROXY_PORT); + + // find the leader and observer master + int leaderPort; + MainThread leader; + MainThread follower; + if (q1.getQuorumPeer().leader != null) { + leaderPort = CLIENT_PORT_QP1; + leader = q1; + follower = q2; + } else if (q2.getQuorumPeer().leader != null) { + leaderPort = CLIENT_PORT_QP2; + leader = q2; + follower = q1; + } else { + throw new RuntimeException("No leader"); + } + + // ensure the observer master has commits in the queue before observer sync + zk = new ZooKeeper("127.0.0.1:" + leaderPort, + ClientBase.CONNECTION_TIMEOUT, this); + for (int i = 0; i < 10; i++) { + zk.create("/bulk" + i, ("initial data of some size").getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + zk.close(); + + q3.start(); + Assert.assertTrue("waiting for server 3 being up", + ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, + CONNECTION_TIMEOUT)); + + latch = new CountDownLatch(1); + zk = new ZooKeeper("127.0.0.1:" + leaderPort, + ClientBase.CONNECTION_TIMEOUT, this); + latch.await(); + Assert.assertEquals(zk.getState(), States.CONNECTED); + + zk.create("/init", "first".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + final long lastLoggedZxid = leader.getQuorumPeer().getLastLoggedZxid(); + + // wait for change to propagate + waitFor("Timeout waiting for observer sync", new WaitForCondition() { + public boolean evaluate() { + return lastLoggedZxid == q3.getQuorumPeer().getLastLoggedZxid(); + } + }, 30); + + // simulate network fault + if (forwarder != null) { + forwarder.shutdown(); + } + + for (int i = 0; i < 10; i++) { + zk.create("/basic" + i, "second".getBytes(),Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } + + DelayRequestProcessor delayRequestProcessor = null; + if (testObserverMaster) { + FollowerZooKeeperServer followerZooKeeperServer = (FollowerZooKeeperServer) follower.getQuorumPeer().getActiveServer(); + delayRequestProcessor = DelayRequestProcessor.injectDelayRequestProcessor(followerZooKeeperServer); + } + + zk.create("/target1", "third".getBytes(),Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + zk.create("/target2", "third".getBytes(),Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + LOG.info("observer zxid " + Long.toHexString(q3.getQuorumPeer().getLastLoggedZxid()) + + (testObserverMaster ? "" : " observer master zxid " + + Long.toHexString(follower.getQuorumPeer().getLastLoggedZxid())) + + " leader zxid " + Long.toHexString(leader.getQuorumPeer().getLastLoggedZxid())); + + // restore network + forwarder = testObserverMaster ? new PortForwarder(OM_PROXY_PORT, OM_PORT) : null; + + Assert.assertTrue("waiting for server 3 being up", + ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, + CONNECTION_TIMEOUT)); + Assert.assertNotNull("Leader switched", leader.getQuorumPeer().leader); + + if (delayRequestProcessor != null) { + delayRequestProcessor.unblockQueue(); + } + + latch = new CountDownLatch(1); + ZooKeeper obsZk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_OBS, + ClientBase.CONNECTION_TIMEOUT, this); + latch.await(); + zk.create("/finalop", "fourth".getBytes(),Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + Assert.assertEquals("first", new String(obsZk.getData("/init", null, null))); + Assert.assertEquals("third", new String(obsZk.getData("/target1", null, null))); + + obsZk.close(); + shutdown(); + + try { + if (forwarder != null) { + forwarder.shutdown(); + } + } catch (Exception e) { + // ignore + } + } + + /** + * This test ensures two things: + * 1. That Observers can successfully proxy requests to the ensemble. + * 2. That Observers don't participate in leader elections. + * The second is tested by constructing an ensemble where a leader would + * be elected if and only if an Observer voted. + */ + @Test + public void testObserver() throws Exception { + // We expect two notifications before we want to continue + latch = new CountDownLatch(2); + setUp(-1); + q3.start(); + Assert.assertTrue("waiting for server 3 being up", + ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, + CONNECTION_TIMEOUT)); + + if (testObserverMaster) { + int masterPort = q3.getQuorumPeer().observer.getSocket().getPort(); + LOG.info("port " + masterPort + " " + OM_PORT); + Assert.assertEquals("observer failed to connect to observer master", masterPort, OM_PORT); + } + + zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_OBS, + ClientBase.CONNECTION_TIMEOUT, this); + zk.create("/obstest", "test".getBytes(),Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + // Assert that commands are getting forwarded correctly + Assert.assertEquals(new String(zk.getData("/obstest", null, null)), "test"); + + // Now check that other commands don't blow everything up + zk.sync("/", null, null); + zk.setData("/obstest", "test2".getBytes(), -1); + zk.getChildren("/", false); + + Assert.assertEquals(zk.getState(), States.CONNECTED); + + LOG.info("Shutting down server 2"); + // Now kill one of the other real servers + q2.shutdown(); + + Assert.assertTrue("Waiting for server 2 to shut down", + ClientBase.waitForServerDown("127.0.0.1:"+CLIENT_PORT_QP2, + ClientBase.CONNECTION_TIMEOUT)); + + LOG.info("Server 2 down"); + + // Now the resulting ensemble shouldn't be quorate + latch.await(); + Assert.assertNotSame("Client is still connected to non-quorate cluster", + KeeperState.SyncConnected,lastEvent.getState()); + + LOG.info("Latch returned"); + + try { + Assert.assertNotEquals("Shouldn't get a response when cluster not quorate!", + "test", new String(zk.getData("/obstest", null, null))); + } + catch (ConnectionLossException c) { + LOG.info("Connection loss exception caught - ensemble not quorate (this is expected)"); + } + + latch = new CountDownLatch(1); + + LOG.info("Restarting server 2"); + + // Bring it back + //q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection, extraCfgs); + q2.start(); + + LOG.info("Waiting for server 2 to come up"); + Assert.assertTrue("waiting for server 2 being up", + ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, + CONNECTION_TIMEOUT)); + + LOG.info("Server 2 started, waiting for latch"); + + latch.await(); + // It's possible our session expired - but this is ok, shows we + // were able to talk to the ensemble + Assert.assertTrue("Client didn't reconnect to quorate ensemble (state was" + + lastEvent.getState() + ")", + (KeeperState.SyncConnected==lastEvent.getState() || + KeeperState.Expired==lastEvent.getState())); + + LOG.info("perform a revalidation test"); + int leaderProxyPort = PortAssignment.unique(); + int obsProxyPort = PortAssignment.unique(); + int leaderPort = q1.getQuorumPeer().leader == null ? CLIENT_PORT_QP2 : CLIENT_PORT_QP1; + PortForwarder leaderPF = new PortForwarder(leaderProxyPort, leaderPort); + + latch = new CountDownLatch(1); + ZooKeeper client = new ZooKeeper(String.format("127.0.0.1:%d,127.0.0.1:%d", leaderProxyPort, obsProxyPort), + ClientBase.CONNECTION_TIMEOUT, this); + latch.await(); + client.create("/revalidtest", "test".getBytes(),Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL); + Assert.assertNotNull("Read-after write failed", client.exists("/revalidtest", null)); + + latch = new CountDownLatch(2); + PortForwarder obsPF = new PortForwarder(obsProxyPort, CLIENT_PORT_OBS); + try { + leaderPF.shutdown(); + } catch (Exception e) { + // ignore? + } + latch.await(); + Assert.assertEquals(new String(client.getData("/revalidtest", null, null)), "test"); + client.close(); + obsPF.shutdown(); + + shutdown(); + } + + @Test + public void testRevalidation() throws Exception { + setUp(-1); + q3.start(); + Assert.assertTrue("waiting for server 3 being up", + ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, + CONNECTION_TIMEOUT)); + final int leaderProxyPort = PortAssignment.unique(); + final int obsProxyPort = PortAssignment.unique(); + + int leaderPort = q1.getQuorumPeer().leader == null ? CLIENT_PORT_QP2 : CLIENT_PORT_QP1; + PortForwarder leaderPF = new PortForwarder(leaderProxyPort, leaderPort); + + latch = new CountDownLatch(1); + zk = new ZooKeeper(String.format("127.0.0.1:%d,127.0.0.1:%d", leaderProxyPort, obsProxyPort), + ClientBase.CONNECTION_TIMEOUT, this); + latch.await(); + zk.create("/revalidtest", "test".getBytes(),Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL); + Assert.assertNotNull("Read-after write failed", zk.exists("/revalidtest", null)); + + latch = new CountDownLatch(2); + PortForwarder obsPF = new PortForwarder(obsProxyPort, CLIENT_PORT_OBS); + try { + leaderPF.shutdown(); + } catch (Exception e) { + // ignore? + } + latch.await(); + Assert.assertEquals(new String(zk.getData("/revalidtest", null, null)), "test"); + obsPF.shutdown(); + + shutdown(); + } + + @Test + public void testInOrderCommits() throws Exception { + setUp(-1); + + zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_QP1, + ClientBase.CONNECTION_TIMEOUT, null); + for (int i = 0; i < 10; i++) { + zk.create("/bulk" + i, ("Initial data of some size").getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + zk.close(); + + q3.start(); + Assert.assertTrue("waiting for observer to be up", + ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, + CONNECTION_TIMEOUT)); + + latch = new CountDownLatch(1); + zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_QP1, + ClientBase.CONNECTION_TIMEOUT, this); + latch.await(); + Assert.assertEquals(zk.getState(), States.CONNECTED); + + zk.create("/init", "first".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + final long zxid = q1.getQuorumPeer().getLastLoggedZxid(); + + // wait for change to propagate + waitFor("Timeout waiting for observer sync", new WaitForCondition() { + public boolean evaluate() { + return zxid == q3.getQuorumPeer().getLastLoggedZxid(); + } + }, 30); + + ZooKeeper obsZk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_OBS, + ClientBase.CONNECTION_TIMEOUT, this); + int followerPort = q1.getQuorumPeer().leader == null ? CLIENT_PORT_QP1 : CLIENT_PORT_QP2; + ZooKeeper fZk = new ZooKeeper("127.0.0.1:" + followerPort, + ClientBase.CONNECTION_TIMEOUT, this); + final int numTransactions = 10001; + CountDownLatch gate = new CountDownLatch(1); + CountDownLatch oAsyncLatch = new CountDownLatch(numTransactions); + Thread oAsyncWriteThread = new Thread(new AsyncWriter(obsZk, numTransactions, true, oAsyncLatch, "/obs", gate)); + CountDownLatch fAsyncLatch = new CountDownLatch(numTransactions); + Thread fAsyncWriteThread = new Thread(new AsyncWriter(fZk, numTransactions, true, fAsyncLatch, "/follower", gate)); + + LOG.info("ASYNC WRITES"); + oAsyncWriteThread.start(); + fAsyncWriteThread.start(); + gate.countDown(); + + oAsyncLatch.await(); + fAsyncLatch.await(); + + oAsyncWriteThread.join(ClientBase.CONNECTION_TIMEOUT); + if (oAsyncWriteThread.isAlive()) { + LOG.error("asyncWriteThread is still alive"); + } + fAsyncWriteThread.join(ClientBase.CONNECTION_TIMEOUT); + if (fAsyncWriteThread.isAlive()) { + LOG.error("asyncWriteThread is still alive"); + } + + obsZk.close(); + fZk.close(); + + shutdown(); + } + + @Test + public void testAdminCommands() throws IOException, MBeanException, + InstanceNotFoundException, ReflectionException, InterruptedException, MalformedObjectNameException, + AttributeNotFoundException, InvalidAttributeValueException, KeeperException { + // flush all beans, then start + for (ZKMBeanInfo beanInfo : MBeanRegistry.getInstance().getRegisteredBeans()) { + MBeanRegistry.getInstance().unregister(beanInfo); + } + + JMXEnv.setUp(); + setUp(-1); + q3.start(); + Assert.assertTrue("waiting for observer to be up", + ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, + CONNECTION_TIMEOUT)); + + // Assert that commands are getting forwarded correctly + zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_OBS, + ClientBase.CONNECTION_TIMEOUT, this); + zk.create("/obstest", "test".getBytes(),Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + Assert.assertEquals(new String(zk.getData("/obstest", null, null)), "test"); + + // test stats collection + final Map<String, String> emptyMap = Collections.emptyMap(); + Map<String, Object> stats = Commands.runCommand("mntr", q3.getQuorumPeer().getActiveServer(), emptyMap).toMap(); + Assert.assertTrue("observer not emitting observer_master_id", stats.containsKey("observer_master_id")); + + // check the stats for the first peer + stats = Commands.runCommand("mntr", q1.getQuorumPeer().getActiveServer(), emptyMap).toMap(); + if (testObserverMaster) { + if (q1.getQuorumPeer().leader == null) { + Assert.assertEquals(1, stats.get("synced_observers")); + } else { + Assert.assertEquals(0, stats.get("synced_observers")); + } + } else { + if (q1.getQuorumPeer().leader == null) { + Assert.assertNull(stats.get("synced_observers")); + } else { + Assert.assertEquals(1, stats.get("synced_observers")); + } + } + + // check the stats for the second peer + stats = Commands.runCommand("mntr", q2.getQuorumPeer().getActiveServer(), emptyMap).toMap(); + if (testObserverMaster) { + if (q2.getQuorumPeer().leader == null) { + Assert.assertEquals(1, stats.get("synced_observers")); + } else { + Assert.assertEquals(0, stats.get("synced_observers")); + } + } else { + if (q2.getQuorumPeer().leader == null) { + Assert.assertNull(stats.get("synced_observers")); + } else { + Assert.assertEquals(1, stats.get("synced_observers")); + } + } + + // test admin commands for disconnection + ObjectName connBean = null; + for (ObjectName bean : JMXEnv.conn().queryNames(new ObjectName(CommonNames.DOMAIN + ":*"), null)) { + if (bean.getCanonicalName().contains("Learner_Connections") && + bean.getCanonicalName().contains("id:" + q3.getQuorumPeer().getId())) { + connBean = bean; + break; + } + } + Assert.assertNotNull("could not find connection bean", connBean); + + latch = new CountDownLatch(1); + JMXEnv.conn().invoke(connBean, "terminateConnection", new Object[0], null); + Assert.assertTrue("server failed to disconnect on terminate", + latch.await(CONNECTION_TIMEOUT/2, TimeUnit.MILLISECONDS)); + Assert.assertTrue("waiting for server 3 being up", + ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, + CONNECTION_TIMEOUT)); + + final String obsBeanName = + String.format("org.apache.ZooKeeperService:name0=ReplicatedServer_id%d,name1=replica.%d,name2=Observer", + q3.getQuorumPeer().getId(), q3.getQuorumPeer().getId()); + Set<ObjectName> names = JMXEnv.conn().queryNames(new ObjectName(obsBeanName), null); + Assert.assertEquals("expecting singular observer bean", 1, names.size()); + ObjectName obsBean = names.iterator().next(); + + if (testObserverMaster) { + // show we can move the observer using the id + long observerMasterId = q3.getQuorumPeer().observer.getLearnerMasterId(); + latch = new CountDownLatch(1); + JMXEnv.conn().setAttribute(obsBean, new Attribute("LearnerMaster", Long.toString(3 - observerMasterId))); + Assert.assertTrue("server failed to disconnect on terminate", + latch.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)); + Assert.assertTrue("waiting for server 3 being up", + ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, + CONNECTION_TIMEOUT)); + } else { + // show we get an error + final long leaderId = q1.getQuorumPeer().leader == null ? 2 : 1; + try { + JMXEnv.conn().setAttribute(obsBean, new Attribute("LearnerMaster", Long.toString(3 - leaderId))); + Assert.fail("should have seen an exception on previous command"); + } catch (RuntimeMBeanException e) { + Assert.assertEquals("mbean failed for the wrong reason", + IllegalArgumentException.class, e.getCause().getClass()); + } + } + + shutdown(); + JMXEnv.tearDown(); + } + + private String createServerString(String type, long serverId, int clientPort) { + return "server." + serverId + "=127.0.0.1:" + + PortAssignment.unique() + ":" + + PortAssignment.unique() + ":" + + type + ";" + clientPort; + } + + private void waitServerUp(int clientPort) { + Assert.assertTrue("waiting for server being up", + ClientBase.waitForServerUp("127.0.0.1:" + clientPort, + CONNECTION_TIMEOUT)); + } + + private ZooKeeperAdmin createAdmin(int clientPort) throws IOException { + System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest", + "super:D/InIHSb7yEEbrWz8b9l71RjZJU="/* password is 'test'*/); + QuorumPeerConfig.setReconfigEnabled(true); + ZooKeeperAdmin admin = new ZooKeeperAdmin("127.0.0.1:" + clientPort, + ClientBase.CONNECTION_TIMEOUT, new Watcher() { + public void process(WatchedEvent event) {} + }); + admin.addAuthInfo("digest", "super:test".getBytes()); + return admin; + } + + // This test is known to be flaky and fail due to "reconfig already in progress". + // TODO: Investigate intermittent testDynamicReconfig failures. + // @Test + public void testDynamicReconfig() throws InterruptedException, IOException, + KeeperException { + if (!testObserverMaster) { + return; + } + + ClientBase.setupTestEnv(); + + // create a quorum running with different observer master port + // to make it easier to choose which server the observer is + // following with + // + // we have setObserverMaster function but it's broken, use this + // solution before we fixed that + int clientPort1 = PortAssignment.unique(); + int clientPort2 = PortAssignment.unique(); + int omPort1 = PortAssignment.unique(); + int omPort2 = PortAssignment.unique(); + String quorumCfgSection = + createServerString("participant", 1, clientPort1) + "\n" + + createServerString("participant", 2, clientPort2); + + MainThread s1 = new MainThread(1, clientPort1, quorumCfgSection, + String.format("observerMasterPort=%d%n",omPort1)); + MainThread s2 = new MainThread(2, clientPort2, quorumCfgSection, + String.format("observerMasterPort=%d%n", omPort2)); + s1.start(); + s2.start(); + waitServerUp(clientPort1); + waitServerUp(clientPort2); + + // create observer to follow non-leader observer master + long nonLeaderOMPort = s1.getQuorumPeer().leader == null ? omPort1 + : omPort2; + int observerClientPort = PortAssignment.unique(); + int observerId = 10; + MainThread observer = new MainThread( + observerId, + observerClientPort, quorumCfgSection + "\n" + + createServerString("observer", observerId, + observerClientPort), + String.format("observerMasterPort=%d%n", nonLeaderOMPort)); + LOG.info("starting observer"); + observer.start(); + waitServerUp(observerClientPort); + + // create a client to the observer + final LinkedBlockingQueue<KeeperState> states = + new LinkedBlockingQueue<KeeperState>(); + ZooKeeper observerClient = new ZooKeeper( + "127.0.0.1:" + observerClientPort, + ClientBase.CONNECTION_TIMEOUT, new Watcher() { + @Override + public void process(WatchedEvent event) { + try { + states.put(event.getState()); + } catch (InterruptedException e) {} + } + }); + + // wait for connected + KeeperState state = states.poll(1000, TimeUnit.MILLISECONDS); + Assert.assertEquals(KeeperState.SyncConnected, state); + + // issue reconfig command + ArrayList<String> newServers = new ArrayList<String>(); + String server = "server.3=127.0.0.1:" + PortAssignment.unique() + + ":" + PortAssignment.unique() + ":participant;localhost:" + + PortAssignment.unique(); + newServers.add(server); + ZooKeeperAdmin admin = createAdmin(clientPort1); + ReconfigTest.reconfig(admin, newServers, null, null, -1); + + // make sure the observer has the new config + ReconfigTest.testServerHasConfig(observerClient, newServers, null); + + // shouldn't be disconnected during reconfig, so expect to not + // receive any new event + state = states.poll(1000, TimeUnit.MILLISECONDS); + Assert.assertNull(state); + + admin.close(); + observerClient.close(); + observer.shutdown(); + s2.shutdown(); + s1.shutdown(); + } + + /** + * Implementation of watcher interface. + */ + public void process(WatchedEvent event) { + lastEvent = event; + if (latch != null) { + latch.countDown(); + } + LOG.info("Latch got event :: " + event); + } + + class AsyncWriter implements Runnable { + private final ZooKeeper client; + private final int numTransactions; + private final boolean issueSync; + private final CountDownLatch writerLatch; + private final String root; + private final CountDownLatch gate; + + AsyncWriter(ZooKeeper client, int numTransactions, boolean issueSync, CountDownLatch writerLatch, + String root, CountDownLatch gate) { + this.client = client; + this.numTransactions = numTransactions; + this.issueSync = issueSync; + this.writerLatch = writerLatch; + this.root = root; + this.gate = gate; + } + + @Override + public void run() { + if (gate != null) { + try { + gate.await(); + } catch (InterruptedException e) { + LOG.error("Gate interrupted"); + return; + } + } + for (int i = 0; i < numTransactions; i++) { + final boolean pleaseLog = i % 100 == 0; + client.create(root + i, "inner thread".getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, new AsyncCallback.StringCallback() { + @Override + public void processResult(int rc, String path, + Object ctx, String name) { + writerLatch.countDown(); + if (pleaseLog) { + LOG.info("wrote {}", path); + } + } + }, null); + if (pleaseLog) { + LOG.info("async wrote {}{}", root, i); + if (issueSync) { + client.sync(root + "0", null, null); + } + } + } + } + } +}
