http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-common/src/test/java/org/apache/zookeeper/test/EmptiedSnapshotRecoveryTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-common/src/test/java/org/apache/zookeeper/test/EmptiedSnapshotRecoveryTest.java b/zookeeper-common/src/test/java/org/apache/zookeeper/test/EmptiedSnapshotRecoveryTest.java deleted file mode 100644 index 5b2f8a4..0000000 --- a/zookeeper-common/src/test/java/org/apache/zookeeper/test/EmptiedSnapshotRecoveryTest.java +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.test; - -import java.io.IOException; -import java.io.File; -import java.io.PrintWriter; -import java.util.List; -import java.util.LinkedList; - -import org.apache.log4j.Logger; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.PortAssignment; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZKTestCase; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.server.quorum.Leader.Proposal; -import org.apache.zookeeper.server.ServerCnxnFactory; -import org.apache.zookeeper.server.SyncRequestProcessor; -import org.apache.zookeeper.server.ZooKeeperServer; -import org.apache.zookeeper.server.persistence.FileTxnSnapLog; -import org.junit.Assert; -import org.junit.Test; - -/** If snapshots are corrupted to the empty file or deleted, Zookeeper should - * not proceed to read its transactiong log files - * Test that zxid == -1 in the presence of emptied/deleted snapshots - */ -public class EmptiedSnapshotRecoveryTest extends ZKTestCase implements Watcher { - private static final Logger LOG = Logger.getLogger(RestoreCommittedLogTest.class); - private static String HOSTPORT = "127.0.0.1:" + PortAssignment.unique(); - private static final int CONNECTION_TIMEOUT = 3000; - private static final int N_TRANSACTIONS = 150; - private static final int SNAP_COUNT = 100; - - public void runTest(boolean leaveEmptyFile) throws Exception { - File tmpSnapDir = ClientBase.createTmpDir(); - File tmpLogDir = ClientBase.createTmpDir(); - ClientBase.setupTestEnv(); - ZooKeeperServer zks = new ZooKeeperServer(tmpSnapDir, tmpLogDir, 3000); - SyncRequestProcessor.setSnapCount(SNAP_COUNT); - final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]); - ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1); - f.startup(zks); - Assert.assertTrue("waiting for server being up ", - ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT)); - ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this); - try { - for (int i = 0; i< N_TRANSACTIONS; i++) { - zk.create("/node-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } - } finally { - zk.close(); - } - f.shutdown(); - zks.shutdown(); - Assert.assertTrue("waiting for server to shutdown", - ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT)); - - // start server again with intact database - zks = new ZooKeeperServer(tmpSnapDir, tmpLogDir, 3000); - zks.startdata(); - long zxid = zks.getZKDatabase().getDataTreeLastProcessedZxid(); - LOG.info("After clean restart, zxid = " + zxid); - Assert.assertTrue("zxid > 0", zxid > 0); - zks.shutdown(); - - // Make all snapshots empty - FileTxnSnapLog txnLogFactory = zks.getTxnLogFactory(); - List<File> snapshots = txnLogFactory.findNRecentSnapshots(10); - Assert.assertTrue("We have a snapshot to corrupt", snapshots.size() > 0); - for (File file: snapshots) { - if (leaveEmptyFile) { - new PrintWriter(file).close (); - } else { - file.delete(); - } - } - - // start server again with corrupted database - zks = new ZooKeeperServer(tmpSnapDir, tmpLogDir, 3000); - try { - zks.startdata(); - zxid = zks.getZKDatabase().loadDataBase(); - Assert.fail("Should have gotten exception for corrupted database"); - } catch (IOException e) { - // expected behavior - } - zks.shutdown(); - } - - /** - * Test resilience to empty Snapshots - * @throws Exception an exception might be thrown here - */ - @Test - public void testRestoreWithEmptySnapFiles() throws Exception { - runTest(true); - } - - /** - * Test resilience to deletion of Snapshots - * @throws Exception an exception might be thrown here - */ - @Test - public void testRestoreWithNoSnapFiles() throws Exception { - runTest(false); - } - - public void process(WatchedEvent event) { - // do nothing - } - -}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-common/src/test/java/org/apache/zookeeper/test/EventTypeTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-common/src/test/java/org/apache/zookeeper/test/EventTypeTest.java b/zookeeper-common/src/test/java/org/apache/zookeeper/test/EventTypeTest.java deleted file mode 100644 index 0c96c83..0000000 --- a/zookeeper-common/src/test/java/org/apache/zookeeper/test/EventTypeTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.test; - -import java.util.EnumSet; - -import org.apache.zookeeper.ZKTestCase; -import org.apache.zookeeper.Watcher.Event.EventType; -import org.junit.Assert; -import org.junit.Test; - -public class EventTypeTest extends ZKTestCase { - - @Test - public void testIntConversion() { - // Ensure that we can convert all valid integers to EventTypes - EnumSet<EventType> allTypes = EnumSet.allOf(EventType.class); - - for(EventType et : allTypes) { - Assert.assertEquals(et, EventType.fromInt( et.getIntValue() ) ); - } - } - - @Test - public void testInvalidIntConversion() { - try { - EventType.fromInt(324242); - Assert.fail("Was able to create an invalid EventType via an integer"); - } catch(RuntimeException re) { - // we're good. - } - - } -} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-common/src/test/java/org/apache/zookeeper/test/FLENewEpochTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-common/src/test/java/org/apache/zookeeper/test/FLENewEpochTest.java b/zookeeper-common/src/test/java/org/apache/zookeeper/test/FLENewEpochTest.java deleted file mode 100644 index 8bf365f..0000000 --- a/zookeeper-common/src/test/java/org/apache/zookeeper/test/FLENewEpochTest.java +++ /dev/null @@ -1,193 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.test; - -import java.io.File; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.concurrent.Semaphore; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.zookeeper.PortAssignment; -import org.apache.zookeeper.ZKTestCase; -import org.apache.zookeeper.server.quorum.FastLeaderElection; -import org.apache.zookeeper.server.quorum.QuorumPeer; -import org.apache.zookeeper.server.quorum.Vote; -import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; -import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class FLENewEpochTest extends ZKTestCase { - protected static final Logger LOG = LoggerFactory.getLogger(FLENewEpochTest.class); - - int count; - HashMap<Long,QuorumServer> peers; - ArrayList<LEThread> threads; - File tmpdir[]; - int port[]; - volatile int [] round; - - Semaphore start0; - Semaphore finish3, finish0; - - @Before - public void setUp() throws Exception { - count = 3; - - peers = new HashMap<Long,QuorumServer>(count); - threads = new ArrayList<LEThread>(count); - tmpdir = new File[count]; - port = new int[count]; - - round = new int[3]; - round[0] = 0; - round[1] = 0; - round[2] = 0; - - start0 = new Semaphore(0); - finish0 = new Semaphore(0); - finish3 = new Semaphore(0); - } - - @After - public void tearDown() throws Exception { - for(int i = 0; i < threads.size(); i++) { - ((FastLeaderElection) threads.get(i).peer.getElectionAlg()).shutdown(); - } - } - - - class LEThread extends Thread { - int i; - QuorumPeer peer; - - LEThread(QuorumPeer peer, int i) { - this.i = i; - this.peer = peer; - LOG.info("Constructor: " + getName()); - - } - - public void run(){ - boolean flag = true; - try{ - while(flag){ - Vote v = null; - peer.setPeerState(ServerState.LOOKING); - LOG.info("Going to call leader election again: " + i); - v = peer.getElectionAlg().lookForLeader(); - - if (v == null){ - Assert.fail("Thread " + i + " got a null vote"); - } - - /* - * A real zookeeper would take care of setting the current vote. Here - * we do it manually. - */ - peer.setCurrentVote(v); - - LOG.info("Finished election: " + i + ", " + v.getId()); - //votes[i] = v; - - switch (i) { - case 0: - LOG.info("First peer, do nothing, just join"); - if(finish0.tryAcquire(1000, java.util.concurrent.TimeUnit.MILLISECONDS)){ - //if(threads.get(0).peer.getPeerState() == ServerState.LEADING ){ - LOG.info("Setting flag to false"); - flag = false; - } - break; - case 1: - LOG.info("Second entering case"); - if(round[1] != 0){ - finish0.release(); - flag = false; - } else { - finish3.acquire(); - start0.release(); - } - LOG.info("Second is going to start second round"); - round[1]++; - break; - case 2: - LOG.info("Third peer, shutting it down"); - QuorumBase.shutdown(peer); - flag = false; - round[2] = 1; - finish3.release(); - LOG.info("Third leaving"); - break; - } - } - } catch (Exception e) { - e.printStackTrace(); - } - } - } - - - @Test - public void testLENewEpoch() throws Exception { - - LOG.info("TestLE: " + getTestName()+ ", " + count); - for(int i = 0; i < count; i++) { - peers.put(Long.valueOf(i), - new QuorumServer(i, - new InetSocketAddress( - "127.0.0.1", PortAssignment.unique()), - new InetSocketAddress( - "127.0.0.1", PortAssignment.unique()))); - tmpdir[i] = ClientBase.createTmpDir(); - port[i] = PortAssignment.unique(); - } - - for(int i = 1; i < count; i++) { - QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2); - peer.startLeaderElection(); - LEThread thread = new LEThread(peer, i); - thread.start(); - threads.add(thread); - } - if(!start0.tryAcquire(4000, java.util.concurrent.TimeUnit.MILLISECONDS)) - Assert.fail("First leader election failed"); - - QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2); - peer.startLeaderElection(); - LEThread thread = new LEThread(peer, 0); - thread.start(); - threads.add(thread); - - LOG.info("Started threads " + getTestName()); - - for(int i = 0; i < threads.size(); i++) { - threads.get(i).join(10000); - if (threads.get(i).isAlive()) { - Assert.fail("Threads didn't join"); - } - - } - } - } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-common/src/test/java/org/apache/zookeeper/test/FLEPredicateTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-common/src/test/java/org/apache/zookeeper/test/FLEPredicateTest.java b/zookeeper-common/src/test/java/org/apache/zookeeper/test/FLEPredicateTest.java deleted file mode 100644 index bc43775..0000000 --- a/zookeeper-common/src/test/java/org/apache/zookeeper/test/FLEPredicateTest.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zookeeper.test; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.HashMap; - -import org.apache.zookeeper.server.quorum.FastLeaderElection; -import org.apache.zookeeper.server.quorum.QuorumCnxManager; -import org.apache.zookeeper.server.quorum.QuorumPeer; -import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; -import org.apache.zookeeper.PortAssignment; -import org.apache.zookeeper.ZKTestCase; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.junit.Assert; -import org.junit.Test; - -public class FLEPredicateTest extends ZKTestCase { - - protected static final Logger LOG = LoggerFactory.getLogger(FLEPredicateTest.class); - - class MockFLE extends FastLeaderElection { - MockFLE(QuorumPeer peer){ - super(peer, peer.createCnxnManager()); - } - - boolean predicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch){ - return this.totalOrderPredicate(newId, newZxid, newEpoch, curId, curZxid, curEpoch); - } - } - - - HashMap<Long,QuorumServer> peers; - - @Test - public void testPredicate() throws IOException { - - peers = new HashMap<Long,QuorumServer>(3); - - /* - * Creates list of peers. - */ - for(int i = 0; i < 3; i++) { - peers.put(Long.valueOf(i), - new QuorumServer(i, - new InetSocketAddress( - "127.0.0.1", PortAssignment.unique()), - new InetSocketAddress( - "127.0.0.1", PortAssignment.unique()))); - } - - /* - * Creating peer. - */ - try{ - File tmpDir = ClientBase.createTmpDir(); - QuorumPeer peer = new QuorumPeer(peers, tmpDir, tmpDir, - PortAssignment.unique(), 3, 0, 1000, 2, 2); - - MockFLE mock = new MockFLE(peer); - mock.start(); - - /* - * Lower epoch must return false - */ - - Assert.assertFalse (mock.predicate(4L, 0L, 0L, 3L, 0L, 2L)); - - /* - * Later epoch - */ - Assert.assertTrue (mock.predicate(0L, 0L, 1L, 1L, 0L, 0L)); - - /* - * Higher zxid - */ - Assert.assertTrue(mock.predicate(0L, 1L, 0L, 1L, 0L, 0L)); - - /* - * Higher id - */ - Assert.assertTrue(mock.predicate(1L, 1L, 0L, 0L, 1L, 0L)); - } catch (IOException e) { - LOG.error("Exception while creating quorum peer", e); - Assert.fail("Exception while creating quorum peer"); - } - } -} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-common/src/test/java/org/apache/zookeeper/test/FLERestartTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-common/src/test/java/org/apache/zookeeper/test/FLERestartTest.java b/zookeeper-common/src/test/java/org/apache/zookeeper/test/FLERestartTest.java deleted file mode 100644 index 21e562b..0000000 --- a/zookeeper-common/src/test/java/org/apache/zookeeper/test/FLERestartTest.java +++ /dev/null @@ -1,187 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.test; - -import java.io.File; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.concurrent.Semaphore; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.zookeeper.PortAssignment; -import org.apache.zookeeper.ZKTestCase; -import org.apache.zookeeper.server.quorum.FastLeaderElection; -import org.apache.zookeeper.server.quorum.QuorumPeer; -import org.apache.zookeeper.server.quorum.Vote; -import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; -import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class FLERestartTest extends ZKTestCase { - protected static final Logger LOG = LoggerFactory.getLogger(FLETest.class); - - private int count; - private HashMap<Long,QuorumServer> peers; - private ArrayList<FLERestartThread> restartThreads; - private File tmpdir[]; - private int port[]; - private Semaphore finish; - - static class TestVote { - long leader; - - TestVote(int id, long leader) { - this.leader = leader; - } - } - - int countVotes(HashSet<TestVote> hs, long id) { - int counter = 0; - for(TestVote v : hs){ - if(v.leader == id) counter++; - } - - return counter; - } - - @Before - public void setUp() throws Exception { - count = 3; - peers = new HashMap<Long,QuorumServer>(count); - restartThreads = new ArrayList<FLERestartThread>(count); - tmpdir = new File[count]; - port = new int[count]; - finish = new Semaphore(0); - } - - @After - public void tearDown() throws Exception { - for(int i = 0; i < restartThreads.size(); i++) { - ((FastLeaderElection) restartThreads.get(i).peer.getElectionAlg()).shutdown(); - } - } - - class FLERestartThread extends Thread { - int i; - QuorumPeer peer; - int peerRound = 0; - - FLERestartThread(QuorumPeer peer, int i) { - this.i = i; - this.peer = peer; - LOG.info("Constructor: " + getName()); - } - public void run() { - try { - Vote v = null; - while(true) { - peer.setPeerState(ServerState.LOOKING); - LOG.info("Going to call leader election again."); - v = peer.getElectionAlg().lookForLeader(); - if(v == null){ - LOG.info("Thread " + i + " got a null vote"); - break; - } - - /* - * A real zookeeper would take care of setting the current vote. Here - * we do it manually. - */ - peer.setCurrentVote(v); - - LOG.info("Finished election: " + i + ", " + v.getId()); - //votes[i] = v; - - switch(i){ - case 0: - if(peerRound == 0){ - LOG.info("First peer, shutting it down"); - QuorumBase.shutdown(peer); - ((FastLeaderElection) restartThreads.get(i).peer.getElectionAlg()).shutdown(); - - peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2); - peer.startLeaderElection(); - peerRound++; - } else { - finish.release(2); - return; - } - - break; - case 1: - LOG.info("Second entering case"); - finish.acquire(); - //if(threads.get(0).peer.getPeerState() == ServerState.LEADING ){ - LOG.info("Release"); - - return; - case 2: - LOG.info("First peer, do nothing, just join"); - finish.acquire(); - //if(threads.get(0).peer.getPeerState() == ServerState.LEADING ){ - LOG.info("Release"); - - return; - } - } - } catch (Exception e){ - e.printStackTrace(); - } - } - } - - - @Test - public void testLERestart() throws Exception { - - LOG.info("TestLE: " + getTestName()+ ", " + count); - for(int i = 0; i < count; i++) { - peers.put(Long.valueOf(i), - new QuorumServer(i, - new InetSocketAddress( - "127.0.0.1", PortAssignment.unique()), - new InetSocketAddress( - "127.0.0.1", PortAssignment.unique()))); - tmpdir[i] = ClientBase.createTmpDir(); - port[i] = PortAssignment.unique(); - } - - for(int i = 0; i < count; i++) { - QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2); - peer.startLeaderElection(); - FLERestartThread thread = new FLERestartThread(peer, i); - thread.start(); - restartThreads.add(thread); - } - LOG.info("Started threads " + getTestName()); - for(int i = 0; i < restartThreads.size(); i++) { - restartThreads.get(i).join(10000); - if (restartThreads.get(i).isAlive()) { - Assert.fail("Threads didn't join"); - } - - } - } -} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-common/src/test/java/org/apache/zookeeper/test/FLETest.java ---------------------------------------------------------------------- diff --git a/zookeeper-common/src/test/java/org/apache/zookeeper/test/FLETest.java b/zookeeper-common/src/test/java/org/apache/zookeeper/test/FLETest.java deleted file mode 100644 index 51bcecb..0000000 --- a/zookeeper-common/src/test/java/org/apache/zookeeper/test/FLETest.java +++ /dev/null @@ -1,536 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.test; - -import java.io.File; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.zookeeper.PortAssignment; -import org.apache.zookeeper.ZKTestCase; -import org.apache.zookeeper.server.quorum.FastLeaderElection; -import org.apache.zookeeper.server.quorum.QuorumPeer; -import org.apache.zookeeper.server.quorum.Vote; -import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; -import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class FLETest extends ZKTestCase { - protected static final Logger LOG = LoggerFactory.getLogger(FLETest.class); - private final int MAX_LOOP_COUNTER = 300; - private FLETest.LEThread leThread; - - static class TestVote { - TestVote(int id, long leader) { - this.leader = leader; - } - - long leader; - } - - int countVotes(HashSet<TestVote> hs, long id) { - int counter = 0; - for(TestVote v : hs){ - if(v.leader == id) counter++; - } - - return counter; - } - - int count; - HashMap<Long,QuorumServer> peers; - ArrayList<LEThread> threads; - HashMap<Integer, HashSet<TestVote> > voteMap; - HashMap<Long, LEThread> quora; - File tmpdir[]; - int port[]; - int successCount; - - volatile Vote votes[]; - volatile long leader = -1; - //volatile int round = 1; - Random rand = new Random(); - Set<Long> joinedThreads; - - @Before - public void setUp() throws Exception { - count = 7; - - peers = new HashMap<Long,QuorumServer>(count); - threads = new ArrayList<LEThread>(count); - voteMap = new HashMap<Integer, HashSet<TestVote> >(); - votes = new Vote[count]; - tmpdir = new File[count]; - port = new int[count]; - successCount = 0; - joinedThreads = new HashSet<Long>(); - } - - @After - public void tearDown() throws Exception { - for (int i = 0; i < threads.size(); i++) { - leThread = threads.get(i); - QuorumBase.shutdown(leThread.peer); - } - } - - - /** - * Implements the behavior of a peer during the leader election rounds - * of tests. - */ - class LEThread extends Thread { - FLETest self; - int i; - QuorumPeer peer; - int totalRounds; - ConcurrentHashMap<Long, HashSet<Integer> > quora; - - LEThread(FLETest self, QuorumPeer peer, int i, int rounds, ConcurrentHashMap<Long, HashSet<Integer> > quora) { - this.self = self; - this.i = i; - this.peer = peer; - this.totalRounds = rounds; - this.quora = quora; - - LOG.info("Constructor: " + getName()); - } - - public void run() { - try { - Vote v = null; - while(true) { - - /* - * Set the state of the peer to LOOKING and look for leader - */ - peer.setPeerState(ServerState.LOOKING); - LOG.info("Going to call leader election again."); - v = peer.getElectionAlg().lookForLeader(); - if(v == null){ - LOG.info("Thread " + i + " got a null vote"); - break; - } - - /* - * Done with the election round, so now we set the vote in - * the peer. A real zookeeper would take care of setting the - * current vote. Here we do it manually. - */ - peer.setCurrentVote(v); - - LOG.info("Finished election: " + i + ", " + v.getId()); - votes[i] = v; - - /* - * Get the current value of the logical clock for this peer - * so that we know in which round this peer has executed. - */ - int lc = (int) ((FastLeaderElection) peer.getElectionAlg()).getLogicalClock(); - - /* - * The leader executes the following block, which essentially shuts down - * the peer if it is not the last round. - */ - if (v.getId() == i) { - LOG.info("I'm the leader: " + i); - if (lc < this.totalRounds) { - LOG.info("Leader " + i + " dying"); - FastLeaderElection election = - (FastLeaderElection) peer.getElectionAlg(); - election.shutdown(); - // Make sure the vote is reset to -1 after shutdown. - Assert.assertEquals(-1, election.getVote().getId()); - LOG.info("Leader " + i + " dead"); - - break; - } - } - - /* - * If the peer has done enough rounds, then consider joining. The thread - * will only join if it is part of a quorum supporting the current - * leader. Otherwise it will try again. - */ - if (lc >= this.totalRounds) { - /* - * quora keeps the supporters of a given leader, so - * we first update it with the vote of this peer. - */ - if(quora.get(v.getId()) == null) quora.put(v.getId(), new HashSet<Integer>()); - quora.get(v.getId()).add(i); - - /* - * we now wait until a quorum supports the same leader. - */ - if(waitForQuorum(v.getId())){ - synchronized(self){ - - /* - * Assert that the state of the thread is the one expected. - */ - if(v.getId() == i){ - Assert.assertTrue("Wrong state" + peer.getPeerState(), - peer.getPeerState() == ServerState.LEADING); - leader = i; - } else { - Assert.assertTrue("Wrong state" + peer.getPeerState(), - peer.getPeerState() == ServerState.FOLLOWING); - } - - /* - * Global variable keeping track of - * how many peers have successfully - * joined. - */ - successCount++; - joinedThreads.add((long)i); - self.notify(); - } - - /* - * I'm done so joining. - */ - break; - } else { - quora.get(v.getId()).remove(i); - } - } - - /* - * This sleep time represents the time a follower - * would take to declare the leader dead and start - * a new leader election. - */ - Thread.sleep(100); - - } - LOG.debug("Thread " + i + " votes " + v); - } catch (InterruptedException e) { - Assert.fail(e.toString()); - } - } - - /** - * Auxiliary method to make sure that enough followers terminated. - * - * @return boolean followers successfully joined. - */ - boolean waitForQuorum(long id) - throws InterruptedException { - int loopCounter = 0; - while((quora.get(id).size() <= count/2) && (loopCounter < MAX_LOOP_COUNTER)){ - Thread.sleep(100); - loopCounter++; - } - - if((loopCounter >= MAX_LOOP_COUNTER) && (quora.get(id).size() <= count/2)){ - return false; - } else { - return true; - } - } - - } - - - - @Test - public void testSingleElection() throws Exception { - try{ - runElection(1); - } catch (Exception e) { - Assert.fail(e.toString()); - } - } - - - @Test - public void testDoubleElection() throws Exception { - try{ - runElection(2); - } catch (Exception e) { - Assert.fail(e.toString()); - } - } - - @Test - public void testTripleElection() throws Exception { - try{ - runElection(3); - } catch (Exception e) { - Assert.fail(e.toString()); - } - } - - /** - * Test leader election for a number of rounds. In all rounds but the last one - * we kill the leader. - * - * @param rounds - * @throws Exception - */ - private void runElection(int rounds) throws Exception { - ConcurrentHashMap<Long, HashSet<Integer> > quora = - new ConcurrentHashMap<Long, HashSet<Integer> >(); - - LOG.info("TestLE: " + getTestName()+ ", " + count); - - /* - * Creates list of peers. - */ - for(int i = 0; i < count; i++) { - port[i] = PortAssignment.unique(); - peers.put(Long.valueOf(i), - new QuorumServer(i, - new InetSocketAddress( - "127.0.0.1", PortAssignment.unique()), - new InetSocketAddress( - "127.0.0.1", PortAssignment.unique()), - new InetSocketAddress( - "127.0.0.1", port[i]))); - tmpdir[i] = ClientBase.createTmpDir(); - } - - /* - * Start one LEThread for each peer we want to run. - */ - for(int i = 0; i < count; i++) { - QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], - port[i], 3, i, 1000, 2, 2); - peer.startLeaderElection(); - LEThread thread = new LEThread(this, peer, i, rounds, quora); - thread.start(); - threads.add(thread); - } - LOG.info("Started threads " + getTestName()); - - int waitCounter = 0; - synchronized(this){ - while(((successCount <= count/2) || (leader == -1)) - && (waitCounter < MAX_LOOP_COUNTER)) - { - this.wait(200); - waitCounter++; - } - } - LOG.info("Success count: " + successCount); - - /* - * Lists what threads haven't joined. A thread doesn't join if - * it hasn't decided upon a leader yet. It can happen that a - * peer is slow or disconnected, and it can take longer to - * nominate and connect to the current leader. - */ - for (int i = 0; i < threads.size(); i++) { - if (threads.get(i).isAlive()) { - LOG.info("Threads didn't join: " + i); - } - } - - /* - * If we have a majority, then we are good to go. - */ - if(successCount <= count/2){ - Assert.fail("Fewer than a a majority has joined"); - } - - /* - * I'm done so joining. - */ - if(!joinedThreads.contains(leader)){ - Assert.fail("Leader hasn't joined: " + leader); - } - } - - - /* - * Class to verify of the thread has become a follower - */ - static class VerifyState extends Thread { - volatile private boolean success = false; - private QuorumPeer peer; - public VerifyState(QuorumPeer peer) { - this.peer = peer; - } - public void run() { - setName("VerifyState-" + peer.getId()); - while (true) { - if(peer.getPeerState() == ServerState.FOLLOWING) { - LOG.info("I am following"); - success = true; - break; - } else if (peer.getPeerState() == ServerState.LEADING) { - LOG.info("I am leading"); - success = false; - break; - } - try { - Thread.sleep(250); - } catch (Exception e) { - LOG.warn("Sleep failed ", e); - } - } - } - public boolean isSuccess() { - return success; - } - } - - /* - * For ZOOKEEPER-975 verify that a peer joining an established cluster - * does not go in LEADING state. - */ - @Test - public void testJoin() throws Exception { - int sid; - QuorumPeer peer; - int waitTime = 10 * 1000; - ArrayList<QuorumPeer> peerList = new ArrayList<QuorumPeer>(); - for(sid = 0; sid < 3; sid++) { - port[sid] = PortAssignment.unique(); - peers.put(Long.valueOf(sid), - new QuorumServer(sid, - new InetSocketAddress( - "127.0.0.1", PortAssignment.unique()), - new InetSocketAddress( - "127.0.0.1", PortAssignment.unique()), - new InetSocketAddress( - "127.0.0.1", port[sid]))); - tmpdir[sid] = ClientBase.createTmpDir(); - } - // start 2 peers and verify if they form the cluster - for (sid = 0; sid < 2; sid++) { - peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid], - port[sid], 3, sid, 2000, 2, 2); - LOG.info("Starting peer " + peer.getId()); - peer.start(); - peerList.add(sid, peer); - } - peer = peerList.get(0); - VerifyState v1 = new VerifyState(peerList.get(0)); - v1.start(); - v1.join(waitTime); - Assert.assertFalse("Unable to form cluster in " + - waitTime + " ms", - !v1.isSuccess()); - // Start 3rd peer and check if it goes in LEADING state - peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid], - port[sid], 3, sid, 2000, 2, 2); - LOG.info("Starting peer " + peer.getId()); - peer.start(); - peerList.add(sid, peer); - v1 = new VerifyState(peer); - v1.start(); - v1.join(waitTime); - if (v1.isAlive()) { - Assert.fail("Peer " + peer.getId() + " failed to join the cluster " + - "within " + waitTime + " ms"); - } else if (!v1.isSuccess()) { - Assert.fail("Incorrect LEADING state for peer " + peer.getId()); - } - // cleanup - for (int id = 0; id < 3; id++) { - peer = peerList.get(id); - if (peer != null) { - peer.shutdown(); - } - } - } - - /* - * For ZOOKEEPER-1732 verify that it is possible to join an ensemble with - * inconsistent election round information. - */ - @Test - public void testJoinInconsistentEnsemble() throws Exception { - int sid; - QuorumPeer peer; - int waitTime = 10 * 1000; - ArrayList<QuorumPeer> peerList = new ArrayList<QuorumPeer>(); - for(sid = 0; sid < 3; sid++) { - peers.put(Long.valueOf(sid), - new QuorumServer(sid, - new InetSocketAddress( - "127.0.0.1", PortAssignment.unique()), - new InetSocketAddress( - "127.0.0.1", PortAssignment.unique()))); - tmpdir[sid] = ClientBase.createTmpDir(); - port[sid] = PortAssignment.unique(); - } - // start 2 peers and verify if they form the cluster - for (sid = 0; sid < 2; sid++) { - peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid], - port[sid], 3, sid, 2000, 2, 2); - LOG.info("Starting peer " + peer.getId()); - peer.start(); - peerList.add(sid, peer); - } - peer = peerList.get(0); - VerifyState v1 = new VerifyState(peerList.get(0)); - v1.start(); - v1.join(waitTime); - Assert.assertFalse("Unable to form cluster in " + - waitTime + " ms", - !v1.isSuccess()); - // Change the election round for one of the members of the ensemble - long leaderSid = peer.getCurrentVote().getId(); - long zxid = peer.getCurrentVote().getZxid(); - long electionEpoch = peer.getCurrentVote().getElectionEpoch(); - ServerState state = peer.getCurrentVote().getState(); - long peerEpoch = peer.getCurrentVote().getPeerEpoch(); - Vote newVote = new Vote(leaderSid, zxid+100, electionEpoch+100, peerEpoch, state); - peer.setCurrentVote(newVote); - // Start 3rd peer and check if it joins the quorum - peer = new QuorumPeer(peers, tmpdir[2], tmpdir[2], - port[2], 3, 2, 2000, 2, 2); - LOG.info("Starting peer " + peer.getId()); - peer.start(); - peerList.add(sid, peer); - v1 = new VerifyState(peer); - v1.start(); - v1.join(waitTime); - if (v1.isAlive()) { - Assert.fail("Peer " + peer.getId() + " failed to join the cluster " + - "within " + waitTime + " ms"); - } - // cleanup - for (int id = 0; id < 3; id++) { - peer = peerList.get(id); - if (peer != null) { - peer.shutdown(); - } - } - } - - @Test - public void testElectionTimeUnit() throws Exception { - Assert.assertEquals("MS", QuorumPeer.FLE_TIME_UNIT); - } -} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-common/src/test/java/org/apache/zookeeper/test/FLEZeroWeightTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-common/src/test/java/org/apache/zookeeper/test/FLEZeroWeightTest.java b/zookeeper-common/src/test/java/org/apache/zookeeper/test/FLEZeroWeightTest.java deleted file mode 100644 index 1ff089c..0000000 --- a/zookeeper-common/src/test/java/org/apache/zookeeper/test/FLEZeroWeightTest.java +++ /dev/null @@ -1,174 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.test; -import java.io.ByteArrayInputStream; -import java.io.File; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Properties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.zookeeper.PortAssignment; -import org.apache.zookeeper.ZKTestCase; -import org.apache.zookeeper.server.quorum.FastLeaderElection; -import org.apache.zookeeper.server.quorum.QuorumPeer; -import org.apache.zookeeper.server.quorum.Vote; -import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; -import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; -import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class FLEZeroWeightTest extends ZKTestCase { - private static final Logger LOG = LoggerFactory.getLogger(HierarchicalQuorumTest.class); - - Properties qp; - - int count; - HashMap<Long,QuorumServer> peers; - ArrayList<LEThread> threads; - File tmpdir[]; - int port[]; - - volatile Vote votes[]; - - @Before - public void setUp() throws Exception { - count = 9; - - peers = new HashMap<Long,QuorumServer>(count); - threads = new ArrayList<LEThread>(count); - votes = new Vote[count]; - tmpdir = new File[count]; - port = new int[count]; - - String config = "group.1=0:1:2\n" + - "group.2=3:4:5\n" + - "group.3=6:7:8\n" + - "weight.0=1\n" + - "weight.1=1\n" + - "weight.2=1\n" + - "weight.3=0\n" + - "weight.4=0\n" + - "weight.5=0\n" + - "weight.6=0\n" + - "weight.7=0\n" + - "weight.8=0"; - - ByteArrayInputStream is = new ByteArrayInputStream(config.getBytes()); - this.qp = new Properties(); - qp.load(is); - } - - @After - public void tearDown() throws Exception { - for(int i = 0; i < threads.size(); i++) { - LEThread leThread = threads.get(i); - // shutdown() has to be explicitly called for every thread to - // make sure that resources are freed properly and all fixed network ports - // are available for other test cases - QuorumBase.shutdown(leThread.peer); - } - } - - class LEThread extends Thread { - int i; - QuorumPeer peer; - boolean fail; - - LEThread(QuorumPeer peer, int i) { - this.i = i; - this.peer = peer; - LOG.info("Constructor: " + getName()); - } - - public void run() { - try { - Vote v = null; - fail = false; - while(true){ - - //while(true) { - peer.setPeerState(ServerState.LOOKING); - LOG.info("Going to call leader election."); - v = peer.getElectionAlg().lookForLeader(); - if(v == null){ - LOG.info("Thread " + i + " got a null vote"); - return; - } - - /* - * A real zookeeper would take care of setting the current vote. Here - * we do it manually. - */ - peer.setCurrentVote(v); - - LOG.info("Finished election: " + i + ", " + v.getId()); - votes[i] = v; - - if((peer.getPeerState() == ServerState.LEADING) && - (peer.getId() > 2)) fail = true; - - if((peer.getPeerState() == ServerState.FOLLOWING) || - (peer.getPeerState() == ServerState.LEADING)) break; - } - LOG.debug("Thread " + i + " votes " + v); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - - @Test - public void testZeroWeightQuorum() throws Exception { - LOG.info("TestZeroWeightQuorum: " + getTestName()+ ", " + count); - for(int i = 0; i < count; i++) { - InetSocketAddress addr1 = new InetSocketAddress("127.0.0.1",PortAssignment.unique()); - InetSocketAddress addr2 = new InetSocketAddress("127.0.0.1",PortAssignment.unique()); - InetSocketAddress addr3 = new InetSocketAddress("127.0.0.1",PortAssignment.unique()); - port[i] = addr3.getPort(); - qp.setProperty("server."+i, "127.0.0.1:"+addr1.getPort()+":"+addr2.getPort()+";"+port[i]); - peers.put(Long.valueOf(i), new QuorumServer(i, addr1, addr2, addr3)); - tmpdir[i] = ClientBase.createTmpDir(); - } - - for(int i = 0; i < count; i++) { - QuorumHierarchical hq = new QuorumHierarchical(qp); - QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2, hq); - peer.startLeaderElection(); - LEThread thread = new LEThread(peer, i); - thread.start(); - threads.add(thread); - } - LOG.info("Started threads " + getTestName()); - - for(int i = 0; i < threads.size(); i++) { - threads.get(i).join(15000); - if (threads.get(i).isAlive()) { - Assert.fail("Threads didn't join"); - } else { - if(threads.get(i).fail) - Assert.fail("Elected zero-weight server"); - } - } - } -} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-common/src/test/java/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-common/src/test/java/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java b/zookeeper-common/src/test/java/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java deleted file mode 100644 index 5086711..0000000 --- a/zookeeper-common/src/test/java/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java +++ /dev/null @@ -1,748 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.test; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.util.Collection; -import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.TestableZooKeeper; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.ZKTestCase; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.server.ZKDatabase; -import org.apache.zookeeper.server.quorum.Leader; -import org.apache.zookeeper.test.ClientBase.CountdownWatcher; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class FollowerResyncConcurrencyTest extends ZKTestCase { - private static final Logger LOG = LoggerFactory.getLogger(FollowerResyncConcurrencyTest.class); - public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT; - - private AtomicInteger counter = new AtomicInteger(0); - private AtomicInteger errors = new AtomicInteger(0); - /** - * Keep track of pending async operations, we shouldn't start verifying - * the state until pending operation is 0 - */ - private AtomicInteger pending = new AtomicInteger(0); - - @Before - public void setUp() throws Exception { - pending.set(0); - errors.set(0); - counter.set(0); - } - - @After - public void tearDown() throws Exception { - LOG.info("Error count {}" , errors.get()); - } - - /** - * See ZOOKEEPER-1319 - verify that a lagging follwer resyncs correctly - * - * 1) start with down quorum - * 2) start leader/follower1, add some data - * 3) restart leader/follower1 - * 4) start follower2 - * 5) verify data consistency across the ensemble - * - * @throws Exception - */ - @Test - public void testLaggingFollowerResyncsUnderNewEpoch() throws Exception { - CountdownWatcher watcher1 = new CountdownWatcher(); - CountdownWatcher watcher2 = new CountdownWatcher(); - CountdownWatcher watcher3 = new CountdownWatcher(); - - QuorumUtil qu = new QuorumUtil(1); - qu.shutdownAll(); - - qu.start(1); - qu.start(2); - Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:" - + qu.getPeer(1).clientPort, ClientBase.CONNECTION_TIMEOUT)); - Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:" - + qu.getPeer(2).clientPort, ClientBase.CONNECTION_TIMEOUT)); - - ZooKeeper zk1 = - createClient(qu.getPeer(1).peer.getClientPort(), watcher1); - LOG.info("zk1 has session id 0x{}", Long.toHexString(zk1.getSessionId())); - - final String resyncPath = "/resyncundernewepoch"; - zk1.create(resyncPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk1.close(); - - qu.shutdown(1); - qu.shutdown(2); - Assert.assertTrue("Waiting for server down", ClientBase.waitForServerDown("127.0.0.1:" - + qu.getPeer(1).clientPort, ClientBase.CONNECTION_TIMEOUT)); - Assert.assertTrue("Waiting for server down", ClientBase.waitForServerDown("127.0.0.1:" - + qu.getPeer(2).clientPort, ClientBase.CONNECTION_TIMEOUT)); - - qu.start(1); - qu.start(2); - Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:" - + qu.getPeer(1).clientPort, ClientBase.CONNECTION_TIMEOUT)); - Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:" - + qu.getPeer(2).clientPort, ClientBase.CONNECTION_TIMEOUT)); - - qu.start(3); - Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:" - + qu.getPeer(3).clientPort, ClientBase.CONNECTION_TIMEOUT)); - - zk1 = createClient(qu.getPeer(1).peer.getClientPort(), watcher1); - LOG.info("zk1 has session id 0x{}", Long.toHexString(zk1.getSessionId())); - - assertNotNull("zk1 has data", zk1.exists(resyncPath, false)); - - final ZooKeeper zk2 = - createClient(qu.getPeer(2).peer.getClientPort(), watcher2); - LOG.info("zk2 has session id 0x{}", Long.toHexString(zk2.getSessionId())); - - assertNotNull("zk2 has data", zk2.exists(resyncPath, false)); - - final ZooKeeper zk3 = - createClient(qu.getPeer(3).peer.getClientPort(), watcher3); - LOG.info("zk3 has session id 0x{}", Long.toHexString(zk3.getSessionId())); - - assertNotNull("zk3 has data", zk3.exists(resyncPath, false)); - - zk1.close(); - zk2.close(); - zk3.close(); - - qu.shutdownAll(); - } - - /** - * See ZOOKEEPER-962. This tests for one of the bugs hit while fixing this, - * setting the ZXID of the SNAP packet - * Starts up 3 ZKs. Shut down F1, write a node, restart the one that was shut down - * The non-leader ZKs are writing to cluster - * Shut down F1 again - * Restart after sessions are expired, expect to get a snap file - * Shut down, run some transactions through. - * Restart to a diff while transactions are running in leader - * @throws IOException - * @throws InterruptedException - * @throws KeeperException - */ - @Test - public void testResyncBySnapThenDiffAfterFollowerCrashes() - throws IOException, InterruptedException, KeeperException, Throwable - { - followerResyncCrashTest(false); - } - - /** - * Same as testResyncBySnapThenDiffAfterFollowerCrashes() but we resync - * follower using txnlog - * - * @throws IOException - * @throws InterruptedException - * @throws KeeperException - */ - @Test - public void testResyncByTxnlogThenDiffAfterFollowerCrashes() - throws IOException, InterruptedException, KeeperException, Throwable - { - followerResyncCrashTest(true); - } - - public void followerResyncCrashTest(boolean useTxnLogResync) - throws IOException, InterruptedException, KeeperException, Throwable - { - final Semaphore sem = new Semaphore(0); - - QuorumUtil qu = new QuorumUtil(1); - qu.startAll(); - CountdownWatcher watcher1 = new CountdownWatcher(); - CountdownWatcher watcher2 = new CountdownWatcher(); - CountdownWatcher watcher3 = new CountdownWatcher(); - - int index = 1; - while(qu.getPeer(index).peer.leader == null) { - index++; - } - - Leader leader = qu.getPeer(index).peer.leader; - assertNotNull(leader); - - if (useTxnLogResync) { - // Set the factor to high value so that this test case always - // resync using txnlog - qu.getPeer(index).peer.getActiveServer().getZKDatabase() - .setSnapshotSizeFactor(1000); - } else { - // Disable sending DIFF using txnlog, so that this test still - // testing the ZOOKEEPER-962 bug - qu.getPeer(index).peer.getActiveServer().getZKDatabase() - .setSnapshotSizeFactor(-1); - } - - /* Reusing the index variable to select a follower to connect to */ - index = (index == 1) ? 2 : 1; - LOG.info("Connecting to follower: {}", index); - - qu.shutdown(index); - - final ZooKeeper zk3 = - createClient(qu.getPeer(3).peer.getClientPort(), watcher3); - LOG.info("zk3 has session id 0x{}", Long.toHexString(zk3.getSessionId())); - - zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); - - qu.restart(index); - - final ZooKeeper zk1 = - createClient(qu.getPeer(index).peer.getClientPort(), watcher1); - LOG.info("zk1 has session id 0x{}", Long.toHexString(zk1.getSessionId())); - - final ZooKeeper zk2 = - createClient(qu.getPeer(index).peer.getClientPort(), watcher2); - LOG.info("zk2 has session id 0x{}", Long.toHexString(zk2.getSessionId())); - - zk1.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - // Prepare a thread that will create znodes. - Thread mytestfooThread = new Thread(new Runnable() { - @Override - public void run() { - for(int i = 0; i < 3000; i++) { - // Here we create 3000 znodes - zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { - - @Override - public void processResult(int rc, String path, Object ctx, String name) { - pending.decrementAndGet(); - counter.incrementAndGet(); - if (rc != 0) { - errors.incrementAndGet(); - } - if(counter.get() == 16200){ - sem.release(); - } - } - }, null); - pending.incrementAndGet(); - if(i%10==0){ - try { - Thread.sleep(100); - } catch (Exception e) { - - } - } - } - - } - }); - - // Here we start populating the server and shutdown the follower after - // initial data is written. - for(int i = 0; i < 13000; i++) { - // Here we create 13000 znodes - zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { - - @Override - public void processResult(int rc, String path, Object ctx, String name) { - pending.decrementAndGet(); - counter.incrementAndGet(); - if (rc != 0) { - errors.incrementAndGet(); - } - if(counter.get() == 16200){ - sem.release(); - } - } - }, null); - pending.incrementAndGet(); - - if(i == 5000){ - qu.shutdown(index); - LOG.info("Shutting down s1"); - } - if(i == 12000){ - // Start the prepared thread so that it is writing znodes while - // the follower is restarting. On the first restart, the follow - // should use txnlog to catchup. For subsequent restart, the - // follower should use a diff to catchup. - mytestfooThread.start(); - LOG.info("Restarting follower: {}", index); - qu.restart(index); - Thread.sleep(300); - LOG.info("Shutdown follower: {}", index); - qu.shutdown(index); - Thread.sleep(300); - LOG.info("Restarting follower: {}", index); - qu.restart(index); - LOG.info("Setting up server: {}", index); - } - if((i % 1000) == 0){ - Thread.sleep(1000); - } - - if(i%50 == 0) { - zk2.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { - @Override - public void processResult(int rc, String path, Object ctx, String name) { - pending.decrementAndGet(); - counter.incrementAndGet(); - if (rc != 0) { - errors.incrementAndGet(); - } - if(counter.get() == 16200){ - sem.release(); - } - } - }, null); - pending.incrementAndGet(); - } - } - - // Wait until all updates return - if(!sem.tryAcquire(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) { - LOG.warn("Did not aquire semaphore fast enough"); - } - mytestfooThread.join(ClientBase.CONNECTION_TIMEOUT); - if (mytestfooThread.isAlive()) { - LOG.error("mytestfooThread is still alive"); - } - assertTrue(waitForPendingRequests(60)); - assertTrue(waitForSync(qu, index, 10)); - - verifyState(qu, index, leader); - - zk1.close(); - zk2.close(); - zk3.close(); - - qu.shutdownAll(); - } - - /** - * This test: - * Starts up 3 ZKs. The non-leader ZKs are writing to cluster - * Shut down one of the non-leader ZKs. - * Restart after sessions have expired but <500 txns have taken place (get a diff) - * Shut down immediately after restarting, start running separate thread with other transactions - * Restart to a diff while transactions are running in leader - * - * - * Before fixes for ZOOKEEPER-962, restarting off of diff could get an inconsistent view of data missing transactions that - * completed during diff syncing. Follower would also be considered "restarted" before all forwarded transactions - * were completely processed, so restarting would cause a snap file with a too-high zxid to be written, and transactions - * would be missed - * - * This test should pretty reliably catch the failure of restarting the server before all diff messages have been processed, - * however, due to the transient nature of the system it may not catch failures due to concurrent processing of transactions - * during the leader's diff forwarding. - * - * @throws IOException - * @throws InterruptedException - * @throws KeeperException - * @throws Throwable - */ - - @Test - public void testResyncByDiffAfterFollowerCrashes() - throws IOException, InterruptedException, KeeperException, Throwable - { - final Semaphore sem = new Semaphore(0); - - QuorumUtil qu = new QuorumUtil(1); - qu.startAll(); - CountdownWatcher watcher1 = new CountdownWatcher(); - CountdownWatcher watcher2 = new CountdownWatcher(); - CountdownWatcher watcher3 = new CountdownWatcher(); - - int index = 1; - while(qu.getPeer(index).peer.leader == null) { - index++; - } - - Leader leader = qu.getPeer(index).peer.leader; - assertNotNull(leader); - - /* Reusing the index variable to select a follower to connect to */ - index = (index == 1) ? 2 : 1; - LOG.info("Connecting to follower: {}", index); - - final ZooKeeper zk1 = - createClient(qu.getPeer(index).peer.getClientPort(), watcher1); - LOG.info("zk1 has session id 0x{}", Long.toHexString(zk1.getSessionId())); - - final ZooKeeper zk2 = - createClient(qu.getPeer(index).peer.getClientPort(), watcher2); - LOG.info("zk2 has session id 0x{}", Long.toHexString(zk2.getSessionId())); - - final ZooKeeper zk3 = - createClient(qu.getPeer(3).peer.getClientPort(), watcher3); - LOG.info("zk3 has session id 0x{}", Long.toHexString(zk3.getSessionId())); - - zk1.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); - - final AtomicBoolean runNow = new AtomicBoolean(false); - Thread mytestfooThread = new Thread(new Runnable() { - - @Override - public void run() { - int inSyncCounter = 0; - while(inSyncCounter < 400) { - if(runNow.get()) { - zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { - - @Override - public void processResult(int rc, String path, Object ctx, String name) { - pending.decrementAndGet(); - counter.incrementAndGet(); - if (rc != 0) { - errors.incrementAndGet();; - } - if(counter.get() > 7300){ - sem.release(); - } - } - }, null); - pending.incrementAndGet(); - try { - Thread.sleep(10); - } catch (Exception e) { - } - inSyncCounter++; - } else { - Thread.yield(); - } - } - - } - }); - - mytestfooThread.start(); - for(int i = 0; i < 5000; i++) { - zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { - - @Override - public void processResult(int rc, String path, Object ctx, String name) { - pending.decrementAndGet(); - counter.incrementAndGet(); - if (rc != 0) { - errors.incrementAndGet();; - } - if(counter.get() > 7300){ - sem.release(); - } - } - }, null); - pending.incrementAndGet(); - if(i == 1000){ - qu.shutdown(index); - Thread.sleep(1100); - LOG.info("Shutting down s1"); - } - if(i == 1100 || i == 1150 || i == 1200) { - Thread.sleep(1000); - } - - if(i == 1200){ - qu.startThenShutdown(index); - runNow.set(true); - qu.restart(index); - LOG.info("Setting up server: {}", index); - } - - if(i>=1000 && i%2== 0) { - zk3.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { - - @Override - public void processResult(int rc, String path, Object ctx, String name) { - pending.decrementAndGet(); - counter.incrementAndGet(); - if (rc != 0) { - errors.incrementAndGet(); - } - if(counter.get() > 7300){ - sem.release(); - } - } - }, null); - pending.incrementAndGet(); - } - if(i == 1050 || i == 1100 || i == 1150) { - Thread.sleep(1000); - } - } - - // Wait until all updates return - if(!sem.tryAcquire(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) { - LOG.warn("Did not aquire semaphore fast enough"); - } - mytestfooThread.join(ClientBase.CONNECTION_TIMEOUT); - if (mytestfooThread.isAlive()) { - LOG.error("mytestfooThread is still alive"); - } - - assertTrue(waitForPendingRequests(60)); - assertTrue(waitForSync(qu, index, 10)); - // Verify that server is following and has the same epoch as the leader - - verifyState(qu, index, leader); - - zk1.close(); - zk2.close(); - zk3.close(); - - qu.shutdownAll(); - } - - private static DisconnectableZooKeeper createClient(int port, - CountdownWatcher watcher) - throws IOException, TimeoutException, InterruptedException - { - DisconnectableZooKeeper zk = new DisconnectableZooKeeper( - "127.0.0.1:" + port, ClientBase.CONNECTION_TIMEOUT, watcher); - - watcher.waitForConnected(CONNECTION_TIMEOUT); - return zk; - } - - /** - * Wait for all async operation to return. So we know that we can start - * verifying the state - */ - private boolean waitForPendingRequests(int timeout) throws InterruptedException { - LOG.info("Wait for pending requests: {}", pending.get()); - for (int i = 0; i < timeout; ++i) { - Thread.sleep(1000); - if (pending.get() == 0) { - return true; - } - } - LOG.info("Timeout waiting for pending requests: {}", pending.get()); - return false; - } - - /** - * Wait for all server to have the same lastProccessedZxid. Timeout in seconds - */ - private boolean waitForSync(QuorumUtil qu, int index, int timeout) throws InterruptedException{ - LOG.info("Wait for server to sync"); - int leaderIndex = (index == 1) ? 2 : 1; - ZKDatabase restartedDb = qu.getPeer(index).peer.getActiveServer().getZKDatabase(); - ZKDatabase cleanDb = qu.getPeer(3).peer.getActiveServer().getZKDatabase(); - ZKDatabase leadDb = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase(); - long leadZxid = 0; - long cleanZxid = 0; - long restartedZxid = 0; - for (int i = 0; i < timeout; ++i) { - leadZxid = leadDb.getDataTreeLastProcessedZxid(); - cleanZxid = cleanDb.getDataTreeLastProcessedZxid(); - restartedZxid = restartedDb.getDataTreeLastProcessedZxid(); - if (leadZxid == cleanZxid && leadZxid == restartedZxid) { - return true; - } - Thread.sleep(1000); - } - LOG.info("Timeout waiting for zxid to sync: leader 0x{}" + - "clean 0x{}" + - "restarted 0x{}", Long.toHexString(leadZxid), Long.toHexString(cleanZxid), - Long.toHexString(restartedZxid)); - return false; - } - - private static TestableZooKeeper createTestableClient(String hp) - throws IOException, TimeoutException, InterruptedException - { - CountdownWatcher watcher = new CountdownWatcher(); - return createTestableClient(watcher, hp); - } - - private static TestableZooKeeper createTestableClient( - CountdownWatcher watcher, String hp) - throws IOException, TimeoutException, InterruptedException - { - TestableZooKeeper zk = new TestableZooKeeper( - hp, ClientBase.CONNECTION_TIMEOUT, watcher); - - watcher.waitForConnected(CONNECTION_TIMEOUT); - return zk; - } - - private void verifyState(QuorumUtil qu, int index, Leader leader) { - LOG.info("Verifying state"); - assertTrue("Not following", qu.getPeer(index).peer.follower != null); - long epochF = (qu.getPeer(index).peer.getActiveServer().getZxid() >> 32L); - long epochL = (leader.getEpoch() >> 32L); - assertTrue("Zxid: " + qu.getPeer(index).peer.getActiveServer().getZKDatabase().getDataTreeLastProcessedZxid() + - "Current epoch: " + epochF, epochF == epochL); - int leaderIndex = (index == 1) ? 2 : 1; - Collection<Long> sessionsRestarted = qu.getPeer(index).peer.getActiveServer().getZKDatabase().getSessions(); - Collection<Long> sessionsNotRestarted = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase().getSessions(); - - for(Long l : sessionsRestarted) { - assertTrue("Should have same set of sessions in both servers, did not expect: " + l, sessionsNotRestarted.contains(l)); - } - assertEquals("Should have same number of sessions", sessionsNotRestarted.size(), sessionsRestarted.size()); - ZKDatabase restarted = qu.getPeer(index).peer.getActiveServer().getZKDatabase(); - ZKDatabase clean = qu.getPeer(3).peer.getActiveServer().getZKDatabase(); - ZKDatabase lead = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase(); - for(Long l : sessionsRestarted) { - LOG.info("Validating ephemeral for session id 0x{}", Long.toHexString(l)); - assertTrue("Should have same set of sessions in both servers, did not expect: " + l, sessionsNotRestarted.contains(l)); - Set<String> ephemerals = restarted.getEphemerals(l); - Set<String> cleanEphemerals = clean.getEphemerals(l); - for(String o : cleanEphemerals) { - if(!ephemerals.contains(o)) { - LOG.info("Restarted follower doesn't contain ephemeral {} zxid 0x{}", - o, Long.toHexString(clean.getDataTree().getNode(o).stat.getMzxid())); - } - } - for(String o : ephemerals) { - if(!cleanEphemerals.contains(o)) { - LOG.info("Restarted follower has extra ephemeral {} zxid 0x{}", - o, Long.toHexString(restarted.getDataTree().getNode(o).stat.getMzxid())); - } - } - Set<String> leadEphemerals = lead.getEphemerals(l); - for(String o : leadEphemerals) { - if(!cleanEphemerals.contains(o)) { - LOG.info("Follower doesn't contain ephemeral from leader {} zxid 0x{}", - o, Long.toHexString(lead.getDataTree().getNode(o).stat.getMzxid())); - } - } - for(String o : cleanEphemerals) { - if(!leadEphemerals.contains(o)) { - LOG.info("Leader doesn't contain ephemeral from follower {} zxid 0x{}", - o, Long.toHexString(clean.getDataTree().getNode(o).stat.getMzxid())); - } - } - assertEquals("Should have same number of ephemerals in both followers", ephemerals.size(), cleanEphemerals.size()); - assertEquals("Leader should equal follower", lead.getEphemerals(l).size(), cleanEphemerals.size()); - } - } - - /** - * Verify that the server is sending the proper zxid. See ZOOKEEPER-1412. - */ - @Test - public void testFollowerSendsLastZxid() throws Exception { - QuorumUtil qu = new QuorumUtil(1); - qu.startAll(); - - int index = 1; - while(qu.getPeer(index).peer.follower == null) { - index++; - } - LOG.info("Connecting to follower: {}", index); - - TestableZooKeeper zk = - createTestableClient("localhost:" + qu.getPeer(index).peer.getClientPort()); - - assertEquals(0L, zk.testableLastZxid()); - zk.exists("/", false); - long lzxid = zk.testableLastZxid(); - assertTrue("lzxid:" + lzxid + " > 0", lzxid > 0); - zk.close(); - qu.shutdownAll(); - } - - private class MyWatcher extends CountdownWatcher { - LinkedBlockingQueue<WatchedEvent> events = - new LinkedBlockingQueue<WatchedEvent>(); - - public void process(WatchedEvent event) { - super.process(event); - if (event.getType() != Event.EventType.None) { - try { - events.put(event); - } catch (InterruptedException e) { - LOG.warn("ignoring interrupt during event.put"); - } - } - } - } - - /** - * Verify that the server is sending the proper zxid, and as a result - * the watch doesn't fire. See ZOOKEEPER-1412. - */ - @Test - public void testFollowerWatcherResync() throws Exception { - QuorumUtil qu = new QuorumUtil(1); - qu.startAll(); - - int index = 1; - while(qu.getPeer(index).peer.follower == null) { - index++; - } - LOG.info("Connecting to follower: {}", index); - - TestableZooKeeper zk1 = createTestableClient( - "localhost:" + qu.getPeer(index).peer.getClientPort()); - zk1.create("/foo", "foo".getBytes(), Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - - MyWatcher watcher = new MyWatcher(); - TestableZooKeeper zk2 = createTestableClient(watcher, - "localhost:" + qu.getPeer(index).peer.getClientPort()); - - zk2.exists("/foo", true); - - watcher.reset(); - zk2.testableConnloss(); - if (!watcher.clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) - { - fail("Unable to connect to server"); - } - assertArrayEquals("foo".getBytes(), zk2.getData("/foo", false, null)); - - assertNull(watcher.events.poll(5, TimeUnit.SECONDS)); - - zk1.close(); - zk2.close(); - qu.shutdownAll(); - } - -}