Repository: zookeeper Updated Branches: refs/heads/master acfc471ec -> a680655a3
ZOOKEEPER-1932: Remove deprecated LeaderElection class. The motivation of removing LeaderElection class: * It has been long deprecated and no one uses it. * Tests around it is flaky. Author: Michael Han <[email protected]> Reviewers: Flavio Junqueira <[email protected]>, Allan Lyu <[email protected]>, Mohammad Arshad <[email protected]> Closes #106 from hanm/ZOOKEEPER-1932 Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/a680655a Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/a680655a Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/a680655a Branch: refs/heads/master Commit: a680655a3569bfc546712cb85eeaea8c9b7de3ad Parents: acfc471 Author: Michael Han <[email protected]> Authored: Thu May 11 22:59:42 2017 +0800 Committer: Mohammad Arshad <[email protected]> Committed: Thu May 11 22:59:42 2017 +0800 ---------------------------------------------------------------------- .../content/xdocs/zookeeperAdmin.xml | 11 +- .../zookeeper/server/quorum/LeaderElection.java | 293 -------------- .../zookeeper/server/quorum/QuorumPeer.java | 75 ++-- .../server/quorum/QuorumPeerConfig.java | 20 +- .../zookeeper/test/system/BaseSysTest.java | 2 +- .../test/system/QuorumPeerInstance.java | 2 +- src/java/test/config/findbugsExcludeFile.xml | 10 +- .../zookeeper/test/HierarchicalQuorumTest.java | 11 +- .../zookeeper/test/LENonTerminateTest.java | 378 ------------------- .../test/org/apache/zookeeper/test/LETest.java | 140 ------- .../org/apache/zookeeper/test/TruncateTest.java | 11 +- 11 files changed, 51 insertions(+), 902 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a680655a/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml ---------------------------------------------------------------------- diff --git a/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml b/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml index f5902ac..1e581db 100644 --- a/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml +++ b/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml @@ -958,15 +958,14 @@ server.3=zoo3:2888:3888</programlisting> <listitem> <para>(No Java system property)</para> - <para>Election implementation to use. A value of "0" corresponds - to the original UDP-based version, "1" corresponds to the + <para>Election implementation to use. A value of "1" corresponds to the non-authenticated UDP-based version of fast leader election, "2" corresponds to the authenticated UDP-based version of fast leader election, and "3" corresponds to TCP-based version of - fast leader election. Currently, algorithm 3 is the default</para> + fast leader election. Currently, algorithm 3 is the default.</para> <note> - <para> The implementations of leader election 0, 1, and 2 are now + <para> The implementations of leader election 1, and 2 are now <emphasis role="bold"> deprecated </emphasis>. We have the intention of removing them in the next release, at which point only the FastLeaderElection will be available. @@ -1029,9 +1028,7 @@ server.3=zoo3:2888:3888</programlisting> <para>There are two port numbers <emphasis role="bold">nnnnn</emphasis>. The first followers use to connect to the leader, and the second is for - leader election. The leader election port is only necessary if electionAlg - is 1, 2, or 3 (default). If electionAlg is 0, then the second port is not - necessary. If you want to test multiple servers on a single machine, then + leader election. If you want to test multiple servers on a single machine, then different ports can be used for each server.</para> </listitem> </varlistentry> http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a680655a/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java b/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java deleted file mode 100644 index d9e72f1..0000000 --- a/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java +++ /dev/null @@ -1,293 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.server.quorum; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetSocketAddress; -import java.net.SocketException; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Random; -import java.util.Map.Entry; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.zookeeper.jmx.MBeanRegistry; -import org.apache.zookeeper.server.quorum.Vote; -import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; -import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; -import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; - -/** - * @deprecated This class has been deprecated as of release 3.4.0. - */ -@Deprecated -public class LeaderElection implements Election { - private static final Logger LOG = LoggerFactory.getLogger(LeaderElection.class); - protected static final Random epochGen = new Random(); - - protected QuorumPeer self; - - public LeaderElection(QuorumPeer self) { - this.self = self; - } - - protected static class ElectionResult { - public Vote vote; - - public int count; - - public Vote winner; - - public int winningCount; - - public int numValidVotes; - } - - protected ElectionResult countVotes(HashMap<InetSocketAddress, Vote> votes, HashSet<Long> heardFrom) { - final ElectionResult result = new ElectionResult(); - // Initialize with null vote - result.vote = new Vote(Long.MIN_VALUE, Long.MIN_VALUE); - result.winner = new Vote(Long.MIN_VALUE, Long.MIN_VALUE); - - // First, filter out votes from unheard-from machines. Then - // make the views consistent. Sometimes peers will have - // different zxids for a server depending on timing. - final HashMap<InetSocketAddress, Vote> validVotes = new HashMap<InetSocketAddress, Vote>(); - final Map<Long, Long> maxZxids = new HashMap<Long,Long>(); - for (Map.Entry<InetSocketAddress, Vote> e : votes.entrySet()) { - // Only include votes from machines that we heard from - final Vote v = e.getValue(); - if (heardFrom.contains(v.getId())) { - validVotes.put(e.getKey(), v); - Long val = maxZxids.get(v.getId()); - if (val == null || val < v.getZxid()) { - maxZxids.put(v.getId(), v.getZxid()); - } - } - } - - // Make all zxids for a given vote id equal to the largest zxid seen for - // that id - for (Map.Entry<InetSocketAddress, Vote> e : validVotes.entrySet()) { - final Vote v = e.getValue(); - Long zxid = maxZxids.get(v.getId()); - if (v.getZxid() < zxid) { - // This is safe inside an iterator as per - // http://download.oracle.com/javase/1.5.0/docs/api/java/util/Map.Entry.html - e.setValue(new Vote(v.getId(), zxid, v.getElectionEpoch(), v.getPeerEpoch(), v.getState())); - } - } - - result.numValidVotes = validVotes.size(); - - final HashMap<Vote, Integer> countTable = new HashMap<Vote, Integer>(); - // Now do the tally - for (Vote v : validVotes.values()) { - Integer count = countTable.get(v); - if (count == null) { - count = 0; - } - countTable.put(v, count + 1); - if (v.getId() == result.vote.getId()) { - result.count++; - } else if (v.getZxid() > result.vote.getZxid() - || (v.getZxid() == result.vote.getZxid() && v.getId() > result.vote.getId())) { - result.vote = v; - result.count = 1; - } - } - result.winningCount = 0; - LOG.info("Election tally: "); - for (Entry<Vote, Integer> entry : countTable.entrySet()) { - if (entry.getValue() > result.winningCount) { - result.winningCount = entry.getValue(); - result.winner = entry.getKey(); - } - LOG.info(entry.getKey().getId() + "\t-> " + entry.getValue()); - } - return result; - } - - /** - * There is nothing to shutdown in this implementation of - * leader election, so we simply have an empty method. - */ - public void shutdown(){} - - /** - * Invoked in QuorumPeer to find or elect a new leader. - * - * @throws InterruptedException - */ - public Vote lookForLeader() throws InterruptedException { - try { - self.jmxLeaderElectionBean = new LeaderElectionBean(); - MBeanRegistry.getInstance().register( - self.jmxLeaderElectionBean, self.jmxLocalPeerBean); - } catch (Exception e) { - LOG.warn("Failed to register with JMX", e); - self.jmxLeaderElectionBean = null; - } - - try { - self.setCurrentVote(new Vote(self.getId(), - self.getLastLoggedZxid())); - // We are going to look for a leader by casting a vote for ourself - byte requestBytes[] = new byte[4]; - ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes); - byte responseBytes[] = new byte[28]; - ByteBuffer responseBuffer = ByteBuffer.wrap(responseBytes); - /* The current vote for the leader. Initially me! */ - DatagramSocket s = null; - try { - s = new DatagramSocket(); - s.setSoTimeout(200); - } catch (SocketException e1) { - LOG.error("Socket exception when creating socket for leader election", e1); - System.exit(4); - } - DatagramPacket requestPacket = new DatagramPacket(requestBytes, - requestBytes.length); - DatagramPacket responsePacket = new DatagramPacket(responseBytes, - responseBytes.length); - int xid = epochGen.nextInt(); - while (self.isRunning()) { - HashMap<InetSocketAddress, Vote> votes = - new HashMap<InetSocketAddress, Vote>(self.getVotingView().size()); - - requestBuffer.clear(); - requestBuffer.putInt(xid); - requestPacket.setLength(4); - HashSet<Long> heardFrom = new HashSet<Long>(); - for (QuorumServer server : self.getVotingView().values()) { - LOG.info("Server address: " + server.addr); - try { - requestPacket.setSocketAddress(server.addr); - } catch (IllegalArgumentException e) { - // Sun doesn't include the address that causes this - // exception to be thrown, so we wrap the exception - // in order to capture this critical detail. - throw new IllegalArgumentException( - "Unable to set socket address on packet, msg:" - + e.getMessage() + " with addr:" + server.addr, - e); - } - - try { - s.send(requestPacket); - responsePacket.setLength(responseBytes.length); - s.receive(responsePacket); - if (responsePacket.getLength() != responseBytes.length) { - LOG.error("Got a short response: " - + responsePacket.getLength()); - continue; - } - responseBuffer.clear(); - int recvedXid = responseBuffer.getInt(); - if (recvedXid != xid) { - LOG.error("Got bad xid: expected " + xid - + " got " + recvedXid); - continue; - } - long peerId = responseBuffer.getLong(); - heardFrom.add(peerId); - //if(server.id != peerId){ - Vote vote = new Vote(responseBuffer.getLong(), - responseBuffer.getLong()); - InetSocketAddress addr = - (InetSocketAddress) responsePacket - .getSocketAddress(); - votes.put(addr, vote); - //} - } catch (IOException e) { - LOG.warn("Ignoring exception while looking for leader", - e); - // Errors are okay, since hosts may be - // down - } - } - - ElectionResult result = countVotes(votes, heardFrom); - // ZOOKEEPER-569: - // If no votes are received for live peers, reset to voting - // for ourselves as otherwise we may hang on to a vote - // for a dead peer - if (result.numValidVotes == 0) { - self.setCurrentVote(new Vote(self.getId(), - self.getLastLoggedZxid())); - } else { - if (result.winner.getId() >= 0) { - self.setCurrentVote(result.vote); - // To do: this doesn't use a quorum verifier - if (result.winningCount > (self.getVotingView().size() / 2)) { - self.setCurrentVote(result.winner); - s.close(); - Vote current = self.getCurrentVote(); - LOG.info("Found leader: my type is: " + self.getLearnerType()); - /* - * We want to make sure we implement the state machine - * correctly. If we are a PARTICIPANT, once a leader - * is elected we can move either to LEADING or - * FOLLOWING. However if we are an OBSERVER, it is an - * error to be elected as a Leader. - */ - if (self.getLearnerType() == LearnerType.OBSERVER) { - if (current.getId() == self.getId()) { - // This should never happen! - LOG.error("OBSERVER elected as leader!"); - Thread.sleep(100); - } - else { - self.setPeerState(ServerState.OBSERVING); - Thread.sleep(100); - return current; - } - } else { - self.setPeerState((current.getId() == self.getId()) - ? ServerState.LEADING: ServerState.FOLLOWING); - if (self.getPeerState() == ServerState.FOLLOWING) { - Thread.sleep(100); - } - return current; - } - } - } - } - Thread.sleep(1000); - } - return null; - } finally { - try { - if(self.jmxLeaderElectionBean != null){ - MBeanRegistry.getInstance().unregister( - self.jmxLeaderElectionBean); - } - } catch (Exception e) { - LOG.warn("Failed to unregister with JMX", e); - } - self.jmxLeaderElectionBean = null; - } - } -} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a680655a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java index 38b0299..61ef4ca 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -29,7 +29,6 @@ import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.SocketException; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -831,28 +830,16 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider responder.interrupt(); } synchronized public void startLeaderElection() { - try { - if (getPeerState() == ServerState.LOOKING) { - currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); - } - } catch(IOException e) { - RuntimeException re = new RuntimeException(e.getMessage()); - re.setStackTrace(e.getStackTrace()); - throw re; - } - - // if (!getView().containsKey(myid)) { - // throw new RuntimeException("My id " + myid + " not in the peer list"); - //} - if (electionType == 0) { - try { - udpSocket = new DatagramSocket(myQuorumAddr.getPort()); - responder = new ResponderThread(); - responder.start(); - } catch (SocketException e) { - throw new RuntimeException(e); + try { + if (getPeerState() == ServerState.LOOKING) { + currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } + } catch(IOException e) { + RuntimeException re = new RuntimeException(e.getMessage()); + re.setStackTrace(e.getStackTrace()); + throw re; } + this.electionAlg = createElectionAlgorithm(electionType); } @@ -952,29 +939,26 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider //TODO: use a factory rather than a switch switch (electionAlgorithm) { - case 0: - le = new LeaderElection(this); - break; - case 1: - le = new AuthFastLeaderElection(this); - break; - case 2: - le = new AuthFastLeaderElection(this, true); - break; - case 3: - qcm = new QuorumCnxManager(this); - QuorumCnxManager.Listener listener = qcm.listener; - if(listener != null){ - listener.start(); - FastLeaderElection fle = new FastLeaderElection(this, qcm); - fle.start(); - le = fle; - } else { - LOG.error("Null listener when initializing cnx manager"); - } - break; - default: - assert false; + case 1: + le = new AuthFastLeaderElection(this); + break; + case 2: + le = new AuthFastLeaderElection(this, true); + break; + case 3: + qcm = new QuorumCnxManager(this); + QuorumCnxManager.Listener listener = qcm.listener; + if(listener != null){ + listener.start(); + FastLeaderElection fle = new FastLeaderElection(this, qcm); + fle.start(); + le = fle; + } else { + LOG.error("Null listener when initializing cnx manager"); + } + break; + default: + assert false; } return le; } @@ -982,9 +966,6 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider @SuppressWarnings("deprecation") protected Election makeLEStrategy(){ LOG.debug("Initializing leader election protocol..."); - if (getElectionType() == 0) { - electionAlg = new LeaderElection(this); - } return electionAlg; } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a680655a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java index cb8f1c2..aa13f88 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java @@ -255,6 +255,9 @@ public class QuorumPeerConfig { syncLimit = Integer.parseInt(value); } else if (key.equals("electionAlg")) { electionAlg = Integer.parseInt(value); + if (electionAlg != 1 && electionAlg != 2 && electionAlg != 3) { + throw new ConfigException("Invalid electionAlg value. Only 1, 2, 3 are supported."); + } } else if (key.equals("quorumListenOnAllIPs")) { quorumListenOnAllIPs = Boolean.parseBoolean(value); } else if (key.equals("peerType")) { @@ -594,17 +597,12 @@ public class QuorumPeerConfig { LOG.warn("Non-optimial configuration, consider an odd number of servers."); } } - /* - * If using FLE, then every server requires a separate election - * port. - */ - if (eAlg != 0) { - for (QuorumServer s : qv.getVotingMembers().values()) { - if (s.electionAddr == null) - throw new IllegalArgumentException( - "Missing election port for server: " + s.id); - } - } + + for (QuorumServer s : qv.getVotingMembers().values()) { + if (s.electionAddr == null) + throw new IllegalArgumentException( + "Missing election port for server: " + s.id); + } } return qv; } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a680655a/src/java/systest/org/apache/zookeeper/test/system/BaseSysTest.java ---------------------------------------------------------------------- diff --git a/src/java/systest/org/apache/zookeeper/test/system/BaseSysTest.java b/src/java/systest/org/apache/zookeeper/test/system/BaseSysTest.java index 2ed516c..031f5fe 100644 --- a/src/java/systest/org/apache/zookeeper/test/system/BaseSysTest.java +++ b/src/java/systest/org/apache/zookeeper/test/system/BaseSysTest.java @@ -179,7 +179,7 @@ public class BaseSysTest { public void startServer(int index) throws IOException { int port = fakeBasePort+10+index; if (fakeMachines) { - qps[index] = new QuorumPeer(peers, qpsDirs[index], qpsDirs[index], port, 0, index+1, tickTime, initLimit, syncLimit); + qps[index] = new QuorumPeer(peers, qpsDirs[index], qpsDirs[index], port, 3, index+1, tickTime, initLimit, syncLimit); qps[index].start(); } else { try { http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a680655a/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java ---------------------------------------------------------------------- diff --git a/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java b/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java index 2231d01..4e56789 100644 --- a/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java +++ b/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java @@ -190,7 +190,7 @@ class QuorumPeerInstance implements Instance { return; } System.err.println("SnapDir = " + snapDir + " LogDir = " + logDir); - peer = new QuorumPeer(peers, snapDir, logDir, clientAddr.getPort(), 0, serverId, tickTime, initLimit, syncLimit); + peer = new QuorumPeer(peers, snapDir, logDir, clientAddr.getPort(), 3, serverId, tickTime, initLimit, syncLimit); peer.start(); for(int i = 0; i < 5; i++) { Thread.sleep(500); http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a680655a/src/java/test/config/findbugsExcludeFile.xml ---------------------------------------------------------------------- diff --git a/src/java/test/config/findbugsExcludeFile.xml b/src/java/test/config/findbugsExcludeFile.xml index 7a7fa4b..60f2366 100644 --- a/src/java/test/config/findbugsExcludeFile.xml +++ b/src/java/test/config/findbugsExcludeFile.xml @@ -32,15 +32,7 @@ <Bug pattern="REC_CATCH_EXCEPTION" /> </Match> - <!-- If we cannot open a socket to elect a leader, then we should - simply exit --> - <Match> - <Class name="org.apache.zookeeper.server.quorum.LeaderElection" /> - <Method name="lookForLeader" /> - <Bug pattern="DM_EXIT" /> - </Match> - - <!-- Committing out of order is an unrecoverable error, so we should + <!-- Committing out of order is an unrecoverable error, so we should really exit --> <Match> <Class name="org.apache.zookeeper.server.quorum.FollowerZooKeeperServer" /> http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a680655a/src/java/test/org/apache/zookeeper/test/HierarchicalQuorumTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/test/HierarchicalQuorumTest.java b/src/java/test/org/apache/zookeeper/test/HierarchicalQuorumTest.java index 3050093..1d45d2c 100644 --- a/src/java/test/org/apache/zookeeper/test/HierarchicalQuorumTest.java +++ b/src/java/test/org/apache/zookeeper/test/HierarchicalQuorumTest.java @@ -206,16 +206,7 @@ public class HierarchicalQuorumTest extends ClientBase { s5.setLearnerType(QuorumPeer.LearnerType.OBSERVER); } Assert.assertEquals(clientport5, s5.getClientPort()); - - // Observers are currently only compatible with LeaderElection - if (withObservers) { - s1.setElectionType(0); - s2.setElectionType(0); - s3.setElectionType(0); - s4.setElectionType(0); - s5.setElectionType(0); - } - + LOG.info("start QuorumPeer 1"); s1.start(); LOG.info("start QuorumPeer 2"); http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a680655a/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java b/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java deleted file mode 100644 index 2bbf7b5..0000000 --- a/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java +++ /dev/null @@ -1,378 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.test; - -import java.io.File; -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetSocketAddress; -import java.net.SocketException; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.zookeeper.PortAssignment; -import org.apache.zookeeper.ZKTestCase; -import org.apache.zookeeper.server.ServerCnxnFactory; -import org.apache.zookeeper.server.quorum.Election; -import org.apache.zookeeper.server.quorum.FLELostMessageTest; -import org.apache.zookeeper.server.quorum.LeaderElection; -import org.apache.zookeeper.server.quorum.QuorumPeer; -import org.apache.zookeeper.server.quorum.Vote; -import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; -import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; -import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; -import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -@SuppressWarnings("deprecation") -public class LENonTerminateTest extends ZKTestCase { - public static class MockLeaderElection extends LeaderElection { - public MockLeaderElection(QuorumPeer self) { - super(self); - } - - /** - * Temporary for 3.3.0 - we want to ensure that a round of voting happens - * before any of the peers update their votes. The easiest way to do that - * is to add a latch that all wait on after counting their votes. - * - * In 3.4.0 we intend to make this class more testable, and therefore - * there should be much less duplicated code. - * - * JMX bean method calls are removed to reduce noise. - */ - public Vote lookForLeader() throws InterruptedException { - self.setCurrentVote(new Vote(self.getId(), - self.getLastLoggedZxid())); - // We are going to look for a leader by casting a vote for ourself - byte requestBytes[] = new byte[4]; - ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes); - byte responseBytes[] = new byte[28]; - ByteBuffer responseBuffer = ByteBuffer.wrap(responseBytes); - /* The current vote for the leader. Initially me! */ - DatagramSocket s = null; - try { - s = new DatagramSocket(); - s.setSoTimeout(200); - } catch (SocketException e1) { - LOG.error("Socket exception when creating socket for leader election", e1); - System.exit(4); - } - DatagramPacket requestPacket = new DatagramPacket(requestBytes, - requestBytes.length); - DatagramPacket responsePacket = new DatagramPacket(responseBytes, - responseBytes.length); - int xid = epochGen.nextInt(); - while (self.isRunning()) { - HashMap<InetSocketAddress, Vote> votes = - new HashMap<InetSocketAddress, Vote>(self.getVotingView().size()); - - requestBuffer.clear(); - requestBuffer.putInt(xid); - requestPacket.setLength(4); - HashSet<Long> heardFrom = new HashSet<Long>(); - for (QuorumServer server : - self.getVotingView().values()) - { - LOG.info("Server address: " + server.addr); - try { - requestPacket.setSocketAddress(server.addr); - } catch (IllegalArgumentException e) { - // Sun doesn't include the address that causes this - // exception to be thrown, so we wrap the exception - // in order to capture this critical detail. - throw new IllegalArgumentException( - "Unable to set socket address on packet, msg:" - + e.getMessage() + " with addr:" + server.addr, - e); - } - - try { - s.send(requestPacket); - responsePacket.setLength(responseBytes.length); - s.receive(responsePacket); - if (responsePacket.getLength() != responseBytes.length) { - LOG.error("Got a short response: " - + responsePacket.getLength()); - continue; - } - responseBuffer.clear(); - int recvedXid = responseBuffer.getInt(); - if (recvedXid != xid) { - LOG.error("Got bad xid: expected " + xid - + " got " + recvedXid); - continue; - } - long peerId = responseBuffer.getLong(); - heardFrom.add(peerId); - //if(server.id != peerId){ - Vote vote = new Vote(responseBuffer.getLong(), - responseBuffer.getLong()); - InetSocketAddress addr = - (InetSocketAddress) responsePacket - .getSocketAddress(); - votes.put(addr, vote); - //} - } catch (IOException e) { - LOG.warn("Ignoring exception while looking for leader", - e); - // Errors are okay, since hosts may be - // down - } - } - - ElectionResult result = countVotes(votes, heardFrom); - - /** - * This is the only difference from LeaderElection - wait for - * this latch on the first time through this method. This ensures - * that the first round of voting happens before setCurrentVote - * is called below. - */ - LOG.info("Waiting for first round of voting to complete"); - latch.countDown(); - Assert.assertTrue("Thread timed out waiting for latch", - latch.await(10000, TimeUnit.MILLISECONDS)); - - // ZOOKEEPER-569: - // If no votes are received for live peers, reset to voting - // for ourselves as otherwise we may hang on to a vote - // for a dead peer - if (result.numValidVotes == 0) { - self.setCurrentVote(new Vote(self.getId(), - self.getLastLoggedZxid())); - } else { - if (result.winner.getId() >= 0) { - self.setCurrentVote(result.vote); - // To do: this doesn't use a quorum verifier - if (result.winningCount > (self.getVotingView().size() / 2)) { - self.setCurrentVote(result.winner); - s.close(); - Vote current = self.getCurrentVote(); - LOG.info("Found leader: my type is: " + self.getLearnerType()); - /* - * We want to make sure we implement the state machine - * correctly. If we are a PARTICIPANT, once a leader - * is elected we can move either to LEADING or - * FOLLOWING. However if we are an OBSERVER, it is an - * error to be elected as a Leader. - */ - if (self.getLearnerType() == LearnerType.OBSERVER) { - if (current.getId() == self.getId()) { - // This should never happen! - LOG.error("OBSERVER elected as leader!"); - Thread.sleep(100); - } - else { - self.setPeerState(ServerState.OBSERVING); - Thread.sleep(100); - return current; - } - } else { - self.setPeerState((current.getId() == self.getId()) - ? ServerState.LEADING: ServerState.FOLLOWING); - if (self.getPeerState() == ServerState.FOLLOWING) { - Thread.sleep(100); - } - return current; - } - } - } - } - Thread.sleep(1000); - } - return null; - } - } - - public static class MockQuorumPeer extends QuorumPeer { - public MockQuorumPeer(Map<Long,QuorumServer> quorumPeers, File snapDir, - File logDir, int clientPort, int electionAlg, - long myid, int tickTime, int initLimit, int syncLimit) - throws IOException - { - super(quorumPeers, snapDir, logDir, electionAlg, - myid,tickTime, initLimit,syncLimit, false, - ServerCnxnFactory.createFactory(clientPort, -1), - new QuorumMaj(quorumPeers)); - } - - protected Election createElectionAlgorithm(int electionAlgorithm){ - LOG.info("Returning mocked leader election"); - return new MockLeaderElection(this); - } - } - - - protected static final Logger LOG = LoggerFactory.getLogger(FLELostMessageTest.class); - - int count; - HashMap<Long,QuorumServer> peers; - File tmpdir[]; - int port[]; - - @Before - public void setUp() throws Exception { - count = 3; - - peers = new HashMap<Long,QuorumServer>(count); - tmpdir = new File[count]; - port = new int[count]; - } - - static final CountDownLatch latch = new CountDownLatch(2); - static final CountDownLatch mockLatch = new CountDownLatch(1); - - private static class LEThread extends Thread { - private int i; - private QuorumPeer peer; - - LEThread(QuorumPeer peer, int i) { - this.i = i; - this.peer = peer; - LOG.info("Constructor: " + getName()); - - } - - public void run(){ - try{ - Vote v = null; - peer.setPeerState(ServerState.LOOKING); - LOG.info("Going to call leader election: " + i); - v = peer.getElectionAlg().lookForLeader(); - - if (v == null){ - Assert.fail("Thread " + i + " got a null vote"); - } - - /* - * A real zookeeper would take care of setting the current vote. Here - * we do it manually. - */ - peer.setCurrentVote(v); - - LOG.info("Finished election: " + i + ", " + v.getId()); - } catch (Exception e) { - e.printStackTrace(); - } - LOG.info("Joining"); - } - } - - /** - * This tests ZK-569. - * With three peers A, B and C, the following could happen: - * 1. Round 1, A,B and C all vote for themselves - * 2. Round 2, C dies, A and B vote for C - * 3. Because C has died, votes for it are ignored, but A and B never - * reset their votes. Hence LE never terminates. ZK-569 fixes this by - * resetting votes to themselves if the set of votes for live peers is null. - */ - @Test - public void testNonTermination() throws Exception { - LOG.info("TestNonTermination: " + getTestName()+ ", " + count); - for(int i = 0; i < count; i++) { - int clientport = PortAssignment.unique(); - peers.put(Long.valueOf(i), - new QuorumServer(i, - new InetSocketAddress("127.0.0.1", clientport), - new InetSocketAddress("127.0.0.1", PortAssignment.unique()))); - tmpdir[i] = ClientBase.createTmpDir(); - port[i] = clientport; - } - - /* - * peer1 and peer2 are A and B in the above example. - */ - QuorumPeer peer1 = new MockQuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 0, 0, 2, 2, 2); - peer1.startLeaderElection(); - LEThread thread1 = new LEThread(peer1, 0); - - QuorumPeer peer2 = new MockQuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 0, 1, 2, 2, 2); - peer2.startLeaderElection(); - LEThread thread2 = new LEThread(peer2, 1); - - /* - * Start mock server. - */ - Thread thread3 = new Thread() { - public void run() { - try { - mockServer(); - } catch (Exception e) { - LOG.error("exception", e); - Assert.fail("Exception when running mocked server " + e); - } - } - }; - - thread3.start(); - Assert.assertTrue("mockServer did not start in 5s", - mockLatch.await(5000, TimeUnit.MILLISECONDS)); - thread1.start(); - thread2.start(); - /* - * Occasionally seen false negatives with a 5s timeout. - */ - thread1.join(15000); - thread2.join(15000); - thread3.join(15000); - if (thread1.isAlive() || thread2.isAlive() || thread3.isAlive()) { - Assert.fail("Threads didn't join"); - } - } - - /** - * MockServer plays the role of peer C. Respond to two requests for votes - * with vote for self and then Assert.fail. - */ - void mockServer() throws InterruptedException, IOException { - byte b[] = new byte[36]; - ByteBuffer responseBuffer = ByteBuffer.wrap(b); - DatagramPacket packet = new DatagramPacket(b, b.length); - QuorumServer server = peers.get(Long.valueOf(2)); - DatagramSocket udpSocket = new DatagramSocket(server.addr.getPort()); - LOG.info("In MockServer"); - mockLatch.countDown(); - Vote current = new Vote(2, 1); - for (int i=0;i<2;++i) { - udpSocket.receive(packet); - responseBuffer.rewind(); - LOG.info("Received " + responseBuffer.getInt() + " " + responseBuffer.getLong() + " " + responseBuffer.getLong()); - LOG.info("From " + packet.getSocketAddress()); - responseBuffer.clear(); - responseBuffer.getInt(); // Skip the xid - responseBuffer.putLong(2); - - responseBuffer.putLong(current.getId()); - responseBuffer.putLong(current.getZxid()); - packet.setData(b); - udpSocket.send(packet); - } - } -} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a680655a/src/java/test/org/apache/zookeeper/test/LETest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/test/LETest.java b/src/java/test/org/apache/zookeeper/test/LETest.java deleted file mode 100644 index e03c5c4..0000000 --- a/src/java/test/org/apache/zookeeper/test/LETest.java +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.test; - -import java.io.File; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Random; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.zookeeper.PortAssignment; -import org.apache.zookeeper.ZKTestCase; -import org.apache.zookeeper.server.quorum.LeaderElection; -import org.apache.zookeeper.server.quorum.QuorumPeer; -import org.apache.zookeeper.server.quorum.Vote; -import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; -import org.junit.Assert; -import org.junit.Test; - -@SuppressWarnings("deprecation") -public class LETest extends ZKTestCase { - private static final Logger LOG = LoggerFactory.getLogger(LETest.class); - volatile Vote votes[]; - volatile boolean leaderDies; - volatile long leader = -1; - Random rand = new Random(); - class LEThread extends Thread { - LeaderElection le; - int i; - QuorumPeer peer; - LEThread(LeaderElection le, QuorumPeer peer, int i) { - this.le = le; - this.i = i; - this.peer = peer; - } - public void run() { - try { - Vote v = null; - while(true) { - v = le.lookForLeader(); - votes[i] = v; - if (v.getId() == i) { - synchronized(LETest.this) { - if (leaderDies) { - leaderDies = false; - peer.stopLeaderElection(); - LOG.info("Leader " + i + " dying"); - leader = -2; - } else { - leader = i; - } - LETest.this.notifyAll(); - } - break; - } - synchronized(LETest.this) { - if (leader == -1) { - LETest.this.wait(); - } - if (leader == v.getId()) { - break; - } - } - Thread.sleep(rand.nextInt(1000)); - peer.setCurrentVote(new Vote(peer.getId(), 0)); - } - LOG.info("Thread " + i + " votes " + v); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - - @Test - public void testLE() throws Exception { - int count = 30; - HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>(count); - ArrayList<LEThread> threads = new ArrayList<LEThread>(count); - File tmpdir[] = new File[count]; - int port[] = new int[count]; - votes = new Vote[count]; - for(int i = 0; i < count; i++) { - peers.put(Long.valueOf(i), - new QuorumServer(i, - new InetSocketAddress("127.0.0.1", - PortAssignment.unique()))); - tmpdir[i] = ClientBase.createTmpDir(); - port[i] = PortAssignment.unique(); - } - LeaderElection le[] = new LeaderElection[count]; - leaderDies = true; - boolean allowOneBadLeader = leaderDies; - for(int i = 0; i < le.length; i++) { - QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], - port[i], 0, i, 1000, 2, 2); - peer.startLeaderElection(); - le[i] = new LeaderElection(peer); - LEThread thread = new LEThread(le[i], peer, i); - thread.start(); - threads.add(thread); - } - for(int i = 0; i < threads.size(); i++) { - threads.get(i).join(15000); - if (threads.get(i).isAlive()) { - Assert.fail("Threads didn't join"); - } - } - long id = votes[0].getId(); - for(int i = 1; i < votes.length; i++) { - if (votes[i] == null) { - Assert.fail("Thread " + i + " had a null vote"); - } - if (votes[i].getId() != id) { - if (allowOneBadLeader && votes[i].getId() == i) { - allowOneBadLeader = false; - } else { - Assert.fail("Thread " + i + " got " + votes[i].getId() + " expected " + id); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/a680655a/src/java/test/org/apache/zookeeper/test/TruncateTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/test/TruncateTest.java b/src/java/test/org/apache/zookeeper/test/TruncateTest.java index 9b9fd7a..955eb1e 100644 --- a/src/java/test/org/apache/zookeeper/test/TruncateTest.java +++ b/src/java/test/org/apache/zookeeper/test/TruncateTest.java @@ -66,7 +66,8 @@ public class TruncateTest extends ZKTestCase { ClientBase.recursiveDelete(dataDir2); ClientBase.recursiveDelete(dataDir3); } - + + @Test public void testTruncationStreamReset() throws Exception { File tmpdir = ClientBase.createTmpDir(); @@ -105,7 +106,7 @@ public class TruncateTest extends ZKTestCase { iter.close(); ClientBase.recursiveDelete(tmpdir); } - + @Test public void testTruncationNullLog() throws Exception { File tmpdir = ClientBase.createTmpDir(); @@ -203,9 +204,9 @@ public class TruncateTest extends ZKTestCase { new InetSocketAddress("127.0.0.1", PortAssignment.unique()), new InetSocketAddress("127.0.0.1", port3))); - QuorumPeer s2 = new QuorumPeer(peers, dataDir2, dataDir2, port2, 0, 2, tickTime, initLimit, syncLimit); + QuorumPeer s2 = new QuorumPeer(peers, dataDir2, dataDir2, port2, 3, 2, tickTime, initLimit, syncLimit); s2.start(); - QuorumPeer s3 = new QuorumPeer(peers, dataDir3, dataDir3, port3, 0, 3, tickTime, initLimit, syncLimit); + QuorumPeer s3 = new QuorumPeer(peers, dataDir3, dataDir3, port3, 3, 3, tickTime, initLimit, syncLimit); s3.start(); zk = ClientBase.createZKClient("127.0.0.1:" + port2, 15000); @@ -221,7 +222,7 @@ public class TruncateTest extends ZKTestCase { } catch(KeeperException.NoNodeException e) { // this is what we want } - QuorumPeer s1 = new QuorumPeer(peers, dataDir1, dataDir1, port1, 0, 1, tickTime, initLimit, syncLimit); + QuorumPeer s1 = new QuorumPeer(peers, dataDir1, dataDir1, port1, 3, 1, tickTime, initLimit, syncLimit); s1.start(); ZooKeeper zk1 = ClientBase.createZKClient("127.0.0.1:" + port1, 15000); zk1.getData("/9", false, new Stat());
