http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-common/src/test/java/org/apache/zookeeper/test/LENonTerminateTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-common/src/test/java/org/apache/zookeeper/test/LENonTerminateTest.java b/zookeeper-common/src/test/java/org/apache/zookeeper/test/LENonTerminateTest.java deleted file mode 100644 index 2bbf7b5..0000000 --- a/zookeeper-common/src/test/java/org/apache/zookeeper/test/LENonTerminateTest.java +++ /dev/null @@ -1,378 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.test; - -import java.io.File; -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetSocketAddress; -import java.net.SocketException; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.zookeeper.PortAssignment; -import org.apache.zookeeper.ZKTestCase; -import org.apache.zookeeper.server.ServerCnxnFactory; -import org.apache.zookeeper.server.quorum.Election; -import org.apache.zookeeper.server.quorum.FLELostMessageTest; -import org.apache.zookeeper.server.quorum.LeaderElection; -import org.apache.zookeeper.server.quorum.QuorumPeer; -import org.apache.zookeeper.server.quorum.Vote; -import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; -import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; -import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; -import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -@SuppressWarnings("deprecation") -public class LENonTerminateTest extends ZKTestCase { - public static class MockLeaderElection extends LeaderElection { - public MockLeaderElection(QuorumPeer self) { - super(self); - } - - /** - * Temporary for 3.3.0 - we want to ensure that a round of voting happens - * before any of the peers update their votes. The easiest way to do that - * is to add a latch that all wait on after counting their votes. - * - * In 3.4.0 we intend to make this class more testable, and therefore - * there should be much less duplicated code. - * - * JMX bean method calls are removed to reduce noise. - */ - public Vote lookForLeader() throws InterruptedException { - self.setCurrentVote(new Vote(self.getId(), - self.getLastLoggedZxid())); - // We are going to look for a leader by casting a vote for ourself - byte requestBytes[] = new byte[4]; - ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes); - byte responseBytes[] = new byte[28]; - ByteBuffer responseBuffer = ByteBuffer.wrap(responseBytes); - /* The current vote for the leader. Initially me! */ - DatagramSocket s = null; - try { - s = new DatagramSocket(); - s.setSoTimeout(200); - } catch (SocketException e1) { - LOG.error("Socket exception when creating socket for leader election", e1); - System.exit(4); - } - DatagramPacket requestPacket = new DatagramPacket(requestBytes, - requestBytes.length); - DatagramPacket responsePacket = new DatagramPacket(responseBytes, - responseBytes.length); - int xid = epochGen.nextInt(); - while (self.isRunning()) { - HashMap<InetSocketAddress, Vote> votes = - new HashMap<InetSocketAddress, Vote>(self.getVotingView().size()); - - requestBuffer.clear(); - requestBuffer.putInt(xid); - requestPacket.setLength(4); - HashSet<Long> heardFrom = new HashSet<Long>(); - for (QuorumServer server : - self.getVotingView().values()) - { - LOG.info("Server address: " + server.addr); - try { - requestPacket.setSocketAddress(server.addr); - } catch (IllegalArgumentException e) { - // Sun doesn't include the address that causes this - // exception to be thrown, so we wrap the exception - // in order to capture this critical detail. - throw new IllegalArgumentException( - "Unable to set socket address on packet, msg:" - + e.getMessage() + " with addr:" + server.addr, - e); - } - - try { - s.send(requestPacket); - responsePacket.setLength(responseBytes.length); - s.receive(responsePacket); - if (responsePacket.getLength() != responseBytes.length) { - LOG.error("Got a short response: " - + responsePacket.getLength()); - continue; - } - responseBuffer.clear(); - int recvedXid = responseBuffer.getInt(); - if (recvedXid != xid) { - LOG.error("Got bad xid: expected " + xid - + " got " + recvedXid); - continue; - } - long peerId = responseBuffer.getLong(); - heardFrom.add(peerId); - //if(server.id != peerId){ - Vote vote = new Vote(responseBuffer.getLong(), - responseBuffer.getLong()); - InetSocketAddress addr = - (InetSocketAddress) responsePacket - .getSocketAddress(); - votes.put(addr, vote); - //} - } catch (IOException e) { - LOG.warn("Ignoring exception while looking for leader", - e); - // Errors are okay, since hosts may be - // down - } - } - - ElectionResult result = countVotes(votes, heardFrom); - - /** - * This is the only difference from LeaderElection - wait for - * this latch on the first time through this method. This ensures - * that the first round of voting happens before setCurrentVote - * is called below. - */ - LOG.info("Waiting for first round of voting to complete"); - latch.countDown(); - Assert.assertTrue("Thread timed out waiting for latch", - latch.await(10000, TimeUnit.MILLISECONDS)); - - // ZOOKEEPER-569: - // If no votes are received for live peers, reset to voting - // for ourselves as otherwise we may hang on to a vote - // for a dead peer - if (result.numValidVotes == 0) { - self.setCurrentVote(new Vote(self.getId(), - self.getLastLoggedZxid())); - } else { - if (result.winner.getId() >= 0) { - self.setCurrentVote(result.vote); - // To do: this doesn't use a quorum verifier - if (result.winningCount > (self.getVotingView().size() / 2)) { - self.setCurrentVote(result.winner); - s.close(); - Vote current = self.getCurrentVote(); - LOG.info("Found leader: my type is: " + self.getLearnerType()); - /* - * We want to make sure we implement the state machine - * correctly. If we are a PARTICIPANT, once a leader - * is elected we can move either to LEADING or - * FOLLOWING. However if we are an OBSERVER, it is an - * error to be elected as a Leader. - */ - if (self.getLearnerType() == LearnerType.OBSERVER) { - if (current.getId() == self.getId()) { - // This should never happen! - LOG.error("OBSERVER elected as leader!"); - Thread.sleep(100); - } - else { - self.setPeerState(ServerState.OBSERVING); - Thread.sleep(100); - return current; - } - } else { - self.setPeerState((current.getId() == self.getId()) - ? ServerState.LEADING: ServerState.FOLLOWING); - if (self.getPeerState() == ServerState.FOLLOWING) { - Thread.sleep(100); - } - return current; - } - } - } - } - Thread.sleep(1000); - } - return null; - } - } - - public static class MockQuorumPeer extends QuorumPeer { - public MockQuorumPeer(Map<Long,QuorumServer> quorumPeers, File snapDir, - File logDir, int clientPort, int electionAlg, - long myid, int tickTime, int initLimit, int syncLimit) - throws IOException - { - super(quorumPeers, snapDir, logDir, electionAlg, - myid,tickTime, initLimit,syncLimit, false, - ServerCnxnFactory.createFactory(clientPort, -1), - new QuorumMaj(quorumPeers)); - } - - protected Election createElectionAlgorithm(int electionAlgorithm){ - LOG.info("Returning mocked leader election"); - return new MockLeaderElection(this); - } - } - - - protected static final Logger LOG = LoggerFactory.getLogger(FLELostMessageTest.class); - - int count; - HashMap<Long,QuorumServer> peers; - File tmpdir[]; - int port[]; - - @Before - public void setUp() throws Exception { - count = 3; - - peers = new HashMap<Long,QuorumServer>(count); - tmpdir = new File[count]; - port = new int[count]; - } - - static final CountDownLatch latch = new CountDownLatch(2); - static final CountDownLatch mockLatch = new CountDownLatch(1); - - private static class LEThread extends Thread { - private int i; - private QuorumPeer peer; - - LEThread(QuorumPeer peer, int i) { - this.i = i; - this.peer = peer; - LOG.info("Constructor: " + getName()); - - } - - public void run(){ - try{ - Vote v = null; - peer.setPeerState(ServerState.LOOKING); - LOG.info("Going to call leader election: " + i); - v = peer.getElectionAlg().lookForLeader(); - - if (v == null){ - Assert.fail("Thread " + i + " got a null vote"); - } - - /* - * A real zookeeper would take care of setting the current vote. Here - * we do it manually. - */ - peer.setCurrentVote(v); - - LOG.info("Finished election: " + i + ", " + v.getId()); - } catch (Exception e) { - e.printStackTrace(); - } - LOG.info("Joining"); - } - } - - /** - * This tests ZK-569. - * With three peers A, B and C, the following could happen: - * 1. Round 1, A,B and C all vote for themselves - * 2. Round 2, C dies, A and B vote for C - * 3. Because C has died, votes for it are ignored, but A and B never - * reset their votes. Hence LE never terminates. ZK-569 fixes this by - * resetting votes to themselves if the set of votes for live peers is null. - */ - @Test - public void testNonTermination() throws Exception { - LOG.info("TestNonTermination: " + getTestName()+ ", " + count); - for(int i = 0; i < count; i++) { - int clientport = PortAssignment.unique(); - peers.put(Long.valueOf(i), - new QuorumServer(i, - new InetSocketAddress("127.0.0.1", clientport), - new InetSocketAddress("127.0.0.1", PortAssignment.unique()))); - tmpdir[i] = ClientBase.createTmpDir(); - port[i] = clientport; - } - - /* - * peer1 and peer2 are A and B in the above example. - */ - QuorumPeer peer1 = new MockQuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 0, 0, 2, 2, 2); - peer1.startLeaderElection(); - LEThread thread1 = new LEThread(peer1, 0); - - QuorumPeer peer2 = new MockQuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 0, 1, 2, 2, 2); - peer2.startLeaderElection(); - LEThread thread2 = new LEThread(peer2, 1); - - /* - * Start mock server. - */ - Thread thread3 = new Thread() { - public void run() { - try { - mockServer(); - } catch (Exception e) { - LOG.error("exception", e); - Assert.fail("Exception when running mocked server " + e); - } - } - }; - - thread3.start(); - Assert.assertTrue("mockServer did not start in 5s", - mockLatch.await(5000, TimeUnit.MILLISECONDS)); - thread1.start(); - thread2.start(); - /* - * Occasionally seen false negatives with a 5s timeout. - */ - thread1.join(15000); - thread2.join(15000); - thread3.join(15000); - if (thread1.isAlive() || thread2.isAlive() || thread3.isAlive()) { - Assert.fail("Threads didn't join"); - } - } - - /** - * MockServer plays the role of peer C. Respond to two requests for votes - * with vote for self and then Assert.fail. - */ - void mockServer() throws InterruptedException, IOException { - byte b[] = new byte[36]; - ByteBuffer responseBuffer = ByteBuffer.wrap(b); - DatagramPacket packet = new DatagramPacket(b, b.length); - QuorumServer server = peers.get(Long.valueOf(2)); - DatagramSocket udpSocket = new DatagramSocket(server.addr.getPort()); - LOG.info("In MockServer"); - mockLatch.countDown(); - Vote current = new Vote(2, 1); - for (int i=0;i<2;++i) { - udpSocket.receive(packet); - responseBuffer.rewind(); - LOG.info("Received " + responseBuffer.getInt() + " " + responseBuffer.getLong() + " " + responseBuffer.getLong()); - LOG.info("From " + packet.getSocketAddress()); - responseBuffer.clear(); - responseBuffer.getInt(); // Skip the xid - responseBuffer.putLong(2); - - responseBuffer.putLong(current.getId()); - responseBuffer.putLong(current.getZxid()); - packet.setData(b); - udpSocket.send(packet); - } - } -}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-common/src/test/java/org/apache/zookeeper/test/LETest.java ---------------------------------------------------------------------- diff --git a/zookeeper-common/src/test/java/org/apache/zookeeper/test/LETest.java b/zookeeper-common/src/test/java/org/apache/zookeeper/test/LETest.java deleted file mode 100644 index c6b1833..0000000 --- a/zookeeper-common/src/test/java/org/apache/zookeeper/test/LETest.java +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.test; - -import java.io.File; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Random; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.zookeeper.PortAssignment; -import org.apache.zookeeper.ZKTestCase; -import org.apache.zookeeper.server.quorum.LeaderElection; -import org.apache.zookeeper.server.quorum.QuorumPeer; -import org.apache.zookeeper.server.quorum.Vote; -import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Test; - -@SuppressWarnings("deprecation") -public class LETest extends ZKTestCase { - private static final Logger LOG = LoggerFactory.getLogger(LETest.class); - volatile Vote votes[]; - volatile boolean leaderDies; - volatile long leader = -1; - Random rand = new Random(); - class LEThread extends Thread { - LeaderElection le; - int i; - QuorumPeer peer; - LEThread(LeaderElection le, QuorumPeer peer, int i) { - this.le = le; - this.i = i; - this.peer = peer; - } - public void run() { - try { - Vote v = null; - while(true) { - v = le.lookForLeader(); - votes[i] = v; - if (v.getId() == i) { - synchronized(LETest.this) { - if (leaderDies) { - leaderDies = false; - peer.stopLeaderElection(); - LOG.info("Leader " + i + " dying"); - leader = -2; - } else { - leader = i; - } - LETest.this.notifyAll(); - } - break; - } - synchronized(LETest.this) { - if (leader == -1) { - LETest.this.wait(); - } - if (leader == v.getId()) { - break; - } - } - Thread.sleep(rand.nextInt(1000)); - peer.setCurrentVote(new Vote(peer.getId(), 0)); - } - LOG.info("Thread " + i + " votes " + v); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - - @Test - @Ignore("ZOOKEEPER-1932, this test is flaky and already removed in master") - public void testLE() throws Exception { - int count = 30; - HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>(count); - ArrayList<LEThread> threads = new ArrayList<LEThread>(count); - File tmpdir[] = new File[count]; - int port[] = new int[count]; - votes = new Vote[count]; - for(int i = 0; i < count; i++) { - peers.put(Long.valueOf(i), - new QuorumServer(i, - new InetSocketAddress("127.0.0.1", - PortAssignment.unique()))); - tmpdir[i] = ClientBase.createTmpDir(); - port[i] = PortAssignment.unique(); - } - LeaderElection le[] = new LeaderElection[count]; - leaderDies = true; - boolean allowOneBadLeader = leaderDies; - for(int i = 0; i < le.length; i++) { - QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], - port[i], 0, i, 1000, 2, 2); - peer.startLeaderElection(); - le[i] = new LeaderElection(peer); - LEThread thread = new LEThread(le[i], peer, i); - thread.start(); - threads.add(thread); - } - for(int i = 0; i < threads.size(); i++) { - threads.get(i).join(15000); - if (threads.get(i).isAlive()) { - Assert.fail("Threads didn't join"); - } - } - long id = votes[0].getId(); - for(int i = 1; i < votes.length; i++) { - if (votes[i] == null) { - Assert.fail("Thread " + i + " had a null vote"); - } - if (votes[i].getId() != id) { - if (allowOneBadLeader && votes[i].getId() == i) { - allowOneBadLeader = false; - } else { - Assert.fail("Thread " + i + " got " + votes[i].getId() + " expected " + id); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-common/src/test/java/org/apache/zookeeper/test/LeaderSessionTrackerTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-common/src/test/java/org/apache/zookeeper/test/LeaderSessionTrackerTest.java b/zookeeper-common/src/test/java/org/apache/zookeeper/test/LeaderSessionTrackerTest.java deleted file mode 100644 index a45b9cb..0000000 --- a/zookeeper-common/src/test/java/org/apache/zookeeper/test/LeaderSessionTrackerTest.java +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.test; - - -import java.io.ByteArrayOutputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; - -import org.apache.jute.BinaryOutputArchive; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.ZKTestCase; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.ZooDefs.OpCode; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Id; -import org.apache.zookeeper.data.Stat; -import org.apache.zookeeper.proto.CreateRequest; -import org.apache.zookeeper.server.Request; -import org.apache.zookeeper.server.quorum.QuorumPeer; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Due to race condition or bad client code, the leader may get request from - * expired session. We need to make sure that we never allow ephmeral node - * to be created in those case, but we do allow normal node to be created. - */ -public class LeaderSessionTrackerTest extends ZKTestCase { - - protected static final Logger LOG = LoggerFactory - .getLogger(LeaderSessionTrackerTest.class); - - QuorumUtil qu; - - @Before - public void setUp() throws Exception { - qu = new QuorumUtil(1); - } - - @After - public void tearDown() throws Exception { - qu.shutdownAll(); - } - - @Test - public void testExpiredSessionWithLocalSession() throws Exception { - testCreateEphemeral(true); - } - - @Test - public void testExpiredSessionWithoutLocalSession() throws Exception { - testCreateEphemeral(false); - } - - /** - * When we create ephemeral node, we need to check against global - * session, so the leader never accept request from an expired session - * (that we no longer track) - * - * This is not the same as SessionInvalidationTest since session - * is not in closing state - */ - public void testCreateEphemeral(boolean localSessionEnabled) throws Exception { - if (localSessionEnabled) { - qu.enableLocalSession(true); - } - qu.startAll(); - QuorumPeer leader = qu.getLeaderQuorumPeer(); - - ZooKeeper zk = ClientBase.createZKClient(qu.getConnectString(leader)); - - CreateRequest createRequest = new CreateRequest("/impossible", - new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL.toFlag()); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); - createRequest.serialize(boa, "request"); - ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); - - // Mimic sessionId generated by follower's local session tracker - long sid = qu.getFollowerQuorumPeers().get(0).getActiveServer() - .getServerId(); - long fakeSessionId = (sid << 56) + 1; - - LOG.info("Fake session Id: " + Long.toHexString(fakeSessionId)); - - Request request = new Request(null, fakeSessionId, 0, OpCode.create, - bb, new ArrayList<Id>()); - - // Submit request directly to leader - leader.getActiveServer().submitRequest(request); - - // Make sure that previous request is finished - zk.create("/ok", new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - - Stat stat = zk.exists("/impossible", null); - Assert.assertEquals("Node from fake session get created", null, stat); - - } - - /** - * When local session is enabled, leader will allow persistent node - * to be create for unknown session - */ - @Test - public void testCreatePersistent() throws Exception { - qu.enableLocalSession(true); - qu.startAll(); - - QuorumPeer leader = qu.getLeaderQuorumPeer(); - - ZooKeeper zk = ClientBase.createZKClient(qu.getConnectString(leader)); - - CreateRequest createRequest = new CreateRequest("/success", - new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT.toFlag()); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); - createRequest.serialize(boa, "request"); - ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); - - // Mimic sessionId generated by follower's local session tracker - long sid = qu.getFollowerQuorumPeers().get(0).getActiveServer() - .getServerId(); - long locallSession = (sid << 56) + 1; - - LOG.info("Local session Id: " + Long.toHexString(locallSession)); - - Request request = new Request(null, locallSession, 0, OpCode.create, - bb, new ArrayList<Id>()); - - // Submit request directly to leader - leader.getActiveServer().submitRequest(request); - - // Make sure that previous request is finished - zk.create("/ok", new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - - Stat stat = zk.exists("/success", null); - Assert.assertTrue("Request from local sesson failed", stat != null); - - } -} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-common/src/test/java/org/apache/zookeeper/test/LoadFromLogNoServerTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-common/src/test/java/org/apache/zookeeper/test/LoadFromLogNoServerTest.java b/zookeeper-common/src/test/java/org/apache/zookeeper/test/LoadFromLogNoServerTest.java deleted file mode 100644 index 4d56f60..0000000 --- a/zookeeper-common/src/test/java/org/apache/zookeeper/test/LoadFromLogNoServerTest.java +++ /dev/null @@ -1,176 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.test; - -import org.apache.jute.BinaryInputArchive; -import org.apache.jute.BinaryOutputArchive; -import org.apache.jute.Record; -import org.apache.zookeeper.ZKTestCase; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.common.Time; -import org.apache.zookeeper.server.DataNode; -import org.apache.zookeeper.server.DataTree; -import org.apache.zookeeper.server.persistence.FileHeader; -import org.apache.zookeeper.server.persistence.FileTxnLog; -import org.apache.zookeeper.server.persistence.FileTxnSnapLog; -import org.apache.zookeeper.txn.CreateTxn; -import org.apache.zookeeper.txn.DeleteTxn; -import org.apache.zookeeper.txn.MultiTxn; -import org.apache.zookeeper.txn.Txn; -import org.apache.zookeeper.txn.TxnHeader; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -public class LoadFromLogNoServerTest extends ZKTestCase { - protected static final Logger LOG = LoggerFactory.getLogger(LoadFromLogNoServerTest.class); - - /** - * For ZOOKEEPER-1046. Verify if cversion and pzxid if incremented - * after create/delete failure during restore. - */ - @Test - public void testTxnFailure() throws Exception { - long count = 1; - File tmpDir = ClientBase.createTmpDir(); - FileTxnSnapLog logFile = new FileTxnSnapLog(tmpDir, tmpDir); - DataTree dt = new DataTree(); - dt.createNode("/test", new byte[0], null, 0, -1, 1, 1); - for (count = 1; count <= 3; count++) { - dt.createNode("/test/" + count, new byte[0], null, 0, -1, count, - Time.currentElapsedTime()); - } - DataNode zk = dt.getNode("/test"); - - // Make create to fail, then verify cversion. - LOG.info("Attempting to create " + "/test/" + (count - 1)); - doOp(logFile, ZooDefs.OpCode.create, "/test/" + (count - 1), dt, zk, -1); - - LOG.info("Attempting to create " + "/test/" + (count - 1)); - doOp(logFile, ZooDefs.OpCode.create, "/test/" + (count - 1), dt, zk, - zk.stat.getCversion() + 1); - - LOG.info("Attempting to create " + "/test/" + (count - 1)); - doOp(logFile, ZooDefs.OpCode.multi, "/test/" + (count - 1), dt, zk, - zk.stat.getCversion() + 1); - - LOG.info("Attempting to create " + "/test/" + (count - 1)); - doOp(logFile, ZooDefs.OpCode.multi, "/test/" + (count - 1), dt, zk, - -1); - - // Make delete fo fail, then verify cversion. - // this doesn't happen anymore, we only set the cversion on create - // LOG.info("Attempting to delete " + "/test/" + (count + 1)); - // doOp(logFile, OpCode.delete, "/test/" + (count + 1), dt, zk); - } - - /* - * Does create/delete depending on the type and verifies - * if cversion before the operation is 1 less than cversion afer. - */ - private void doOp(FileTxnSnapLog logFile, int type, String path, - DataTree dt, DataNode parent, int cversion) throws Exception { - int lastSlash = path.lastIndexOf('/'); - String parentName = path.substring(0, lastSlash); - - int prevCversion = parent.stat.getCversion(); - long prevPzxid = parent.stat.getPzxid(); - List<String> child = dt.getChildren(parentName, null, null); - StringBuilder childStr = new StringBuilder(); - for (String s : child) { - childStr.append(s).append(" "); - } - LOG.info("Children: " + childStr + " for " + parentName); - LOG.info("(cverions, pzxid): " + prevCversion + ", " + prevPzxid); - - Record txn = null; - TxnHeader txnHeader = null; - if (type == ZooDefs.OpCode.delete) { - txn = new DeleteTxn(path); - txnHeader = new TxnHeader(0xabcd, 0x123, prevPzxid + 1, - Time.currentElapsedTime(), ZooDefs.OpCode.delete); - } else if (type == ZooDefs.OpCode.create) { - txnHeader = new TxnHeader(0xabcd, 0x123, prevPzxid + 1, - Time.currentElapsedTime(), ZooDefs.OpCode.create); - txn = new CreateTxn(path, new byte[0], null, false, cversion); - } - else if (type == ZooDefs.OpCode.multi) { - txnHeader = new TxnHeader(0xabcd, 0x123, prevPzxid + 1, - Time.currentElapsedTime(), ZooDefs.OpCode.create); - txn = new CreateTxn(path, new byte[0], null, false, cversion); - List<Txn> txnList = new ArrayList<Txn>(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); - txn.serialize(boa, "request") ; - ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); - Txn txact = new Txn(ZooDefs.OpCode.create, bb.array()); - txnList.add(txact); - txn = new MultiTxn(txnList); - txnHeader = new TxnHeader(0xabcd, 0x123, prevPzxid + 1, - Time.currentElapsedTime(), ZooDefs.OpCode.multi); - } - logFile.processTransaction(txnHeader, dt, null, txn); - - int newCversion = parent.stat.getCversion(); - long newPzxid = parent.stat.getPzxid(); - child = dt.getChildren(parentName, null, null); - childStr = new StringBuilder(); - for (String s : child) { - childStr.append(s).append(" "); - } - LOG.info("Children: " + childStr + " for " + parentName); - LOG.info("(cverions, pzxid): " +newCversion + ", " + newPzxid); - Assert.assertTrue(type + " <cversion, pzxid> verification failed. Expected: <" + - (prevCversion + 1) + ", " + (prevPzxid + 1) + ">, found: <" + - newCversion + ", " + newPzxid + ">", - (newCversion == prevCversion + 1 && newPzxid == prevPzxid + 1)); - } - - /** - * Simulates ZOOKEEPER-1069 and verifies that flush() before padLogFile - * fixes it. - */ - @Test - public void testPad() throws Exception { - File tmpDir = ClientBase.createTmpDir(); - FileTxnLog txnLog = new FileTxnLog(tmpDir); - TxnHeader txnHeader = new TxnHeader(0xabcd, 0x123, 0x123, - Time.currentElapsedTime(), ZooDefs.OpCode.create); - Record txn = new CreateTxn("/Test", new byte[0], null, false, 1); - txnLog.append(txnHeader, txn); - FileInputStream in = new FileInputStream(tmpDir.getPath() + "/log." + - Long.toHexString(txnHeader.getZxid())); - BinaryInputArchive ia = BinaryInputArchive.getArchive(in); - FileHeader header = new FileHeader(); - header.deserialize(ia, "fileheader"); - LOG.info("Received magic : " + header.getMagic() + - " Expected : " + FileTxnLog.TXNLOG_MAGIC); - Assert.assertTrue("Missing magic number ", - header.getMagic() == FileTxnLog.TXNLOG_MAGIC); - } - -} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-common/src/test/java/org/apache/zookeeper/test/LoadFromLogTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-common/src/test/java/org/apache/zookeeper/test/LoadFromLogTest.java b/zookeeper-common/src/test/java/org/apache/zookeeper/test/LoadFromLogTest.java deleted file mode 100644 index 90de755..0000000 --- a/zookeeper-common/src/test/java/org/apache/zookeeper/test/LoadFromLogTest.java +++ /dev/null @@ -1,310 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.test; - -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException.NoNodeException; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; -import org.apache.zookeeper.server.SyncRequestProcessor; -import org.apache.zookeeper.server.ZooKeeperServer; -import org.apache.zookeeper.server.persistence.FileTxnLog; -import org.apache.zookeeper.server.persistence.FileTxnLog.FileTxnIterator; -import org.apache.zookeeper.server.persistence.FileTxnSnapLog; -import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator; -import org.apache.zookeeper.server.persistence.Util; -import org.apache.zookeeper.txn.TxnHeader; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; - -public class LoadFromLogTest extends ClientBase { - private static final int NUM_MESSAGES = 300; - protected static final Logger LOG = LoggerFactory.getLogger(LoadFromLogTest.class); - - // setting up the quorum has a transaction overhead for creating and closing the session - private static final int TRANSACTION_OVERHEAD = 2; - private static final int TOTAL_TRANSACTIONS = NUM_MESSAGES + TRANSACTION_OVERHEAD; - - @Before - public void setUp() throws Exception { - SyncRequestProcessor.setSnapCount(50); - super.setUp(); - } - - /** - * test that all transactions from the Log are loaded, and only once - * @throws Exception an exception might be thrown here - */ - @Test - public void testLoad() throws Exception { - // generate some transactions that will get logged - ZooKeeper zk = createZKClient(hostPort); - try { - for (int i = 0; i< NUM_MESSAGES; i++) { - zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } - } finally { - zk.close(); - } - stopServer(); - - // now verify that the FileTxnLog reads every transaction only once - File logDir = new File(tmpDir, FileTxnSnapLog.version + FileTxnSnapLog.VERSION); - FileTxnLog txnLog = new FileTxnLog(logDir); - TxnIterator itr = txnLog.read(0); - - // Check that storage space return some value - FileTxnIterator fileItr = (FileTxnIterator) itr; - long storageSize = fileItr.getStorageSize(); - LOG.info("Txnlog size: " + storageSize + " bytes"); - Assert.assertTrue("Storage size is greater than zero ", - (storageSize > 0)); - - long expectedZxid = 0; - long lastZxid = 0; - TxnHeader hdr; - do { - hdr = itr.getHeader(); - expectedZxid++; - Assert.assertTrue("not the same transaction. lastZxid=" + lastZxid + ", zxid=" + hdr.getZxid(), lastZxid != hdr.getZxid()); - Assert.assertTrue("excepting next transaction. expected=" + expectedZxid + ", retreived=" + hdr.getZxid(), (hdr.getZxid() == expectedZxid)); - lastZxid = hdr.getZxid(); - }while(itr.next()); - - Assert.assertTrue("processed all transactions. " + expectedZxid + " == " + TOTAL_TRANSACTIONS, (expectedZxid == TOTAL_TRANSACTIONS)); - } - - /** - * test that we fail to load txnlog of a request zxid that is older - * than what exist on disk - * @throws Exception an exception might be thrown here - */ - @Test - public void testLoadFailure() throws Exception { - // generate some transactions that will get logged - ZooKeeper zk = createZKClient(hostPort); - try { - for (int i = 0; i< NUM_MESSAGES; i++) { - zk.create("/data-", new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT_SEQUENTIAL); - } - } finally { - zk.close(); - } - stopServer(); - - File logDir = new File(tmpDir, FileTxnSnapLog.version + FileTxnSnapLog.VERSION); - File[] logFiles = FileTxnLog.getLogFiles(logDir.listFiles(), 0); - // Verify that we have at least NUM_MESSAGES / SNAPCOUNT txnlog - Assert.assertTrue(logFiles.length > NUM_MESSAGES / 100); - // Delete the first log file, so we will fail to read it back from disk - Assert.assertTrue("delete the first log file", logFiles[0].delete()); - - // Find zxid for the second log - long secondStartZxid = Util.getZxidFromName(logFiles[1].getName(), "log"); - - FileTxnLog txnLog = new FileTxnLog(logDir); - TxnIterator itr = txnLog.read(1, false); - - // Oldest log is already remove, so this should point to the start of - // of zxid on the second log - Assert.assertEquals(secondStartZxid, itr.getHeader().getZxid()); - - itr = txnLog.read(secondStartZxid, false); - Assert.assertEquals(secondStartZxid, itr.getHeader().getZxid()); - Assert.assertTrue(itr.next()); - - // Trying to get a second txn on second txnlog give us the - // the start of second log, since the first one is removed - long nextZxid = itr.getHeader().getZxid(); - - itr = txnLog.read(nextZxid, false); - Assert.assertEquals(secondStartZxid, itr.getHeader().getZxid()); - - // Trying to get a first txn on the third give us the - // the start of second log, since the first one is removed - long thirdStartZxid = Util.getZxidFromName(logFiles[2].getName(), "log"); - itr = txnLog.read(thirdStartZxid, false); - Assert.assertEquals(secondStartZxid, itr.getHeader().getZxid()); - Assert.assertTrue(itr.next()); - - nextZxid = itr.getHeader().getZxid(); - itr = txnLog.read(nextZxid, false); - Assert.assertEquals(secondStartZxid, itr.getHeader().getZxid()); - } - - /** - * Test we can restore the snapshot that has data ahead of the zxid - * of the snapshot file. - */ - @Test - public void testRestore() throws Exception { - // generate some transactions - ZooKeeper zk = createZKClient(hostPort); - String lastPath = null; - try { - zk.create("/invalidsnap", new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - for (int i = 0; i < NUM_MESSAGES; i++) { - lastPath = zk.create("/invalidsnap/test-", new byte[0], - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); - } - } finally { - zk.close(); - } - String[] tokens = lastPath.split("-"); - String expectedPath = "/invalidsnap/test-" - + String.format("%010d", - (new Integer(tokens[1])).intValue() + 1); - ZooKeeperServer zks = getServer(serverFactory); - long eZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid(); - // force the zxid to be behind the content - zks.getZKDatabase().setlastProcessedZxid( - zks.getZKDatabase().getDataTreeLastProcessedZxid() - 10); - LOG.info("Set lastProcessedZxid to " - + zks.getZKDatabase().getDataTreeLastProcessedZxid()); - // Force snapshot and restore - zks.takeSnapshot(); - zks.shutdown(); - stopServer(); - - startServer(); - zks = getServer(serverFactory); - long fZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid(); - - // Verify lastProcessedZxid is set correctly - Assert.assertTrue("Restore failed expected zxid=" + eZxid + " found=" - + fZxid, fZxid == eZxid); - zk = createZKClient(hostPort); - - // Verify correctness of data and whether sequential znode creation - // proceeds correctly after this point - String[] children; - String path; - try { - children = zk.getChildren("/invalidsnap", false).toArray( - new String[0]); - path = zk.create("/invalidsnap/test-", new byte[0], - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); - } finally { - zk.close(); - } - LOG.info("Expected " + expectedPath + " found " + path); - Assert.assertTrue("Error in sequential znode creation expected " - + expectedPath + " found " + path, path.equals(expectedPath)); - Assert.assertTrue("Unexpected number of children " + children.length - + " expected " + NUM_MESSAGES, - (children.length == NUM_MESSAGES)); - } - - /** - * Test we can restore a snapshot that has errors and data ahead of the zxid - * of the snapshot file. - */ - @Test - public void testRestoreWithTransactionErrors() throws Exception { - // generate some transactions - ZooKeeper zk = createZKClient(hostPort); - try { - for (int i = 0; i < NUM_MESSAGES; i++) { - try { - zk.create("/invaliddir/test-", new byte[0], - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); - } catch(NoNodeException e) { - //Expected - } - } - } finally { - zk.close(); - } - - // force the zxid to be behind the content - ZooKeeperServer zks = getServer(serverFactory); - zks.getZKDatabase().setlastProcessedZxid( - zks.getZKDatabase().getDataTreeLastProcessedZxid() - 10); - LOG.info("Set lastProcessedZxid to " - + zks.getZKDatabase().getDataTreeLastProcessedZxid()); - - // Force snapshot and restore - zks.takeSnapshot(); - zks.shutdown(); - stopServer(); - - zks = new ZooKeeperServer(tmpDir, tmpDir, 3000); - startServer(); - } - - /** - * Verify snap/log dir create with/without autocreate enabled. - */ - @Test - public void testDatadirAutocreate() throws Exception { - stopServer(); - - try { - // now verify autocreate off works - System.setProperty(FileTxnSnapLog.ZOOKEEPER_DATADIR_AUTOCREATE, "false"); - tmpDir = createTmpDir(); - startServer(); - Assert.fail("Server should not have started without datadir"); - } catch (IOException e) { - LOG.info("Server failed to start - correct behavior " + e); - } finally { - System.setProperty(FileTxnSnapLog.ZOOKEEPER_DATADIR_AUTOCREATE, - FileTxnSnapLog.ZOOKEEPER_DATADIR_AUTOCREATE_DEFAULT); - } - } - - /** - * ZOOKEEPER-1573: test restoring a snapshot with deleted txns ahead of the - * snapshot file's zxid. - */ - @Test - public void testReloadSnapshotWithMissingParent() throws Exception { - // create transactions to create the snapshot with create/delete pattern - ZooKeeper zk = createZKClient(hostPort); - zk.create("/a", "".getBytes(), Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - Stat stat = zk.exists("/a", false); - long createZxId = stat.getMzxid(); - zk.create("/a/b", "".getBytes(), Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - zk.delete("/a/b", -1); - zk.delete("/a", -1); - // force the zxid to be behind the content - ZooKeeperServer zks = getServer(serverFactory); - zks.getZKDatabase().setlastProcessedZxid(createZxId); - LOG.info("Set lastProcessedZxid to {}", zks.getZKDatabase() - .getDataTreeLastProcessedZxid()); - // Force snapshot and restore - zks.takeSnapshot(); - zks.shutdown(); - stopServer(); - - startServer(); - } -} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-common/src/test/java/org/apache/zookeeper/test/LocalSessionRequestTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-common/src/test/java/org/apache/zookeeper/test/LocalSessionRequestTest.java b/zookeeper-common/src/test/java/org/apache/zookeeper/test/LocalSessionRequestTest.java deleted file mode 100644 index 603cd15..0000000 --- a/zookeeper-common/src/test/java/org/apache/zookeeper/test/LocalSessionRequestTest.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.test; - -import org.apache.zookeeper.ZKTestCase; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.server.TraceFormatter; -import org.apache.zookeeper.server.ZKDatabase; -import org.apache.zookeeper.server.quorum.Leader.Proposal; -import org.apache.zookeeper.server.quorum.QuorumPeer; -import org.apache.zookeeper.test.ClientBase.CountdownWatcher; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Validate that open/close session request of a local session to not propagate - * to other machines in the quorum. We verify this by checking that - * these request doesn't show up in committedLog on other machines. - */ -public class LocalSessionRequestTest extends ZKTestCase { - protected static final Logger LOG = LoggerFactory - .getLogger(LocalSessionRequestTest.class); - // Need to be short since we need to wait for session to expire - public static final int CONNECTION_TIMEOUT = 4000; - - private final QuorumBase qb = new QuorumBase(); - - @Before - public void setUp() throws Exception { - LOG.info("STARTING quorum " + getClass().getName()); - qb.localSessionsEnabled = true; - qb.localSessionsUpgradingEnabled = true; - qb.setUp(); - ClientBase.waitForServerUp(qb.hostPort, 10000); - } - - @After - public void tearDown() throws Exception { - LOG.info("STOPPING quorum " + getClass().getName()); - qb.tearDown(); - } - - @Test - public void testLocalSessionsOnFollower() throws Exception { - testOpenCloseSession(false); - } - - @Test - public void testLocalSessionsOnLeader() throws Exception { - testOpenCloseSession(true); - } - - /** - * Walk through the target peer commmittedLog. - * @param sessionId - * @param peerId - */ - private void validateRequestLog(long sessionId, int peerId) { - String session = Long.toHexString(sessionId); - LOG.info("Searching for txn of session 0x " + session + - " on peer " + peerId); - String peerType = peerId == qb.getLeaderIndex() ? "leader" : "follower"; - QuorumPeer peer = qb.getPeerList().get(peerId); - ZKDatabase db = peer.getActiveServer().getZKDatabase(); - for (Proposal p : db.getCommittedLog()) { - Assert.assertFalse("Should not see " + - TraceFormatter.op2String(p.request.type) + - " request from local session 0x" + session + - " on the " + peerType, - p.request.sessionId == sessionId); - } - } - - /** - * Test that a CloseSession request generated by both the server (client - * disconnect) or by the client (client explicitly issue close()) doesn't - * get committed by the ensemble - */ - public void testOpenCloseSession(boolean onLeader) throws Exception { - int leaderIdx = qb.getLeaderIndex(); - Assert.assertFalse("No leader in quorum?", leaderIdx == -1); - int followerIdx = (leaderIdx + 1) % 5; - int testPeerIdx = onLeader ? leaderIdx : followerIdx; - int verifyPeerIdx = onLeader ? followerIdx : leaderIdx; - - String hostPorts[] = qb.hostPort.split(","); - - CountdownWatcher watcher = new CountdownWatcher(); - DisconnectableZooKeeper client = new DisconnectableZooKeeper( - hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher); - watcher.waitForConnected(CONNECTION_TIMEOUT); - - long localSessionId1 = client.getSessionId(); - - // Cut the connection, so the server will create closeSession as part - // of expiring the session. - client.dontReconnect(); - client.disconnect(); - watcher.reset(); - - // We don't validate right away, will do another session create first - - ZooKeeper zk = qb.createClient(watcher, hostPorts[testPeerIdx], - CONNECTION_TIMEOUT); - watcher.waitForConnected(CONNECTION_TIMEOUT); - - long localSessionId2 = zk.getSessionId(); - - // Send closeSession request. - zk.close(); - watcher.reset(); - - // This should be enough time for the first session to expire and for - // the closeSession request to propagate to other machines (if there is a bug) - // Since it is time sensitive, we have false negative when test - // machine is under load - Thread.sleep(CONNECTION_TIMEOUT * 2); - - // Validate that we don't see any txn from the first session - validateRequestLog(localSessionId1, verifyPeerIdx); - - // Validate that we don't see any txn from the second session - validateRequestLog(localSessionId2, verifyPeerIdx); - - qb.shutdownServers(); - - } -} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-common/src/test/java/org/apache/zookeeper/test/LocalSessionsOnlyTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-common/src/test/java/org/apache/zookeeper/test/LocalSessionsOnlyTest.java b/zookeeper-common/src/test/java/org/apache/zookeeper/test/LocalSessionsOnlyTest.java deleted file mode 100644 index 3e240e0..0000000 --- a/zookeeper-common/src/test/java/org/apache/zookeeper/test/LocalSessionsOnlyTest.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.test; - -import java.util.HashMap; -import java.util.Map.Entry; - -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZKTestCase; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.test.ClientBase.CountdownWatcher; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Tests learners configured to use local sessions only. Expected - * behavior is that sessions created on the learner will never be - * made global. Operations requiring a global session (e.g. - * creation of ephemeral nodes) will fail with an error. - */ -public class LocalSessionsOnlyTest extends ZKTestCase { - protected static final Logger LOG = LoggerFactory - .getLogger(LocalSessionsOnlyTest.class); - public static final int CONNECTION_TIMEOUT = ClientBase.CONNECTION_TIMEOUT; - - private final QuorumBase qb = new QuorumBase(); - - @Before - public void setUp() throws Exception { - LOG.info("STARTING quorum " + getClass().getName()); - qb.localSessionsEnabled = true; - qb.localSessionsUpgradingEnabled = false; - qb.setUp(); - ClientBase.waitForServerUp(qb.hostPort, 10000); - } - - @After - public void tearDown() throws Exception { - LOG.info("STOPPING quorum " + getClass().getName()); - qb.tearDown(); - } - - @Test - public void testLocalSessionsOnFollower() throws Exception { - testLocalSessions(false); - } - - @Test - public void testLocalSessionsOnLeader() throws Exception { - testLocalSessions(true); - } - - private void testLocalSessions(boolean testLeader) throws Exception { - String nodePrefix = "/testLocalSessions-" - + (testLeader ? "leaderTest-" : "followerTest-"); - int leaderIdx = qb.getLeaderIndex(); - Assert.assertFalse("No leader in quorum?", leaderIdx == -1); - int followerIdx = (leaderIdx + 1) % 5; - int testPeerIdx = testLeader ? leaderIdx : followerIdx; - String hostPorts[] = qb.hostPort.split(","); - - CountdownWatcher watcher = new CountdownWatcher(); - ZooKeeper zk = qb.createClient(watcher, hostPorts[testPeerIdx], - CONNECTION_TIMEOUT); - watcher.waitForConnected(CONNECTION_TIMEOUT); - - long localSessionId = zk.getSessionId(); - - // Try creating some data. - for (int i = 0; i < 5; i++) { - zk.create(nodePrefix + i, new byte[0], - ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } - - // Now, try an ephemeral node. This should fail since we - // cannot create ephemeral nodes on a local session. - try { - zk.create(nodePrefix + "ephemeral", new byte[0], - ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); - Assert.fail("Ephemeral node creation should fail."); - } catch (KeeperException.EphemeralOnLocalSessionException e) { - } - - // Close the session. - zk.close(); - - // Validate data on both follower and leader - HashMap<String, Integer> peers = new HashMap<String, Integer>(); - peers.put("leader", leaderIdx); - peers.put("follower", followerIdx); - for (Entry<String, Integer> entry: peers.entrySet()) { - watcher.reset(); - // Try reconnecting with a new session. - // The data should be persisted, even though the session was not. - zk = qb.createClient(watcher, hostPorts[entry.getValue()], - CONNECTION_TIMEOUT); - watcher.waitForConnected(CONNECTION_TIMEOUT); - - long newSessionId = zk.getSessionId(); - Assert.assertFalse(newSessionId == localSessionId); - - for (int i = 0; i < 5; i++) { - Assert.assertNotNull("Data not exists in " + entry.getKey(), - zk.exists(nodePrefix + i, null)); - } - - // We may get the correct exception but the txn may go through - Assert.assertNull("Data exists in " + entry.getKey(), - zk.exists(nodePrefix + "ephemeral", null)); - - zk.close(); - } - qb.shutdownServers(); - } -} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-common/src/test/java/org/apache/zookeeper/test/MaxCnxnsTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-common/src/test/java/org/apache/zookeeper/test/MaxCnxnsTest.java b/zookeeper-common/src/test/java/org/apache/zookeeper/test/MaxCnxnsTest.java deleted file mode 100644 index a96e5a8..0000000 --- a/zookeeper-common/src/test/java/org/apache/zookeeper/test/MaxCnxnsTest.java +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.test; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.jute.BinaryOutputArchive; -import org.apache.zookeeper.proto.ConnectRequest; -import org.junit.Assert; -import org.junit.Test; - -public class MaxCnxnsTest extends ClientBase { - final private static int numCnxns = 30; - AtomicInteger numConnected = new AtomicInteger(0); - String host; - int port; - - @Override - public void setUp() throws Exception { - maxCnxns = numCnxns; - super.setUp(); - } - - class CnxnThread extends Thread { - - public CnxnThread(int i) { - super("CnxnThread-"+i); - } - - public void run() { - SocketChannel sChannel = null; - try { - /* - * For future unwary socket programmers: although connect 'blocks' it - * does not require an accept on the server side to return. Therefore - * you can not assume that all the sockets are connected at the end of - * this for loop. - */ - sChannel = SocketChannel.open(); - sChannel.connect(new InetSocketAddress(host,port)); - // Construct a connection request - ConnectRequest conReq = new ConnectRequest(0, 0, - 10000, 0, "password".getBytes()); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); - boa.writeInt(-1, "len"); - conReq.serialize(boa, "connect"); - baos.close(); - ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); - bb.putInt(bb.capacity() - 4); - bb.rewind(); - - /* Send a connect request. Any socket that has been closed (or at least - * not added to the cnxn list on the server) will not have any bytes to - * read and get an eof. - * - * The trick here was finding a call that caused the server to put - * bytes in the input stream without closing the cnxn. None of - * the four letter commands do that, so we actually try to create - * a session which should send us something back, while maintaining - * the connection. - */ - - int eof = sChannel.write(bb); - // If the socket times out, we count that as Assert.failed - - // the server should respond within 10s - sChannel.socket().setSoTimeout(10000); - if (!sChannel.socket().isClosed()){ - eof = sChannel.socket().getInputStream().read(); - if (eof != -1) { - numConnected.incrementAndGet(); - } - } - } - catch (IOException io) { - // "Connection reset by peer" - } finally { - if (sChannel != null) { - try { - sChannel.close(); - } catch (Exception e) { - // Do nothing - } - } - } - } - } - - /** - * Verify the ability to limit the number of concurrent connections. - * @throws IOException - * @throws InterruptedException - */ - @Test - public void testMaxCnxns() throws IOException, InterruptedException{ - String split[] = hostPort.split(":"); - host = split[0]; - port = Integer.parseInt(split[1]); - int numThreads = numCnxns + 5; - CnxnThread[] threads = new CnxnThread[numThreads]; - - for (int i=0;i<numCnxns;++i) { - threads[i] = new CnxnThread(i); - } - - for (int i=0;i<numCnxns;++i) { - threads[i].start(); - } - - for (int i=0;i<numCnxns;++i) { - threads[i].join(); - } - Assert.assertSame(numCnxns,numConnected.get()); - } -} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-common/src/test/java/org/apache/zookeeper/test/MultiAsyncTransactionTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-common/src/test/java/org/apache/zookeeper/test/MultiAsyncTransactionTest.java b/zookeeper-common/src/test/java/org/apache/zookeeper/test/MultiAsyncTransactionTest.java deleted file mode 100644 index 822b9c5..0000000 --- a/zookeeper-common/src/test/java/org/apache/zookeeper/test/MultiAsyncTransactionTest.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.zookeeper.AsyncCallback.MultiCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Op; -import org.apache.zookeeper.OpResult; -import org.apache.zookeeper.OpResult.CreateResult; -import org.apache.zookeeper.OpResult.ErrorResult; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.ZooKeeper; -import org.junit.Before; -import org.junit.Test; - -public class MultiAsyncTransactionTest extends ClientBase { - private ZooKeeper zk; - private final AtomicInteger pendingOps = new AtomicInteger(0); - - @Before - public void setUp() throws Exception { - super.setUp(); - zk = createClient(); - pendingOps.set(0); - } - - private static class MultiResult { - int rc; - List<OpResult> results; - } - - private void finishPendingOps() { - if (pendingOps.decrementAndGet() == 0) { - synchronized (pendingOps) { - pendingOps.notifyAll(); - } - } - } - - private void waitForPendingOps(int timeout) throws Exception { - synchronized(pendingOps) { - while(pendingOps.get() > 0) { - pendingOps.wait(timeout); - } - } - } - - /** - * ZOOKEEPER-1624: PendingChanges of create sequential node request didn't - * get rollbacked correctly when multi-op failed. This cause - * create sequential node request in subsequent multi-op to failed because - * sequential node name generation is incorrect. - * - * The check is to make sure that each request in multi-op failed with - * the correct reason. - */ - @Test - public void testSequentialNodeCreateInAsyncMulti() throws Exception { - final int iteration = 4; - final List<MultiResult> results = new ArrayList<MultiResult>(); - - pendingOps.set(iteration); - - List<Op> ops = Arrays.asList( - Op.create("/node-", new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT_SEQUENTIAL), - Op.create("/dup", new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT)); - - - for (int i = 0; i < iteration; ++i) { - zk.multi(ops, new MultiCallback() { - @Override - public void processResult(int rc, String path, Object ctx, - List<OpResult> opResults) { - MultiResult result = new MultiResult(); - result.results = opResults; - result.rc = rc; - results.add(result); - finishPendingOps(); - } - }, null); - } - - waitForPendingOps(CONNECTION_TIMEOUT); - - // Check that return code of all request are correct - assertEquals(KeeperException.Code.OK.intValue(), results.get(0).rc); - assertEquals(KeeperException.Code.NODEEXISTS.intValue(), results.get(1).rc); - assertEquals(KeeperException.Code.NODEEXISTS.intValue(), results.get(2).rc); - assertEquals(KeeperException.Code.NODEEXISTS.intValue(), results.get(3).rc); - - // Check that the first operation is successful in all request - assertTrue(results.get(0).results.get(0) instanceof CreateResult); - assertEquals(KeeperException.Code.OK.intValue(), - ((ErrorResult) results.get(1).results.get(0)).getErr()); - assertEquals(KeeperException.Code.OK.intValue(), - ((ErrorResult) results.get(2).results.get(0)).getErr()); - assertEquals(KeeperException.Code.OK.intValue(), - ((ErrorResult) results.get(3).results.get(0)).getErr()); - - // Check that the second operation failed after the first request - assertEquals(KeeperException.Code.NODEEXISTS.intValue(), - ((ErrorResult) results.get(1).results.get(1)).getErr()); - assertEquals(KeeperException.Code.NODEEXISTS.intValue(), - ((ErrorResult) results.get(2).results.get(1)).getErr()); - assertEquals(KeeperException.Code.NODEEXISTS.intValue(), - ((ErrorResult) results.get(3).results.get(1)).getErr()); - - } -} \ No newline at end of file