http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java
----------------------------------------------------------------------
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java
new file mode 100644
index 0000000..07de57b
--- /dev/null
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java
@@ -0,0 +1,532 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ZKDatabase;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used by Followers to host Observers. This reduces the network load on the 
Leader process by pushing
+ * the responsibility for keeping Observers in sync off the leading peer.
+ *
+ * It is expected that Observers will continue to perform the initial vetting 
of clients and requests.
+ * Observers send the request to the follower where it is received by an 
ObserverMaster.
+ *
+ * The ObserverMaster forwards a copy of the request to the ensemble Leader 
and inserts it into its own
+ * request processor pipeline where it can be matched with the response comes 
back. All commits received
+ * from the Leader will be forwarded along to every Learner connected to the 
ObserverMaster.
+ *
+ * New Learners connecting to a Follower will receive a LearnerHandler object 
and be party to its syncing logic
+ * to be brought up to date.
+ *
+ * The logic is quite a bit simpler than the corresponding logic in Leader 
because it only hosts observers.
+ */
+public class ObserverMaster implements LearnerMaster, Runnable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ObserverMaster.class);
+
+    //Follower counter
+    private final AtomicLong followerCounter = new AtomicLong(-1);
+
+    private QuorumPeer self;
+    private FollowerZooKeeperServer zks;
+    private int port;
+
+    private Set<LearnerHandler> activeObservers = 
Collections.newSetFromMap(new ConcurrentHashMap<LearnerHandler,Boolean>());
+
+    private final ConcurrentHashMap<LearnerHandler, LearnerHandlerBean> 
connectionBeans = new ConcurrentHashMap<>();
+
+    /**
+     * we want to keep a log of past txns so that observers can sync up with 
us when we connect,
+     * but we can't keep everything in memory, so this limits how much memory 
will be dedicated
+     * to keeping recent txns.
+     */
+    private final static int PKTS_SIZE_LIMIT = 32 * 1024 * 1024;
+    private static volatile int pktsSizeLimit = 
Integer.getInteger("zookeeper.observerMaster.sizeLimit", PKTS_SIZE_LIMIT);
+    private ConcurrentLinkedQueue<QuorumPacket> proposedPkts = new 
ConcurrentLinkedQueue<>();
+    private ConcurrentLinkedQueue<QuorumPacket> committedPkts = new 
ConcurrentLinkedQueue<>();
+    private int pktsSize = 0;
+
+    private long lastProposedZxid;
+
+    // ensure ordering of revalidations returned to this learner
+    private final Object revalidateSessionLock = new Object();
+
+    // Throttle when there are too many concurrent snapshots being sent to 
observers
+    private static final String MAX_CONCURRENT_SNAPSHOTS = 
"zookeeper.leader.maxConcurrentSnapshots";
+    private static final int maxConcurrentSnapshots;
+
+    private static final String MAX_CONCURRENT_DIFFS = 
"zookeeper.leader.maxConcurrentDiffs";
+    private static final int maxConcurrentDiffs;
+    static {
+        maxConcurrentSnapshots = Integer.getInteger(MAX_CONCURRENT_SNAPSHOTS, 
10);
+        LOG.info(MAX_CONCURRENT_SNAPSHOTS + " = " + maxConcurrentSnapshots);
+
+        maxConcurrentDiffs = Integer.getInteger(MAX_CONCURRENT_DIFFS, 100);
+        LOG.info(MAX_CONCURRENT_DIFFS + " = " + maxConcurrentDiffs);
+    }
+
+    private final ConcurrentLinkedQueue<Revalidation> pendingRevalidations = 
new ConcurrentLinkedQueue<>();
+    static class Revalidation {
+        public final long sessionId;
+        public final int timeout;
+        public final LearnerHandler handler;
+
+        Revalidation(final Long sessionId, final int timeout, final 
LearnerHandler handler) {
+            this.sessionId = sessionId;
+            this.timeout = timeout;
+            this.handler = handler;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            final Revalidation that = (Revalidation) o;
+            return sessionId == that.sessionId && timeout == that.timeout && 
handler.equals(that.handler);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = (int) (sessionId ^ (sessionId >>> 32));
+            result = 31 * result + timeout;
+            result = 31 * result + handler.hashCode();
+            return result;
+        }
+    }
+
+    private final LearnerSnapshotThrottler learnerSnapshotThrottler =
+            new LearnerSnapshotThrottler(maxConcurrentSnapshots);
+
+    private Thread thread;
+    private ServerSocket ss;
+    private boolean listenerRunning;
+    private ScheduledExecutorService pinger;
+
+    Runnable ping = new Runnable() {
+        @Override
+        public void run() {
+            for (LearnerHandler lh: activeObservers) {
+                lh.ping();
+            }
+        }
+    };
+
+    ObserverMaster(QuorumPeer self, FollowerZooKeeperServer zks, int port) {
+        this.self = self;
+        this.zks = zks;
+        this.port = port;
+    }
+
+    @Override
+    public void addLearnerHandler(LearnerHandler learnerHandler) {
+        if (!listenerRunning) {
+            throw new RuntimeException(("ObserverMaster is not running"));
+        }
+    }
+
+    @Override
+    public void removeLearnerHandler(LearnerHandler learnerHandler) {
+        activeObservers.remove(learnerHandler);
+    }
+
+    @Override
+    public int syncTimeout() {
+        return self.getSyncLimit() * self.getTickTime();
+    }
+
+    @Override
+    public int getTickOfNextAckDeadline() {
+        return self.tick.get() + self.syncLimit;
+    }
+
+    @Override
+    public int getTickOfInitialAckDeadline() {
+        return self.tick.get() + self.initLimit + self.syncLimit;
+    }
+
+    @Override
+    public long getAndDecrementFollowerCounter() {
+        return followerCounter.getAndDecrement();
+    }
+
+    @Override
+    public void waitForEpochAck(long sid, StateSummary ss) throws IOException, 
InterruptedException {
+        // since this is done by an active follower, we don't need to wait for 
anything
+    }
+
+    @Override
+    public LearnerSnapshotThrottler getLearnerSnapshotThrottler() {
+        return learnerSnapshotThrottler;
+    }
+
+    @Override
+    public void waitForStartup() throws InterruptedException {
+        // since this is done by an active follower, we don't need to wait for 
anything
+    }
+
+    @Override
+    synchronized public long getLastProposed() {
+        return lastProposedZxid;
+    }
+
+    @Override
+    public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws 
InterruptedException, IOException {
+        return self.getCurrentEpoch();
+    }
+
+    @Override
+    public ZKDatabase getZKDatabase() {
+        return zks.getZKDatabase();
+    }
+
+    @Override
+    public void waitForNewLeaderAck(long sid, long zxid) throws 
InterruptedException {
+        // no need to wait since we are a follower
+    }
+
+    @Override
+    public int getCurrentTick() {
+        return self.tick.get();
+    }
+
+    @Override
+    public void processAck(long sid, long zxid, SocketAddress 
localSocketAddress) {
+        if ((zxid & 0xffffffffL) == 0) {
+            /*
+             * We no longer process NEWLEADER ack by this method. However,
+             * the learner sends ack back to the leader after it gets UPTODATE
+             * so we just ignore the message.
+             */
+            return;
+        }
+
+        throw new RuntimeException("Observers shouldn't send ACKS ack = " + 
Long.toHexString(zxid));
+    }
+
+    @Override
+    public void touch(long sess, int to) {
+        zks.getSessionTracker().touchSession(sess, to);
+    }
+
+    boolean revalidateLearnerSession(QuorumPacket qp) throws IOException {
+        ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
+        DataInputStream dis = new DataInputStream(bis);
+        long id = dis.readLong();
+        boolean valid = dis.readBoolean();
+        Iterator<Revalidation> itr = pendingRevalidations.iterator();
+        if (!itr.hasNext()) {
+            // not a learner session, handle locally
+            return false;
+        }
+        Revalidation revalidation = itr.next();
+        if (revalidation.sessionId != id) {
+            // not a learner session, handle locally
+            return false;
+        }
+        itr.remove();
+        LearnerHandler learnerHandler = revalidation.handler;
+        // create a copy here as the qp object is reused by the Follower and 
may be mutated
+        QuorumPacket deepCopy = new QuorumPacket(qp.getType(), qp.getZxid(),
+                Arrays.copyOf(qp.getData(), qp.getData().length),
+                qp.getAuthinfo() == null ? null : new 
ArrayList<>(qp.getAuthinfo()));
+        learnerHandler.queuePacket(deepCopy);
+        // To keep consistent as leader, touch the session when it's
+        // revalidating the session, only update if it's a valid session.
+        if (valid) {
+            touch(revalidation.sessionId, revalidation.timeout);
+        }
+        return true;
+    }
+
+    @Override
+    public void revalidateSession(QuorumPacket qp, LearnerHandler 
learnerHandler) throws IOException {
+        ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
+        DataInputStream dis = new DataInputStream(bis);
+        long id = dis.readLong();
+        int to = dis.readInt();
+        synchronized (revalidateSessionLock) {
+            pendingRevalidations.add(new Revalidation(id, to, learnerHandler));
+            Learner learner = zks.getLearner();
+            if (learner != null) {
+                learner.writePacket(qp, true);
+            }
+        }
+    }
+
+    @Override
+    public void submitLearnerRequest(Request si) {
+        zks.processObserverRequest(si);
+    }
+
+    @Override
+    synchronized public long startForwarding(LearnerHandler learnerHandler, 
long lastSeenZxid) {
+        Iterator<QuorumPacket> itr = committedPkts.iterator();
+        if (itr.hasNext()) {
+            QuorumPacket packet = itr.next();
+            if (packet.getZxid() > lastSeenZxid + 1) {
+                LOG.error("LearnerHandler is too far behind ({} < {}), 
disconnecting {} at {}", Long.toHexString(lastSeenZxid + 1),
+                        Long.toHexString(packet.getZxid()), 
learnerHandler.getSid(), learnerHandler.getRemoteAddress());
+                learnerHandler.shutdown();
+                return -1;
+            } else if (packet.getZxid() == lastSeenZxid + 1) {
+                learnerHandler.queuePacket(packet);
+            }
+            long queueHeadZxid = packet.getZxid();
+            long queueBytesUsed = LearnerHandler.packetSize(packet);
+            while (itr.hasNext()) {
+                packet = itr.next();
+                if (packet.getZxid() <= lastSeenZxid) {
+                    continue;
+                }
+                learnerHandler.queuePacket(packet);
+                queueBytesUsed += LearnerHandler.packetSize(packet);
+            }
+            LOG.info("finished syncing observer from retained commit queue: 
sid {}, " +
+                            "queue head 0x{}, queue tail 0x{}, sync position 
0x{}, num packets used {}, " +
+                            "num bytes used {}",
+                    learnerHandler.getSid(),
+                    Long.toHexString(queueHeadZxid),
+                    Long.toHexString(packet.getZxid()),
+                    Long.toHexString(lastSeenZxid),
+                    packet.getZxid() - lastSeenZxid,
+                    queueBytesUsed);
+        }
+        activeObservers.add(learnerHandler);
+        return lastProposedZxid;
+    }
+
+    @Override
+    public long getQuorumVerifierVersion() {
+        return self.getQuorumVerifier().getVersion();
+    }
+
+    @Override
+    public String getPeerInfo(long sid) {
+        QuorumPeer.QuorumServer server = self.getView().get(sid);
+        return server == null ? "" : server.toString();
+    }
+
+    @Override
+    public byte[] getQuorumVerifierBytes() {
+        return self.getLastSeenQuorumVerifier().toString().getBytes();
+    }
+
+    @Override
+    public QuorumAuthServer getQuorumAuthServer() {
+        return (self == null) ? null : self.authServer;
+    }
+
+    void proposalReceived(QuorumPacket qp) {
+        proposedPkts.add(new QuorumPacket(Leader.INFORM, qp.getZxid(), 
qp.getData(), null));
+    }
+
+    private synchronized QuorumPacket removeProposedPacket(long zxid) {
+        QuorumPacket pkt = proposedPkts.peek();
+        if (pkt == null || pkt.getZxid() > zxid) {
+            LOG.debug("ignore missing proposal packet for {}", 
Long.toHexString(zxid));
+            return null;
+        }
+        if (pkt.getZxid() != zxid) {
+            final String m = String.format("Unexpected proposal packet on 
commit ack, expected zxid 0x%d got zxid 0x%d",
+                    zxid, pkt.getZxid());
+            LOG.error(m);
+            throw new RuntimeException(m);
+        }
+        proposedPkts.remove();
+        return pkt;
+    }
+
+    private synchronized void cacheCommittedPacket(final QuorumPacket pkt) {
+        committedPkts.add(pkt);
+        pktsSize += LearnerHandler.packetSize(pkt);
+        // remove 5 packets for every one added as we near the size limit
+        for (int i = 0; pktsSize > pktsSizeLimit * 0.8  && i < 5; i++) {
+            QuorumPacket oldPkt = committedPkts.poll();
+            if (oldPkt == null) {
+                pktsSize = 0;
+                break;
+            }
+            pktsSize -= LearnerHandler.packetSize(oldPkt);
+        }
+        // enforce the size limit as a hard cap
+        while (pktsSize > pktsSizeLimit) {
+            QuorumPacket oldPkt = committedPkts.poll();
+            if (oldPkt == null) {
+                pktsSize = 0;
+                break;
+            }
+            pktsSize -= LearnerHandler.packetSize(oldPkt);
+        }
+    }
+
+    private synchronized void sendPacket(final QuorumPacket pkt) {
+        for (LearnerHandler lh: activeObservers) {
+            lh.queuePacket(pkt);
+        }
+        lastProposedZxid = pkt.getZxid();
+    }
+
+    synchronized void proposalCommitted(long zxid) {
+        QuorumPacket pkt = removeProposedPacket(zxid);
+        if (pkt == null) {
+            return;
+        }
+        cacheCommittedPacket(pkt);
+        sendPacket(pkt);
+    }
+
+    synchronized void informAndActivate(long zxid, long suggestedLeaderId) {
+        QuorumPacket pkt = removeProposedPacket(zxid);
+        if (pkt == null) {
+            return;
+        }
+
+        // Build the INFORMANDACTIVATE packet
+        QuorumPacket informAndActivateQP = Leader.buildInformAndActivePacket(
+                zxid, suggestedLeaderId, pkt.getData());
+        cacheCommittedPacket(informAndActivateQP);
+        sendPacket(informAndActivateQP);
+    }
+
+    synchronized public void start() throws IOException {
+        if (thread != null && thread.isAlive()) {
+            return;
+        }
+        listenerRunning = true;
+        int backlog = 10; // dog science
+        if (self.shouldUsePortUnification() || self.isSslQuorum()) {
+            boolean allowInsecureConnection = self.shouldUsePortUnification();
+            if (self.getQuorumListenOnAllIPs()) {
+                ss = new UnifiedServerSocket(self.getX509Util(), 
allowInsecureConnection, port, backlog);
+            } else {
+                ss = new UnifiedServerSocket(
+                        self.getX509Util(),
+                        allowInsecureConnection,
+                        port,
+                        backlog,
+                        self.getQuorumAddress().getAddress());
+            }
+        } else {
+            if (self.getQuorumListenOnAllIPs()) {
+                ss = new ServerSocket(port, backlog);
+            } else {
+                ss = new ServerSocket(port, backlog, 
self.getQuorumAddress().getAddress());
+            }
+        }
+        thread = new Thread(this, "ObserverMaster");
+        thread.start();
+        pinger = Executors.newSingleThreadScheduledExecutor();
+        pinger.scheduleAtFixedRate(ping, self.tickTime /2, self.tickTime/2, 
TimeUnit.MILLISECONDS);
+    }
+
+    public void run() {
+        while (listenerRunning) {
+            try {
+                Socket s = ss.accept();
+                // start with the initLimit, once the ack is processed
+                // in LearnerHandler switch to the syncLimit
+                s.setSoTimeout(self.tickTime * self.initLimit);
+                BufferedInputStream is = new 
BufferedInputStream(s.getInputStream());
+                LearnerHandler lh = new LearnerHandler(s, is, this);
+                lh.start();
+            } catch (Exception e) {
+                if (listenerRunning) {
+                    LOG.debug("Ignoring accept exception (maybe shutting 
down)", e);
+                } else {
+                    LOG.debug("Ignoring accept exception (maybe client 
closed)", e);
+                }
+            }
+        }
+        /*
+         * we don't need to close ss because we only got here because 
listenerRunning is
+         * false and that is set and then ss is closed() in stop()
+         */
+    }
+
+    synchronized public void stop() {
+        listenerRunning = false;
+        if (pinger != null) {
+            pinger.shutdownNow();
+        }
+        if (ss != null) {
+            try {
+                ss.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+        for (LearnerHandler lh: activeObservers) {
+            lh.shutdown();
+        }
+    }
+
+    int getNumActiveObservers() {
+        return activeObservers.size();
+    }
+
+    int getPktsSizeLimit() {
+        return pktsSizeLimit;
+    }
+
+    static void setPktsSizeLimit(final int sizeLimit) {
+        pktsSizeLimit = sizeLimit;
+    }
+
+
+    @Override
+    public void registerLearnerHandlerBean(final LearnerHandler 
learnerHandler, Socket socket) {
+        LearnerHandlerBean bean = new LearnerHandlerBean(learnerHandler, 
socket);
+        if (zks.registerJMX(bean)) {
+            connectionBeans.put(learnerHandler, bean);
+        }
+    }
+
+    @Override
+    public void unregisterLearnerHandlerBean(final LearnerHandler 
learnerHandler) {
+        LearnerHandlerBean bean = connectionBeans.remove(learnerHandler);
+        if (bean != null){
+            MBeanRegistry.getInstance().unregister(bean);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
----------------------------------------------------------------------
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
index 035db0c..9d36fe2 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
@@ -144,6 +144,16 @@ public class QuorumPeer extends ZooKeeperThread implements 
QuorumStats.Provider
         }
     }
 
+    private int observerMasterPort;
+
+    public int getObserverMasterPort() {
+        return observerMasterPort;
+    }
+
+    public void setObserverMasterPort(int observerMasterPort) {
+        this.observerMasterPort = observerMasterPort;
+    }
+
     public static class QuorumServer {
         public InetSocketAddress addr = null;
 
@@ -1231,8 +1241,14 @@ public class QuorumPeer extends ZooKeeperThread 
implements QuorumStats.Provider
                         LOG.warn("Unexpected exception",e );
                     } finally {
                         observer.shutdown();
-                        setObserver(null);  
-                       updateServerState();
+                        setObserver(null);
+                        updateServerState();
+
+                        // Add delay jitter before we switch to LOOKING
+                        // state to reduce the load of ObserverMaster
+                        if (isRunning()) {
+                            Observer.waitForReconnectDelay();
+                        }
                     }
                     break;
                 case FOLLOWING:
@@ -1651,6 +1667,7 @@ public class QuorumPeer extends ZooKeeperThread 
implements QuorumStats.Provider
             if (qs != null) {
                 setAddrs(qs.addr, qs.electionAddr, qs.clientAddr);
             }
+            updateObserverMasterList();
             return prevQV;
         }
     }
@@ -1989,7 +2006,74 @@ public class QuorumPeer extends ZooKeeperThread 
implements QuorumStats.Provider
         }
     }
 
-   private boolean updateLearnerType(QuorumVerifier newQV) {        
+    private ArrayList<QuorumServer> observerMasters = new ArrayList<>();
+    private void updateObserverMasterList() {
+        if (observerMasterPort <= 0) {
+            return; // observer masters not enabled
+        }
+        observerMasters.clear();
+        StringBuilder sb = new StringBuilder();
+        for (QuorumServer server : quorumVerifier.getVotingMembers().values()) 
{
+            InetSocketAddress addr = new 
InetSocketAddress(server.addr.getAddress(), observerMasterPort);
+            observerMasters.add(new QuorumServer(server.id, addr));
+            sb.append(addr).append(",");
+        }
+        LOG.info("Updated learner master list to be {}", sb.toString());
+        Collections.shuffle(observerMasters);
+    }
+
+    private boolean useObserverMasters() {
+        return getLearnerType() == LearnerType.OBSERVER && 
observerMasters.size() > 0;
+    }
+
+    private int nextObserverMaster = 0;
+    private QuorumServer nextObserverMaster() {
+        if (nextObserverMaster >= observerMasters.size()) {
+            nextObserverMaster = 0;
+        }
+        return observerMasters.get(nextObserverMaster++);
+    }
+
+    QuorumServer findLearnerMaster(QuorumServer leader) {
+        return useObserverMasters() ? nextObserverMaster() : leader;
+    }
+
+    /**
+     * Vet a given learner master's information.
+     * Allows specification by server id, ip  only, or ip and port
+     */
+    QuorumServer validateLearnerMaster(String desiredMaster) {
+        if (useObserverMasters()) {
+            Long sid;
+            try {
+                sid = Long.parseLong(desiredMaster);
+            } catch (NumberFormatException e) {
+                sid = null;
+            }
+            for (QuorumServer server : observerMasters) {
+                if (sid == null) {
+                    String serverAddr = 
server.addr.getAddress().getHostAddress() + ':' + server.addr.getPort();
+                    if (serverAddr.startsWith(desiredMaster)) {
+                        return server;
+                    }
+                } else {
+                    if (sid.equals(server.id)) {
+                        return server;
+                    }
+                }
+            }
+            if (sid == null) {
+                LOG.info("could not find learner master address={}", 
desiredMaster);
+            } else {
+                LOG.warn("could not find learner master sid={}", sid);
+            }
+        } else {
+            LOG.info("cannot validate request, observer masters not enabled");
+        }
+        return null;
+    }
+
+   private boolean updateLearnerType(QuorumVerifier newQV) {
        //check if I'm an observer in new config
        if (newQV.getObservingMembers().containsKey(getId())) {
            if (getLearnerType()!=LearnerType.OBSERVER){

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
----------------------------------------------------------------------
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
index aee5efc..2c4dd0d 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
@@ -71,6 +71,7 @@ public class QuorumPeerConfig {
     protected InetSocketAddress secureClientPortAddress;
     protected boolean sslQuorum = false;
     protected boolean shouldUsePortUnification = false;
+    protected int observerMasterPort;
     protected File dataDir;
     protected File dataLogDir;
     protected String dynamicConfigFileStr = null;
@@ -239,6 +240,7 @@ public class QuorumPeerConfig {
     throws IOException, ConfigException {
         int clientPort = 0;
         int secureClientPort = 0;
+        int observerMasterPort = 0;
         String clientPortAddress = null;
         String secureClientPortAddress = null;
         VerifyingFileFactory vff = new 
VerifyingFileFactory.Builder(LOG).warnForRelativePath().build();
@@ -261,6 +263,8 @@ public class QuorumPeerConfig {
                 secureClientPort = Integer.parseInt(value);
             } else if (key.equals("secureClientPortAddress")){
                 secureClientPortAddress = value.trim();
+            } else if (key.equals("observerMasterPort")) {
+                observerMasterPort = Integer.parseInt(value);
             } else if (key.equals("tickTime")) {
                 tickTime = Integer.parseInt(value);
             } else if (key.equals("maxClientCnxns")) {
@@ -412,6 +416,13 @@ public class QuorumPeerConfig {
             configureSSLAuth();
         }
 
+        if (observerMasterPort <= 0) {
+            LOG.info("observerMasterPort is not set");
+        } else {
+            this.observerMasterPort = observerMasterPort;
+            LOG.info("observerMasterPort is {}", observerMasterPort);
+        }
+
         if (tickTime == 0) {
             throw new IllegalArgumentException("tickTime is not set");
         }
@@ -754,6 +765,7 @@ public class QuorumPeerConfig {
 
     public InetSocketAddress getClientPortAddress() { return 
clientPortAddress; }
     public InetSocketAddress getSecureClientPortAddress() { return 
secureClientPortAddress; }
+    public int getObserverMasterPort() { return observerMasterPort; }
     public File getDataDir() { return dataDir; }
     public File getDataLogDir() { return dataLogDir; }
     public int getTickTime() { return tickTime; }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
----------------------------------------------------------------------
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
index d2a02b2..a2e2bfc 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
@@ -188,6 +188,7 @@ public class QuorumPeerMain {
           quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
           quorumPeer.setInitLimit(config.getInitLimit());
           quorumPeer.setSyncLimit(config.getSyncLimit());
+          quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
           quorumPeer.setConfigFileName(config.getConfigFilename());
           quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
           quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
----------------------------------------------------------------------
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java 
b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
index 54a6be5..f657893 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.zookeeper;
 
+import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.junit.Rule;
@@ -75,4 +76,29 @@ public class ZKTestCase {
 
     };
 
+    public interface WaitForCondition {
+        /**
+         * @return true when success
+         */
+        boolean evaluate();
+    }
+
+    /**
+     * Wait for condition to be true; otherwise fail the test if it exceed
+     * timeout
+     * @param msg       error message to print when fail
+     * @param condition condition to evaluate
+     * @param timeout   timeout in seconds
+     * @throws InterruptedException
+     */
+    public void waitFor(String msg, WaitForCondition condition, int timeout)
+            throws InterruptedException {
+        for (int i = 0; i < timeout; ++i) {
+            if (condition.evaluate()) {
+                return;
+            }
+            Thread.sleep(100);
+        }
+        Assert.fail(msg);
+    }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DelayRequestProcessor.java
----------------------------------------------------------------------
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DelayRequestProcessor.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DelayRequestProcessor.java
new file mode 100644
index 0000000..aeb4e0f
--- /dev/null
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DelayRequestProcessor.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestProcessor;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Allows the blocking of the request processor queue on a ZooKeeperServer.
+ *
+ * This is used to simulate arbitrary length delays or to produce delays
+ * in request processing that are maximally inconvenient for a given feature
+ * for the purposes of testing it.
+ */
+public class DelayRequestProcessor implements RequestProcessor {
+
+    private boolean blocking;
+    RequestProcessor next;
+
+    private LinkedBlockingQueue<Request> incomingRequests = new 
LinkedBlockingQueue<>();
+
+    private DelayRequestProcessor(RequestProcessor next) {
+        this.blocking = true;
+        this.next = next;
+    }
+
+    @Override
+    public void processRequest(Request request) throws 
RequestProcessorException {
+        if (blocking) {
+            incomingRequests.add(request);
+        } else {
+            next.processRequest(request);
+        }
+    }
+
+    public void submitRequest(Request request) throws 
RequestProcessorException {
+        next.processRequest(request);
+    }
+
+    @Override
+    public void shutdown() {
+    }
+
+    public void unblockQueue() throws RequestProcessorException {
+        if (blocking) {
+            for (Request request : incomingRequests) {
+                next.processRequest(request);
+            }
+            blocking = false;
+        }
+    }
+
+    public static DelayRequestProcessor 
injectDelayRequestProcessor(FollowerZooKeeperServer zooKeeperServer) {
+        RequestProcessor finalRequestProcessor = 
zooKeeperServer.commitProcessor.nextProcessor;
+        DelayRequestProcessor delayRequestProcessor = new 
DelayRequestProcessor(finalRequestProcessor);
+        zooKeeperServer.commitProcessor.nextProcessor = delayRequestProcessor;
+        return delayRequestProcessor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
index f238971..1cd33ec 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
@@ -39,7 +39,6 @@ import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
 import org.apache.zookeeper.server.util.ZxidUtils;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Matchers;
@@ -127,6 +126,9 @@ public class LearnerHandlerTest extends ZKTestCase {
 
     @Before
     public void setUp() throws Exception {
+        db = new MockZKDatabase(null);
+        sock = mock(Socket.class);
+
         // Intercept when startForwarding is called
         leader = mock(Leader.class);
         when(
@@ -137,10 +139,8 @@ public class LearnerHandlerTest extends ZKTestCase {
                 return 0;
             }
         });
+        when(leader.getZKDatabase()).thenReturn(db);
 
-        sock = mock(Socket.class);
-
-        db = new MockZKDatabase(null);
         learnerHandler = new MockLearnerHandler(sock, leader);
     }
 
@@ -204,7 +204,7 @@ public class LearnerHandlerTest extends ZKTestCase {
         peerZxid = 3;
         db.lastProcessedZxid = 1;
         db.committedLog.clear();
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
         // We send TRUNC and forward any packet starting lastProcessedZxid
         assertOpType(Leader.TRUNC, db.lastProcessedZxid, db.lastProcessedZxid);
         reset();
@@ -213,7 +213,7 @@ public class LearnerHandlerTest extends ZKTestCase {
         peerZxid = 1;
         db.lastProcessedZxid = 1;
         db.committedLog.clear();
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
         // We send DIFF and forward any packet starting lastProcessedZxid
         assertOpType(Leader.DIFF, db.lastProcessedZxid, db.lastProcessedZxid);
         assertEquals(1, learnerHandler.getQueuedPackets().size());
@@ -226,7 +226,7 @@ public class LearnerHandlerTest extends ZKTestCase {
         db.lastProcessedZxid = 1;
         db.committedLog.clear();
         // We send SNAP
-        assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertTrue(learnerHandler.syncFollower(peerZxid, leader));
         assertEquals(0, learnerHandler.getQueuedPackets().size());
         reset();
 
@@ -248,7 +248,7 @@ public class LearnerHandlerTest extends ZKTestCase {
 
         // Peer has zxid that we have never seen
         peerZxid = 4;
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
         // We send TRUNC to 3 and forward any packet starting 5
         assertOpType(Leader.TRUNC, 3, 5);
         // DIFF + 1 proposals + 1 commit
@@ -258,7 +258,7 @@ public class LearnerHandlerTest extends ZKTestCase {
 
         // Peer is within committedLog range
         peerZxid = 2;
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
         // We send DIFF and forward any packet starting lastProcessedZxid
         assertOpType(Leader.DIFF, db.getmaxCommittedLog(),
                 db.getmaxCommittedLog());
@@ -271,7 +271,7 @@ public class LearnerHandlerTest extends ZKTestCase {
         peerZxid = 1;
         db.setSnapshotSizeFactor(-1);
         // We send SNAP
-        assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertTrue(learnerHandler.syncFollower(peerZxid, leader));
         assertEquals(0, learnerHandler.getQueuedPackets().size());
         reset();
     }
@@ -297,7 +297,7 @@ public class LearnerHandlerTest extends ZKTestCase {
 
         // Peer has zxid that we have never seen
         peerZxid = 4;
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
         // We send TRUNC to 3 and forward any packet starting at 
maxCommittedLog
         assertOpType(Leader.TRUNC, 3, db.getmaxCommittedLog());
         // DIFF + 4 proposals + 4 commit
@@ -307,7 +307,7 @@ public class LearnerHandlerTest extends ZKTestCase {
 
         // Peer zxid is in txnlog range
         peerZxid = 3;
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
         // We send DIFF and forward any packet starting at maxCommittedLog
         assertOpType(Leader.DIFF, db.getmaxCommittedLog(),
                 db.getmaxCommittedLog());
@@ -336,11 +336,12 @@ public class LearnerHandlerTest extends ZKTestCase {
         db.lastProcessedZxid = 7;
         db.txnLog.add(createProposal(2));
         db.txnLog.add(createProposal(3));
+        when(leader.getZKDatabase()).thenReturn(db);
 
         // Peer zxid
         peerZxid = 4;
         assertTrue("Couldn't identify snapshot transfer!",
-                learnerHandler.syncFollower(peerZxid, db, leader));
+                learnerHandler.syncFollower(peerZxid, leader));
         reset();
     }
 
@@ -362,7 +363,7 @@ public class LearnerHandlerTest extends ZKTestCase {
 
         // Peer has zxid that we have never seen
         peerZxid = 4;
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
         // We send TRUNC to 3 and forward any packet starting at
         // lastProcessedZxid
         assertOpType(Leader.TRUNC, 3, db.lastProcessedZxid);
@@ -373,7 +374,7 @@ public class LearnerHandlerTest extends ZKTestCase {
 
         // Peer has zxid in txnlog range
         peerZxid = 2;
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
         // We send DIFF and forward any packet starting at lastProcessedZxid
         assertOpType(Leader.DIFF, db.lastProcessedZxid, db.lastProcessedZxid);
         // DIFF + 4 proposals + 4 commit
@@ -383,7 +384,7 @@ public class LearnerHandlerTest extends ZKTestCase {
 
         // Peer miss the txnlog
         peerZxid = 1;
-        assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertTrue(learnerHandler.syncFollower(peerZxid, leader));
         // We send snap
         assertEquals(0, learnerHandler.getQueuedPackets().size());
         reset();
@@ -414,7 +415,7 @@ public class LearnerHandlerTest extends ZKTestCase {
 
         // Peer has zxid that we have never seen
         peerZxid = getZxid(0xf, 4);
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
         // We send TRUNC to 3 and forward any packet starting at 
maxCommittedLog
         assertOpType(Leader.TRUNC, getZxid(0xf, 3), db.getmaxCommittedLog());
         // DIFF + 4 proposals + 4 commit
@@ -425,7 +426,7 @@ public class LearnerHandlerTest extends ZKTestCase {
 
         // Peer zxid is in txnlog range
         peerZxid = getZxid(0xf, 3);
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
         // We send DIFF and forward any packet starting at maxCommittedLog
         assertOpType(Leader.DIFF, db.getmaxCommittedLog(),
                 db.getmaxCommittedLog());
@@ -456,13 +457,13 @@ public class LearnerHandlerTest extends ZKTestCase {
         // We should get snap, we can do better here, but the main logic is
         // that we should never send diff if we have never seen any txn older
         // than peer zxid
-        assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertTrue(learnerHandler.syncFollower(peerZxid, leader));
         assertEquals(0, learnerHandler.getQueuedPackets().size());
         reset();
 
         // Peer has zxid of epoch 1
         peerZxid = getZxid(1, 0);
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
         // We send DIFF to (1, 2) and forward any packet starting at (1, 2)
         assertOpType(Leader.DIFF, getZxid(1, 2), getZxid(1, 2));
         // DIFF + 2 proposals + 2 commit
@@ -472,7 +473,7 @@ public class LearnerHandlerTest extends ZKTestCase {
 
         // Peer has zxid of epoch 2, so it is already sync
         peerZxid = getZxid(2, 0);
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
         // We send DIFF to (2, 0) and forward any packet starting at (2, 0)
         assertOpType(Leader.DIFF, getZxid(2, 0), getZxid(2, 0));
         // DIFF only
@@ -498,7 +499,7 @@ public class LearnerHandlerTest extends ZKTestCase {
 
         // Peer has zxid of epoch 3
         peerZxid = getZxid(3, 0);
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
         // We send DIFF to (6,0) and forward any packet starting at (4,1)
         assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(4, 1));
         // DIFF + 1 proposals + 1 commit
@@ -508,7 +509,7 @@ public class LearnerHandlerTest extends ZKTestCase {
 
         // Peer has zxid of epoch 4
         peerZxid = getZxid(4, 0);
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
         // We send DIFF to (6,0) and forward any packet starting at (4,1)
         assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(4, 1));
         // DIFF + 1 proposals + 1 commit
@@ -518,7 +519,7 @@ public class LearnerHandlerTest extends ZKTestCase {
 
         // Peer has zxid of epoch 5
         peerZxid = getZxid(5, 0);
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
         // We send DIFF to (6,0) and forward any packet starting at (5,0)
         assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(5, 0));
         // DIFF only
@@ -527,7 +528,7 @@ public class LearnerHandlerTest extends ZKTestCase {
 
         // Peer has zxid of epoch 6
         peerZxid = getZxid(6, 0);
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
         // We send DIFF to (6,0) and forward any packet starting at (6, 0)
         assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(6, 0));
         // DIFF only
@@ -558,7 +559,7 @@ public class LearnerHandlerTest extends ZKTestCase {
 
         // Peer has zxid of epoch 1
         peerZxid = getZxid(1, 0);
-        assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertFalse(learnerHandler.syncFollower(peerZxid, leader));
         // We send DIFF to (1, 2) and forward any packet starting at (1, 2)
         assertOpType(Leader.DIFF, getZxid(1, 2), getZxid(1, 2));
         // DIFF + 2 proposals + 2 commit
@@ -585,7 +586,7 @@ public class LearnerHandlerTest extends ZKTestCase {
 
         // Peer has zxid (3, 1)
         peerZxid = getZxid(3, 1);
-        assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
+        assertTrue(learnerHandler.syncFollower(peerZxid, leader));
         assertEquals(0, learnerHandler.getQueuedPackets().size());
         reset();
     }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/PortForwarder.java
----------------------------------------------------------------------
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/PortForwarder.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/PortForwarder.java
index acbad80..f222121 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/PortForwarder.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/PortForwarder.java
@@ -16,9 +16,6 @@
  * limitations under the License.
  */
 
-/**
- * 
- */
 package org.apache.zookeeper.server.util;
 
 import java.io.IOException;
@@ -29,6 +26,8 @@ import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketException;
 import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -95,9 +94,10 @@ public class PortForwarder extends Thread {
         private final OutputStream out;
         private final Socket toClose;
         private final Socket toClose2;
+        private boolean isFinished = false;
 
         PortForwardWorker(Socket toClose, Socket toClose2, InputStream in,
-                OutputStream out) throws IOException {
+                OutputStream out) {
             this.toClose = toClose;
             this.toClose2 = toClose2;
             this.in = in;
@@ -118,50 +118,57 @@ public class PortForwarder extends Thread {
                                 this.out.write(buf, 0, read);
                             } catch (IOException e) {
                                 LOG.warn("exception during write", e);
-                                try {
-                                    toClose.close();
-                                } catch (IOException ex) {
-                                    // ignore
-                                }
-                                try {
-                                    toClose2.close();
-                                } catch (IOException ex) {
-                                    // ignore
-                                }
                                 break;
                             }
+                        } else if (read < 0) {
+                            throw new IOException("read " + read);
                         }
                     } catch (SocketTimeoutException e) {
                         LOG.error("socket timeout", e);
                     }
-                    Thread.sleep(1);
                 }
+                Thread.sleep(1);
             } catch (InterruptedException e) {
                 LOG.warn("Interrupted", e);
-                try {
-                    toClose.close();
-                } catch (IOException ex) {
-                    // ignore
-                }
-                try {
-                    toClose2.close();
-                } catch (IOException ex) {
-                    // ignore silently
-                }
             } catch (SocketException e) {
                 if (!"Socket closed".equals(e.getMessage())) {
                     LOG.error("Unexpected exception", e);
                 }
             } catch (IOException e) {
                 LOG.error("Unexpected exception", e);
+            } finally {
+                shutdown();
             }
             LOG.info("Shutting down forward for " + toClose);
+            isFinished = true;
         }
 
+        boolean waitForShutdown(long timeoutMs) throws InterruptedException {
+            synchronized (this) {
+                if (!isFinished) {
+                   this.wait(timeoutMs);
+                }
+            }
+            return isFinished;
+        }
+
+        public void shutdown() {
+            try {
+                toClose.close();
+            } catch (IOException ex) {
+                // ignore
+            }
+            try {
+                toClose2.close();
+            } catch (IOException ex) {
+                // ignore silently
+            }
+        }
     }
 
     private volatile boolean stopped = false;
-    private ExecutorService workers = Executors.newCachedThreadPool();
+    private ExecutorService workerExecutor = Executors.newCachedThreadPool();
+    private List<PortForwardWorker> workers = new ArrayList<>();
     private ServerSocket serverSocket;
     private final int to;
 
@@ -207,30 +214,31 @@ public class PortForwarder extends Thread {
                             + " to:" + to);
                     sock.setSoTimeout(30000);
                     target.setSoTimeout(30000);
-                    this.workers.execute(new PortForwardWorker(sock, target,
+
+
+                    workers.add(new PortForwardWorker(sock, target,
                             sock.getInputStream(), target.getOutputStream()));
-                    this.workers.execute(new PortForwardWorker(target, sock,
+                    workers.add(new PortForwardWorker(target, sock,
                             target.getInputStream(), sock.getOutputStream()));
-                } catch (SocketTimeoutException e) {                   
-                    LOG.warn("socket timed out local:" 
-                            + (sock != null ? sock.getLocalPort(): "")
-                            + " from:" + (sock != null ? sock.getPort(): "")
-                            + " to:" + to, e);
+                    for (PortForwardWorker worker: workers) {
+                        workerExecutor.submit(worker);
+                    }
+                } catch (SocketTimeoutException e) {
+                    LOG.warn("socket timed out", e);
                 } catch (ConnectException e) {
-                    LOG.warn("connection exception local:"
-                            + (sock != null ? sock.getLocalPort(): "")
-                            + " from:" + (sock != null ? sock.getPort(): "")
+                    LOG.warn("connection exception local:" + 
sock.getLocalPort()
+                            + " from:" + sock.getPort()
                             + " to:" + to, e);
                     sock.close();
                 } catch (IOException e) {
                     if (!"Socket closed".equals(e.getMessage())) {
-                        LOG.warn("unexpected exception local:" 
-                                       + (sock != null ? sock.getLocalPort(): 
"")
-                                + " from:" + (sock != null ? sock.getPort(): 
"")
-                                + " to:" + to, e);
+                        LOG.warn("unexpected exception local:" + 
sock.getLocalPort()
+                            + " from:" + sock.getPort()
+                            + " to:" + to, e);
                         throw e;
                     }
                 }
+
             }
         } catch (IOException e) {
             LOG.error("Unexpected exception to:" + to, e);
@@ -242,15 +250,16 @@ public class PortForwarder extends Thread {
     public void shutdown() throws Exception {
         this.stopped = true;
         this.serverSocket.close();
-        this.workers.shutdownNow();
-        try {
-            if (!this.workers.awaitTermination(5, TimeUnit.SECONDS)) {
-                throw new Exception(
-                        "Failed to stop forwarding within 5 seconds");
+        this.join();
+        this.workerExecutor.shutdownNow();
+        for (PortForwardWorker worker: workers) {
+            worker.shutdown();
+        }
+
+        for (PortForwardWorker worker: workers) {
+            if (!worker.waitForShutdown(5000)) {
+                throw new Exception("Failed to stop forwarding within 5 
seconds");
             }
-        } catch (InterruptedException e) {
-            throw new Exception("Failed to stop forwarding");
         }
-        this.join();
     }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/b2513c11/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java
----------------------------------------------------------------------
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java
new file mode 100644
index 0000000..4cec0ef
--- /dev/null
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java
@@ -0,0 +1,780 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.jmx.CommonNames;
+import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.jmx.ZKMBeanInfo;
+import org.apache.zookeeper.server.admin.Commands;
+import org.apache.zookeeper.server.quorum.DelayRequestProcessor;
+import org.apache.zookeeper.server.quorum.FollowerZooKeeperServer;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
+import org.apache.zookeeper.server.util.PortForwarder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.admin.ZooKeeperAdmin;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.management.Attribute;
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.InvalidAttributeValueException;
+import javax.management.MBeanException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+import javax.management.RuntimeMBeanException;
+
+@RunWith(Parameterized.class)
+public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher{
+    protected static final Logger LOG = 
LoggerFactory.getLogger(ObserverMasterTest.class);
+
+    public ObserverMasterTest(Boolean testObserverMaster) {
+        this.testObserverMaster = testObserverMaster;
+    }
+
+    @Parameterized.Parameters
+    public static List<Object []> data() { return Arrays.asList(new Object 
[][] {
+            {Boolean.TRUE},
+            {Boolean.FALSE}});
+    }
+
+    private Boolean testObserverMaster;
+
+    private CountDownLatch latch;
+    ZooKeeper zk;
+    private WatchedEvent lastEvent = null;
+
+    private int CLIENT_PORT_QP1;
+    private int CLIENT_PORT_QP2;
+    private int CLIENT_PORT_OBS;
+    private int OM_PORT;
+    private MainThread q1;
+    private MainThread q2;
+    private MainThread q3;
+
+    private PortForwarder setUp(final int omProxyPort) throws IOException {
+        ClientBase.setupTestEnv();
+
+        final int PORT_QP1 = PortAssignment.unique();
+        final int PORT_QP2 = PortAssignment.unique();
+        final int PORT_OBS = PortAssignment.unique();
+        final int PORT_QP_LE1 = PortAssignment.unique();
+        final int PORT_QP_LE2 = PortAssignment.unique();
+        final int PORT_OBS_LE = PortAssignment.unique();
+
+        CLIENT_PORT_QP1 = PortAssignment.unique();
+        CLIENT_PORT_QP2 = PortAssignment.unique();
+        CLIENT_PORT_OBS = PortAssignment.unique();
+
+        OM_PORT = PortAssignment.unique();
+
+        String quorumCfgSection =
+                "server.1=127.0.0.1:" + (PORT_QP1)
+                        + ":" + (PORT_QP_LE1) + ";" +  CLIENT_PORT_QP1
+                        + "\nserver.2=127.0.0.1:" + (PORT_QP2)
+                        + ":" + (PORT_QP_LE2) + ";" + CLIENT_PORT_QP2
+                        + "\nserver.3=127.0.0.1:"
+                        + (PORT_OBS)+ ":" + (PORT_OBS_LE) + ":observer" + ";" 
+ CLIENT_PORT_OBS;
+        String extraCfgs = testObserverMaster ? 
String.format("observerMasterPort=%d%n", OM_PORT) : "";
+        String extraCfgsObs = testObserverMaster ? 
String.format("observerMasterPort=%d%n", omProxyPort <= 0 ? OM_PORT : 
omProxyPort) : "";
+
+        PortForwarder forwarder = null;
+        if (testObserverMaster && omProxyPort >= 0) {
+            forwarder = new PortForwarder(omProxyPort, OM_PORT);
+        }
+
+        q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection, extraCfgs);
+        q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection, extraCfgs);
+        q3 = new MainThread(3, CLIENT_PORT_OBS, quorumCfgSection, 
extraCfgsObs);
+        q1.start();
+        q2.start();
+        Assert.assertTrue("waiting for server 1 being up",
+                ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1,
+                        CONNECTION_TIMEOUT));
+        Assert.assertTrue("waiting for server 2 being up",
+                ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2,
+                        CONNECTION_TIMEOUT));
+        return forwarder;
+    }
+
+    private void shutdown() throws InterruptedException {
+        LOG.info("Shutting down all servers");
+        zk.close();
+
+        q1.shutdown();
+        q2.shutdown();
+        q3.shutdown();
+
+        Assert.assertTrue("Waiting for server 1 to shut down",
+                ClientBase.waitForServerDown("127.0.0.1:"+CLIENT_PORT_QP1,
+                        ClientBase.CONNECTION_TIMEOUT));
+        Assert.assertTrue("Waiting for server 2 to shut down",
+                ClientBase.waitForServerDown("127.0.0.1:"+CLIENT_PORT_QP2,
+                        ClientBase.CONNECTION_TIMEOUT));
+        Assert.assertTrue("Waiting for server 3 to shut down",
+                ClientBase.waitForServerDown("127.0.0.1:"+CLIENT_PORT_OBS,
+                        ClientBase.CONNECTION_TIMEOUT));
+    }
+
+    @Test
+    public void testLaggingObserverMaster() throws Exception {
+        final int OM_PROXY_PORT = PortAssignment.unique();
+        PortForwarder forwarder = setUp(OM_PROXY_PORT);
+
+        // find the leader and observer master
+        int leaderPort;
+        MainThread leader;
+        MainThread follower;
+        if (q1.getQuorumPeer().leader != null) {
+            leaderPort = CLIENT_PORT_QP1;
+            leader = q1;
+            follower = q2;
+        } else if (q2.getQuorumPeer().leader != null) {
+            leaderPort = CLIENT_PORT_QP2;
+            leader = q2;
+            follower = q1;
+        } else {
+            throw new RuntimeException("No leader");
+        }
+
+        // ensure the observer master has commits in the queue before observer 
sync
+        zk = new ZooKeeper("127.0.0.1:" + leaderPort,
+                ClientBase.CONNECTION_TIMEOUT, this);
+        for (int i = 0; i < 10; i++) {
+            zk.create("/bulk" + i, ("initial data of some size").getBytes(), 
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        }
+        zk.close();
+
+        q3.start();
+        Assert.assertTrue("waiting for server 3 being up",
+                ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS,
+                        CONNECTION_TIMEOUT));
+
+        latch = new CountDownLatch(1);
+        zk = new ZooKeeper("127.0.0.1:" + leaderPort,
+                ClientBase.CONNECTION_TIMEOUT, this);
+        latch.await();
+        Assert.assertEquals(zk.getState(), States.CONNECTED);
+
+        zk.create("/init", "first".getBytes(), Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+        final long lastLoggedZxid = leader.getQuorumPeer().getLastLoggedZxid();
+
+        // wait for change to propagate
+        waitFor("Timeout waiting for observer sync", new WaitForCondition() {
+            public boolean evaluate() {
+                return lastLoggedZxid == 
q3.getQuorumPeer().getLastLoggedZxid();
+            }
+        }, 30);
+
+        // simulate network fault
+        if (forwarder != null) {
+            forwarder.shutdown();
+        }
+
+        for (int i = 0; i < 10; i++) {
+            zk.create("/basic" + i, "second".getBytes(),Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+        }
+
+        DelayRequestProcessor delayRequestProcessor = null;
+        if (testObserverMaster) {
+            FollowerZooKeeperServer followerZooKeeperServer = 
(FollowerZooKeeperServer) follower.getQuorumPeer().getActiveServer();
+            delayRequestProcessor = 
DelayRequestProcessor.injectDelayRequestProcessor(followerZooKeeperServer);
+        }
+
+        zk.create("/target1", "third".getBytes(),Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        zk.create("/target2", "third".getBytes(),Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        LOG.info("observer zxid " + 
Long.toHexString(q3.getQuorumPeer().getLastLoggedZxid()) +
+                (testObserverMaster ? "" : " observer master zxid " +
+                        
Long.toHexString(follower.getQuorumPeer().getLastLoggedZxid())) +
+                " leader zxid " + 
Long.toHexString(leader.getQuorumPeer().getLastLoggedZxid()));
+
+        // restore network
+        forwarder = testObserverMaster ? new PortForwarder(OM_PROXY_PORT, 
OM_PORT) : null;
+
+        Assert.assertTrue("waiting for server 3 being up",
+                ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS,
+                        CONNECTION_TIMEOUT));
+        Assert.assertNotNull("Leader switched", leader.getQuorumPeer().leader);
+
+        if (delayRequestProcessor != null) {
+            delayRequestProcessor.unblockQueue();
+        }
+
+        latch = new CountDownLatch(1);
+        ZooKeeper obsZk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_OBS,
+                ClientBase.CONNECTION_TIMEOUT, this);
+        latch.await();
+        zk.create("/finalop", "fourth".getBytes(),Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        Assert.assertEquals("first", new String(obsZk.getData("/init", null, 
null)));
+        Assert.assertEquals("third", new String(obsZk.getData("/target1", 
null, null)));
+
+        obsZk.close();
+        shutdown();
+
+        try {
+            if (forwarder != null) {
+                forwarder.shutdown();
+            }
+        } catch (Exception e) {
+            // ignore
+        }
+    }
+
+    /**
+     * This test ensures two things:
+     * 1. That Observers can successfully proxy requests to the ensemble.
+     * 2. That Observers don't participate in leader elections.
+     * The second is tested by constructing an ensemble where a leader would
+     * be elected if and only if an Observer voted.
+     */
+    @Test
+    public void testObserver() throws Exception {
+        // We expect two notifications before we want to continue
+        latch = new CountDownLatch(2);
+        setUp(-1);
+        q3.start();
+        Assert.assertTrue("waiting for server 3 being up",
+                ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS,
+                        CONNECTION_TIMEOUT));
+
+        if (testObserverMaster) {
+            int masterPort = q3.getQuorumPeer().observer.getSocket().getPort();
+            LOG.info("port " + masterPort + " " + OM_PORT);
+            Assert.assertEquals("observer failed to connect to observer 
master", masterPort, OM_PORT);
+        }
+
+        zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_OBS,
+                ClientBase.CONNECTION_TIMEOUT, this);
+        zk.create("/obstest", "test".getBytes(),Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        // Assert that commands are getting forwarded correctly
+        Assert.assertEquals(new String(zk.getData("/obstest", null, null)), 
"test");
+
+        // Now check that other commands don't blow everything up
+        zk.sync("/", null, null);
+        zk.setData("/obstest", "test2".getBytes(), -1);
+        zk.getChildren("/", false);
+
+        Assert.assertEquals(zk.getState(), States.CONNECTED);
+
+        LOG.info("Shutting down server 2");
+        // Now kill one of the other real servers
+        q2.shutdown();
+
+        Assert.assertTrue("Waiting for server 2 to shut down",
+                ClientBase.waitForServerDown("127.0.0.1:"+CLIENT_PORT_QP2,
+                        ClientBase.CONNECTION_TIMEOUT));
+
+        LOG.info("Server 2 down");
+
+        // Now the resulting ensemble shouldn't be quorate
+        latch.await();
+        Assert.assertNotSame("Client is still connected to non-quorate 
cluster",
+                KeeperState.SyncConnected,lastEvent.getState());
+
+        LOG.info("Latch returned");
+
+        try {
+            Assert.assertNotEquals("Shouldn't get a response when cluster not 
quorate!",
+                    "test", new String(zk.getData("/obstest", null, null)));
+        }
+        catch (ConnectionLossException c) {
+            LOG.info("Connection loss exception caught - ensemble not quorate 
(this is expected)");
+        }
+
+        latch = new CountDownLatch(1);
+
+        LOG.info("Restarting server 2");
+
+        // Bring it back
+        //q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection, extraCfgs);
+        q2.start();
+
+        LOG.info("Waiting for server 2 to come up");
+        Assert.assertTrue("waiting for server 2 being up",
+                ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2,
+                        CONNECTION_TIMEOUT));
+
+        LOG.info("Server 2 started, waiting for latch");
+
+        latch.await();
+        // It's possible our session expired - but this is ok, shows we
+        // were able to talk to the ensemble
+        Assert.assertTrue("Client didn't reconnect to quorate ensemble (state 
was" +
+                        lastEvent.getState() + ")",
+                (KeeperState.SyncConnected==lastEvent.getState() ||
+                        KeeperState.Expired==lastEvent.getState()));
+
+        LOG.info("perform a revalidation test");
+        int leaderProxyPort = PortAssignment.unique();
+        int obsProxyPort = PortAssignment.unique();
+        int leaderPort = q1.getQuorumPeer().leader == null ? CLIENT_PORT_QP2 : 
CLIENT_PORT_QP1;
+        PortForwarder leaderPF = new PortForwarder(leaderProxyPort, 
leaderPort);
+
+        latch = new CountDownLatch(1);
+        ZooKeeper client = new 
ZooKeeper(String.format("127.0.0.1:%d,127.0.0.1:%d", leaderProxyPort, 
obsProxyPort),
+                ClientBase.CONNECTION_TIMEOUT, this);
+        latch.await();
+        client.create("/revalidtest", "test".getBytes(),Ids.OPEN_ACL_UNSAFE,
+                CreateMode.EPHEMERAL);
+        Assert.assertNotNull("Read-after write failed", 
client.exists("/revalidtest", null));
+
+        latch = new CountDownLatch(2);
+        PortForwarder obsPF = new PortForwarder(obsProxyPort, CLIENT_PORT_OBS);
+        try {
+            leaderPF.shutdown();
+        } catch (Exception e) {
+            // ignore?
+        }
+        latch.await();
+        Assert.assertEquals(new String(client.getData("/revalidtest", null, 
null)), "test");
+        client.close();
+        obsPF.shutdown();
+
+        shutdown();
+    }
+
+    @Test
+    public void testRevalidation() throws Exception {
+        setUp(-1);
+        q3.start();
+        Assert.assertTrue("waiting for server 3 being up",
+                ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS,
+                        CONNECTION_TIMEOUT));
+        final int leaderProxyPort = PortAssignment.unique();
+        final int obsProxyPort = PortAssignment.unique();
+
+        int leaderPort = q1.getQuorumPeer().leader == null ? CLIENT_PORT_QP2 : 
CLIENT_PORT_QP1;
+        PortForwarder leaderPF = new PortForwarder(leaderProxyPort, 
leaderPort);
+
+        latch = new CountDownLatch(1);
+        zk = new ZooKeeper(String.format("127.0.0.1:%d,127.0.0.1:%d", 
leaderProxyPort, obsProxyPort),
+                ClientBase.CONNECTION_TIMEOUT, this);
+        latch.await();
+        zk.create("/revalidtest", "test".getBytes(),Ids.OPEN_ACL_UNSAFE,
+                CreateMode.EPHEMERAL);
+        Assert.assertNotNull("Read-after write failed", 
zk.exists("/revalidtest", null));
+
+        latch = new CountDownLatch(2);
+        PortForwarder obsPF = new PortForwarder(obsProxyPort, CLIENT_PORT_OBS);
+        try {
+            leaderPF.shutdown();
+        } catch (Exception e) {
+            // ignore?
+        }
+        latch.await();
+        Assert.assertEquals(new String(zk.getData("/revalidtest", null, 
null)), "test");
+        obsPF.shutdown();
+
+        shutdown();
+    }
+
+    @Test
+    public void testInOrderCommits() throws Exception {
+        setUp(-1);
+
+        zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_QP1,
+                ClientBase.CONNECTION_TIMEOUT, null);
+        for (int i = 0; i < 10; i++) {
+            zk.create("/bulk" + i, ("Initial data of some size").getBytes(), 
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        }
+        zk.close();
+
+        q3.start();
+        Assert.assertTrue("waiting for observer to be up",
+                ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS,
+                        CONNECTION_TIMEOUT));
+
+        latch = new CountDownLatch(1);
+        zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_QP1,
+                ClientBase.CONNECTION_TIMEOUT, this);
+        latch.await();
+        Assert.assertEquals(zk.getState(), States.CONNECTED);
+
+        zk.create("/init", "first".getBytes(), Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+        final long zxid = q1.getQuorumPeer().getLastLoggedZxid();
+
+        // wait for change to propagate
+        waitFor("Timeout waiting for observer sync", new WaitForCondition() {
+            public boolean evaluate() {
+                return zxid == q3.getQuorumPeer().getLastLoggedZxid();
+            }
+        }, 30);
+
+        ZooKeeper obsZk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_OBS,
+                ClientBase.CONNECTION_TIMEOUT, this);
+        int followerPort = q1.getQuorumPeer().leader == null ? CLIENT_PORT_QP1 
: CLIENT_PORT_QP2;
+        ZooKeeper fZk = new ZooKeeper("127.0.0.1:" + followerPort,
+                ClientBase.CONNECTION_TIMEOUT, this);
+        final int numTransactions = 10001;
+        CountDownLatch gate = new CountDownLatch(1);
+        CountDownLatch oAsyncLatch = new CountDownLatch(numTransactions);
+        Thread oAsyncWriteThread = new Thread(new AsyncWriter(obsZk, 
numTransactions, true, oAsyncLatch, "/obs", gate));
+        CountDownLatch fAsyncLatch = new CountDownLatch(numTransactions);
+        Thread fAsyncWriteThread = new Thread(new AsyncWriter(fZk, 
numTransactions, true, fAsyncLatch, "/follower", gate));
+
+        LOG.info("ASYNC WRITES");
+        oAsyncWriteThread.start();
+        fAsyncWriteThread.start();
+        gate.countDown();
+
+        oAsyncLatch.await();
+        fAsyncLatch.await();
+
+        oAsyncWriteThread.join(ClientBase.CONNECTION_TIMEOUT);
+        if (oAsyncWriteThread.isAlive()) {
+            LOG.error("asyncWriteThread is still alive");
+        }
+        fAsyncWriteThread.join(ClientBase.CONNECTION_TIMEOUT);
+        if (fAsyncWriteThread.isAlive()) {
+            LOG.error("asyncWriteThread is still alive");
+        }
+
+        obsZk.close();
+        fZk.close();
+
+        shutdown();
+    }
+
+    @Test
+    public void testAdminCommands() throws IOException, MBeanException,
+            InstanceNotFoundException, ReflectionException, 
InterruptedException, MalformedObjectNameException,
+            AttributeNotFoundException, InvalidAttributeValueException, 
KeeperException {
+        // flush all beans, then start
+        for (ZKMBeanInfo beanInfo : 
MBeanRegistry.getInstance().getRegisteredBeans()) {
+            MBeanRegistry.getInstance().unregister(beanInfo);
+        }
+
+        JMXEnv.setUp();
+        setUp(-1);
+        q3.start();
+        Assert.assertTrue("waiting for observer to be up",
+                ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS,
+                        CONNECTION_TIMEOUT));
+
+        // Assert that commands are getting forwarded correctly
+        zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_OBS,
+                ClientBase.CONNECTION_TIMEOUT, this);
+        zk.create("/obstest", "test".getBytes(),Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        Assert.assertEquals(new String(zk.getData("/obstest", null, null)), 
"test");
+
+        // test stats collection
+        final Map<String, String> emptyMap = Collections.emptyMap();
+        Map<String, Object> stats = Commands.runCommand("mntr", 
q3.getQuorumPeer().getActiveServer(), emptyMap).toMap();
+        Assert.assertTrue("observer not emitting observer_master_id", 
stats.containsKey("observer_master_id"));
+
+        // check the stats for the first peer
+        stats = Commands.runCommand("mntr", 
q1.getQuorumPeer().getActiveServer(), emptyMap).toMap();
+        if (testObserverMaster) {
+            if (q1.getQuorumPeer().leader == null) {
+                Assert.assertEquals(1, stats.get("synced_observers"));
+            } else {
+                Assert.assertEquals(0, stats.get("synced_observers"));
+            }
+        } else {
+            if (q1.getQuorumPeer().leader == null) {
+                Assert.assertNull(stats.get("synced_observers"));
+            } else {
+                Assert.assertEquals(1, stats.get("synced_observers"));
+            }
+        }
+
+        // check the stats for the second peer
+        stats = Commands.runCommand("mntr", 
q2.getQuorumPeer().getActiveServer(), emptyMap).toMap();
+        if (testObserverMaster) {
+            if (q2.getQuorumPeer().leader == null) {
+                Assert.assertEquals(1, stats.get("synced_observers"));
+            } else {
+                Assert.assertEquals(0, stats.get("synced_observers"));
+            }
+        } else {
+            if (q2.getQuorumPeer().leader == null) {
+                Assert.assertNull(stats.get("synced_observers"));
+            } else {
+                Assert.assertEquals(1, stats.get("synced_observers"));
+            }
+        }
+
+        // test admin commands for disconnection
+        ObjectName connBean = null;
+        for (ObjectName bean : JMXEnv.conn().queryNames(new 
ObjectName(CommonNames.DOMAIN + ":*"), null)) {
+            if (bean.getCanonicalName().contains("Learner_Connections") &&
+                    bean.getCanonicalName().contains("id:" + 
q3.getQuorumPeer().getId())) {
+                connBean = bean;
+                break;
+            }
+        }
+        Assert.assertNotNull("could not find connection bean", connBean);
+
+        latch = new CountDownLatch(1);
+        JMXEnv.conn().invoke(connBean, "terminateConnection", new Object[0], 
null);
+        Assert.assertTrue("server failed to disconnect on terminate",
+                latch.await(CONNECTION_TIMEOUT/2, TimeUnit.MILLISECONDS));
+        Assert.assertTrue("waiting for server 3 being up",
+                ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS,
+                        CONNECTION_TIMEOUT));
+
+        final String obsBeanName =
+                
String.format("org.apache.ZooKeeperService:name0=ReplicatedServer_id%d,name1=replica.%d,name2=Observer",
+                        q3.getQuorumPeer().getId(), 
q3.getQuorumPeer().getId());
+        Set<ObjectName> names = JMXEnv.conn().queryNames(new 
ObjectName(obsBeanName), null);
+        Assert.assertEquals("expecting singular observer bean", 1, 
names.size());
+        ObjectName obsBean = names.iterator().next();
+
+        if (testObserverMaster) {
+            // show we can move the observer using the id
+            long observerMasterId = 
q3.getQuorumPeer().observer.getLearnerMasterId();
+            latch = new CountDownLatch(1);
+            JMXEnv.conn().setAttribute(obsBean, new Attribute("LearnerMaster", 
Long.toString(3 - observerMasterId)));
+            Assert.assertTrue("server failed to disconnect on terminate",
+                    latch.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
+            Assert.assertTrue("waiting for server 3 being up",
+                    ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS,
+                            CONNECTION_TIMEOUT));
+        } else {
+            // show we get an error
+            final long leaderId = q1.getQuorumPeer().leader == null ? 2 : 1;
+            try {
+                JMXEnv.conn().setAttribute(obsBean, new 
Attribute("LearnerMaster", Long.toString(3 - leaderId)));
+                Assert.fail("should have seen an exception on previous 
command");
+            } catch (RuntimeMBeanException e) {
+                Assert.assertEquals("mbean failed for the wrong reason",
+                        IllegalArgumentException.class, 
e.getCause().getClass());
+            }
+        }
+
+        shutdown();
+        JMXEnv.tearDown();
+    }
+
+    private String createServerString(String type, long serverId, int 
clientPort) {
+        return "server." + serverId + "=127.0.0.1:" +
+                PortAssignment.unique() + ":" +
+                PortAssignment.unique() + ":" +
+                type + ";" + clientPort;
+    }
+
+    private void waitServerUp(int clientPort) {
+        Assert.assertTrue("waiting for server being up",
+                ClientBase.waitForServerUp("127.0.0.1:" + clientPort,
+                        CONNECTION_TIMEOUT));
+    }
+
+    private ZooKeeperAdmin createAdmin(int clientPort) throws IOException {
+        
System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest",
+                "super:D/InIHSb7yEEbrWz8b9l71RjZJU="/* password is 'test'*/);
+        QuorumPeerConfig.setReconfigEnabled(true);
+        ZooKeeperAdmin admin = new ZooKeeperAdmin("127.0.0.1:" + clientPort,
+                ClientBase.CONNECTION_TIMEOUT, new Watcher() {
+                    public void process(WatchedEvent event) {}
+                });
+        admin.addAuthInfo("digest", "super:test".getBytes());
+        return admin;
+    }
+
+    // This test is known to be flaky and fail due to "reconfig already in 
progress".
+    // TODO: Investigate intermittent testDynamicReconfig failures.
+    // @Test
+    public void testDynamicReconfig() throws InterruptedException, IOException,
+              KeeperException {
+        if (!testObserverMaster) {
+            return;
+        }
+
+        ClientBase.setupTestEnv();
+
+        // create a quorum running with different observer master port
+        // to make it easier to choose which server the observer is
+        // following with
+        //
+        // we have setObserverMaster function but it's broken, use this
+        // solution before we fixed that
+        int clientPort1 = PortAssignment.unique();
+        int clientPort2 = PortAssignment.unique();
+        int omPort1 = PortAssignment.unique();
+        int omPort2 = PortAssignment.unique();
+        String quorumCfgSection =
+                createServerString("participant", 1, clientPort1) + "\n" +
+                createServerString("participant", 2, clientPort2);
+
+        MainThread s1 = new MainThread(1, clientPort1, quorumCfgSection,
+                String.format("observerMasterPort=%d%n",omPort1));
+        MainThread s2 = new MainThread(2, clientPort2, quorumCfgSection,
+                String.format("observerMasterPort=%d%n", omPort2));
+        s1.start();
+        s2.start();
+        waitServerUp(clientPort1);
+        waitServerUp(clientPort2);
+
+        // create observer to follow non-leader observer master
+        long nonLeaderOMPort = s1.getQuorumPeer().leader == null ? omPort1
+                                                                 : omPort2;
+        int observerClientPort = PortAssignment.unique();
+        int observerId = 10;
+        MainThread observer = new MainThread(
+                observerId,
+                observerClientPort, quorumCfgSection + "\n" +
+                createServerString("observer", observerId,
+                        observerClientPort),
+                String.format("observerMasterPort=%d%n", nonLeaderOMPort));
+        LOG.info("starting observer");
+        observer.start();
+        waitServerUp(observerClientPort);
+
+        // create a client to the observer
+        final LinkedBlockingQueue<KeeperState> states =
+            new LinkedBlockingQueue<KeeperState>();
+        ZooKeeper observerClient = new ZooKeeper(
+                "127.0.0.1:" + observerClientPort,
+                ClientBase.CONNECTION_TIMEOUT, new Watcher() {
+                    @Override
+                    public void process(WatchedEvent event) {
+                        try {
+                            states.put(event.getState());
+                        } catch (InterruptedException e) {}
+                    }
+                });
+
+        // wait for connected
+        KeeperState state = states.poll(1000, TimeUnit.MILLISECONDS);
+        Assert.assertEquals(KeeperState.SyncConnected, state);
+
+        // issue reconfig command
+        ArrayList<String> newServers = new ArrayList<String>();
+        String server = "server.3=127.0.0.1:" + PortAssignment.unique()
+                + ":" + PortAssignment.unique() + ":participant;localhost:"
+                + PortAssignment.unique();
+        newServers.add(server);
+        ZooKeeperAdmin admin = createAdmin(clientPort1);
+        ReconfigTest.reconfig(admin, newServers, null, null, -1);
+
+        // make sure the observer has the new config
+        ReconfigTest.testServerHasConfig(observerClient, newServers, null);
+
+        // shouldn't be disconnected during reconfig, so expect to not
+        // receive any new event
+        state = states.poll(1000, TimeUnit.MILLISECONDS);
+        Assert.assertNull(state);
+
+        admin.close();
+        observerClient.close();
+        observer.shutdown();
+        s2.shutdown();
+        s1.shutdown();
+    }
+
+    /**
+     * Implementation of watcher interface.
+     */
+    public void process(WatchedEvent event) {
+        lastEvent = event;
+        if (latch != null) {
+            latch.countDown();
+        }
+        LOG.info("Latch got event :: " + event);
+    }
+
+    class AsyncWriter implements Runnable {
+        private final ZooKeeper client;
+        private final int numTransactions;
+        private final boolean issueSync;
+        private final CountDownLatch writerLatch;
+        private final String root;
+        private final CountDownLatch gate;
+
+        AsyncWriter(ZooKeeper client, int numTransactions, boolean issueSync, 
CountDownLatch writerLatch,
+                    String root, CountDownLatch gate) {
+            this.client = client;
+            this.numTransactions = numTransactions;
+            this.issueSync = issueSync;
+            this.writerLatch = writerLatch;
+            this.root = root;
+            this.gate = gate;
+        }
+
+        @Override
+        public void run() {
+            if (gate != null) {
+                try {
+                    gate.await();
+                } catch (InterruptedException e) {
+                    LOG.error("Gate interrupted");
+                    return;
+                }
+            }
+            for (int i = 0; i < numTransactions; i++) {
+                final boolean pleaseLog = i % 100 == 0;
+                client.create(root + i, "inner thread".getBytes(), 
Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT, new 
AsyncCallback.StringCallback() {
+                            @Override
+                            public void processResult(int rc, String path,
+                                                      Object ctx, String name) 
{
+                                writerLatch.countDown();
+                                if (pleaseLog) {
+                                    LOG.info("wrote {}", path);
+                                }
+                            }
+                        }, null);
+                if (pleaseLog) {
+                    LOG.info("async wrote {}{}", root, i);
+                    if (issueSync) {
+                        client.sync(root + "0", null, null);
+                    }
+                }
+            }
+        }
+    }
+}

Reply via email to