Repository: zookeeper
Updated Branches:
  refs/heads/branch-3.4 6ce1a6cc3 -> a09a67971


ZOOKEEPER-2959: ignore accepted epoch and LEADERINFO ack from observers


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/a09a6797
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/a09a6797
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/a09a6797

Branch: refs/heads/branch-3.4
Commit: a09a67971d7aaa31617988a188501b855e3e59f2
Parents: 6ce1a6c
Author: Alexander Shraer <ashr...@apple.com>
Authored: Wed May 9 20:56:16 2018 -0700
Committer: Alexander Shraer <ashr...@apple.com>
Committed: Wed May 9 20:56:16 2018 -0700

----------------------------------------------------------------------
 .../apache/zookeeper/server/quorum/Leader.java  |  34 ++--
 .../zookeeper/server/quorum/LearnerHandler.java |   2 +-
 .../quorum/flexible/QuorumHierarchical.java     |   4 +-
 .../server/quorum/flexible/QuorumMaj.java       |   4 +-
 .../server/quorum/flexible/QuorumVerifier.java  |   6 +-
 .../server/quorum/LeaderWithObserverTest.java   | 177 +++++++++++++++++++
 .../zookeeper/server/quorum/Zab1_0Test.java     | 129 +-------------
 .../zookeeper/server/quorum/ZabUtils.java       | 142 +++++++++++++++
 8 files changed, 356 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a09a6797/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Leader.java 
b/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
index a9fd8d0..7013cac 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
@@ -79,8 +79,8 @@ public class Leader {
     final LeaderZooKeeperServer zk;
 
     final QuorumPeer self;
-
-    private boolean quorumFormed = false;
+    // VisibleForTesting
+    protected boolean quorumFormed = false;
     
     // the follower acceptor thread
     LearnerCnxAcceptor cnxAcceptor;
@@ -411,7 +411,7 @@ public class Leader {
             // us. We do this by waiting for the NEWLEADER packet to get
             // acknowledged
             try {
-                waitForNewLeaderAck(self.getId(), zk.getZxid(), 
LearnerType.PARTICIPANT);
+                waitForNewLeaderAck(self.getId(), zk.getZxid());
             } catch (InterruptedException e) {
                 shutdown("Waiting for a quorum of followers, only synced with 
sids: [ "
                         + getSidSetString(newLeaderProposal.ackSet) + " ]");
@@ -868,8 +868,8 @@ public class Leader {
                 
         return lastProposed;
     }
-
-    private HashSet<Long> connectingFollowers = new HashSet<Long>();
+    // VisibleForTesting
+    protected Set<Long> connectingFollowers = new HashSet<Long>();
     public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws 
InterruptedException, IOException {
         synchronized(connectingFollowers) {
             if (!waitingForNewEpoch) {
@@ -878,7 +878,9 @@ public class Leader {
             if (lastAcceptedEpoch >= epoch) {
                 epoch = lastAcceptedEpoch+1;
             }
-            connectingFollowers.add(sid);
+            if (isParticipant(sid)) {
+                connectingFollowers.add(sid);
+            }
             QuorumVerifier verifier = self.getQuorumVerifier();
             if (connectingFollowers.contains(self.getId()) && 
                                             
verifier.containsQuorum(connectingFollowers)) {
@@ -900,9 +902,10 @@ public class Leader {
             return epoch;
         }
     }
-
-    private HashSet<Long> electingFollowers = new HashSet<Long>();
-    private boolean electionFinished = false;
+    // VisibleForTesting
+    protected Set<Long> electingFollowers = new HashSet<Long>();
+    // VisibleForTesting
+    protected boolean electionFinished = false;
     public void waitForEpochAck(long id, StateSummary ss) throws IOException, 
InterruptedException {
         synchronized(electingFollowers) {
             if (electionFinished) {
@@ -916,7 +919,9 @@ public class Leader {
                                                     + 
leaderStateSummary.getLastZxid()
                                                     + " (last zxid)");
                 }
-                electingFollowers.add(id);
+                if (isParticipant(id)) {
+                    electingFollowers.add(id);
+                }
             }
             QuorumVerifier verifier = self.getQuorumVerifier();
             if (electingFollowers.contains(self.getId()) && 
verifier.containsQuorum(electingFollowers)) {
@@ -981,10 +986,9 @@ public class Leader {
      * sufficient acks.
      *
      * @param sid
-     * @param learnerType
      * @throws InterruptedException
      */
-    public void waitForNewLeaderAck(long sid, long zxid, LearnerType 
learnerType)
+    public void waitForNewLeaderAck(long sid, long zxid)
             throws InterruptedException {
 
         synchronized (newLeaderProposal.ackSet) {
@@ -1002,7 +1006,7 @@ public class Leader {
                 return;
             }
 
-            if (learnerType == LearnerType.PARTICIPANT) {
+            if (isParticipant(sid)) {
                 newLeaderProposal.ackSet.add(sid);
             }
 
@@ -1075,4 +1079,8 @@ public class Leader {
     private boolean isRunning() {
         return self.isRunning() && zk.isRunning();
     }
+
+    private boolean isParticipant(long sid) {
+        return self.getVotingView().containsKey(sid);
+    }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a09a6797/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java 
b/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
index 884cc63..973950d 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
@@ -533,7 +533,7 @@ public class LearnerHandler extends ZooKeeperThread {
                 return;
             }
             LOG.info("Received NEWLEADER-ACK message from " + getSid());
-            leader.waitForNewLeaderAck(getSid(), qp.getZxid(), 
getLearnerType());
+            leader.waitForNewLeaderAck(getSid(), qp.getZxid());
 
             syncLimitCheck.start();
             

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a09a6797/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java
----------------------------------------------------------------------
diff --git 
a/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java
 
b/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java
index 428391e..9993f91 100644
--- 
a/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java
+++ 
b/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java
@@ -23,10 +23,10 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileReader;
 import java.io.IOException;
-import java.util.HashSet;
 import java.util.HashMap;
 import java.util.Properties;
 import java.util.Map.Entry;
+import java.util.Set;
 
 
 import org.slf4j.Logger;
@@ -232,7 +232,7 @@ public class QuorumHierarchical implements QuorumVerifier {
     /**
      * Verifies if a given set is a quorum.
      */
-    public boolean containsQuorum(HashSet<Long> set){
+    public boolean containsQuorum(Set<Long> set){
         HashMap<Long, Long> expansion = new HashMap<Long, Long>();
         
         /*

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a09a6797/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java
----------------------------------------------------------------------
diff --git 
a/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java 
b/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java
index 04773d7..8f9b573 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java
@@ -18,7 +18,7 @@
 
 package org.apache.zookeeper.server.quorum.flexible;
 
-import java.util.HashSet;
+import java.util.Set;
 
 //import org.apache.zookeeper.server.quorum.QuorumCnxManager;
 
@@ -56,7 +56,7 @@ public class QuorumMaj implements QuorumVerifier {
     /**
      * Verifies if a set is a majority.
      */
-    public boolean containsQuorum(HashSet<Long> set){
+    public boolean containsQuorum(Set<Long> set){
         return (set.size() > half);
     }
     

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a09a6797/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java
----------------------------------------------------------------------
diff --git 
a/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java 
b/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java
index 9840365..6649129 100644
--- 
a/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java
+++ 
b/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java
@@ -18,16 +18,16 @@
 
 package org.apache.zookeeper.server.quorum.flexible;
 
-import java.util.HashSet;
+import java.util.Set;
 
 /**
  * All quorum validators have to implement a method called
- * containsQuorum, which verifies if a HashSet of server 
+ * containsQuorum, which verifies if a Set of server 
  * identifiers constitutes a quorum.
  *
  */
 
 public interface QuorumVerifier {
     long getWeight(long id);
-    boolean containsQuorum(HashSet<Long> set);
+    boolean containsQuorum(Set<Long> set);
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a09a6797/src/java/test/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java
----------------------------------------------------------------------
diff --git 
a/src/java/test/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java 
b/src/java/test/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java
new file mode 100644
index 0000000..0f6a098
--- /dev/null
+++ 
b/src/java/test/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.apache.zookeeper.server.quorum.ZabUtils.createLeader;
+import static org.apache.zookeeper.server.quorum.ZabUtils.createQuorumPeer;
+
+public class LeaderWithObserverTest {
+
+    QuorumPeer peer;
+    Leader leader;
+    File tmpDir;
+    long participantId;
+    long observerId;
+
+    @Before
+    public void setUp() throws Exception {
+        tmpDir = ClientBase.createTmpDir();
+        peer = createQuorumPeer(tmpDir);
+        participantId = 1;
+        observerId = peer.quorumPeers.size();
+        leader = createLeader(tmpDir, peer);
+        peer.leader = leader;
+        peer.quorumPeers.put(observerId, new 
QuorumPeer.QuorumServer(observerId, "127.0.0.1", PortAssignment.unique(),
+                0, QuorumPeer.LearnerType.OBSERVER));
+
+        // these tests are serial, we can speed up InterruptedException
+        peer.tickTime = 1;
+    }
+
+    @After
+    public void tearDown(){
+        leader.shutdown("end of test");
+        tmpDir.delete();
+    }
+
+    @Test
+    public void testGetEpochToPropose() throws Exception {
+        long lastAcceptedEpoch = 5;
+        peer.setAcceptedEpoch(5);
+
+        Assert.assertEquals("Unexpected vote in connectingFollowers", 0, 
leader.connectingFollowers.size());
+        Assert.assertTrue(leader.waitingForNewEpoch);
+        try {
+            // Leader asks for epoch (mocking Leader.lead behavior)
+            // First add to connectingFollowers
+            leader.getEpochToPropose(peer.getId(), lastAcceptedEpoch);
+        } catch (InterruptedException e) {
+            // ignore timeout
+        }
+
+        Assert.assertEquals("Unexpected vote in connectingFollowers", 1, 
leader.connectingFollowers.size());
+        Assert.assertEquals("Leader shouldn't set new epoch until quorum of 
participants is in connectingFollowers",
+                lastAcceptedEpoch, peer.getAcceptedEpoch());
+        Assert.assertTrue(leader.waitingForNewEpoch);
+        try {
+            // Observer asks for epoch (mocking LearnerHandler behavior)
+            leader.getEpochToPropose(observerId, lastAcceptedEpoch);
+        } catch (InterruptedException e) {
+            // ignore timeout
+        }
+
+        Assert.assertEquals("Unexpected vote in connectingFollowers", 1, 
leader.connectingFollowers.size());
+        Assert.assertEquals("Leader shouldn't set new epoch after observer 
asks for epoch",
+                lastAcceptedEpoch, peer.getAcceptedEpoch());
+        Assert.assertTrue(leader.waitingForNewEpoch);
+        try {
+            // Now participant asks for epoch (mocking LearnerHandler 
behavior). Second add to connectingFollowers.
+            // Triggers verifier.containsQuorum = true
+            leader.getEpochToPropose(participantId, lastAcceptedEpoch);
+        } catch (Exception e) {
+            Assert.fail("Timed out in getEpochToPropose");
+        }
+
+        Assert.assertEquals("Unexpected vote in connectingFollowers", 2, 
leader.connectingFollowers.size());
+        Assert.assertEquals("Leader should record next epoch", 
lastAcceptedEpoch + 1, peer.getAcceptedEpoch());
+        Assert.assertFalse(leader.waitingForNewEpoch);
+    }
+
+    @Test
+    public void testWaitForEpochAck() throws Exception {
+        // things needed for waitForEpochAck to run (usually in leader.lead(), 
but we're not running leader here)
+        leader.readyToStart = true;
+        leader.leaderStateSummary = new 
StateSummary(leader.self.getCurrentEpoch(), leader.zk.getLastProcessedZxid());
+
+        Assert.assertEquals("Unexpected vote in electingFollowers", 0, 
leader.electingFollowers.size());
+        Assert.assertFalse(leader.electionFinished);
+        try {
+            // leader calls waitForEpochAck, first add to electingFollowers
+            leader.waitForEpochAck(peer.getId(), new StateSummary(0, 0));
+        }  catch (InterruptedException e) {
+            // ignore timeout
+        }
+
+        Assert.assertEquals("Unexpected vote in electingFollowers", 1, 
leader.electingFollowers.size());
+        Assert.assertFalse(leader.electionFinished);
+        try {
+            // observer calls waitForEpochAck, should fail 
verifier.containsQuorum
+            leader.waitForEpochAck(observerId, new StateSummary(0, 0));
+        }  catch (InterruptedException e) {
+            // ignore timeout
+        }
+
+        Assert.assertEquals("Unexpected vote in electingFollowers", 1, 
leader.electingFollowers.size());
+        Assert.assertFalse(leader.electionFinished);
+        try {
+            // second add to electingFollowers, verifier.containsQuorum=true, 
waitForEpochAck returns without exceptions
+            leader.waitForEpochAck(participantId, new StateSummary(0, 0));
+            Assert.assertEquals("Unexpected vote in electingFollowers", 2, 
leader.electingFollowers.size());
+            Assert.assertTrue(leader.electionFinished);
+        } catch (Exception e) {
+            Assert.fail("Timed out in waitForEpochAck");
+        }
+    }
+
+    @Test
+    public void testWaitForNewLeaderAck() throws Exception {
+        long zxid = leader.zk.getZxid();
+
+        // things needed for waitForNewLeaderAck to run (usually in 
leader.lead(), but we're not running leader here)
+        leader.newLeaderProposal.packet = new QuorumPacket(0, zxid,null, null);
+
+        Assert.assertEquals("Unexpected vote in ackSet", 0, 
leader.newLeaderProposal.ackSet.size());
+        Assert.assertFalse(leader.quorumFormed);
+        try {
+            // leader calls waitForNewLeaderAck, first add to ackSet
+            leader.waitForNewLeaderAck(peer.getId(), zxid);
+        }  catch (InterruptedException e) {
+            // ignore timeout
+        }
+
+        Assert.assertEquals("Unexpected vote in ackSet", 1, 
leader.newLeaderProposal.ackSet.size());
+        Assert.assertFalse(leader.quorumFormed);
+        try {
+            // observer calls waitForNewLeaderAck, should fail 
verifier.containsQuorum
+            leader.waitForNewLeaderAck(observerId, zxid);
+        }  catch (InterruptedException e) {
+            // ignore timeout
+        }
+
+        Assert.assertEquals("Unexpected vote in ackSet", 1, 
leader.newLeaderProposal.ackSet.size());
+        Assert.assertFalse(leader.quorumFormed);
+        try {
+            // second add to ackSet, verifier.containsQuorum=true, 
waitForNewLeaderAck returns without exceptions
+            leader.waitForNewLeaderAck(participantId, zxid);
+            Assert.assertEquals("Unexpected vote in ackSet", 2, 
leader.newLeaderProposal.ackSet.size());
+            Assert.assertTrue(leader.quorumFormed);
+        } catch (Exception e) {
+            Assert.fail("Timed out in waitForEpochAck");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a09a6797/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java 
b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
index 3ed6097..4f831e8 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
@@ -18,6 +18,11 @@
 
 package org.apache.zookeeper.server.quorum;
 
+import static org.apache.zookeeper.server.quorum.ZabUtils.createQuorumPeer;
+import static org.apache.zookeeper.server.quorum.ZabUtils.createMockLeader;
+import static org.apache.zookeeper.server.quorum.ZabUtils.MockLeader;
+import static org.apache.zookeeper.server.quorum.ZabUtils.createLeader;
+
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -27,47 +32,34 @@ import java.io.BufferedInputStream;
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.EOFException;
-import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.InputArchive;
 import org.apache.jute.OutputArchive;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper.States;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.ByteBufferInputStream;
 import org.apache.zookeeper.server.ByteBufferOutputStream;
 import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.SyncRequestProcessor;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooKeeperServer.DataTreeBuilder;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.persistence.Util;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
-import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.txn.CreateSessionTxn;
 import org.apache.zookeeper.txn.CreateTxn;
@@ -80,8 +72,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class Zab1_0Test {
-    private static final int SYNC_LIMIT = 2;
-
     private static final Logger LOG = 
LoggerFactory.getLogger(Zab1_0Test.class);
 
     private static final File testData = new File(
@@ -106,25 +96,6 @@ public class Zab1_0Test {
             }
         }
     }
-           
-    private static final class MockLeader extends Leader {
-           
-        MockLeader(QuorumPeer qp, LeaderZooKeeperServer zk)
-        throws IOException {
-            super(qp, zk);
-        }
-           
-        /**
-         * This method returns the value of the variable that holds the epoch
-         * to be proposed and that has been proposed, depending on the point
-         * of the execution in which it is called. 
-         * 
-         * @return epoch
-         */
-        public long getCurrentEpochToPropose() {
-            return epoch;
-        }
-    }
      
    public static final class FollowerMockThread extends Thread {
        private final Leader leader;
@@ -283,42 +254,6 @@ public class Zab1_0Test {
         }
     }
 
-    private static final class NullServerCnxnFactory extends ServerCnxnFactory 
{
-        public void startup(ZooKeeperServer zkServer) throws IOException,
-                InterruptedException {
-        }
-        public void start() {
-        }
-        public void shutdown() {
-        }
-        public void setMaxClientCnxnsPerHost(int max) {
-        }
-        public void join() throws InterruptedException {
-        }
-        public int getMaxClientCnxnsPerHost() {
-            return 0;
-        }
-        public int getLocalPort() {
-            return 0;
-        }
-        public InetSocketAddress getLocalAddress() {
-            return null;
-        }
-        public Iterable<ServerCnxn> getConnections() {
-            return null;
-        }
-        public void configure(InetSocketAddress addr, int maxClientCnxns)
-                throws IOException {
-        }
-        public void closeSession(long sessionId) {
-        }
-        public void closeAll() {
-        }
-        @Override
-        public int getNumAliveConnections() {
-            return 0;
-        }
-    }
     static Socket[] getSocketPair() throws IOException {
         ServerSocket ss = new ServerSocket();
         ss.bind(null);
@@ -988,7 +923,7 @@ public class Zab1_0Test {
 
                 LOG.info("Proposal sent.");
 
-                for (int i = 0; i < (2 * SYNC_LIMIT) + 2; i++) {
+                for (int i = 0; i < (2 * ZabUtils.SYNC_LIMIT) + 2; i++) {
                     try {
                         ia.readRecord(qp, null);
                         LOG.info("Ping received: " + i);
@@ -1229,7 +1164,7 @@ public class Zab1_0Test {
         testLeaderConversation(new LeaderConversation() {
             public void converseWithLeader(InputArchive ia, OutputArchive oa, 
Leader l)
                     throws IOException, InterruptedException {
-                /* we test a normal run. everything should work out well. */   
                
+                /* we test a normal run. everything should work out well. */   
             
                 LearnerInfo li = new LearnerInfo(1, 0x10000);
                 byte liBytes[] = new byte[12];
                 ByteBufferOutputStream.record2ByteBuffer(li,
@@ -1245,7 +1180,7 @@ public class Zab1_0Test {
                 Thread.sleep(l.self.getInitLimit()*l.self.getTickTime() + 
5000);
                 
                 // The leader didn't get a quorum of acks - make sure that 
leader's current epoch is not advanced
-                Assert.assertEquals(0, l.self.getCurrentEpoch());              
        
+                Assert.assertEquals(0, l.self.getCurrentEpoch());           
             }
         });
     }
@@ -1343,30 +1278,6 @@ public class Zab1_0Test {
         }
     }
 
-    private Leader createLeader(File tmpDir, QuorumPeer peer)
-    throws IOException, NoSuchFieldException, IllegalAccessException{
-        LeaderZooKeeperServer zk = prepareLeader(tmpDir, peer);
-        return new Leader(peer, zk);
-    }
-            
-    private Leader createMockLeader(File tmpDir, QuorumPeer peer)
-    throws IOException, NoSuchFieldException, IllegalAccessException{
-        LeaderZooKeeperServer zk = prepareLeader(tmpDir, peer);
-        return new MockLeader(peer, zk);
-    }
-            
-    private LeaderZooKeeperServer prepareLeader(File tmpDir, QuorumPeer peer)
-    throws IOException, NoSuchFieldException, IllegalAccessException {
-        FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
-        peer.setTxnFactory(logFactory);
-        Field addrField = peer.getClass().getDeclaredField("myQuorumAddr");
-        addrField.setAccessible(true);
-        addrField.set(peer, new InetSocketAddress(PortAssignment.unique()));
-        ZKDatabase zkDb = new ZKDatabase(logFactory);
-        LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, peer, 
new ZooKeeperServer.BasicDataTreeBuilder(), zkDb);
-        return zk;
-    }
-
     static class ConversableFollower extends Follower {
 
         ConversableFollower(QuorumPeer self, FollowerZooKeeperServer zk) {
@@ -1420,30 +1331,6 @@ public class Zab1_0Test {
         peer.setZKDatabase(zkDb);
         return new ConversableObserver(peer, zk);
     }
-        
-    
-    private QuorumPeer createQuorumPeer(File tmpDir) throws IOException,
-            FileNotFoundException {
-        QuorumPeer peer = QuorumPeer.testingQuorumPeer();
-        peer.syncLimit = SYNC_LIMIT;
-        peer.initLimit = 2;
-        peer.tickTime = 2000;
-        peer.quorumPeers = new HashMap<Long, QuorumServer>();
-        peer.quorumPeers.put(1L, new QuorumServer(0, "0.0.0.0", 33221, 0, 
null));
-        peer.quorumPeers.put(1L, new QuorumServer(1, "0.0.0.0", 33223, 0, 
null));
-        peer.setQuorumVerifier(new QuorumMaj(3));
-        peer.setCnxnFactory(new NullServerCnxnFactory());
-        File version2 = new File(tmpDir, "version-2");
-        version2.mkdir();
-        FileOutputStream fos;
-        fos = new FileOutputStream(new File(version2, "currentEpoch"));
-        fos.write("0\n".getBytes());
-        fos.close();
-        fos = new FileOutputStream(new File(version2, "acceptedEpoch"));
-        fos.write("0\n".getBytes());
-        fos.close();
-        return peer;
-    }
 
     private String readContentsOfFile(File f) throws IOException {
         return new BufferedReader(new FileReader(f)).readLine();

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a09a6797/src/java/test/org/apache/zookeeper/server/quorum/ZabUtils.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/ZabUtils.java 
b/src/java/test/org/apache/zookeeper/server/quorum/ZabUtils.java
new file mode 100644
index 0000000..a84a332
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/quorum/ZabUtils.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+
+public class ZabUtils {
+
+    private ZabUtils() {}
+
+    public static final int SYNC_LIMIT = 2;
+
+    public static QuorumPeer createQuorumPeer(File tmpDir) throws IOException{
+        QuorumPeer peer = new QuorumPeer();
+        peer.syncLimit = 2;
+        peer.initLimit = 2;
+        peer.tickTime = 2000;
+        peer.quorumPeers = new HashMap<Long, QuorumPeer.QuorumServer>();
+        peer.quorumPeers.put(0L, new QuorumPeer.QuorumServer(0, "127.0.0.1", 
PortAssignment.unique(), 0, null));
+        peer.quorumPeers.put(1L, new QuorumPeer.QuorumServer(1, "127.0.0.1", 
PortAssignment.unique(), 0, null));
+        peer.quorumPeers.put(2L, new QuorumPeer.QuorumServer(2, "127.0.0.1", 
PortAssignment.unique(), 0, null));
+        peer.setQuorumVerifier(new QuorumMaj(peer.quorumPeers.size()));
+        peer.setCnxnFactory(new NullServerCnxnFactory());
+        File version2 = new File(tmpDir, "version-2");
+        version2.mkdir();
+        FileOutputStream fos = new FileOutputStream(new File(version2, 
"currentEpoch"));
+        fos.write("0\n".getBytes());
+        fos.close();
+        fos = new FileOutputStream(new File(version2, "acceptedEpoch"));
+        fos.write("0\n".getBytes());
+        fos.close();
+        return peer;
+    }
+
+    public static Leader createLeader(File tmpDir, QuorumPeer peer)
+            throws IOException, NoSuchFieldException, IllegalAccessException{
+        LeaderZooKeeperServer zk = prepareLeader(tmpDir, peer);
+        return new Leader(peer, zk);
+    }
+
+    public static MockLeader createMockLeader(File tmpDir, QuorumPeer peer)
+            throws IOException, NoSuchFieldException, IllegalAccessException{
+        LeaderZooKeeperServer zk = prepareLeader(tmpDir, peer);
+        return new MockLeader(peer, zk);
+    }
+
+    private static LeaderZooKeeperServer prepareLeader(File tmpDir, QuorumPeer 
peer)
+            throws IOException, NoSuchFieldException, IllegalAccessException {
+        FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
+        peer.setTxnFactory(logFactory);
+        Field addrField = peer.getClass().getDeclaredField("myQuorumAddr");
+        addrField.setAccessible(true);
+        addrField.set(peer, new InetSocketAddress(PortAssignment.unique()));
+        ZKDatabase zkDb = new ZKDatabase(logFactory);
+        return new LeaderZooKeeperServer(logFactory, peer, new 
ZooKeeperServer.BasicDataTreeBuilder(), zkDb);
+    }
+
+    private static final class NullServerCnxnFactory extends ServerCnxnFactory 
{
+        public void startup(ZooKeeperServer zkServer) throws IOException,
+                InterruptedException {
+        }
+        public void start() {
+        }
+        public void shutdown() {
+        }
+        public void setMaxClientCnxnsPerHost(int max) {
+        }
+        public void join() throws InterruptedException {
+        }
+        public int getMaxClientCnxnsPerHost() {
+            return 0;
+        }
+        public int getLocalPort() {
+            return 0;
+        }
+        public InetSocketAddress getLocalAddress() {
+            return null;
+        }
+        public Iterable<ServerCnxn> getConnections() {
+            return null;
+        }
+        public void configure(InetSocketAddress addr, int maxClientCnxns)
+                throws IOException {
+        }
+        public void closeSession(long sessionId) {
+        }
+        public void closeAll() {
+        }
+        @Override
+        public int getNumAliveConnections() {
+            return 0;
+        }
+    }
+
+    public static final class MockLeader extends Leader {
+
+        MockLeader(QuorumPeer qp, LeaderZooKeeperServer zk)
+                throws IOException {
+            super(qp, zk);
+        }
+
+        /**
+         * This method returns the value of the variable that holds the epoch
+         * to be proposed and that has been proposed, depending on the point
+         * of the execution in which it is called.
+         *
+         * @return epoch
+         */
+        public long getCurrentEpochToPropose() {
+            return epoch;
+        }
+    }
+}

Reply via email to