http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java new file mode 100644 index 0000000..6373bb3 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java @@ -0,0 +1,870 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.KeeperException.InvalidACLException; +import org.apache.zookeeper.TestableZooKeeper; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooDefs.Perms; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.proto.ExistsRequest; +import org.apache.zookeeper.proto.ExistsResponse; +import org.apache.zookeeper.proto.ReplyHeader; +import org.apache.zookeeper.proto.RequestHeader; +import org.apache.zookeeper.server.PrepRequestProcessor; +import org.apache.zookeeper.server.util.OSMXBean; +import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ClientTest extends ClientBase { + protected static final Logger LOG = LoggerFactory.getLogger(ClientTest.class); + private boolean skipACL = System.getProperty("zookeeper.skipACL", "no").equals("yes"); + + /** Verify that pings are sent, keeping the "idle" client alive */ + @Test + public void testPing() throws Exception { + ZooKeeper zkIdle = null; + ZooKeeper zkWatchCreator = null; + try { + CountdownWatcher watcher = new CountdownWatcher(); + zkIdle = createClient(watcher, hostPort, 10000); + + zkWatchCreator = createClient(); + + for (int i = 0; i < 10; i++) { + zkWatchCreator.create("/" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } + for (int i = 0; i < 10; i++) { + zkIdle.exists("/" + i, true); + } + for (int i = 0; i < 10; i++) { + Thread.sleep(1000); + zkWatchCreator.delete("/" + i, -1); + } + // The bug will manifest itself here because zkIdle will expire + zkIdle.exists("/0", false); + } finally { + if (zkIdle != null) { + zkIdle.close(); + } + if (zkWatchCreator != null) { + zkWatchCreator.close(); + } + } + } + + @Test + public void testClientwithoutWatcherObj() throws IOException, + InterruptedException, KeeperException + { + performClientTest(false); + } + + @Test + public void testClientWithWatcherObj() throws IOException, + InterruptedException, KeeperException + { + performClientTest(true); + } + + /** Exercise the testable functions, verify tostring, etc... */ + @Test + public void testTestability() throws Exception { + TestableZooKeeper zk = createClient(); + try { + LOG.info("{}",zk.testableLocalSocketAddress()); + LOG.info("{}",zk.testableRemoteSocketAddress()); + LOG.info("{}",zk.toString()); + } finally { + zk.close(CONNECTION_TIMEOUT); + LOG.info("{}",zk.testableLocalSocketAddress()); + LOG.info("{}",zk.testableRemoteSocketAddress()); + LOG.info("{}",zk.toString()); + } + } + + @Test + public void testACLs() throws Exception { + ZooKeeper zk = null; + try { + zk = createClient(); + try { + zk.create("/acltest", new byte[0], Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT); + Assert.fail("Should have received an invalid acl error"); + } catch(InvalidACLException e) { + LOG.info("Test successful, invalid acl received : " + + e.getMessage()); + } + try { + ArrayList<ACL> testACL = new ArrayList<ACL>(); + testACL.add(new ACL(Perms.ALL | Perms.ADMIN, Ids.AUTH_IDS)); + testACL.add(new ACL(Perms.ALL | Perms.ADMIN, new Id("ip", "127.0.0.1/8"))); + zk.create("/acltest", new byte[0], testACL, CreateMode.PERSISTENT); + Assert.fail("Should have received an invalid acl error"); + } catch(InvalidACLException e) { + LOG.info("Test successful, invalid acl received : " + + e.getMessage()); + } + try { + ArrayList<ACL> testACL = new ArrayList<ACL>(); + testACL.add(new ACL(Perms.ALL | Perms.ADMIN, new Id())); + zk.create("/nullidtest", new byte[0], testACL, CreateMode.PERSISTENT); + Assert.fail("Should have received an invalid acl error"); + } catch(InvalidACLException e) { + LOG.info("Test successful, invalid acl received : " + + e.getMessage()); + } + zk.addAuthInfo("digest", "ben:passwd".getBytes()); + ArrayList<ACL> testACL = new ArrayList<ACL>(); + testACL.add(new ACL(Perms.ALL, new Id("auth",""))); + testACL.add(new ACL(Perms.WRITE, new Id("ip", "127.0.0.1"))); + zk.create("/acltest", new byte[0], testACL, CreateMode.PERSISTENT); + zk.close(); + zk = createClient(); + zk.addAuthInfo("digest", "ben:passwd2".getBytes()); + if (skipACL) { + try { + zk.getData("/acltest", false, null); + } catch (KeeperException e) { + Assert.fail("Badauth reads should succeed with skipACL."); + } + } else { + try { + zk.getData("/acltest", false, null); + Assert.fail("Should have received a permission error"); + } catch (KeeperException e) { + Assert.assertEquals(Code.NOAUTH, e.code()); + } + } + zk.addAuthInfo("digest", "ben:passwd".getBytes()); + zk.getData("/acltest", false, null); + zk.setACL("/acltest", Ids.OPEN_ACL_UNSAFE, -1); + zk.close(); + zk = createClient(); + zk.getData("/acltest", false, null); + List<ACL> acls = zk.getACL("/acltest", new Stat()); + Assert.assertEquals(1, acls.size()); + Assert.assertEquals(Ids.OPEN_ACL_UNSAFE, acls); + + // The stat parameter should be optional. + acls = zk.getACL("/acltest", null); + Assert.assertEquals(1, acls.size()); + Assert.assertEquals(Ids.OPEN_ACL_UNSAFE, acls); + + zk.close(); + } finally { + if (zk != null) { + zk.close(); + } + } + } + + @Test + public void testNullAuthId() throws Exception { + ZooKeeper zk = null; + try { + zk = createClient(); + zk.addAuthInfo("digest", "ben:passwd".getBytes()); + ArrayList<ACL> testACL = new ArrayList<ACL>(); + testACL.add(new ACL(Perms.ALL, new Id("auth", null))); + zk.create("/acltest", new byte[0], testACL, CreateMode.PERSISTENT); + zk.close(); + zk = createClient(); + zk.addAuthInfo("digest", "ben:passwd2".getBytes()); + if (skipACL) { + try { + zk.getData("/acltest", false, null); + } catch (KeeperException e) { + Assert.fail("Badauth reads should succeed with skipACL."); + } + } else { + try { + zk.getData("/acltest", false, null); + Assert.fail("Should have received a permission error"); + } catch (KeeperException e) { + Assert.assertEquals(Code.NOAUTH, e.code()); + } + } + zk.addAuthInfo("digest", "ben:passwd".getBytes()); + zk.getData("/acltest", false, null); + zk.setACL("/acltest", Ids.OPEN_ACL_UNSAFE, -1); + zk.close(); + zk = createClient(); + zk.getData("/acltest", false, null); + List<ACL> acls = zk.getACL("/acltest", new Stat()); + Assert.assertEquals(1, acls.size()); + Assert.assertEquals(Ids.OPEN_ACL_UNSAFE, acls); + } finally { + if (zk != null) { + zk.close(); + } + } + } + + private class MyWatcher extends CountdownWatcher { + LinkedBlockingQueue<WatchedEvent> events = + new LinkedBlockingQueue<WatchedEvent>(); + + public void process(WatchedEvent event) { + super.process(event); + if (event.getType() != EventType.None) { + try { + events.put(event); + } catch (InterruptedException e) { + LOG.warn("ignoring interrupt during event.put"); + } + } + } + } + + /** + * Register multiple watchers and verify that they all get notified and + * in the right order. + */ + @Test + public void testMutipleWatcherObjs() + throws IOException, InterruptedException, KeeperException + { + ZooKeeper zk = createClient(new CountdownWatcher(), hostPort); + try { + MyWatcher watchers[] = new MyWatcher[100]; + MyWatcher watchers2[] = new MyWatcher[watchers.length]; + for (int i = 0; i < watchers.length; i++) { + watchers[i] = new MyWatcher(); + watchers2[i] = new MyWatcher(); + zk.create("/foo-" + i, ("foodata" + i).getBytes(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + Stat stat = new Stat(); + + // + // test get/exists with single set of watchers + // get all, then exists all + // + for (int i = 0; i < watchers.length; i++) { + Assert.assertNotNull(zk.getData("/foo-" + i, watchers[i], stat)); + } + for (int i = 0; i < watchers.length; i++) { + Assert.assertNotNull(zk.exists("/foo-" + i, watchers[i])); + } + // trigger the watches + for (int i = 0; i < watchers.length; i++) { + zk.setData("/foo-" + i, ("foodata2-" + i).getBytes(), -1); + zk.setData("/foo-" + i, ("foodata3-" + i).getBytes(), -1); + } + for (int i = 0; i < watchers.length; i++) { + WatchedEvent event = + watchers[i].events.poll(10, TimeUnit.SECONDS); + Assert.assertEquals("/foo-" + i, event.getPath()); + Assert.assertEquals(EventType.NodeDataChanged, event.getType()); + Assert.assertEquals(KeeperState.SyncConnected, event.getState()); + + // small chance that an unexpected message was delivered + // after this check, but we would catch that next time + // we check events + Assert.assertEquals(0, watchers[i].events.size()); + } + + // + // test get/exists with single set of watchers + // get/exists together + // + for (int i = 0; i < watchers.length; i++) { + Assert.assertNotNull(zk.getData("/foo-" + i, watchers[i], stat)); + Assert.assertNotNull(zk.exists("/foo-" + i, watchers[i])); + } + // trigger the watches + for (int i = 0; i < watchers.length; i++) { + zk.setData("/foo-" + i, ("foodata4-" + i).getBytes(), -1); + zk.setData("/foo-" + i, ("foodata5-" + i).getBytes(), -1); + } + for (int i = 0; i < watchers.length; i++) { + WatchedEvent event = + watchers[i].events.poll(10, TimeUnit.SECONDS); + Assert.assertEquals("/foo-" + i, event.getPath()); + Assert.assertEquals(EventType.NodeDataChanged, event.getType()); + Assert.assertEquals(KeeperState.SyncConnected, event.getState()); + + // small chance that an unexpected message was delivered + // after this check, but we would catch that next time + // we check events + Assert.assertEquals(0, watchers[i].events.size()); + } + + // + // test get/exists with two sets of watchers + // + for (int i = 0; i < watchers.length; i++) { + Assert.assertNotNull(zk.getData("/foo-" + i, watchers[i], stat)); + Assert.assertNotNull(zk.exists("/foo-" + i, watchers2[i])); + } + // trigger the watches + for (int i = 0; i < watchers.length; i++) { + zk.setData("/foo-" + i, ("foodata6-" + i).getBytes(), -1); + zk.setData("/foo-" + i, ("foodata7-" + i).getBytes(), -1); + } + for (int i = 0; i < watchers.length; i++) { + WatchedEvent event = + watchers[i].events.poll(10, TimeUnit.SECONDS); + Assert.assertEquals("/foo-" + i, event.getPath()); + Assert.assertEquals(EventType.NodeDataChanged, event.getType()); + Assert.assertEquals(KeeperState.SyncConnected, event.getState()); + + // small chance that an unexpected message was delivered + // after this check, but we would catch that next time + // we check events + Assert.assertEquals(0, watchers[i].events.size()); + + // watchers2 + WatchedEvent event2 = + watchers2[i].events.poll(10, TimeUnit.SECONDS); + Assert.assertEquals("/foo-" + i, event2.getPath()); + Assert.assertEquals(EventType.NodeDataChanged, event2.getType()); + Assert.assertEquals(KeeperState.SyncConnected, event2.getState()); + + // small chance that an unexpected message was delivered + // after this check, but we would catch that next time + // we check events + Assert.assertEquals(0, watchers2[i].events.size()); + } + + } finally { + if (zk != null) { + zk.close(); + } + } + } + + private void performClientTest(boolean withWatcherObj) + throws IOException, InterruptedException, KeeperException + { + ZooKeeper zk = null; + try { + MyWatcher watcher = new MyWatcher(); + zk = createClient(watcher, hostPort); + LOG.info("Before create /benwashere"); + zk.create("/benwashere", "".getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + LOG.info("After create /benwashere"); + try { + zk.setData("/benwashere", "hi".getBytes(), 57); + Assert.fail("Should have gotten BadVersion exception"); + } catch(KeeperException.BadVersionException e) { + // expected that + } catch (KeeperException e) { + Assert.fail("Should have gotten BadVersion exception"); + } + LOG.info("Before delete /benwashere"); + zk.delete("/benwashere", 0); + LOG.info("After delete /benwashere"); + zk.close(); + //LOG.info("Closed client: " + zk.describeCNXN()); + Thread.sleep(2000); + + zk = createClient(watcher, hostPort); + //LOG.info("Created a new client: " + zk.describeCNXN()); + LOG.info("Before delete /"); + + try { + zk.delete("/", -1); + Assert.fail("deleted root!"); + } catch(KeeperException.BadArgumentsException e) { + // good, expected that + } + Stat stat = new Stat(); + // Test basic create, ls, and getData + zk.create("/pat", "Pat was here".getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + LOG.info("Before create /ben"); + zk.create("/pat/ben", "Ben was here".getBytes(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + LOG.info("Before getChildren /pat"); + List<String> children = zk.getChildren("/pat", false); + Assert.assertEquals(1, children.size()); + Assert.assertEquals("ben", children.get(0)); + List<String> children2 = zk.getChildren("/pat", false, null); + Assert.assertEquals(children, children2); + String value = new String(zk.getData("/pat/ben", false, stat)); + Assert.assertEquals("Ben was here", value); + // Test stat and watch of non existent node + + try { + if (withWatcherObj) { + Assert.assertEquals(null, zk.exists("/frog", watcher)); + } else { + Assert.assertEquals(null, zk.exists("/frog", true)); + } + LOG.info("Comment: asseting passed for frog setting /"); + } catch (KeeperException.NoNodeException e) { + // OK, expected that + } + zk.create("/frog", "hi".getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + // the first poll is just a session delivery + LOG.info("Comment: checking for events length " + + watcher.events.size()); + WatchedEvent event = watcher.events.poll(10, TimeUnit.SECONDS); + Assert.assertEquals("/frog", event.getPath()); + Assert.assertEquals(EventType.NodeCreated, event.getType()); + Assert.assertEquals(KeeperState.SyncConnected, event.getState()); + // Test child watch and create with sequence + zk.getChildren("/pat/ben", true); + for (int i = 0; i < 10; i++) { + zk.create("/pat/ben/" + i + "-", Integer.toString(i).getBytes(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); + } + children = zk.getChildren("/pat/ben", false); + Collections.sort(children); + Assert.assertEquals(10, children.size()); + for (int i = 0; i < 10; i++) { + final String name = children.get(i); + Assert.assertTrue("starts with -", name.startsWith(i + "-")); + byte b[]; + if (withWatcherObj) { + b = zk.getData("/pat/ben/" + name, watcher, stat); + } else { + b = zk.getData("/pat/ben/" + name, true, stat); + } + Assert.assertEquals(Integer.toString(i), new String(b)); + zk.setData("/pat/ben/" + name, "new".getBytes(), + stat.getVersion()); + if (withWatcherObj) { + stat = zk.exists("/pat/ben/" + name, watcher); + } else { + stat = zk.exists("/pat/ben/" + name, true); + } + zk.delete("/pat/ben/" + name, stat.getVersion()); + } + event = watcher.events.poll(10, TimeUnit.SECONDS); + Assert.assertEquals("/pat/ben", event.getPath()); + Assert.assertEquals(EventType.NodeChildrenChanged, event.getType()); + Assert.assertEquals(KeeperState.SyncConnected, event.getState()); + for (int i = 0; i < 10; i++) { + event = watcher.events.poll(10, TimeUnit.SECONDS); + final String name = children.get(i); + Assert.assertEquals("/pat/ben/" + name, event.getPath()); + Assert.assertEquals(EventType.NodeDataChanged, event.getType()); + Assert.assertEquals(KeeperState.SyncConnected, event.getState()); + event = watcher.events.poll(10, TimeUnit.SECONDS); + Assert.assertEquals("/pat/ben/" + name, event.getPath()); + Assert.assertEquals(EventType.NodeDeleted, event.getType()); + Assert.assertEquals(KeeperState.SyncConnected, event.getState()); + } + zk.create("/good\u0040path", "".getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + zk.create("/duplicate", "".getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + try { + zk.create("/duplicate", "".getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + Assert.fail("duplicate create allowed"); + } catch(KeeperException.NodeExistsException e) { + // OK, expected that + } + } finally { + if (zk != null) { + zk.close(); + } + } + } + + // Test that sequential filenames are being created correctly, + // with 0-padding in the filename + @Test + public void testSequentialNodeNames() + throws IOException, InterruptedException, KeeperException + { + String path = "/SEQUENCE"; + String file = "TEST"; + String filepath = path + "/" + file; + + ZooKeeper zk = null; + try { + zk = createClient(); + zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.create(filepath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); + List<String> children = zk.getChildren(path, false); + Assert.assertEquals(1, children.size()); + Assert.assertEquals(file + "0000000000", children.get(0)); + + zk.create(filepath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); + children = zk.getChildren(path, false); + Assert.assertEquals(2, children.size()); + Assert.assertTrue("contains child 1", children.contains(file + "0000000001")); + + zk.create(filepath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); + children = zk.getChildren(path, false); + Assert.assertEquals(3, children.size()); + Assert.assertTrue("contains child 2", + children.contains(file + "0000000002")); + + // The pattern is holding so far. Let's run the counter a bit + // to be sure it continues to spit out the correct answer + for(int i = children.size(); i < 105; i++) + zk.create(filepath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); + + children = zk.getChildren(path, false); + Assert.assertTrue("contains child 104", + children.contains(file + "0000000104")); + + } + finally { + if(zk != null) + zk.close(); + } + } + + // Test that data provided when + // creating sequential nodes is stored properly + @Test + public void testSequentialNodeData() throws Exception { + ZooKeeper zk= null; + String queue_handle = "/queue"; + try { + zk = createClient(); + + zk.create(queue_handle, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + zk.create(queue_handle + "/element", "0".getBytes(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); + zk.create(queue_handle + "/element", "1".getBytes(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); + List<String> children = zk.getChildren(queue_handle, true); + Assert.assertEquals(children.size(), 2); + String child1 = children.get(0); + String child2 = children.get(1); + int compareResult = child1.compareTo(child2); + Assert.assertNotSame(compareResult, 0); + if (compareResult < 0) { + } else { + String temp = child1; + child1 = child2; + child2 = temp; + } + String child1data = new String(zk.getData(queue_handle + + "/" + child1, false, null)); + String child2data = new String(zk.getData(queue_handle + + "/" + child2, false, null)); + Assert.assertEquals(child1data, "0"); + Assert.assertEquals(child2data, "1"); + } finally { + if (zk != null) { + zk.close(); + } + } + + } + + @Test + public void testLargeNodeData() throws Exception { + ZooKeeper zk= null; + String queue_handle = "/large"; + try { + zk = createClient(); + + zk.create(queue_handle, new byte[500000], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } finally { + if (zk != null) { + zk.close(); + } + } + + } + + private void verifyCreateFails(String path, ZooKeeper zk) throws Exception { + try { + zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } catch (IllegalArgumentException e) { + // this is good + return; + } + Assert.fail("bad path \"" + path + "\" not caught"); + } + + // Test that the path string is validated + @Test + public void testPathValidation() throws Exception { + ZooKeeper zk = createClient(); + + verifyCreateFails(null, zk); + verifyCreateFails("", zk); + verifyCreateFails("//", zk); + verifyCreateFails("///", zk); + verifyCreateFails("////", zk); + verifyCreateFails("/.", zk); + verifyCreateFails("/..", zk); + verifyCreateFails("/./", zk); + verifyCreateFails("/../", zk); + verifyCreateFails("/foo/./", zk); + verifyCreateFails("/foo/../", zk); + verifyCreateFails("/foo/.", zk); + verifyCreateFails("/foo/..", zk); + verifyCreateFails("/./.", zk); + verifyCreateFails("/../..", zk); + verifyCreateFails("/\u0001foo", zk); + verifyCreateFails("/foo/bar/", zk); + verifyCreateFails("/foo//bar", zk); + verifyCreateFails("/foo/bar//", zk); + + verifyCreateFails("foo", zk); + verifyCreateFails("a", zk); + + zk.create("/createseqpar", null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + // next two steps - related to sequential processing + // 1) verify that empty child name Assert.fails if not sequential + try { + zk.create("/createseqpar/", null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + Assert.assertTrue(false); + } catch(IllegalArgumentException be) { + // catch this. + } + + // 2) verify that empty child name success if sequential + zk.create("/createseqpar/", null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL); + zk.create("/createseqpar/.", null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL); + zk.create("/createseqpar/..", null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL); + try { + zk.create("/createseqpar//", null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL); + Assert.assertTrue(false); + } catch(IllegalArgumentException be) { + // catch this. + } + try { + zk.create("/createseqpar/./", null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL); + Assert.assertTrue(false); + } catch(IllegalArgumentException be) { + // catch this. + } + try { + zk.create("/createseqpar/../", null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL); + Assert.assertTrue(false); + } catch(IllegalArgumentException be) { + // catch this. + } + + + //check for the code path that throws at server + PrepRequestProcessor.setFailCreate(true); + try { + zk.create("/m", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + Assert.assertTrue(false); + } catch(KeeperException.BadArgumentsException be) { + // catch this. + } + PrepRequestProcessor.setFailCreate(false); + zk.create("/.foo", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.create("/.f.", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.create("/..f", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.create("/..f..", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.create("/f.c", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.create("/f\u0040f", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.create("/f", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.create("/f/.f", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.create("/f/f.", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.create("/f/..f", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.create("/f/f..", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.create("/f/.f/f", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.create("/f/f./f", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + +// private void notestConnections() +// throws IOException, InterruptedException, KeeperException +// { +// ZooKeeper zk; +// for(int i = 0; i < 2000; i++) { +// if (i % 100 == 0) { +// LOG.info("Testing " + i + " connections"); +// } +// // We want to make sure socket descriptors are going away +// zk = new ZooKeeper(hostPort, 30000, this); +// zk.getData("/", false, new Stat()); +// zk.close(); +// } +// } + + @Test + public void testDeleteWithChildren() throws Exception { + ZooKeeper zk = createClient(); + zk.create("/parent", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.create("/parent/child", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + try { + zk.delete("/parent", -1); + Assert.fail("Should have received a not equals message"); + } catch (KeeperException e) { + Assert.assertEquals(KeeperException.Code.NOTEMPTY, e.code()); + } + zk.delete("/parent/child", -1); + zk.delete("/parent", -1); + zk.close(); + } + + private class VerifyClientCleanup extends Thread { + int count; + int current = 0; + + VerifyClientCleanup(String name, int count) { + super(name); + this.count = count; + } + + public void run() { + try { + for (; current < count; current++) { + TestableZooKeeper zk = createClient(); + // we've asked to close, wait for it to finish closing + // all the sub-threads otw the selector may not be + // closed when we check (false positive on test Assert.failure + zk.close(CONNECTION_TIMEOUT); + } + } catch (Throwable t) { + LOG.error("test Assert.failed", t); + } + } + } + + /** + * Verify that the client is cleaning up properly. Open/close a large + * number of sessions. Essentially looking to see if sockets/selectors + * are being cleaned up properly during close. + * + * @throws Throwable + */ + @Test + public void testClientCleanup() throws Throwable { + OSMXBean osMbean = new OSMXBean(); + if (osMbean.getUnix() == false) { + LOG.warn("skipping testClientCleanup, only available on Unix"); + return; + } + + final int threadCount = 3; + final int clientCount = 10; + + /* Log the number of fds used before and after a test is run. Verifies + * we are freeing resources correctly. Unfortunately this only works + * on unix systems (the only place sun has implemented as part of the + * mgmt bean api). + */ + long initialFdCount = osMbean.getOpenFileDescriptorCount(); + + VerifyClientCleanup threads[] = new VerifyClientCleanup[threadCount]; + + for (int i = 0; i < threads.length; i++) { + threads[i] = new VerifyClientCleanup("VCC" + i, clientCount); + threads[i].start(); + } + + for (int i = 0; i < threads.length; i++) { + threads[i].join(CONNECTION_TIMEOUT); + Assert.assertTrue(threads[i].current == threads[i].count); + } + + // if this Assert.fails it means we are not cleaning up after the closed + // sessions. + long currentCount = osMbean.getOpenFileDescriptorCount(); + final String logmsg = "open fds after test ({}) are not significantly higher than before ({})"; + + if (currentCount > initialFdCount + 10) { + // consider as error + LOG.error(logmsg,Long.valueOf(currentCount),Long.valueOf(initialFdCount)); + } else { + LOG.info(logmsg,Long.valueOf(currentCount),Long.valueOf(initialFdCount)); + } + } + + + /** + * We create a perfectly valid 'exists' request, except that the opcode is wrong. + * @return + * @throws Exception + */ + @Test + public void testNonExistingOpCode() throws Exception { + final CountDownLatch clientDisconnected = new CountDownLatch(1); + Watcher watcher = new Watcher() { + @Override + public synchronized void process(WatchedEvent event) { + if (event.getState() == KeeperState.Disconnected) { + clientDisconnected.countDown(); + } + } + }; + TestableZooKeeper zk = new TestableZooKeeper(hostPort, CONNECTION_TIMEOUT, watcher); + + final String path = "/m1"; + + RequestHeader h = new RequestHeader(); + h.setType(888); // This code does not exists + ExistsRequest request = new ExistsRequest(); + request.setPath(path); + request.setWatch(false); + ExistsResponse response = new ExistsResponse(); + + ReplyHeader r = zk.submitRequest(h, request, response, null); + + Assert.assertEquals(r.getErr(), Code.UNIMPLEMENTED.intValue()); + + // Sending a nonexisting opcode should cause the server to disconnect + Assert.assertTrue("failed to disconnect", + clientDisconnected.await(5000, TimeUnit.MILLISECONDS)); + } + + @Test + public void testTryWithResources() throws Exception { + ZooKeeper zooKeeper; + try (ZooKeeper zk = createClient()) { + zooKeeper = zk; + Assert.assertTrue(zooKeeper.getState().isAlive()); + } + + Assert.assertFalse(zooKeeper.getState().isAlive()); + } +}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java new file mode 100644 index 0000000..a072bc0 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java @@ -0,0 +1,523 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.net.Socket; + +import org.apache.zookeeper.common.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.server.quorum.QuorumCnxManager; +import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message; +import org.apache.zookeeper.server.quorum.QuorumCnxManager.InitialMessage; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; +import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; +import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class CnxManagerTest extends ZKTestCase { + protected static final Logger LOG = LoggerFactory.getLogger(FLENewEpochTest.class); + protected static final int THRESHOLD = 4; + + int count; + HashMap<Long,QuorumServer> peers; + File peerTmpdir[]; + int peerQuorumPort[]; + int peerClientPort[]; + @Before + public void setUp() throws Exception { + + this.count = 3; + this.peers = new HashMap<Long,QuorumServer>(count); + peerTmpdir = new File[count]; + peerQuorumPort = new int[count]; + peerClientPort = new int[count]; + + for(int i = 0; i < count; i++) { + peerQuorumPort[i] = PortAssignment.unique(); + peerClientPort[i] = PortAssignment.unique(); + peers.put(Long.valueOf(i), + new QuorumServer(i, + new InetSocketAddress( + "127.0.0.1", peerQuorumPort[i]), + new InetSocketAddress( + "127.0.0.1", PortAssignment.unique()), + new InetSocketAddress( + "127.0.0.1", peerClientPort[i]))); + peerTmpdir[i] = ClientBase.createTmpDir(); + } + } + + ByteBuffer createMsg(int state, long leader, long zxid, long epoch){ + byte requestBytes[] = new byte[28]; + ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes); + + /* + * Building notification packet to send + */ + + requestBuffer.clear(); + requestBuffer.putInt(state); + requestBuffer.putLong(leader); + requestBuffer.putLong(zxid); + requestBuffer.putLong(epoch); + + return requestBuffer; + } + + class CnxManagerThread extends Thread { + + boolean failed; + CnxManagerThread(){ + failed = false; + } + + public void run(){ + try { + QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[0], peerTmpdir[0], peerClientPort[0], 3, 0, 1000, 2, 2); + QuorumCnxManager cnxManager = peer.createCnxnManager(); + QuorumCnxManager.Listener listener = cnxManager.listener; + if(listener != null){ + listener.start(); + } else { + LOG.error("Null listener when initializing cnx manager"); + } + + long sid = 1; + cnxManager.toSend(sid, createMsg(ServerState.LOOKING.ordinal(), 0, -1, 1)); + + Message m = null; + int numRetries = 1; + while((m == null) && (numRetries++ <= THRESHOLD)){ + m = cnxManager.pollRecvQueue(3000, TimeUnit.MILLISECONDS); + if(m == null) cnxManager.connectAll(); + } + + if(numRetries > THRESHOLD){ + failed = true; + return; + } + + cnxManager.testInitiateConnection(sid); + + m = cnxManager.pollRecvQueue(3000, TimeUnit.MILLISECONDS); + if(m == null){ + failed = true; + return; + } + } catch (Exception e) { + LOG.error("Exception while running mock thread", e); + Assert.fail("Unexpected exception"); + } + } + } + + @Test + public void testCnxManager() throws Exception { + CnxManagerThread thread = new CnxManagerThread(); + + thread.start(); + + QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2); + QuorumCnxManager cnxManager = peer.createCnxnManager(); + QuorumCnxManager.Listener listener = cnxManager.listener; + if(listener != null){ + listener.start(); + } else { + LOG.error("Null listener when initializing cnx manager"); + } + + cnxManager.toSend(0L, createMsg(ServerState.LOOKING.ordinal(), 1, -1, 1)); + + Message m = null; + int numRetries = 1; + while((m == null) && (numRetries++ <= THRESHOLD)){ + m = cnxManager.pollRecvQueue(3000, TimeUnit.MILLISECONDS); + if(m == null) cnxManager.connectAll(); + } + + Assert.assertTrue("Exceeded number of retries", numRetries <= THRESHOLD); + + thread.join(5000); + if (thread.isAlive()) { + Assert.fail("Thread didn't join"); + } else { + if(thread.failed) + Assert.fail("Did not receive expected message"); + } + cnxManager.halt(); + Assert.assertFalse(cnxManager.listener.isAlive()); + } + + @Test + public void testCnxManagerTimeout() throws Exception { + Random rand = new Random(); + byte b = (byte) rand.nextInt(); + int deadPort = PortAssignment.unique(); + String deadAddress = "10.1.1." + b; + + LOG.info("This is the dead address I'm trying: " + deadAddress); + + peers.put(Long.valueOf(2), + new QuorumServer(2, + new InetSocketAddress(deadAddress, deadPort), + new InetSocketAddress(deadAddress, PortAssignment.unique()), + new InetSocketAddress(deadAddress, PortAssignment.unique()))); + peerTmpdir[2] = ClientBase.createTmpDir(); + + QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2); + QuorumCnxManager cnxManager = peer.createCnxnManager(); + QuorumCnxManager.Listener listener = cnxManager.listener; + if(listener != null){ + listener.start(); + } else { + LOG.error("Null listener when initializing cnx manager"); + } + + long begin = Time.currentElapsedTime(); + cnxManager.toSend(2L, createMsg(ServerState.LOOKING.ordinal(), 1, -1, 1)); + long end = Time.currentElapsedTime(); + + if((end - begin) > 6000) Assert.fail("Waited more than necessary"); + cnxManager.halt(); + Assert.assertFalse(cnxManager.listener.isAlive()); + } + + /** + * Tests a bug in QuorumCnxManager that causes a spin lock + * when a negative value is sent. This test checks if the + * connection is being closed upon a message with negative + * length. + * + * @throws Exception + */ + @Test + public void testCnxManagerSpinLock() throws Exception { + QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2); + QuorumCnxManager cnxManager = peer.createCnxnManager(); + QuorumCnxManager.Listener listener = cnxManager.listener; + if(listener != null){ + listener.start(); + } else { + LOG.error("Null listener when initializing cnx manager"); + } + + int port = peers.get(peer.getId()).electionAddr.getPort(); + LOG.info("Election port: " + port); + + Thread.sleep(1000); + + SocketChannel sc = SocketChannel.open(); + sc.socket().connect(peers.get(1L).electionAddr, 5000); + + InetSocketAddress otherAddr = peers.get(new Long(2)).electionAddr; + DataOutputStream dout = new DataOutputStream(sc.socket().getOutputStream()); + dout.writeLong(QuorumCnxManager.PROTOCOL_VERSION); + dout.writeLong(new Long(2)); + String addr = otherAddr.getHostString()+ ":" + otherAddr.getPort(); + byte[] addr_bytes = addr.getBytes(); + dout.writeInt(addr_bytes.length); + dout.write(addr_bytes); + dout.flush(); + + + ByteBuffer msgBuffer = ByteBuffer.wrap(new byte[4]); + msgBuffer.putInt(-20); + msgBuffer.position(0); + sc.write(msgBuffer); + + Thread.sleep(1000); + + try{ + /* + * Write a number of times until it + * detects that the socket is broken. + */ + for(int i = 0; i < 100; i++){ + msgBuffer.position(0); + sc.write(msgBuffer); + } + Assert.fail("Socket has not been closed"); + } catch (Exception e) { + LOG.info("Socket has been closed as expected"); + } + peer.shutdown(); + cnxManager.halt(); + Assert.assertFalse(cnxManager.listener.isAlive()); + } + + /** + * Tests a bug in QuorumCnxManager that causes a NPE when a 3.4.6 + * observer connects to a 3.5.0 server. + * {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1789} + * + * @throws Exception + */ + @Test + public void testCnxManagerNPE() throws Exception { + // the connecting peer (id = 2) is a 3.4.6 observer + peers.get(2L).type = LearnerType.OBSERVER; + QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], + peerClientPort[1], 3, 1, 1000, 2, 2); + QuorumCnxManager cnxManager = peer.createCnxnManager(); + QuorumCnxManager.Listener listener = cnxManager.listener; + if (listener != null) { + listener.start(); + } else { + LOG.error("Null listener when initializing cnx manager"); + } + int port = peers.get(peer.getId()).electionAddr.getPort(); + LOG.info("Election port: " + port); + + Thread.sleep(1000); + + SocketChannel sc = SocketChannel.open(); + sc.socket().connect(peers.get(1L).electionAddr, 5000); + + /* + * Write id (3.4.6 protocol). This previously caused a NPE in + * QuorumCnxManager. + */ + byte[] msgBytes = new byte[8]; + ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes); + msgBuffer.putLong(2L); + msgBuffer.position(0); + sc.write(msgBuffer); + + msgBuffer = ByteBuffer.wrap(new byte[8]); + // write length of message + msgBuffer.putInt(4); + // write message + msgBuffer.putInt(5); + msgBuffer.position(0); + sc.write(msgBuffer); + + Message m = cnxManager.pollRecvQueue(1000, TimeUnit.MILLISECONDS); + Assert.assertNotNull(m); + + peer.shutdown(); + cnxManager.halt(); + Assert.assertFalse(cnxManager.listener.isAlive()); + } + + /* + * Test if a receiveConnection is able to timeout on socket errors + */ + @Test + public void testSocketTimeout() throws Exception { + QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 2000, 2, 2); + QuorumCnxManager cnxManager = peer.createCnxnManager(); + QuorumCnxManager.Listener listener = cnxManager.listener; + if(listener != null){ + listener.start(); + } else { + LOG.error("Null listener when initializing cnx manager"); + } + int port = peers.get(peer.getId()).electionAddr.getPort(); + LOG.info("Election port: " + port); + Thread.sleep(1000); + + Socket sock = new Socket(); + sock.connect(peers.get(1L).electionAddr, 5000); + long begin = Time.currentElapsedTime(); + // Read without sending data. Verify timeout. + cnxManager.receiveConnection(sock); + long end = Time.currentElapsedTime(); + if((end - begin) > ((peer.getSyncLimit() * peer.getTickTime()) + 500)) Assert.fail("Waited more than necessary"); + cnxManager.halt(); + Assert.assertFalse(cnxManager.listener.isAlive()); + } + + /* + * Test if Worker threads are getting killed after connection loss + */ + @Test + public void testWorkerThreads() throws Exception { + ArrayList<QuorumPeer> peerList = new ArrayList<QuorumPeer>(); + try { + for (int sid = 0; sid < 3; sid++) { + QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[sid], + peerTmpdir[sid], peerClientPort[sid], 3, sid, 1000, 2, + 2); + LOG.info("Starting peer {}", peer.getId()); + peer.start(); + peerList.add(sid, peer); + } + String failure = verifyThreadCount(peerList, 4); + Assert.assertNull(failure, failure); + for (int myid = 0; myid < 3; myid++) { + for (int i = 0; i < 5; i++) { + // halt one of the listeners and verify count + QuorumPeer peer = peerList.get(myid); + LOG.info("Round {}, halting peer ", + new Object[] { i, peer.getId() }); + peer.shutdown(); + peerList.remove(myid); + failure = verifyThreadCount(peerList, 2); + Assert.assertNull(failure, failure); + // Restart halted node and verify count + peer = new QuorumPeer(peers, peerTmpdir[myid], + peerTmpdir[myid], peerClientPort[myid], 3, myid, + 1000, 2, 2); + LOG.info("Round {}, restarting peer ", + new Object[] { i, peer.getId() }); + peer.start(); + peerList.add(myid, peer); + failure = verifyThreadCount(peerList, 4); + Assert.assertNull(failure, failure); + } + } + } finally { + for (QuorumPeer quorumPeer : peerList) { + quorumPeer.shutdown(); + } + } + } + + /** + * Returns null on success, otw the message assoc with the failure + * @throws InterruptedException + */ + public String verifyThreadCount(ArrayList<QuorumPeer> peerList, long ecnt) + throws InterruptedException + { + String failure = null; + for (int i = 0; i < 480; i++) { + Thread.sleep(500); + + failure = _verifyThreadCount(peerList, ecnt); + if (failure == null) { + return null; + } + } + return failure; + } + public String _verifyThreadCount(ArrayList<QuorumPeer> peerList, long ecnt) { + for (int myid = 0; myid < peerList.size(); myid++) { + QuorumPeer peer = peerList.get(myid); + QuorumCnxManager cnxManager = peer.getQuorumCnxManager(); + long cnt = cnxManager.getThreadCount(); + if (cnt != ecnt) { + return new Date() + + " Incorrect number of Worker threads for sid=" + myid + + " expected " + ecnt + " found " + cnt; + } + } + return null; + } + + @Test + public void testInitialMessage() throws Exception { + InitialMessage msg; + ByteArrayOutputStream bos; + DataInputStream din; + DataOutputStream dout; + String hostport; + + // message with bad protocol version + try { + + // the initial message (without the protocol version) + hostport = "10.0.0.2:3888"; + bos = new ByteArrayOutputStream(); + dout = new DataOutputStream(bos); + dout.writeLong(5L); // sid + dout.writeInt(hostport.getBytes().length); + dout.writeBytes(hostport); + + // now parse it + din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray())); + msg = InitialMessage.parse(-65530L, din); + Assert.fail("bad protocol version accepted"); + } catch (InitialMessage.InitialMessageException ex) {} + + // message too long + try { + + hostport = createLongString(1048576); + bos = new ByteArrayOutputStream(); + dout = new DataOutputStream(bos); + dout.writeLong(5L); // sid + dout.writeInt(hostport.getBytes().length); + dout.writeBytes(hostport); + + din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray())); + msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION, din); + Assert.fail("long message accepted"); + } catch (InitialMessage.InitialMessageException ex) {} + + // bad hostport string + try { + + hostport = "what's going on here?"; + bos = new ByteArrayOutputStream(); + dout = new DataOutputStream(bos); + dout.writeLong(5L); // sid + dout.writeInt(hostport.getBytes().length); + dout.writeBytes(hostport); + + din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray())); + msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION, din); + Assert.fail("bad hostport accepted"); + } catch (InitialMessage.InitialMessageException ex) {} + + // good message + try { + + hostport = "10.0.0.2:3888"; + bos = new ByteArrayOutputStream(); + dout = new DataOutputStream(bos); + dout.writeLong(5L); // sid + dout.writeInt(hostport.getBytes().length); + dout.writeBytes(hostport); + + // now parse it + din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray())); + msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION, din); + } catch (InitialMessage.InitialMessageException ex) { + Assert.fail(ex.toString()); + } + } + + private String createLongString(int size) { + StringBuilder sb = new StringBuilder(size); + for (int i=0; i < size; i++) { + sb.append('x'); + } + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/ConnectStringParserTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ConnectStringParserTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ConnectStringParserTest.java new file mode 100644 index 0000000..393cc03 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ConnectStringParserTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.client.ConnectStringParser; +import org.junit.Assert; +import org.junit.Test; + +public class ConnectStringParserTest extends ZKTestCase{ + + @Test + public void testSingleServerChrootPath(){ + String chrootPath = "/hallo/welt"; + String servers = "10.10.10.1"; + assertChrootPath(chrootPath, + new ConnectStringParser(servers+chrootPath)); + } + + @Test + public void testMultipleServersChrootPath(){ + String chrootPath = "/hallo/welt"; + String servers = "10.10.10.1,10.10.10.2"; + assertChrootPath(chrootPath, + new ConnectStringParser(servers+chrootPath)); + } + + @Test + public void testParseServersWithoutPort(){ + String servers = "10.10.10.1,10.10.10.2"; + ConnectStringParser parser = new ConnectStringParser(servers); + + Assert.assertEquals("10.10.10.1", parser.getServerAddresses().get(0).getHostString()); + Assert.assertEquals("10.10.10.2", parser.getServerAddresses().get(1).getHostString()); + } + + @Test + public void testParseServersWithPort(){ + String servers = "10.10.10.1:112,10.10.10.2:110"; + ConnectStringParser parser = new ConnectStringParser(servers); + + Assert.assertEquals("10.10.10.1", parser.getServerAddresses().get(0).getHostString()); + Assert.assertEquals("10.10.10.2", parser.getServerAddresses().get(1).getHostString()); + + Assert.assertEquals(112, parser.getServerAddresses().get(0).getPort()); + Assert.assertEquals(110, parser.getServerAddresses().get(1).getPort()); + } + + private void assertChrootPath(String expected, ConnectStringParser parser){ + Assert.assertEquals(expected, parser.getChrootPath()); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/CreateModeTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/CreateModeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/CreateModeTest.java new file mode 100644 index 0000000..fc61adf --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/CreateModeTest.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.util.EnumSet; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.KeeperException.Code; +import org.junit.Assert; +import org.junit.Test; + +public class CreateModeTest extends ZKTestCase { + + @Test + public void testBasicCreateMode() { + CreateMode cm = CreateMode.PERSISTENT; + Assert.assertEquals(cm.toFlag(), 0); + Assert.assertFalse(cm.isEphemeral()); + Assert.assertFalse(cm.isSequential()); + Assert.assertFalse(cm.isContainer()); + + cm = CreateMode.EPHEMERAL; + Assert.assertEquals(cm.toFlag(), 1); + Assert.assertTrue(cm.isEphemeral()); + Assert.assertFalse(cm.isSequential()); + Assert.assertFalse(cm.isContainer()); + + cm = CreateMode.PERSISTENT_SEQUENTIAL; + Assert.assertEquals(cm.toFlag(), 2); + Assert.assertFalse(cm.isEphemeral()); + Assert.assertTrue(cm.isSequential()); + Assert.assertFalse(cm.isContainer()); + + cm = CreateMode.EPHEMERAL_SEQUENTIAL; + Assert.assertEquals(cm.toFlag(), 3); + Assert.assertTrue(cm.isEphemeral()); + Assert.assertTrue(cm.isSequential()); + Assert.assertFalse(cm.isContainer()); + + cm = CreateMode.CONTAINER; + Assert.assertEquals(cm.toFlag(), 4); + Assert.assertFalse(cm.isEphemeral()); + Assert.assertFalse(cm.isSequential()); + Assert.assertTrue(cm.isContainer()); + } + + @Test + public void testFlagConversion() throws KeeperException { + // Ensure we get the same value back after round trip conversion + EnumSet<CreateMode> allModes = EnumSet.allOf(CreateMode.class); + + for(CreateMode cm : allModes) { + Assert.assertEquals(cm, CreateMode.fromFlag( cm.toFlag() ) ); + } + } + + @Test + public void testInvalidFlagConversion() throws KeeperException { + try { + CreateMode.fromFlag(99); + Assert.fail("Shouldn't be able to convert 99 to a CreateMode."); + } catch(KeeperException ke) { + Assert.assertEquals(Code.BADARGUMENTS, ke.code()); + } + + try { + CreateMode.fromFlag(-1); + Assert.fail("Shouldn't be able to convert -1 to a CreateMode."); + } catch(KeeperException ke) { + Assert.assertEquals(Code.BADARGUMENTS, ke.code()); + } + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/CreateTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/CreateTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/CreateTest.java new file mode 100644 index 0000000..1b427a1 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/CreateTest.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.test; + +import java.io.IOException; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.Stat; +import org.junit.Assert; +import org.junit.Test; +/** + * Test suite for validating the Create API. + */ +public class CreateTest extends ClientBase { + private ZooKeeper zk; + + @Override + public void setUp() throws Exception { + super.setUp(); + zk = createClient(); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + zk.close(); + } + + @Test + public void testCreate() + throws IOException, KeeperException, InterruptedException { + createNoStatVerifyResult("/foo"); + createNoStatVerifyResult("/foo/child"); + } + + @Test + public void testCreateWithStat() + throws IOException, KeeperException, InterruptedException { + String name = "/foo"; + Stat stat = createWithStatVerifyResult("/foo"); + Stat childStat = createWithStatVerifyResult("/foo/child"); + // Don't expect to get the same stats for different creates. + Assert.assertFalse(stat.equals(childStat)); + } + + @Test + public void testCreateWithNullStat() + throws IOException, KeeperException, InterruptedException { + String name = "/foo"; + Assert.assertNull(zk.exists(name, false)); + + Stat stat = null; + // If a null Stat object is passed the create should still + // succeed, but no Stat info will be returned. + String path = zk.create(name, name.getBytes(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + Assert.assertNull(stat); + Assert.assertNotNull(zk.exists(name, false)); + } + + private void createNoStatVerifyResult(String newName) + throws KeeperException, InterruptedException { + Assert.assertNull("Node existed before created", zk.exists(newName, false)); + String path = zk.create(newName, newName.getBytes(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + Assert.assertEquals(path, newName); + Assert.assertNotNull("Node was not created as expected", + zk.exists(newName, false)); + } + + private Stat createWithStatVerifyResult(String newName) + throws KeeperException, InterruptedException { + Assert.assertNull("Node existed before created", zk.exists(newName, false)); + Stat stat = new Stat(); + String path = zk.create(newName, newName.getBytes(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + Assert.assertEquals(path, newName); + validateCreateStat(stat, newName); + + Stat referenceStat = zk.exists(newName, false); + Assert.assertNotNull("Node was not created as expected", referenceStat); + Assert.assertEquals(referenceStat, stat); + + return stat; + } + + private void validateCreateStat(Stat stat, String name) { + Assert.assertEquals(stat.getCzxid(), stat.getMzxid()); + Assert.assertEquals(stat.getCzxid(), stat.getPzxid()); + Assert.assertEquals(stat.getCtime(), stat.getMtime()); + Assert.assertEquals(0, stat.getCversion()); + Assert.assertEquals(0, stat.getVersion()); + Assert.assertEquals(0, stat.getAversion()); + Assert.assertEquals(0, stat.getEphemeralOwner()); + Assert.assertEquals(name.length(), stat.getDataLength()); + Assert.assertEquals(0, stat.getNumChildren()); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/DisconnectableZooKeeper.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/DisconnectableZooKeeper.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/DisconnectableZooKeeper.java new file mode 100644 index 0000000..619bdc6 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/DisconnectableZooKeeper.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.io.IOException; + +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; + +public class DisconnectableZooKeeper extends ZooKeeper { + public DisconnectableZooKeeper(String host, int sessionTimeout, Watcher watcher) + throws IOException + { + super(host, sessionTimeout, watcher); + } + + public DisconnectableZooKeeper(String host, int sessionTimeout, Watcher watcher, + long sessionId, byte[] sessionPasswd) + throws IOException + { + super(host, sessionTimeout, watcher, sessionId, sessionPasswd); + } + + /** Testing only!!! Really!!!! This is only here to test when the client + * disconnects from the server w/o sending a session disconnect (ie + * ending the session cleanly). The server will eventually notice the + * client is no longer pinging and will timeout the session. + */ + public void disconnect() throws IOException { + cnxn.disconnect(); + } + + /** + * Prevent the client from automatically reconnecting if the connection to the + * server is lost + */ + public void dontReconnect() throws Exception { + java.lang.reflect.Field f = cnxn.getClass().getDeclaredField("closing"); + f.setAccessible(true); + f.setBoolean(cnxn, true); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/DisconnectedWatcherTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/DisconnectedWatcherTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/DisconnectedWatcherTest.java new file mode 100644 index 0000000..aa65e21 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/DisconnectedWatcherTest.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.ZooDefs.Ids; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DisconnectedWatcherTest extends ClientBase { + protected static final Logger LOG = LoggerFactory.getLogger(DisconnectedWatcherTest.class); + final int TIMEOUT = 5000; + + private class MyWatcher extends CountdownWatcher { + LinkedBlockingQueue<WatchedEvent> events = + new LinkedBlockingQueue<WatchedEvent>(); + + public void process(WatchedEvent event) { + super.process(event); + if (event.getType() != Event.EventType.None) { + try { + events.put(event); + } catch (InterruptedException e) { + LOG.warn("ignoring interrupt during event.put"); + } + } + } + } + + // @see jira issue ZOOKEEPER-961 + + @Test + public void testChildWatcherAutoResetWithChroot() throws Exception { + ZooKeeper zk1 = createClient(); + + zk1.create("/ch1", null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + MyWatcher watcher = new MyWatcher(); + ZooKeeper zk2 = createClient(watcher, hostPort + "/ch1"); + zk2.getChildren("/", true ); + + // this call shouldn't trigger any error or watch + zk1.create("/youdontmatter1", null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + // this should trigger the watch + zk1.create("/ch1/youshouldmatter1", null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + WatchedEvent e = watcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS); + Assert.assertNotNull(e); + Assert.assertEquals(EventType.NodeChildrenChanged, e.getType()); + Assert.assertEquals("/", e.getPath()); + + MyWatcher childWatcher = new MyWatcher(); + zk2.getChildren("/", childWatcher); + + stopServer(); + watcher.waitForDisconnected(3000); + startServer(); + watcher.waitForConnected(3000); + + // this should trigger the watch + zk1.create("/ch1/youshouldmatter2", null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS); + Assert.assertNotNull(e); + Assert.assertEquals(EventType.NodeChildrenChanged, e.getType()); + Assert.assertEquals("/", e.getPath()); + } + + @Test + public void testDefaultWatcherAutoResetWithChroot() throws Exception { + ZooKeeper zk1 = createClient(); + + zk1.create("/ch1", null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + MyWatcher watcher = new MyWatcher(); + ZooKeeper zk2 = createClient(watcher, hostPort + "/ch1"); + zk2.getChildren("/", true ); + + // this call shouldn't trigger any error or watch + zk1.create("/youdontmatter1", null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + // this should trigger the watch + zk1.create("/ch1/youshouldmatter1", null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + WatchedEvent e = watcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS); + Assert.assertNotNull(e); + Assert.assertEquals(EventType.NodeChildrenChanged, e.getType()); + Assert.assertEquals("/", e.getPath()); + + zk2.getChildren("/", true ); + + stopServer(); + watcher.waitForDisconnected(3000); + startServer(); + watcher.waitForConnected(3000); + + // this should trigger the watch + zk1.create("/ch1/youshouldmatter2", null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + e = watcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS); + Assert.assertNotNull(e); + Assert.assertEquals(EventType.NodeChildrenChanged, e.getType()); + Assert.assertEquals("/", e.getPath()); + } + + @Test + public void testDeepChildWatcherAutoResetWithChroot() throws Exception { + ZooKeeper zk1 = createClient(); + + zk1.create("/ch1", null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + zk1.create("/ch1/here", null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + zk1.create("/ch1/here/we", null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + zk1.create("/ch1/here/we/are", null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + MyWatcher watcher = new MyWatcher(); + ZooKeeper zk2 = createClient(watcher, hostPort + "/ch1/here/we"); + zk2.getChildren("/are", true ); + + // this should trigger the watch + zk1.create("/ch1/here/we/are/now", null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + WatchedEvent e = watcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS); + Assert.assertNotNull(e); + Assert.assertEquals(EventType.NodeChildrenChanged, e.getType()); + Assert.assertEquals("/are", e.getPath()); + + MyWatcher childWatcher = new MyWatcher(); + zk2.getChildren("/are", childWatcher); + + stopServer(); + watcher.waitForDisconnected(3000); + startServer(); + watcher.waitForConnected(3000); + + // this should trigger the watch + zk1.create("/ch1/here/we/are/again", null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS); + Assert.assertNotNull(e); + Assert.assertEquals(EventType.NodeChildrenChanged, e.getType()); + Assert.assertEquals("/are", e.getPath()); + } + + // @see jira issue ZOOKEEPER-706. Test auto reset of a large number of + // watches which require multiple SetWatches calls. + @Test(timeout = 840000) + public void testManyChildWatchersAutoReset() throws Exception { + ZooKeeper zk1 = createClient(); + + MyWatcher watcher = new MyWatcher(); + ZooKeeper zk2 = createClient(watcher); + + // 110 character base path + String pathBase = "/long-path-000000000-111111111-222222222-333333333-444444444-" + + "555555555-666666666-777777777-888888888-999999999"; + + zk1.create(pathBase, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + // Create 10,000 nodes. This should ensure the length of our + // watches set below exceeds 1MB. + List<String> paths = new ArrayList<String>(); + for (int i = 0; i < 10000; i++) { + String path = zk1.create(pathBase + "/ch-", null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL); + paths.add(path); + } + LOG.info("Created 10,000 nodes."); + + MyWatcher childWatcher = new MyWatcher(); + + // Set a combination of child/exists/data watches + int i = 0; + for (String path : paths) { + if (i % 3 == 0) { + zk2.getChildren(path, childWatcher); + } else if (i % 3 == 1) { + zk2.exists(path + "/foo", childWatcher); + } else if (i % 3 == 2) { + zk2.getData(path, childWatcher, null); + } + + i++; + } + + stopServer(); + watcher.waitForDisconnected(30000); + startServer(); + watcher.waitForConnected(30000); + + // Trigger the watches and ensure they properly propagate to the client + i = 0; + for (String path : paths) { + if (i % 3 == 0) { + zk1.create(path + "/ch", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + WatchedEvent e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS); + Assert.assertNotNull(e); + Assert.assertEquals(EventType.NodeChildrenChanged, e.getType()); + Assert.assertEquals(path, e.getPath()); + } else if (i % 3 == 1) { + zk1.create(path + "/foo", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + WatchedEvent e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS); + Assert.assertNotNull(e); + Assert.assertEquals(EventType.NodeCreated, e.getType()); + Assert.assertEquals(path + "/foo", e.getPath()); + } else if (i % 3 == 2) { + zk1.setData(path, new byte[]{1, 2, 3}, -1); + + WatchedEvent e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS); + Assert.assertNotNull(e); + Assert.assertEquals(EventType.NodeDataChanged, e.getType()); + Assert.assertEquals(path, e.getPath()); + } + + i++; + } + } + +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/DuplicateLocalSessionUpgradeTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/DuplicateLocalSessionUpgradeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/DuplicateLocalSessionUpgradeTest.java new file mode 100644 index 0000000..a21acdc --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/DuplicateLocalSessionUpgradeTest.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.test.ClientBase.CountdownWatcher; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * When request are route incorrectly, both follower and the leader will perform + * local session upgrade. So we saw CreateSession twice in txnlog This doesn't + * affect the correctness but cause the ensemble to see more load than + * necessary. + */ +public class DuplicateLocalSessionUpgradeTest extends ZKTestCase { + protected static final Logger LOG = LoggerFactory + .getLogger(DuplicateLocalSessionUpgradeTest.class); + + private final QuorumBase qb = new QuorumBase(); + + private static final int CONNECTION_TIMEOUT = ClientBase.CONNECTION_TIMEOUT; + + @Before + public void setUp() throws Exception { + LOG.info("STARTING quorum " + getClass().getName()); + qb.localSessionsEnabled = true; + qb.localSessionsUpgradingEnabled = true; + qb.setUp(); + ClientBase.waitForServerUp(qb.hostPort, 10000); + } + + @After + public void tearDown() throws Exception { + LOG.info("STOPPING quorum " + getClass().getName()); + qb.tearDown(); + } + + @Test + public void testLocalSessionUpgradeOnFollower() throws Exception { + testLocalSessionUpgrade(false); + } + + @Test + public void testLocalSessionUpgradeOnLeader() throws Exception { + testLocalSessionUpgrade(true); + } + + private void testLocalSessionUpgrade(boolean testLeader) throws Exception { + + int leaderIdx = qb.getLeaderIndex(); + Assert.assertFalse("No leader in quorum?", leaderIdx == -1); + int followerIdx = (leaderIdx + 1) % 5; + int testPeerIdx = testLeader ? leaderIdx : followerIdx; + String hostPorts[] = qb.hostPort.split(","); + + CountdownWatcher watcher = new CountdownWatcher(); + ZooKeeper zk = qb.createClient(watcher, hostPorts[testPeerIdx], + CONNECTION_TIMEOUT); + watcher.waitForConnected(CONNECTION_TIMEOUT); + + final String firstPath = "/first"; + final String secondPath = "/ephemeral"; + + // Just create some node so that we know the current zxid + zk.create(firstPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + // Now, try an ephemeral node. This will trigger session upgrade + // so there will be createSession request inject into the pipeline + // prior to this request + zk.create(secondPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL); + + Stat firstStat = zk.exists(firstPath, null); + Assert.assertNotNull(firstStat); + + Stat secondStat = zk.exists(secondPath, null); + Assert.assertNotNull(secondStat); + + long zxidDiff = secondStat.getCzxid() - firstStat.getCzxid(); + + // If there is only one createSession request in between, zxid diff + // will be exactly 2. The alternative way of checking is to actually + // read txnlog but this should be sufficient + Assert.assertEquals(2L, zxidDiff); + + } +}