Repository: zookeeper Updated Branches: refs/heads/branch-3.4 6ce1a6cc3 -> a09a67971
ZOOKEEPER-2959: ignore accepted epoch and LEADERINFO ack from observers Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/a09a6797 Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/a09a6797 Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/a09a6797 Branch: refs/heads/branch-3.4 Commit: a09a67971d7aaa31617988a188501b855e3e59f2 Parents: 6ce1a6c Author: Alexander Shraer <ashr...@apple.com> Authored: Wed May 9 20:56:16 2018 -0700 Committer: Alexander Shraer <ashr...@apple.com> Committed: Wed May 9 20:56:16 2018 -0700 ---------------------------------------------------------------------- .../apache/zookeeper/server/quorum/Leader.java | 34 ++-- .../zookeeper/server/quorum/LearnerHandler.java | 2 +- .../quorum/flexible/QuorumHierarchical.java | 4 +- .../server/quorum/flexible/QuorumMaj.java | 4 +- .../server/quorum/flexible/QuorumVerifier.java | 6 +- .../server/quorum/LeaderWithObserverTest.java | 177 +++++++++++++++++++ .../zookeeper/server/quorum/Zab1_0Test.java | 129 +------------- .../zookeeper/server/quorum/ZabUtils.java | 142 +++++++++++++++ 8 files changed, 356 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a09a6797/src/java/main/org/apache/zookeeper/server/quorum/Leader.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Leader.java b/src/java/main/org/apache/zookeeper/server/quorum/Leader.java index a9fd8d0..7013cac 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/Leader.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/Leader.java @@ -79,8 +79,8 @@ public class Leader { final LeaderZooKeeperServer zk; final QuorumPeer self; - - private boolean quorumFormed = false; + // VisibleForTesting + protected boolean quorumFormed = false; // the follower acceptor thread LearnerCnxAcceptor cnxAcceptor; @@ -411,7 +411,7 @@ public class Leader { // us. We do this by waiting for the NEWLEADER packet to get // acknowledged try { - waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT); + waitForNewLeaderAck(self.getId(), zk.getZxid()); } catch (InterruptedException e) { shutdown("Waiting for a quorum of followers, only synced with sids: [ " + getSidSetString(newLeaderProposal.ackSet) + " ]"); @@ -868,8 +868,8 @@ public class Leader { return lastProposed; } - - private HashSet<Long> connectingFollowers = new HashSet<Long>(); + // VisibleForTesting + protected Set<Long> connectingFollowers = new HashSet<Long>(); public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException { synchronized(connectingFollowers) { if (!waitingForNewEpoch) { @@ -878,7 +878,9 @@ public class Leader { if (lastAcceptedEpoch >= epoch) { epoch = lastAcceptedEpoch+1; } - connectingFollowers.add(sid); + if (isParticipant(sid)) { + connectingFollowers.add(sid); + } QuorumVerifier verifier = self.getQuorumVerifier(); if (connectingFollowers.contains(self.getId()) && verifier.containsQuorum(connectingFollowers)) { @@ -900,9 +902,10 @@ public class Leader { return epoch; } } - - private HashSet<Long> electingFollowers = new HashSet<Long>(); - private boolean electionFinished = false; + // VisibleForTesting + protected Set<Long> electingFollowers = new HashSet<Long>(); + // VisibleForTesting + protected boolean electionFinished = false; public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException { synchronized(electingFollowers) { if (electionFinished) { @@ -916,7 +919,9 @@ public class Leader { + leaderStateSummary.getLastZxid() + " (last zxid)"); } - electingFollowers.add(id); + if (isParticipant(id)) { + electingFollowers.add(id); + } } QuorumVerifier verifier = self.getQuorumVerifier(); if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) { @@ -981,10 +986,9 @@ public class Leader { * sufficient acks. * * @param sid - * @param learnerType * @throws InterruptedException */ - public void waitForNewLeaderAck(long sid, long zxid, LearnerType learnerType) + public void waitForNewLeaderAck(long sid, long zxid) throws InterruptedException { synchronized (newLeaderProposal.ackSet) { @@ -1002,7 +1006,7 @@ public class Leader { return; } - if (learnerType == LearnerType.PARTICIPANT) { + if (isParticipant(sid)) { newLeaderProposal.ackSet.add(sid); } @@ -1075,4 +1079,8 @@ public class Leader { private boolean isRunning() { return self.isRunning() && zk.isRunning(); } + + private boolean isParticipant(long sid) { + return self.getVotingView().containsKey(sid); + } } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a09a6797/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java b/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java index 884cc63..973950d 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java @@ -533,7 +533,7 @@ public class LearnerHandler extends ZooKeeperThread { return; } LOG.info("Received NEWLEADER-ACK message from " + getSid()); - leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType()); + leader.waitForNewLeaderAck(getSid(), qp.getZxid()); syncLimitCheck.start(); http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a09a6797/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java b/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java index 428391e..9993f91 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java @@ -23,10 +23,10 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileReader; import java.io.IOException; -import java.util.HashSet; import java.util.HashMap; import java.util.Properties; import java.util.Map.Entry; +import java.util.Set; import org.slf4j.Logger; @@ -232,7 +232,7 @@ public class QuorumHierarchical implements QuorumVerifier { /** * Verifies if a given set is a quorum. */ - public boolean containsQuorum(HashSet<Long> set){ + public boolean containsQuorum(Set<Long> set){ HashMap<Long, Long> expansion = new HashMap<Long, Long>(); /* http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a09a6797/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java b/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java index 04773d7..8f9b573 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java @@ -18,7 +18,7 @@ package org.apache.zookeeper.server.quorum.flexible; -import java.util.HashSet; +import java.util.Set; //import org.apache.zookeeper.server.quorum.QuorumCnxManager; @@ -56,7 +56,7 @@ public class QuorumMaj implements QuorumVerifier { /** * Verifies if a set is a majority. */ - public boolean containsQuorum(HashSet<Long> set){ + public boolean containsQuorum(Set<Long> set){ return (set.size() > half); } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a09a6797/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java b/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java index 9840365..6649129 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java @@ -18,16 +18,16 @@ package org.apache.zookeeper.server.quorum.flexible; -import java.util.HashSet; +import java.util.Set; /** * All quorum validators have to implement a method called - * containsQuorum, which verifies if a HashSet of server + * containsQuorum, which verifies if a Set of server * identifiers constitutes a quorum. * */ public interface QuorumVerifier { long getWeight(long id); - boolean containsQuorum(HashSet<Long> set); + boolean containsQuorum(Set<Long> set); } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a09a6797/src/java/test/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java b/src/java/test/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java new file mode 100644 index 0000000..0f6a098 --- /dev/null +++ b/src/java/test/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.test.ClientBase; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; + +import static org.apache.zookeeper.server.quorum.ZabUtils.createLeader; +import static org.apache.zookeeper.server.quorum.ZabUtils.createQuorumPeer; + +public class LeaderWithObserverTest { + + QuorumPeer peer; + Leader leader; + File tmpDir; + long participantId; + long observerId; + + @Before + public void setUp() throws Exception { + tmpDir = ClientBase.createTmpDir(); + peer = createQuorumPeer(tmpDir); + participantId = 1; + observerId = peer.quorumPeers.size(); + leader = createLeader(tmpDir, peer); + peer.leader = leader; + peer.quorumPeers.put(observerId, new QuorumPeer.QuorumServer(observerId, "127.0.0.1", PortAssignment.unique(), + 0, QuorumPeer.LearnerType.OBSERVER)); + + // these tests are serial, we can speed up InterruptedException + peer.tickTime = 1; + } + + @After + public void tearDown(){ + leader.shutdown("end of test"); + tmpDir.delete(); + } + + @Test + public void testGetEpochToPropose() throws Exception { + long lastAcceptedEpoch = 5; + peer.setAcceptedEpoch(5); + + Assert.assertEquals("Unexpected vote in connectingFollowers", 0, leader.connectingFollowers.size()); + Assert.assertTrue(leader.waitingForNewEpoch); + try { + // Leader asks for epoch (mocking Leader.lead behavior) + // First add to connectingFollowers + leader.getEpochToPropose(peer.getId(), lastAcceptedEpoch); + } catch (InterruptedException e) { + // ignore timeout + } + + Assert.assertEquals("Unexpected vote in connectingFollowers", 1, leader.connectingFollowers.size()); + Assert.assertEquals("Leader shouldn't set new epoch until quorum of participants is in connectingFollowers", + lastAcceptedEpoch, peer.getAcceptedEpoch()); + Assert.assertTrue(leader.waitingForNewEpoch); + try { + // Observer asks for epoch (mocking LearnerHandler behavior) + leader.getEpochToPropose(observerId, lastAcceptedEpoch); + } catch (InterruptedException e) { + // ignore timeout + } + + Assert.assertEquals("Unexpected vote in connectingFollowers", 1, leader.connectingFollowers.size()); + Assert.assertEquals("Leader shouldn't set new epoch after observer asks for epoch", + lastAcceptedEpoch, peer.getAcceptedEpoch()); + Assert.assertTrue(leader.waitingForNewEpoch); + try { + // Now participant asks for epoch (mocking LearnerHandler behavior). Second add to connectingFollowers. + // Triggers verifier.containsQuorum = true + leader.getEpochToPropose(participantId, lastAcceptedEpoch); + } catch (Exception e) { + Assert.fail("Timed out in getEpochToPropose"); + } + + Assert.assertEquals("Unexpected vote in connectingFollowers", 2, leader.connectingFollowers.size()); + Assert.assertEquals("Leader should record next epoch", lastAcceptedEpoch + 1, peer.getAcceptedEpoch()); + Assert.assertFalse(leader.waitingForNewEpoch); + } + + @Test + public void testWaitForEpochAck() throws Exception { + // things needed for waitForEpochAck to run (usually in leader.lead(), but we're not running leader here) + leader.readyToStart = true; + leader.leaderStateSummary = new StateSummary(leader.self.getCurrentEpoch(), leader.zk.getLastProcessedZxid()); + + Assert.assertEquals("Unexpected vote in electingFollowers", 0, leader.electingFollowers.size()); + Assert.assertFalse(leader.electionFinished); + try { + // leader calls waitForEpochAck, first add to electingFollowers + leader.waitForEpochAck(peer.getId(), new StateSummary(0, 0)); + } catch (InterruptedException e) { + // ignore timeout + } + + Assert.assertEquals("Unexpected vote in electingFollowers", 1, leader.electingFollowers.size()); + Assert.assertFalse(leader.electionFinished); + try { + // observer calls waitForEpochAck, should fail verifier.containsQuorum + leader.waitForEpochAck(observerId, new StateSummary(0, 0)); + } catch (InterruptedException e) { + // ignore timeout + } + + Assert.assertEquals("Unexpected vote in electingFollowers", 1, leader.electingFollowers.size()); + Assert.assertFalse(leader.electionFinished); + try { + // second add to electingFollowers, verifier.containsQuorum=true, waitForEpochAck returns without exceptions + leader.waitForEpochAck(participantId, new StateSummary(0, 0)); + Assert.assertEquals("Unexpected vote in electingFollowers", 2, leader.electingFollowers.size()); + Assert.assertTrue(leader.electionFinished); + } catch (Exception e) { + Assert.fail("Timed out in waitForEpochAck"); + } + } + + @Test + public void testWaitForNewLeaderAck() throws Exception { + long zxid = leader.zk.getZxid(); + + // things needed for waitForNewLeaderAck to run (usually in leader.lead(), but we're not running leader here) + leader.newLeaderProposal.packet = new QuorumPacket(0, zxid,null, null); + + Assert.assertEquals("Unexpected vote in ackSet", 0, leader.newLeaderProposal.ackSet.size()); + Assert.assertFalse(leader.quorumFormed); + try { + // leader calls waitForNewLeaderAck, first add to ackSet + leader.waitForNewLeaderAck(peer.getId(), zxid); + } catch (InterruptedException e) { + // ignore timeout + } + + Assert.assertEquals("Unexpected vote in ackSet", 1, leader.newLeaderProposal.ackSet.size()); + Assert.assertFalse(leader.quorumFormed); + try { + // observer calls waitForNewLeaderAck, should fail verifier.containsQuorum + leader.waitForNewLeaderAck(observerId, zxid); + } catch (InterruptedException e) { + // ignore timeout + } + + Assert.assertEquals("Unexpected vote in ackSet", 1, leader.newLeaderProposal.ackSet.size()); + Assert.assertFalse(leader.quorumFormed); + try { + // second add to ackSet, verifier.containsQuorum=true, waitForNewLeaderAck returns without exceptions + leader.waitForNewLeaderAck(participantId, zxid); + Assert.assertEquals("Unexpected vote in ackSet", 2, leader.newLeaderProposal.ackSet.size()); + Assert.assertTrue(leader.quorumFormed); + } catch (Exception e) { + Assert.fail("Timed out in waitForEpochAck"); + } + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a09a6797/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java index 3ed6097..4f831e8 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -18,6 +18,11 @@ package org.apache.zookeeper.server.quorum; +import static org.apache.zookeeper.server.quorum.ZabUtils.createQuorumPeer; +import static org.apache.zookeeper.server.quorum.ZabUtils.createMockLeader; +import static org.apache.zookeeper.server.quorum.ZabUtils.MockLeader; +import static org.apache.zookeeper.server.quorum.ZabUtils.createLeader; + import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -27,47 +32,34 @@ import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; import java.io.FileReader; import java.io.IOException; import java.io.EOFException; -import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; import org.apache.jute.InputArchive; import org.apache.jute.OutputArchive; -import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.PortAssignment; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.ByteBufferInputStream; import org.apache.zookeeper.server.ByteBufferOutputStream; import org.apache.zookeeper.server.Request; -import org.apache.zookeeper.server.ServerCnxn; -import org.apache.zookeeper.server.ServerCnxnFactory; -import org.apache.zookeeper.server.SyncRequestProcessor; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.ZooKeeperServer.DataTreeBuilder; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.persistence.Util; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; -import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; import org.apache.zookeeper.server.util.ZxidUtils; import org.apache.zookeeper.txn.CreateSessionTxn; import org.apache.zookeeper.txn.CreateTxn; @@ -80,8 +72,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Zab1_0Test { - private static final int SYNC_LIMIT = 2; - private static final Logger LOG = LoggerFactory.getLogger(Zab1_0Test.class); private static final File testData = new File( @@ -106,25 +96,6 @@ public class Zab1_0Test { } } } - - private static final class MockLeader extends Leader { - - MockLeader(QuorumPeer qp, LeaderZooKeeperServer zk) - throws IOException { - super(qp, zk); - } - - /** - * This method returns the value of the variable that holds the epoch - * to be proposed and that has been proposed, depending on the point - * of the execution in which it is called. - * - * @return epoch - */ - public long getCurrentEpochToPropose() { - return epoch; - } - } public static final class FollowerMockThread extends Thread { private final Leader leader; @@ -283,42 +254,6 @@ public class Zab1_0Test { } } - private static final class NullServerCnxnFactory extends ServerCnxnFactory { - public void startup(ZooKeeperServer zkServer) throws IOException, - InterruptedException { - } - public void start() { - } - public void shutdown() { - } - public void setMaxClientCnxnsPerHost(int max) { - } - public void join() throws InterruptedException { - } - public int getMaxClientCnxnsPerHost() { - return 0; - } - public int getLocalPort() { - return 0; - } - public InetSocketAddress getLocalAddress() { - return null; - } - public Iterable<ServerCnxn> getConnections() { - return null; - } - public void configure(InetSocketAddress addr, int maxClientCnxns) - throws IOException { - } - public void closeSession(long sessionId) { - } - public void closeAll() { - } - @Override - public int getNumAliveConnections() { - return 0; - } - } static Socket[] getSocketPair() throws IOException { ServerSocket ss = new ServerSocket(); ss.bind(null); @@ -988,7 +923,7 @@ public class Zab1_0Test { LOG.info("Proposal sent."); - for (int i = 0; i < (2 * SYNC_LIMIT) + 2; i++) { + for (int i = 0; i < (2 * ZabUtils.SYNC_LIMIT) + 2; i++) { try { ia.readRecord(qp, null); LOG.info("Ping received: " + i); @@ -1229,7 +1164,7 @@ public class Zab1_0Test { testLeaderConversation(new LeaderConversation() { public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException, InterruptedException { - /* we test a normal run. everything should work out well. */ + /* we test a normal run. everything should work out well. */ LearnerInfo li = new LearnerInfo(1, 0x10000); byte liBytes[] = new byte[12]; ByteBufferOutputStream.record2ByteBuffer(li, @@ -1245,7 +1180,7 @@ public class Zab1_0Test { Thread.sleep(l.self.getInitLimit()*l.self.getTickTime() + 5000); // The leader didn't get a quorum of acks - make sure that leader's current epoch is not advanced - Assert.assertEquals(0, l.self.getCurrentEpoch()); + Assert.assertEquals(0, l.self.getCurrentEpoch()); } }); } @@ -1343,30 +1278,6 @@ public class Zab1_0Test { } } - private Leader createLeader(File tmpDir, QuorumPeer peer) - throws IOException, NoSuchFieldException, IllegalAccessException{ - LeaderZooKeeperServer zk = prepareLeader(tmpDir, peer); - return new Leader(peer, zk); - } - - private Leader createMockLeader(File tmpDir, QuorumPeer peer) - throws IOException, NoSuchFieldException, IllegalAccessException{ - LeaderZooKeeperServer zk = prepareLeader(tmpDir, peer); - return new MockLeader(peer, zk); - } - - private LeaderZooKeeperServer prepareLeader(File tmpDir, QuorumPeer peer) - throws IOException, NoSuchFieldException, IllegalAccessException { - FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir); - peer.setTxnFactory(logFactory); - Field addrField = peer.getClass().getDeclaredField("myQuorumAddr"); - addrField.setAccessible(true); - addrField.set(peer, new InetSocketAddress(PortAssignment.unique())); - ZKDatabase zkDb = new ZKDatabase(logFactory); - LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, peer, new ZooKeeperServer.BasicDataTreeBuilder(), zkDb); - return zk; - } - static class ConversableFollower extends Follower { ConversableFollower(QuorumPeer self, FollowerZooKeeperServer zk) { @@ -1420,30 +1331,6 @@ public class Zab1_0Test { peer.setZKDatabase(zkDb); return new ConversableObserver(peer, zk); } - - - private QuorumPeer createQuorumPeer(File tmpDir) throws IOException, - FileNotFoundException { - QuorumPeer peer = QuorumPeer.testingQuorumPeer(); - peer.syncLimit = SYNC_LIMIT; - peer.initLimit = 2; - peer.tickTime = 2000; - peer.quorumPeers = new HashMap<Long, QuorumServer>(); - peer.quorumPeers.put(1L, new QuorumServer(0, "0.0.0.0", 33221, 0, null)); - peer.quorumPeers.put(1L, new QuorumServer(1, "0.0.0.0", 33223, 0, null)); - peer.setQuorumVerifier(new QuorumMaj(3)); - peer.setCnxnFactory(new NullServerCnxnFactory()); - File version2 = new File(tmpDir, "version-2"); - version2.mkdir(); - FileOutputStream fos; - fos = new FileOutputStream(new File(version2, "currentEpoch")); - fos.write("0\n".getBytes()); - fos.close(); - fos = new FileOutputStream(new File(version2, "acceptedEpoch")); - fos.write("0\n".getBytes()); - fos.close(); - return peer; - } private String readContentsOfFile(File f) throws IOException { return new BufferedReader(new FileReader(f)).readLine(); http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a09a6797/src/java/test/org/apache/zookeeper/server/quorum/ZabUtils.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/quorum/ZabUtils.java b/src/java/test/org/apache/zookeeper/server/quorum/ZabUtils.java new file mode 100644 index 0000000..a84a332 --- /dev/null +++ b/src/java/test/org/apache/zookeeper/server/quorum/ZabUtils.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.ServerCnxnFactory; +import org.apache.zookeeper.server.ZKDatabase; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.util.HashMap; + +public class ZabUtils { + + private ZabUtils() {} + + public static final int SYNC_LIMIT = 2; + + public static QuorumPeer createQuorumPeer(File tmpDir) throws IOException{ + QuorumPeer peer = new QuorumPeer(); + peer.syncLimit = 2; + peer.initLimit = 2; + peer.tickTime = 2000; + peer.quorumPeers = new HashMap<Long, QuorumPeer.QuorumServer>(); + peer.quorumPeers.put(0L, new QuorumPeer.QuorumServer(0, "127.0.0.1", PortAssignment.unique(), 0, null)); + peer.quorumPeers.put(1L, new QuorumPeer.QuorumServer(1, "127.0.0.1", PortAssignment.unique(), 0, null)); + peer.quorumPeers.put(2L, new QuorumPeer.QuorumServer(2, "127.0.0.1", PortAssignment.unique(), 0, null)); + peer.setQuorumVerifier(new QuorumMaj(peer.quorumPeers.size())); + peer.setCnxnFactory(new NullServerCnxnFactory()); + File version2 = new File(tmpDir, "version-2"); + version2.mkdir(); + FileOutputStream fos = new FileOutputStream(new File(version2, "currentEpoch")); + fos.write("0\n".getBytes()); + fos.close(); + fos = new FileOutputStream(new File(version2, "acceptedEpoch")); + fos.write("0\n".getBytes()); + fos.close(); + return peer; + } + + public static Leader createLeader(File tmpDir, QuorumPeer peer) + throws IOException, NoSuchFieldException, IllegalAccessException{ + LeaderZooKeeperServer zk = prepareLeader(tmpDir, peer); + return new Leader(peer, zk); + } + + public static MockLeader createMockLeader(File tmpDir, QuorumPeer peer) + throws IOException, NoSuchFieldException, IllegalAccessException{ + LeaderZooKeeperServer zk = prepareLeader(tmpDir, peer); + return new MockLeader(peer, zk); + } + + private static LeaderZooKeeperServer prepareLeader(File tmpDir, QuorumPeer peer) + throws IOException, NoSuchFieldException, IllegalAccessException { + FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir); + peer.setTxnFactory(logFactory); + Field addrField = peer.getClass().getDeclaredField("myQuorumAddr"); + addrField.setAccessible(true); + addrField.set(peer, new InetSocketAddress(PortAssignment.unique())); + ZKDatabase zkDb = new ZKDatabase(logFactory); + return new LeaderZooKeeperServer(logFactory, peer, new ZooKeeperServer.BasicDataTreeBuilder(), zkDb); + } + + private static final class NullServerCnxnFactory extends ServerCnxnFactory { + public void startup(ZooKeeperServer zkServer) throws IOException, + InterruptedException { + } + public void start() { + } + public void shutdown() { + } + public void setMaxClientCnxnsPerHost(int max) { + } + public void join() throws InterruptedException { + } + public int getMaxClientCnxnsPerHost() { + return 0; + } + public int getLocalPort() { + return 0; + } + public InetSocketAddress getLocalAddress() { + return null; + } + public Iterable<ServerCnxn> getConnections() { + return null; + } + public void configure(InetSocketAddress addr, int maxClientCnxns) + throws IOException { + } + public void closeSession(long sessionId) { + } + public void closeAll() { + } + @Override + public int getNumAliveConnections() { + return 0; + } + } + + public static final class MockLeader extends Leader { + + MockLeader(QuorumPeer qp, LeaderZooKeeperServer zk) + throws IOException { + super(qp, zk); + } + + /** + * This method returns the value of the variable that holds the epoch + * to be proposed and that has been proposed, depending on the point + * of the execution in which it is called. + * + * @return epoch + */ + public long getCurrentEpochToPropose() { + return epoch; + } + } +}