http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java
----------------------------------------------------------------------
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java
new file mode 100644
index 0000000..6373bb3
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java
@@ -0,0 +1,870 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.KeeperException.InvalidACLException;
+import org.apache.zookeeper.TestableZooKeeper;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooDefs.Perms;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.ExistsRequest;
+import org.apache.zookeeper.proto.ExistsResponse;
+import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.proto.RequestHeader;
+import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.util.OSMXBean;
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClientTest extends ClientBase {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(ClientTest.class);
+    private boolean skipACL = System.getProperty("zookeeper.skipACL", 
"no").equals("yes");
+
+    /** Verify that pings are sent, keeping the "idle" client alive */
+    @Test
+    public void testPing() throws Exception {
+        ZooKeeper zkIdle = null;
+        ZooKeeper zkWatchCreator = null;
+        try {
+            CountdownWatcher watcher = new CountdownWatcher();
+            zkIdle = createClient(watcher, hostPort, 10000);
+
+            zkWatchCreator = createClient();
+
+            for (int i = 0; i < 10; i++) {
+                zkWatchCreator.create("/" + i, new byte[0], 
Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT);
+            }
+            for (int i = 0; i < 10; i++) {
+                zkIdle.exists("/" + i, true);
+            }
+            for (int i = 0; i < 10; i++) {
+                Thread.sleep(1000);
+                zkWatchCreator.delete("/" + i, -1);
+            }
+            // The bug will manifest itself here because zkIdle will expire
+            zkIdle.exists("/0", false);
+        } finally {
+            if (zkIdle != null) {
+                zkIdle.close();
+            }
+            if (zkWatchCreator != null) {
+                zkWatchCreator.close();
+            }
+        }
+    }
+
+    @Test
+    public void testClientwithoutWatcherObj() throws IOException,
+            InterruptedException, KeeperException
+    {
+        performClientTest(false);
+    }
+
+    @Test
+    public void testClientWithWatcherObj() throws IOException,
+            InterruptedException, KeeperException
+    {
+        performClientTest(true);
+    }
+
+    /** Exercise the testable functions, verify tostring, etc... */
+    @Test
+    public void testTestability() throws Exception {
+        TestableZooKeeper zk = createClient();
+        try {
+            LOG.info("{}",zk.testableLocalSocketAddress());
+            LOG.info("{}",zk.testableRemoteSocketAddress());
+            LOG.info("{}",zk.toString());
+        } finally {
+            zk.close(CONNECTION_TIMEOUT);
+            LOG.info("{}",zk.testableLocalSocketAddress());
+            LOG.info("{}",zk.testableRemoteSocketAddress());
+            LOG.info("{}",zk.toString());
+        }
+    }
+
+    @Test
+    public void testACLs() throws Exception {
+        ZooKeeper zk = null;
+        try {
+            zk = createClient();
+            try {
+                zk.create("/acltest", new byte[0], Ids.CREATOR_ALL_ACL, 
CreateMode.PERSISTENT);
+                Assert.fail("Should have received an invalid acl error");
+            } catch(InvalidACLException e) {
+                LOG.info("Test successful, invalid acl received : "
+                        + e.getMessage());
+            }
+            try {
+                ArrayList<ACL> testACL = new ArrayList<ACL>();
+                testACL.add(new ACL(Perms.ALL | Perms.ADMIN, Ids.AUTH_IDS));
+                testACL.add(new ACL(Perms.ALL | Perms.ADMIN, new Id("ip", 
"127.0.0.1/8")));
+                zk.create("/acltest", new byte[0], testACL, 
CreateMode.PERSISTENT);
+                Assert.fail("Should have received an invalid acl error");
+            } catch(InvalidACLException e) {
+                LOG.info("Test successful, invalid acl received : "
+                        + e.getMessage());
+            }
+            try {
+                ArrayList<ACL> testACL = new ArrayList<ACL>();
+                testACL.add(new ACL(Perms.ALL | Perms.ADMIN, new Id()));
+                zk.create("/nullidtest", new byte[0], testACL, 
CreateMode.PERSISTENT);
+                Assert.fail("Should have received an invalid acl error");
+            } catch(InvalidACLException e) {
+                LOG.info("Test successful, invalid acl received : "
+                        + e.getMessage());
+            }
+            zk.addAuthInfo("digest", "ben:passwd".getBytes());
+            ArrayList<ACL> testACL = new ArrayList<ACL>();
+            testACL.add(new ACL(Perms.ALL, new Id("auth","")));
+            testACL.add(new ACL(Perms.WRITE, new Id("ip", "127.0.0.1")));
+            zk.create("/acltest", new byte[0], testACL, CreateMode.PERSISTENT);
+            zk.close();
+            zk = createClient();
+            zk.addAuthInfo("digest", "ben:passwd2".getBytes());
+            if (skipACL) {
+                try {
+                    zk.getData("/acltest", false, null);
+                } catch (KeeperException e) {
+                    Assert.fail("Badauth reads should succeed with skipACL.");
+                }
+            } else {
+                try {
+                    zk.getData("/acltest", false, null);
+                    Assert.fail("Should have received a permission error");
+                } catch (KeeperException e) {
+                    Assert.assertEquals(Code.NOAUTH, e.code());
+                }
+            }
+            zk.addAuthInfo("digest", "ben:passwd".getBytes());
+            zk.getData("/acltest", false, null);
+            zk.setACL("/acltest", Ids.OPEN_ACL_UNSAFE, -1);
+            zk.close();
+            zk = createClient();
+            zk.getData("/acltest", false, null);
+            List<ACL> acls = zk.getACL("/acltest", new Stat());
+            Assert.assertEquals(1, acls.size());
+            Assert.assertEquals(Ids.OPEN_ACL_UNSAFE, acls);
+
+            // The stat parameter should be optional.
+            acls = zk.getACL("/acltest", null);
+            Assert.assertEquals(1, acls.size());
+            Assert.assertEquals(Ids.OPEN_ACL_UNSAFE, acls);
+
+            zk.close();
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+
+    @Test
+    public void testNullAuthId() throws Exception {
+        ZooKeeper zk = null;
+        try {
+            zk = createClient();
+            zk.addAuthInfo("digest", "ben:passwd".getBytes());
+            ArrayList<ACL> testACL = new ArrayList<ACL>();
+            testACL.add(new ACL(Perms.ALL, new Id("auth", null)));
+            zk.create("/acltest", new byte[0], testACL, CreateMode.PERSISTENT);
+            zk.close();
+            zk = createClient();
+            zk.addAuthInfo("digest", "ben:passwd2".getBytes());
+            if (skipACL) {
+                try {
+                    zk.getData("/acltest", false, null);
+                } catch (KeeperException e) {
+                    Assert.fail("Badauth reads should succeed with skipACL.");
+                }
+            } else {
+                try {
+                    zk.getData("/acltest", false, null);
+                    Assert.fail("Should have received a permission error");
+                } catch (KeeperException e) {
+                    Assert.assertEquals(Code.NOAUTH, e.code());
+                }
+            }
+            zk.addAuthInfo("digest", "ben:passwd".getBytes());
+            zk.getData("/acltest", false, null);
+            zk.setACL("/acltest", Ids.OPEN_ACL_UNSAFE, -1);
+            zk.close();
+            zk = createClient();
+            zk.getData("/acltest", false, null);
+            List<ACL> acls = zk.getACL("/acltest", new Stat());
+            Assert.assertEquals(1, acls.size());
+            Assert.assertEquals(Ids.OPEN_ACL_UNSAFE, acls);
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+
+    private class MyWatcher extends CountdownWatcher {
+        LinkedBlockingQueue<WatchedEvent> events =
+            new LinkedBlockingQueue<WatchedEvent>();
+
+        public void process(WatchedEvent event) {
+            super.process(event);
+            if (event.getType() != EventType.None) {
+                try {
+                    events.put(event);
+                } catch (InterruptedException e) {
+                    LOG.warn("ignoring interrupt during event.put");
+                }
+            }
+        }
+    }
+
+    /**
+     * Register multiple watchers and verify that they all get notified and
+     * in the right order.
+     */
+    @Test
+    public void testMutipleWatcherObjs()
+        throws IOException, InterruptedException, KeeperException
+    {
+        ZooKeeper zk = createClient(new CountdownWatcher(), hostPort);
+        try {
+            MyWatcher watchers[] = new MyWatcher[100];
+            MyWatcher watchers2[] = new MyWatcher[watchers.length];
+            for (int i = 0; i < watchers.length; i++) {
+                watchers[i] = new MyWatcher();
+                watchers2[i] = new MyWatcher();
+                zk.create("/foo-" + i, ("foodata" + i).getBytes(),
+                        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            }
+            Stat stat = new Stat();
+
+            //
+            // test get/exists with single set of watchers
+            //   get all, then exists all
+            //
+            for (int i = 0; i < watchers.length; i++) {
+                Assert.assertNotNull(zk.getData("/foo-" + i, watchers[i], 
stat));
+            }
+            for (int i = 0; i < watchers.length; i++) {
+                Assert.assertNotNull(zk.exists("/foo-" + i, watchers[i]));
+            }
+            // trigger the watches
+            for (int i = 0; i < watchers.length; i++) {
+                zk.setData("/foo-" + i, ("foodata2-" + i).getBytes(), -1);
+                zk.setData("/foo-" + i, ("foodata3-" + i).getBytes(), -1);
+            }
+            for (int i = 0; i < watchers.length; i++) {
+                WatchedEvent event =
+                    watchers[i].events.poll(10, TimeUnit.SECONDS);
+                Assert.assertEquals("/foo-" + i, event.getPath());
+                Assert.assertEquals(EventType.NodeDataChanged, 
event.getType());
+                Assert.assertEquals(KeeperState.SyncConnected, 
event.getState());
+
+                // small chance that an unexpected message was delivered
+                //  after this check, but we would catch that next time
+                //  we check events
+                Assert.assertEquals(0, watchers[i].events.size());
+            }
+
+            //
+            // test get/exists with single set of watchers
+            //  get/exists together
+            //
+            for (int i = 0; i < watchers.length; i++) {
+                Assert.assertNotNull(zk.getData("/foo-" + i, watchers[i], 
stat));
+                Assert.assertNotNull(zk.exists("/foo-" + i, watchers[i]));
+            }
+            // trigger the watches
+            for (int i = 0; i < watchers.length; i++) {
+                zk.setData("/foo-" + i, ("foodata4-" + i).getBytes(), -1);
+                zk.setData("/foo-" + i, ("foodata5-" + i).getBytes(), -1);
+            }
+            for (int i = 0; i < watchers.length; i++) {
+                WatchedEvent event =
+                    watchers[i].events.poll(10, TimeUnit.SECONDS);
+                Assert.assertEquals("/foo-" + i, event.getPath());
+                Assert.assertEquals(EventType.NodeDataChanged, 
event.getType());
+                Assert.assertEquals(KeeperState.SyncConnected, 
event.getState());
+
+                // small chance that an unexpected message was delivered
+                //  after this check, but we would catch that next time
+                //  we check events
+                Assert.assertEquals(0, watchers[i].events.size());
+            }
+
+            //
+            // test get/exists with two sets of watchers
+            //
+            for (int i = 0; i < watchers.length; i++) {
+                Assert.assertNotNull(zk.getData("/foo-" + i, watchers[i], 
stat));
+                Assert.assertNotNull(zk.exists("/foo-" + i, watchers2[i]));
+            }
+            // trigger the watches
+            for (int i = 0; i < watchers.length; i++) {
+                zk.setData("/foo-" + i, ("foodata6-" + i).getBytes(), -1);
+                zk.setData("/foo-" + i, ("foodata7-" + i).getBytes(), -1);
+            }
+            for (int i = 0; i < watchers.length; i++) {
+                WatchedEvent event =
+                    watchers[i].events.poll(10, TimeUnit.SECONDS);
+                Assert.assertEquals("/foo-" + i, event.getPath());
+                Assert.assertEquals(EventType.NodeDataChanged, 
event.getType());
+                Assert.assertEquals(KeeperState.SyncConnected, 
event.getState());
+
+                // small chance that an unexpected message was delivered
+                //  after this check, but we would catch that next time
+                //  we check events
+                Assert.assertEquals(0, watchers[i].events.size());
+
+                // watchers2
+                WatchedEvent event2 =
+                    watchers2[i].events.poll(10, TimeUnit.SECONDS);
+                Assert.assertEquals("/foo-" + i, event2.getPath());
+                Assert.assertEquals(EventType.NodeDataChanged, 
event2.getType());
+                Assert.assertEquals(KeeperState.SyncConnected, 
event2.getState());
+
+                // small chance that an unexpected message was delivered
+                //  after this check, but we would catch that next time
+                //  we check events
+                Assert.assertEquals(0, watchers2[i].events.size());
+            }
+
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+
+    private void performClientTest(boolean withWatcherObj)
+        throws IOException, InterruptedException, KeeperException
+    {
+        ZooKeeper zk = null;
+        try {
+            MyWatcher watcher = new MyWatcher();
+            zk = createClient(watcher, hostPort);
+            LOG.info("Before create /benwashere");
+            zk.create("/benwashere", "".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+            LOG.info("After create /benwashere");
+            try {
+                zk.setData("/benwashere", "hi".getBytes(), 57);
+                Assert.fail("Should have gotten BadVersion exception");
+            } catch(KeeperException.BadVersionException e) {
+                // expected that
+            } catch (KeeperException e) {
+                Assert.fail("Should have gotten BadVersion exception");
+            }
+            LOG.info("Before delete /benwashere");
+            zk.delete("/benwashere", 0);
+            LOG.info("After delete /benwashere");
+            zk.close();
+            //LOG.info("Closed client: " + zk.describeCNXN());
+            Thread.sleep(2000);
+
+            zk = createClient(watcher, hostPort);
+            //LOG.info("Created a new client: " + zk.describeCNXN());
+            LOG.info("Before delete /");
+
+            try {
+                zk.delete("/", -1);
+                Assert.fail("deleted root!");
+            } catch(KeeperException.BadArgumentsException e) {
+                // good, expected that
+            }
+            Stat stat = new Stat();
+            // Test basic create, ls, and getData
+            zk.create("/pat", "Pat was here".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+            LOG.info("Before create /ben");
+            zk.create("/pat/ben", "Ben was here".getBytes(),
+                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            LOG.info("Before getChildren /pat");
+            List<String> children = zk.getChildren("/pat", false);
+            Assert.assertEquals(1, children.size());
+            Assert.assertEquals("ben", children.get(0));
+            List<String> children2 = zk.getChildren("/pat", false, null);
+            Assert.assertEquals(children, children2);
+            String value = new String(zk.getData("/pat/ben", false, stat));
+            Assert.assertEquals("Ben was here", value);
+            // Test stat and watch of non existent node
+
+            try {
+                if (withWatcherObj) {
+                    Assert.assertEquals(null, zk.exists("/frog", watcher));
+                } else {
+                    Assert.assertEquals(null, zk.exists("/frog", true));
+                }
+                LOG.info("Comment: asseting passed for frog setting /");
+            } catch (KeeperException.NoNodeException e) {
+                // OK, expected that
+            }
+            zk.create("/frog", "hi".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+            // the first poll is just a session delivery
+            LOG.info("Comment: checking for events length "
+                     + watcher.events.size());
+            WatchedEvent event = watcher.events.poll(10, TimeUnit.SECONDS);
+            Assert.assertEquals("/frog", event.getPath());
+            Assert.assertEquals(EventType.NodeCreated, event.getType());
+            Assert.assertEquals(KeeperState.SyncConnected, event.getState());
+            // Test child watch and create with sequence
+            zk.getChildren("/pat/ben", true);
+            for (int i = 0; i < 10; i++) {
+                zk.create("/pat/ben/" + i + "-", 
Integer.toString(i).getBytes(),
+                        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
+            }
+            children = zk.getChildren("/pat/ben", false);
+            Collections.sort(children);
+            Assert.assertEquals(10, children.size());
+            for (int i = 0; i < 10; i++) {
+                final String name = children.get(i);
+                Assert.assertTrue("starts with -", name.startsWith(i + "-"));
+                byte b[];
+                if (withWatcherObj) {
+                    b = zk.getData("/pat/ben/" + name, watcher, stat);
+                } else {
+                    b = zk.getData("/pat/ben/" + name, true, stat);
+                }
+                Assert.assertEquals(Integer.toString(i), new String(b));
+                zk.setData("/pat/ben/" + name, "new".getBytes(),
+                        stat.getVersion());
+                if (withWatcherObj) {
+                    stat = zk.exists("/pat/ben/" + name, watcher);
+                } else {
+                stat = zk.exists("/pat/ben/" + name, true);
+                }
+                zk.delete("/pat/ben/" + name, stat.getVersion());
+            }
+            event = watcher.events.poll(10, TimeUnit.SECONDS);
+            Assert.assertEquals("/pat/ben", event.getPath());
+            Assert.assertEquals(EventType.NodeChildrenChanged, 
event.getType());
+            Assert.assertEquals(KeeperState.SyncConnected, event.getState());
+            for (int i = 0; i < 10; i++) {
+                event = watcher.events.poll(10, TimeUnit.SECONDS);
+                final String name = children.get(i);
+                Assert.assertEquals("/pat/ben/" + name, event.getPath());
+                Assert.assertEquals(EventType.NodeDataChanged, 
event.getType());
+                Assert.assertEquals(KeeperState.SyncConnected, 
event.getState());
+                event = watcher.events.poll(10, TimeUnit.SECONDS);
+                Assert.assertEquals("/pat/ben/" + name, event.getPath());
+                Assert.assertEquals(EventType.NodeDeleted, event.getType());
+                Assert.assertEquals(KeeperState.SyncConnected, 
event.getState());
+            }
+            zk.create("/good\u0040path", "".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+
+            zk.create("/duplicate", "".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+            try {
+                zk.create("/duplicate", "".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT);
+                Assert.fail("duplicate create allowed");
+            } catch(KeeperException.NodeExistsException e) {
+                // OK, expected that
+            }
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+    
+    // Test that sequential filenames are being created correctly,
+    // with 0-padding in the filename
+    @Test
+    public void testSequentialNodeNames()
+        throws IOException, InterruptedException, KeeperException
+    {
+        String path = "/SEQUENCE";
+        String file = "TEST";
+        String filepath = path + "/" + file;
+
+        ZooKeeper zk = null;
+        try {
+            zk = createClient();
+            zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+            zk.create(filepath, new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT_SEQUENTIAL);
+            List<String> children = zk.getChildren(path, false);
+            Assert.assertEquals(1, children.size());
+            Assert.assertEquals(file + "0000000000", children.get(0));
+
+            zk.create(filepath, new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT_SEQUENTIAL);
+            children = zk.getChildren(path, false);
+            Assert.assertEquals(2, children.size());
+            Assert.assertTrue("contains child 1",  children.contains(file + 
"0000000001"));
+
+            zk.create(filepath, new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT_SEQUENTIAL);
+            children = zk.getChildren(path, false);
+            Assert.assertEquals(3, children.size());
+            Assert.assertTrue("contains child 2",
+                       children.contains(file + "0000000002"));
+
+            // The pattern is holding so far.  Let's run the counter a bit
+            // to be sure it continues to spit out the correct answer
+            for(int i = children.size(); i < 105; i++)
+               zk.create(filepath, new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT_SEQUENTIAL);
+
+            children = zk.getChildren(path, false);
+            Assert.assertTrue("contains child 104",
+                       children.contains(file + "0000000104"));
+
+        }
+        finally {
+            if(zk != null)
+                zk.close();
+        }
+    }
+    
+    // Test that data provided when 
+    // creating sequential nodes is stored properly
+    @Test
+    public void testSequentialNodeData() throws Exception {
+        ZooKeeper zk= null;
+        String queue_handle = "/queue";
+        try {
+            zk = createClient();
+
+            zk.create(queue_handle, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+            zk.create(queue_handle + "/element", "0".getBytes(),
+                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
+            zk.create(queue_handle + "/element", "1".getBytes(),
+                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
+            List<String> children = zk.getChildren(queue_handle, true);
+            Assert.assertEquals(children.size(), 2);
+            String child1 = children.get(0);
+            String child2 = children.get(1);
+            int compareResult = child1.compareTo(child2);
+            Assert.assertNotSame(compareResult, 0);
+            if (compareResult < 0) {
+            } else {
+                String temp = child1;
+                child1 = child2;
+                child2 = temp;
+            }
+            String child1data = new String(zk.getData(queue_handle
+                    + "/" + child1, false, null));
+            String child2data = new String(zk.getData(queue_handle
+                    + "/" + child2, false, null));
+            Assert.assertEquals(child1data, "0");
+            Assert.assertEquals(child2data, "1");
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+
+    }
+
+    @Test
+    public void testLargeNodeData() throws Exception {
+        ZooKeeper zk= null;
+        String queue_handle = "/large";
+        try {
+            zk = createClient();
+
+            zk.create(queue_handle, new byte[500000], Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+
+    }
+
+    private void verifyCreateFails(String path, ZooKeeper zk) throws Exception 
{
+        try {
+            zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        } catch (IllegalArgumentException e) {
+            // this is good
+            return;
+        }
+        Assert.fail("bad path \"" + path + "\" not caught");
+    }
+
+    // Test that the path string is validated
+    @Test
+    public void testPathValidation() throws Exception {
+        ZooKeeper zk = createClient();
+
+        verifyCreateFails(null, zk);
+        verifyCreateFails("", zk);
+        verifyCreateFails("//", zk);
+        verifyCreateFails("///", zk);
+        verifyCreateFails("////", zk);
+        verifyCreateFails("/.", zk);
+        verifyCreateFails("/..", zk);
+        verifyCreateFails("/./", zk);
+        verifyCreateFails("/../", zk);
+        verifyCreateFails("/foo/./", zk);
+        verifyCreateFails("/foo/../", zk);
+        verifyCreateFails("/foo/.", zk);
+        verifyCreateFails("/foo/..", zk);
+        verifyCreateFails("/./.", zk);
+        verifyCreateFails("/../..", zk);
+        verifyCreateFails("/\u0001foo", zk);
+        verifyCreateFails("/foo/bar/", zk);
+        verifyCreateFails("/foo//bar", zk);
+        verifyCreateFails("/foo/bar//", zk);
+
+        verifyCreateFails("foo", zk);
+        verifyCreateFails("a", zk);
+
+        zk.create("/createseqpar", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        // next two steps - related to sequential processing
+        // 1) verify that empty child name Assert.fails if not sequential
+        try {
+            zk.create("/createseqpar/", null, Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+            Assert.assertTrue(false);
+        } catch(IllegalArgumentException be) {
+            // catch this.
+        }
+
+        // 2) verify that empty child name success if sequential 
+        zk.create("/createseqpar/", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT_SEQUENTIAL);
+        zk.create("/createseqpar/.", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT_SEQUENTIAL);
+        zk.create("/createseqpar/..", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT_SEQUENTIAL);
+        try {
+            zk.create("/createseqpar//", null, Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT_SEQUENTIAL);
+            Assert.assertTrue(false);
+        } catch(IllegalArgumentException be) {
+            // catch this.
+        }
+        try {
+            zk.create("/createseqpar/./", null, Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT_SEQUENTIAL);
+            Assert.assertTrue(false);
+        } catch(IllegalArgumentException be) {
+            // catch this.
+        }
+        try {
+            zk.create("/createseqpar/../", null, Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT_SEQUENTIAL);
+            Assert.assertTrue(false);
+        } catch(IllegalArgumentException be) {
+            // catch this.
+        }
+
+        
+        //check for the code path that throws at server
+        PrepRequestProcessor.setFailCreate(true);
+        try {
+            zk.create("/m", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            Assert.assertTrue(false);
+        } catch(KeeperException.BadArgumentsException be) {
+            // catch this.
+        }
+        PrepRequestProcessor.setFailCreate(false);
+        zk.create("/.foo", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/.f.", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/..f", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/..f..", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/f.c", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/f\u0040f", null, Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+        zk.create("/f", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/f/.f", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/f/f.", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/f/..f", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/f/f..", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/f/.f/f", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/f/f./f", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    }
+
+//    private void notestConnections()
+//        throws IOException, InterruptedException, KeeperException
+//    {
+//        ZooKeeper zk;
+//        for(int i = 0; i < 2000; i++) {
+//            if (i % 100 == 0) {
+//                LOG.info("Testing " + i + " connections");
+//            }
+//            // We want to make sure socket descriptors are going away
+//            zk = new ZooKeeper(hostPort, 30000, this);
+//            zk.getData("/", false, new Stat());
+//            zk.close();
+//        }
+//    }
+
+    @Test
+    public void testDeleteWithChildren() throws Exception {
+        ZooKeeper zk = createClient();
+        zk.create("/parent", new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+        zk.create("/parent/child", new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+        try {
+            zk.delete("/parent", -1);
+            Assert.fail("Should have received a not equals message");
+        } catch (KeeperException e) {
+            Assert.assertEquals(KeeperException.Code.NOTEMPTY, e.code());
+        }
+        zk.delete("/parent/child", -1);
+        zk.delete("/parent", -1);
+        zk.close();
+    }
+
+    private class VerifyClientCleanup extends Thread {
+        int count;
+        int current = 0;
+
+        VerifyClientCleanup(String name, int count) {
+            super(name);
+            this.count = count;
+        }
+
+        public void run() {
+            try {
+                for (; current < count; current++) {
+                    TestableZooKeeper zk = createClient();
+                    // we've asked to close, wait for it to finish closing
+                    // all the sub-threads otw the selector may not be
+                    // closed when we check (false positive on test 
Assert.failure
+                    zk.close(CONNECTION_TIMEOUT);
+                }
+            } catch (Throwable t) {
+                LOG.error("test Assert.failed", t);
+            }
+        }
+    }
+
+    /**
+     * Verify that the client is cleaning up properly. Open/close a large
+     * number of sessions. Essentially looking to see if sockets/selectors
+     * are being cleaned up properly during close.
+     *
+     * @throws Throwable
+     */
+    @Test
+    public void testClientCleanup() throws Throwable {
+        OSMXBean osMbean = new OSMXBean();
+        if (osMbean.getUnix() == false) {
+            LOG.warn("skipping testClientCleanup, only available on Unix");
+            return;
+        }
+
+        final int threadCount = 3;
+        final int clientCount = 10;
+
+        /* Log the number of fds used before and after a test is run. Verifies
+         * we are freeing resources correctly. Unfortunately this only works
+         * on unix systems (the only place sun has implemented as part of the
+         * mgmt bean api).
+         */
+        long initialFdCount = osMbean.getOpenFileDescriptorCount();
+
+        VerifyClientCleanup threads[] = new VerifyClientCleanup[threadCount];
+
+        for (int i = 0; i < threads.length; i++) {
+            threads[i] = new VerifyClientCleanup("VCC" + i, clientCount);
+            threads[i].start();
+        }
+
+        for (int i = 0; i < threads.length; i++) {
+            threads[i].join(CONNECTION_TIMEOUT);
+            Assert.assertTrue(threads[i].current == threads[i].count);
+        }
+
+        // if this Assert.fails it means we are not cleaning up after the 
closed
+        // sessions.
+        long currentCount = osMbean.getOpenFileDescriptorCount();
+        final String logmsg = "open fds after test ({}) are not significantly 
higher than before ({})";
+        
+        if (currentCount > initialFdCount + 10) {
+            // consider as error
+               
LOG.error(logmsg,Long.valueOf(currentCount),Long.valueOf(initialFdCount));
+        } else {
+               
LOG.info(logmsg,Long.valueOf(currentCount),Long.valueOf(initialFdCount));
+        }
+    }
+
+
+    /**
+     * We create a perfectly valid 'exists' request, except that the opcode is 
wrong.
+     * @return
+     * @throws Exception
+     */
+    @Test
+    public void testNonExistingOpCode() throws Exception  {
+        final CountDownLatch clientDisconnected = new CountDownLatch(1);
+        Watcher watcher = new Watcher() {
+            @Override
+            public synchronized void process(WatchedEvent event) {
+                if (event.getState() == KeeperState.Disconnected) {
+                    clientDisconnected.countDown();
+                }
+            }
+        };
+        TestableZooKeeper zk = new TestableZooKeeper(hostPort, 
CONNECTION_TIMEOUT, watcher);
+
+        final String path = "/m1";
+
+        RequestHeader h = new RequestHeader();
+        h.setType(888);  // This code does not exists
+        ExistsRequest request = new ExistsRequest();
+        request.setPath(path);
+        request.setWatch(false);
+        ExistsResponse response = new ExistsResponse();
+
+        ReplyHeader r = zk.submitRequest(h, request, response, null);
+
+        Assert.assertEquals(r.getErr(), Code.UNIMPLEMENTED.intValue());
+
+        // Sending a nonexisting opcode should cause the server to disconnect
+        Assert.assertTrue("failed to disconnect",
+                clientDisconnected.await(5000, TimeUnit.MILLISECONDS));
+    }
+
+    @Test
+    public void testTryWithResources() throws Exception {
+        ZooKeeper zooKeeper;
+        try (ZooKeeper zk = createClient()) {
+            zooKeeper = zk;
+            Assert.assertTrue(zooKeeper.getState().isAlive());
+        }
+
+        Assert.assertFalse(zooKeeper.getState().isAlive());
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java
----------------------------------------------------------------------
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java
new file mode 100644
index 0000000..a072bc0
--- /dev/null
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java
@@ -0,0 +1,523 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.net.Socket;
+
+import org.apache.zookeeper.common.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.quorum.QuorumCnxManager;
+import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
+import org.apache.zookeeper.server.quorum.QuorumCnxManager.InitialMessage;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CnxManagerTest extends ZKTestCase {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(FLENewEpochTest.class);
+    protected static final int THRESHOLD = 4;
+
+    int count;
+    HashMap<Long,QuorumServer> peers;
+    File peerTmpdir[];
+    int peerQuorumPort[];
+    int peerClientPort[];
+    @Before
+    public void setUp() throws Exception {
+
+        this.count = 3;
+        this.peers = new HashMap<Long,QuorumServer>(count);
+        peerTmpdir = new File[count];
+        peerQuorumPort = new int[count];
+        peerClientPort = new int[count];
+
+        for(int i = 0; i < count; i++) {
+            peerQuorumPort[i] = PortAssignment.unique();
+            peerClientPort[i] = PortAssignment.unique();
+            peers.put(Long.valueOf(i),
+                new QuorumServer(i,
+                    new InetSocketAddress(
+                        "127.0.0.1", peerQuorumPort[i]),
+                    new InetSocketAddress(
+                        "127.0.0.1", PortAssignment.unique()),
+                    new InetSocketAddress(
+                        "127.0.0.1", peerClientPort[i])));
+            peerTmpdir[i] = ClientBase.createTmpDir();
+        }
+    }
+
+    ByteBuffer createMsg(int state, long leader, long zxid, long epoch){
+        byte requestBytes[] = new byte[28];
+        ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
+
+        /*
+         * Building notification packet to send
+         */
+
+        requestBuffer.clear();
+        requestBuffer.putInt(state);
+        requestBuffer.putLong(leader);
+        requestBuffer.putLong(zxid);
+        requestBuffer.putLong(epoch);
+
+        return requestBuffer;
+    }
+
+    class CnxManagerThread extends Thread {
+
+        boolean failed;
+        CnxManagerThread(){
+            failed = false;
+        }
+
+        public void run(){
+            try {
+                QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[0], 
peerTmpdir[0], peerClientPort[0], 3, 0, 1000, 2, 2);
+                QuorumCnxManager cnxManager = peer.createCnxnManager();
+                QuorumCnxManager.Listener listener = cnxManager.listener;
+                if(listener != null){
+                    listener.start();
+                } else {
+                    LOG.error("Null listener when initializing cnx manager");
+                }
+
+                long sid = 1;
+                cnxManager.toSend(sid, 
createMsg(ServerState.LOOKING.ordinal(), 0, -1, 1));
+
+                Message m = null;
+                int numRetries = 1;
+                while((m == null) && (numRetries++ <= THRESHOLD)){
+                    m = cnxManager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
+                    if(m == null) cnxManager.connectAll();
+                }
+
+                if(numRetries > THRESHOLD){
+                    failed = true;
+                    return;
+                }
+
+                cnxManager.testInitiateConnection(sid);
+
+                m = cnxManager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
+                if(m == null){
+                    failed = true;
+                    return;
+                }
+            } catch (Exception e) {
+                LOG.error("Exception while running mock thread", e);
+                Assert.fail("Unexpected exception");
+            }
+        }
+    }
+
+    @Test
+    public void testCnxManager() throws Exception {
+        CnxManagerThread thread = new CnxManagerThread();
+
+        thread.start();
+
+        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], 
peerClientPort[1], 3, 1, 1000, 2, 2);
+        QuorumCnxManager cnxManager = peer.createCnxnManager();
+        QuorumCnxManager.Listener listener = cnxManager.listener;
+        if(listener != null){
+            listener.start();
+        } else {
+            LOG.error("Null listener when initializing cnx manager");
+        }
+
+        cnxManager.toSend(0L, createMsg(ServerState.LOOKING.ordinal(), 1, -1, 
1));
+
+        Message m = null;
+        int numRetries = 1;
+        while((m == null) && (numRetries++ <= THRESHOLD)){
+            m = cnxManager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
+            if(m == null) cnxManager.connectAll();
+        }
+
+        Assert.assertTrue("Exceeded number of retries", numRetries <= 
THRESHOLD);
+
+        thread.join(5000);
+        if (thread.isAlive()) {
+            Assert.fail("Thread didn't join");
+        } else {
+            if(thread.failed)
+                Assert.fail("Did not receive expected message");
+        }
+        cnxManager.halt();
+        Assert.assertFalse(cnxManager.listener.isAlive());
+    }
+
+    @Test
+    public void testCnxManagerTimeout() throws Exception {
+        Random rand = new Random();
+        byte b = (byte) rand.nextInt();
+        int deadPort = PortAssignment.unique();
+        String deadAddress = "10.1.1." + b;
+
+        LOG.info("This is the dead address I'm trying: " + deadAddress);
+
+        peers.put(Long.valueOf(2),
+                new QuorumServer(2,
+                        new InetSocketAddress(deadAddress, deadPort),
+                        new InetSocketAddress(deadAddress, 
PortAssignment.unique()),
+                        new InetSocketAddress(deadAddress, 
PortAssignment.unique())));
+        peerTmpdir[2] = ClientBase.createTmpDir();
+
+        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], 
peerClientPort[1], 3, 1, 1000, 2, 2);
+        QuorumCnxManager cnxManager = peer.createCnxnManager();
+        QuorumCnxManager.Listener listener = cnxManager.listener;
+        if(listener != null){
+            listener.start();
+        } else {
+            LOG.error("Null listener when initializing cnx manager");
+        }
+
+        long begin = Time.currentElapsedTime();
+        cnxManager.toSend(2L, createMsg(ServerState.LOOKING.ordinal(), 1, -1, 
1));
+        long end = Time.currentElapsedTime();
+
+        if((end - begin) > 6000) Assert.fail("Waited more than necessary");
+        cnxManager.halt();
+        Assert.assertFalse(cnxManager.listener.isAlive());
+    }
+
+    /**
+     * Tests a bug in QuorumCnxManager that causes a spin lock
+     * when a negative value is sent. This test checks if the
+     * connection is being closed upon a message with negative
+     * length.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testCnxManagerSpinLock() throws Exception {
+        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], 
peerClientPort[1], 3, 1, 1000, 2, 2);
+        QuorumCnxManager cnxManager = peer.createCnxnManager();
+        QuorumCnxManager.Listener listener = cnxManager.listener;
+        if(listener != null){
+            listener.start();
+        } else {
+            LOG.error("Null listener when initializing cnx manager");
+        }
+
+        int port = peers.get(peer.getId()).electionAddr.getPort();
+        LOG.info("Election port: " + port);
+
+        Thread.sleep(1000);
+
+        SocketChannel sc = SocketChannel.open();
+        sc.socket().connect(peers.get(1L).electionAddr, 5000);
+
+        InetSocketAddress otherAddr = peers.get(new Long(2)).electionAddr;
+        DataOutputStream dout = new 
DataOutputStream(sc.socket().getOutputStream());
+        dout.writeLong(QuorumCnxManager.PROTOCOL_VERSION);
+        dout.writeLong(new Long(2));
+        String addr = otherAddr.getHostString()+ ":" + otherAddr.getPort();
+        byte[] addr_bytes = addr.getBytes();
+        dout.writeInt(addr_bytes.length);
+        dout.write(addr_bytes);
+        dout.flush();
+        
+
+        ByteBuffer msgBuffer = ByteBuffer.wrap(new byte[4]);
+        msgBuffer.putInt(-20);
+        msgBuffer.position(0);
+        sc.write(msgBuffer);
+
+        Thread.sleep(1000);
+
+        try{
+            /*
+             * Write a number of times until it
+             * detects that the socket is broken.
+             */
+            for(int i = 0; i < 100; i++){
+                msgBuffer.position(0);
+                sc.write(msgBuffer);
+            }
+            Assert.fail("Socket has not been closed");
+        } catch (Exception e) {
+            LOG.info("Socket has been closed as expected");
+        }
+        peer.shutdown();
+        cnxManager.halt();
+        Assert.assertFalse(cnxManager.listener.isAlive());
+    }
+
+    /**
+     * Tests a bug in QuorumCnxManager that causes a NPE when a 3.4.6
+     * observer connects to a 3.5.0 server. 
+     * {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1789}
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testCnxManagerNPE() throws Exception {
+        // the connecting peer (id = 2) is a 3.4.6 observer
+        peers.get(2L).type = LearnerType.OBSERVER;
+        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1],
+                peerClientPort[1], 3, 1, 1000, 2, 2);
+        QuorumCnxManager cnxManager = peer.createCnxnManager();
+        QuorumCnxManager.Listener listener = cnxManager.listener;
+        if (listener != null) {
+            listener.start();
+        } else {
+            LOG.error("Null listener when initializing cnx manager");
+        }
+        int port = peers.get(peer.getId()).electionAddr.getPort();
+        LOG.info("Election port: " + port);
+
+        Thread.sleep(1000);
+
+        SocketChannel sc = SocketChannel.open();
+        sc.socket().connect(peers.get(1L).electionAddr, 5000);
+
+        /*
+         * Write id (3.4.6 protocol). This previously caused a NPE in
+         * QuorumCnxManager.
+         */
+        byte[] msgBytes = new byte[8];
+        ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
+        msgBuffer.putLong(2L);
+        msgBuffer.position(0);
+        sc.write(msgBuffer);
+
+        msgBuffer = ByteBuffer.wrap(new byte[8]);
+        // write length of message
+        msgBuffer.putInt(4);
+        // write message
+        msgBuffer.putInt(5);
+        msgBuffer.position(0);
+        sc.write(msgBuffer);
+
+        Message m = cnxManager.pollRecvQueue(1000, TimeUnit.MILLISECONDS);
+        Assert.assertNotNull(m);
+
+        peer.shutdown();
+        cnxManager.halt();
+        Assert.assertFalse(cnxManager.listener.isAlive());
+    }
+
+    /*
+     * Test if a receiveConnection is able to timeout on socket errors
+     */
+    @Test
+    public void testSocketTimeout() throws Exception {
+        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], 
peerClientPort[1], 3, 1, 2000, 2, 2);
+        QuorumCnxManager cnxManager = peer.createCnxnManager();
+        QuorumCnxManager.Listener listener = cnxManager.listener;
+        if(listener != null){
+            listener.start();
+        } else {
+            LOG.error("Null listener when initializing cnx manager");
+        }
+        int port = peers.get(peer.getId()).electionAddr.getPort();
+        LOG.info("Election port: " + port);
+        Thread.sleep(1000);
+
+        Socket sock = new Socket();
+        sock.connect(peers.get(1L).electionAddr, 5000);
+        long begin = Time.currentElapsedTime();
+        // Read without sending data. Verify timeout.
+        cnxManager.receiveConnection(sock);
+        long end = Time.currentElapsedTime();
+        if((end - begin) > ((peer.getSyncLimit() * peer.getTickTime()) + 500)) 
Assert.fail("Waited more than necessary");
+        cnxManager.halt();
+        Assert.assertFalse(cnxManager.listener.isAlive());
+    }
+
+    /*
+     * Test if Worker threads are getting killed after connection loss
+     */
+    @Test
+    public void testWorkerThreads() throws Exception {
+        ArrayList<QuorumPeer> peerList = new ArrayList<QuorumPeer>();
+        try {
+            for (int sid = 0; sid < 3; sid++) {
+                QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[sid],
+                        peerTmpdir[sid], peerClientPort[sid], 3, sid, 1000, 2,
+                        2);
+                LOG.info("Starting peer {}", peer.getId());
+                peer.start();
+                peerList.add(sid, peer);
+            }
+            String failure = verifyThreadCount(peerList, 4);
+            Assert.assertNull(failure, failure);
+            for (int myid = 0; myid < 3; myid++) {
+                for (int i = 0; i < 5; i++) {
+                    // halt one of the listeners and verify count
+                    QuorumPeer peer = peerList.get(myid);
+                    LOG.info("Round {}, halting peer ",
+                            new Object[] { i, peer.getId() });
+                    peer.shutdown();
+                    peerList.remove(myid);
+                    failure = verifyThreadCount(peerList, 2);
+                    Assert.assertNull(failure, failure);
+                    // Restart halted node and verify count
+                    peer = new QuorumPeer(peers, peerTmpdir[myid],
+                            peerTmpdir[myid], peerClientPort[myid], 3, myid,
+                            1000, 2, 2);
+                    LOG.info("Round {}, restarting peer ",
+                            new Object[] { i, peer.getId() });
+                    peer.start();
+                    peerList.add(myid, peer);
+                    failure = verifyThreadCount(peerList, 4);
+                    Assert.assertNull(failure, failure);
+                }
+            }
+        } finally {
+            for (QuorumPeer quorumPeer : peerList) {
+                quorumPeer.shutdown();
+            }
+        }
+    }
+
+    /**
+     * Returns null on success, otw the message assoc with the failure
+     * @throws InterruptedException
+     */
+    public String verifyThreadCount(ArrayList<QuorumPeer> peerList, long ecnt)
+        throws InterruptedException
+    {
+        String failure = null;
+        for (int i = 0; i < 480; i++) {
+            Thread.sleep(500);
+
+            failure = _verifyThreadCount(peerList, ecnt);
+            if (failure == null) {
+                return null;
+            }
+        }
+        return failure;
+    }
+    public String _verifyThreadCount(ArrayList<QuorumPeer> peerList, long 
ecnt) {
+        for (int myid = 0; myid < peerList.size(); myid++) {
+            QuorumPeer peer = peerList.get(myid);
+            QuorumCnxManager cnxManager = peer.getQuorumCnxManager();
+            long cnt = cnxManager.getThreadCount();
+            if (cnt != ecnt) {
+                return new Date()
+                    + " Incorrect number of Worker threads for sid=" + myid
+                    + " expected " + ecnt + " found " + cnt;
+            }
+        }
+        return null;
+    }
+
+    @Test
+    public void testInitialMessage() throws Exception {
+        InitialMessage msg;
+        ByteArrayOutputStream bos;
+        DataInputStream din;
+        DataOutputStream dout;
+        String hostport;
+
+        // message with bad protocol version
+        try {
+
+            // the initial message (without the protocol version)
+            hostport = "10.0.0.2:3888";
+            bos = new ByteArrayOutputStream();
+            dout = new DataOutputStream(bos);
+            dout.writeLong(5L); // sid
+            dout.writeInt(hostport.getBytes().length);
+            dout.writeBytes(hostport);
+
+            // now parse it
+            din = new DataInputStream(new 
ByteArrayInputStream(bos.toByteArray()));
+            msg = InitialMessage.parse(-65530L, din);
+            Assert.fail("bad protocol version accepted");
+        } catch (InitialMessage.InitialMessageException ex) {}
+
+        // message too long
+        try {
+
+            hostport = createLongString(1048576);
+            bos = new ByteArrayOutputStream();
+            dout = new DataOutputStream(bos);
+            dout.writeLong(5L); // sid
+            dout.writeInt(hostport.getBytes().length);
+            dout.writeBytes(hostport);
+
+            din = new DataInputStream(new 
ByteArrayInputStream(bos.toByteArray()));
+            msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION, din);
+            Assert.fail("long message accepted");
+        } catch (InitialMessage.InitialMessageException ex) {}
+
+        // bad hostport string
+        try {
+
+            hostport = "what's going on here?";
+            bos = new ByteArrayOutputStream();
+            dout = new DataOutputStream(bos);
+            dout.writeLong(5L); // sid
+            dout.writeInt(hostport.getBytes().length);
+            dout.writeBytes(hostport);
+
+            din = new DataInputStream(new 
ByteArrayInputStream(bos.toByteArray()));
+            msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION, din);
+            Assert.fail("bad hostport accepted");
+        } catch (InitialMessage.InitialMessageException ex) {}
+
+        // good message
+        try {
+
+            hostport = "10.0.0.2:3888";
+            bos = new ByteArrayOutputStream();
+            dout = new DataOutputStream(bos);
+            dout.writeLong(5L); // sid
+            dout.writeInt(hostport.getBytes().length);
+            dout.writeBytes(hostport);
+
+            // now parse it
+            din = new DataInputStream(new 
ByteArrayInputStream(bos.toByteArray()));
+            msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION, din);
+        } catch (InitialMessage.InitialMessageException ex) {
+            Assert.fail(ex.toString());
+        }
+    }
+
+    private String createLongString(int size) {
+        StringBuilder sb = new StringBuilder(size);
+        for (int i=0; i < size; i++) {
+            sb.append('x');
+        }
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/ConnectStringParserTest.java
----------------------------------------------------------------------
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ConnectStringParserTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ConnectStringParserTest.java
new file mode 100644
index 0000000..393cc03
--- /dev/null
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ConnectStringParserTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.client.ConnectStringParser;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConnectStringParserTest extends ZKTestCase{
+
+    @Test
+    public void testSingleServerChrootPath(){
+        String chrootPath = "/hallo/welt";
+        String servers = "10.10.10.1";
+        assertChrootPath(chrootPath,
+                new ConnectStringParser(servers+chrootPath));
+    }
+
+    @Test
+    public void testMultipleServersChrootPath(){
+        String chrootPath = "/hallo/welt";
+        String servers = "10.10.10.1,10.10.10.2";
+        assertChrootPath(chrootPath,
+                new ConnectStringParser(servers+chrootPath));
+    }
+
+    @Test
+    public void testParseServersWithoutPort(){
+        String servers = "10.10.10.1,10.10.10.2";
+        ConnectStringParser parser = new ConnectStringParser(servers);
+
+        Assert.assertEquals("10.10.10.1", 
parser.getServerAddresses().get(0).getHostString());
+        Assert.assertEquals("10.10.10.2", 
parser.getServerAddresses().get(1).getHostString());
+    }
+
+    @Test
+    public void testParseServersWithPort(){
+        String servers = "10.10.10.1:112,10.10.10.2:110";
+        ConnectStringParser parser = new ConnectStringParser(servers);
+
+        Assert.assertEquals("10.10.10.1", 
parser.getServerAddresses().get(0).getHostString());
+        Assert.assertEquals("10.10.10.2", 
parser.getServerAddresses().get(1).getHostString());
+
+        Assert.assertEquals(112, parser.getServerAddresses().get(0).getPort());
+        Assert.assertEquals(110, parser.getServerAddresses().get(1).getPort());
+    }
+
+    private void assertChrootPath(String expected, ConnectStringParser parser){
+        Assert.assertEquals(expected, parser.getChrootPath());
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/CreateModeTest.java
----------------------------------------------------------------------
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/CreateModeTest.java 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/CreateModeTest.java
new file mode 100644
index 0000000..fc61adf
--- /dev/null
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/CreateModeTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.util.EnumSet;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.KeeperException.Code;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CreateModeTest extends ZKTestCase {
+    
+    @Test
+    public void testBasicCreateMode() {
+        CreateMode cm = CreateMode.PERSISTENT;
+        Assert.assertEquals(cm.toFlag(), 0);
+        Assert.assertFalse(cm.isEphemeral());
+        Assert.assertFalse(cm.isSequential());
+        Assert.assertFalse(cm.isContainer());
+
+        cm = CreateMode.EPHEMERAL;
+        Assert.assertEquals(cm.toFlag(), 1);
+        Assert.assertTrue(cm.isEphemeral());
+        Assert.assertFalse(cm.isSequential());
+        Assert.assertFalse(cm.isContainer());
+
+        cm = CreateMode.PERSISTENT_SEQUENTIAL;
+        Assert.assertEquals(cm.toFlag(), 2);
+        Assert.assertFalse(cm.isEphemeral());
+        Assert.assertTrue(cm.isSequential());
+        Assert.assertFalse(cm.isContainer());
+
+        cm = CreateMode.EPHEMERAL_SEQUENTIAL;
+        Assert.assertEquals(cm.toFlag(), 3);
+        Assert.assertTrue(cm.isEphemeral());
+        Assert.assertTrue(cm.isSequential());
+        Assert.assertFalse(cm.isContainer());
+
+        cm = CreateMode.CONTAINER;
+        Assert.assertEquals(cm.toFlag(), 4);
+        Assert.assertFalse(cm.isEphemeral());
+        Assert.assertFalse(cm.isSequential());
+        Assert.assertTrue(cm.isContainer());
+    }
+    
+    @Test
+    public void testFlagConversion() throws KeeperException {
+        // Ensure we get the same value back after round trip conversion
+        EnumSet<CreateMode> allModes = EnumSet.allOf(CreateMode.class);
+
+        for(CreateMode cm : allModes) {
+            Assert.assertEquals(cm, CreateMode.fromFlag( cm.toFlag() ) );
+        }
+    }
+
+    @Test
+    public void testInvalidFlagConversion() throws KeeperException {
+        try {
+            CreateMode.fromFlag(99);
+            Assert.fail("Shouldn't be able to convert 99 to a CreateMode.");
+        } catch(KeeperException ke) {
+            Assert.assertEquals(Code.BADARGUMENTS, ke.code());
+        }
+
+        try {
+            CreateMode.fromFlag(-1);
+            Assert.fail("Shouldn't be able to convert -1 to a CreateMode.");
+        } catch(KeeperException ke) {
+            Assert.assertEquals(Code.BADARGUMENTS, ke.code());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/CreateTest.java
----------------------------------------------------------------------
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/CreateTest.java 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/CreateTest.java
new file mode 100644
index 0000000..1b427a1
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/CreateTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Assert;
+import org.junit.Test;
+/**
+ * Test suite for validating the Create API.
+ */
+public class CreateTest extends ClientBase {
+  private ZooKeeper zk;
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    zk = createClient();
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+    zk.close();
+  }
+
+  @Test
+  public void testCreate()
+      throws IOException, KeeperException, InterruptedException {
+    createNoStatVerifyResult("/foo");
+    createNoStatVerifyResult("/foo/child");
+  }
+
+  @Test
+  public void testCreateWithStat()
+      throws IOException, KeeperException, InterruptedException {
+    String name = "/foo";
+    Stat stat = createWithStatVerifyResult("/foo");
+    Stat childStat = createWithStatVerifyResult("/foo/child");
+    // Don't expect to get the same stats for different creates.
+    Assert.assertFalse(stat.equals(childStat));
+  }
+
+  @Test
+  public void testCreateWithNullStat()
+      throws IOException, KeeperException, InterruptedException {
+    String name = "/foo";
+    Assert.assertNull(zk.exists(name, false));
+
+    Stat stat = null;
+    // If a null Stat object is passed the create should still
+    // succeed, but no Stat info will be returned.
+    String path = zk.create(name, name.getBytes(),
+        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+    Assert.assertNull(stat);
+    Assert.assertNotNull(zk.exists(name, false));
+  }
+
+  private void createNoStatVerifyResult(String newName)
+      throws KeeperException, InterruptedException {
+    Assert.assertNull("Node existed before created", zk.exists(newName, 
false));
+    String path = zk.create(newName, newName.getBytes(),
+                            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    Assert.assertEquals(path, newName);
+    Assert.assertNotNull("Node was not created as expected",
+                         zk.exists(newName, false));
+  }
+
+  private Stat createWithStatVerifyResult(String newName)
+        throws KeeperException, InterruptedException {
+    Assert.assertNull("Node existed before created", zk.exists(newName, 
false));
+    Stat stat = new Stat();
+    String path = zk.create(newName, newName.getBytes(),
+                            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+    Assert.assertEquals(path, newName);
+    validateCreateStat(stat, newName);
+
+    Stat referenceStat = zk.exists(newName, false);
+    Assert.assertNotNull("Node was not created as expected", referenceStat);
+    Assert.assertEquals(referenceStat, stat);
+
+    return stat;
+  }
+
+  private void validateCreateStat(Stat stat, String name) {
+    Assert.assertEquals(stat.getCzxid(), stat.getMzxid());
+    Assert.assertEquals(stat.getCzxid(), stat.getPzxid());
+    Assert.assertEquals(stat.getCtime(), stat.getMtime());
+    Assert.assertEquals(0, stat.getCversion());
+    Assert.assertEquals(0, stat.getVersion());
+    Assert.assertEquals(0, stat.getAversion());
+    Assert.assertEquals(0, stat.getEphemeralOwner());
+    Assert.assertEquals(name.length(), stat.getDataLength());
+    Assert.assertEquals(0, stat.getNumChildren());
+  }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/DisconnectableZooKeeper.java
----------------------------------------------------------------------
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/DisconnectableZooKeeper.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/DisconnectableZooKeeper.java
new file mode 100644
index 0000000..619bdc6
--- /dev/null
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/DisconnectableZooKeeper.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+public class DisconnectableZooKeeper extends ZooKeeper {
+    public DisconnectableZooKeeper(String host, int sessionTimeout, Watcher 
watcher)
+        throws IOException
+    {
+        super(host, sessionTimeout, watcher);
+    }
+    
+    public DisconnectableZooKeeper(String host, int sessionTimeout, Watcher 
watcher,
+        long sessionId, byte[] sessionPasswd)
+        throws IOException
+    {
+        super(host, sessionTimeout, watcher, sessionId, sessionPasswd);
+    }
+
+    /** Testing only!!! Really!!!! This is only here to test when the client
+     * disconnects from the server w/o sending a session disconnect (ie
+     * ending the session cleanly). The server will eventually notice the
+     * client is no longer pinging and will timeout the session.
+     */
+    public void disconnect() throws IOException {
+        cnxn.disconnect();
+    }
+
+    /**
+     * Prevent the client from automatically reconnecting if the connection to 
the
+     * server is lost
+     */
+    public void dontReconnect() throws Exception {
+        java.lang.reflect.Field f = 
cnxn.getClass().getDeclaredField("closing");
+        f.setAccessible(true);
+        f.setBoolean(cnxn, true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/DisconnectedWatcherTest.java
----------------------------------------------------------------------
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/DisconnectedWatcherTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/DisconnectedWatcherTest.java
new file mode 100644
index 0000000..aa65e21
--- /dev/null
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/DisconnectedWatcherTest.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DisconnectedWatcherTest extends ClientBase {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(DisconnectedWatcherTest.class);
+    final int TIMEOUT = 5000;
+
+    private class MyWatcher extends CountdownWatcher {
+        LinkedBlockingQueue<WatchedEvent> events =
+            new LinkedBlockingQueue<WatchedEvent>();
+
+        public void process(WatchedEvent event) {
+            super.process(event);
+            if (event.getType() != Event.EventType.None) {
+                try {
+                    events.put(event);
+                } catch (InterruptedException e) {
+                    LOG.warn("ignoring interrupt during event.put");
+                }
+            }
+        }
+    }
+
+    // @see jira issue ZOOKEEPER-961
+    
+    @Test
+    public void testChildWatcherAutoResetWithChroot() throws Exception {
+        ZooKeeper zk1 = createClient();
+
+        zk1.create("/ch1", null, Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+
+        MyWatcher watcher = new MyWatcher();
+        ZooKeeper zk2 = createClient(watcher, hostPort + "/ch1");
+        zk2.getChildren("/", true );
+
+        // this call shouldn't trigger any error or watch
+        zk1.create("/youdontmatter1", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        // this should trigger the watch
+        zk1.create("/ch1/youshouldmatter1", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        WatchedEvent e = watcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+        Assert.assertNotNull(e);
+        Assert.assertEquals(EventType.NodeChildrenChanged, e.getType());
+        Assert.assertEquals("/", e.getPath());
+
+        MyWatcher childWatcher = new MyWatcher();
+        zk2.getChildren("/", childWatcher);
+        
+        stopServer();
+        watcher.waitForDisconnected(3000);
+        startServer();
+        watcher.waitForConnected(3000);
+
+        // this should trigger the watch
+        zk1.create("/ch1/youshouldmatter2", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+        Assert.assertNotNull(e);
+        Assert.assertEquals(EventType.NodeChildrenChanged, e.getType());
+        Assert.assertEquals("/", e.getPath());
+    }
+    
+    @Test
+    public void testDefaultWatcherAutoResetWithChroot() throws Exception {
+        ZooKeeper zk1 = createClient();
+
+        zk1.create("/ch1", null, Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+
+        MyWatcher watcher = new MyWatcher();
+        ZooKeeper zk2 = createClient(watcher, hostPort + "/ch1");
+        zk2.getChildren("/", true );
+
+        // this call shouldn't trigger any error or watch
+        zk1.create("/youdontmatter1", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        // this should trigger the watch
+        zk1.create("/ch1/youshouldmatter1", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        WatchedEvent e = watcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+        Assert.assertNotNull(e);
+        Assert.assertEquals(EventType.NodeChildrenChanged, e.getType());
+        Assert.assertEquals("/", e.getPath());
+
+        zk2.getChildren("/", true );
+
+        stopServer();
+        watcher.waitForDisconnected(3000);
+        startServer();
+        watcher.waitForConnected(3000);
+
+        // this should trigger the watch
+        zk1.create("/ch1/youshouldmatter2", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        e = watcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+        Assert.assertNotNull(e);
+        Assert.assertEquals(EventType.NodeChildrenChanged, e.getType());
+        Assert.assertEquals("/", e.getPath());
+    }
+    
+    @Test
+    public void testDeepChildWatcherAutoResetWithChroot() throws Exception {
+        ZooKeeper zk1 = createClient();
+
+        zk1.create("/ch1", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        zk1.create("/ch1/here", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        zk1.create("/ch1/here/we", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        zk1.create("/ch1/here/we/are", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        MyWatcher watcher = new MyWatcher();
+        ZooKeeper zk2 = createClient(watcher, hostPort + "/ch1/here/we");
+        zk2.getChildren("/are", true );
+
+        // this should trigger the watch
+        zk1.create("/ch1/here/we/are/now", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        WatchedEvent e = watcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+        Assert.assertNotNull(e);
+        Assert.assertEquals(EventType.NodeChildrenChanged, e.getType());
+        Assert.assertEquals("/are", e.getPath());
+
+        MyWatcher childWatcher = new MyWatcher();
+        zk2.getChildren("/are", childWatcher);
+        
+        stopServer();
+        watcher.waitForDisconnected(3000);
+        startServer();
+        watcher.waitForConnected(3000);
+
+        // this should trigger the watch
+        zk1.create("/ch1/here/we/are/again", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+        Assert.assertNotNull(e);
+        Assert.assertEquals(EventType.NodeChildrenChanged, e.getType());
+        Assert.assertEquals("/are", e.getPath());
+    }
+
+    // @see jira issue ZOOKEEPER-706. Test auto reset of a large number of
+    // watches which require multiple SetWatches calls.
+    @Test(timeout = 840000)
+    public void testManyChildWatchersAutoReset() throws Exception {
+        ZooKeeper zk1 = createClient();
+
+        MyWatcher watcher = new MyWatcher();
+        ZooKeeper zk2 = createClient(watcher);
+
+        // 110 character base path
+        String pathBase = 
"/long-path-000000000-111111111-222222222-333333333-444444444-"
+                          + 
"555555555-666666666-777777777-888888888-999999999";
+
+        zk1.create(pathBase, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        // Create 10,000 nodes. This should ensure the length of our
+        // watches set below exceeds 1MB.
+        List<String> paths = new ArrayList<String>();
+        for (int i = 0; i < 10000; i++) {
+            String path = zk1.create(pathBase + "/ch-", null, 
Ids.OPEN_ACL_UNSAFE,
+                                     CreateMode.PERSISTENT_SEQUENTIAL);
+            paths.add(path);
+        }
+        LOG.info("Created 10,000 nodes.");
+
+        MyWatcher childWatcher = new MyWatcher();
+
+        // Set a combination of child/exists/data watches
+        int i = 0;
+        for (String path : paths) {
+            if (i % 3 == 0) {
+                zk2.getChildren(path, childWatcher);
+            } else if (i % 3 == 1) {
+                zk2.exists(path + "/foo", childWatcher);
+            } else if (i % 3 == 2) {
+                zk2.getData(path, childWatcher, null);
+            }
+
+            i++;
+        }
+
+        stopServer();
+        watcher.waitForDisconnected(30000);
+        startServer();
+        watcher.waitForConnected(30000);
+
+        // Trigger the watches and ensure they properly propagate to the client
+        i = 0;
+        for (String path : paths) {
+            if (i % 3 == 0) {
+                zk1.create(path + "/ch", null, Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+
+                WatchedEvent e = childWatcher.events.poll(TIMEOUT, 
TimeUnit.MILLISECONDS);
+                Assert.assertNotNull(e);
+                Assert.assertEquals(EventType.NodeChildrenChanged, 
e.getType());
+                Assert.assertEquals(path, e.getPath());
+            } else if (i % 3 == 1) {
+                zk1.create(path + "/foo", null, Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+
+                WatchedEvent e = childWatcher.events.poll(TIMEOUT, 
TimeUnit.MILLISECONDS);
+                Assert.assertNotNull(e);
+                Assert.assertEquals(EventType.NodeCreated, e.getType());
+                Assert.assertEquals(path + "/foo", e.getPath());
+            } else if (i % 3 == 2) {
+                zk1.setData(path, new byte[]{1, 2, 3}, -1);
+
+                WatchedEvent e = childWatcher.events.poll(TIMEOUT, 
TimeUnit.MILLISECONDS);
+                Assert.assertNotNull(e);
+                Assert.assertEquals(EventType.NodeDataChanged, e.getType());
+                Assert.assertEquals(path, e.getPath());
+            }
+
+            i++;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/DuplicateLocalSessionUpgradeTest.java
----------------------------------------------------------------------
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/DuplicateLocalSessionUpgradeTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/DuplicateLocalSessionUpgradeTest.java
new file mode 100644
index 0000000..a21acdc
--- /dev/null
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/DuplicateLocalSessionUpgradeTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * When request are route incorrectly, both follower and the leader will 
perform
+ * local session upgrade. So we saw CreateSession twice in txnlog This doesn't
+ * affect the correctness but cause the ensemble to see more load than
+ * necessary.
+ */
+public class DuplicateLocalSessionUpgradeTest extends ZKTestCase {
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(DuplicateLocalSessionUpgradeTest.class);
+
+    private final QuorumBase qb = new QuorumBase();
+
+    private static final int CONNECTION_TIMEOUT = 
ClientBase.CONNECTION_TIMEOUT;
+
+    @Before
+    public void setUp() throws Exception {
+        LOG.info("STARTING quorum " + getClass().getName());
+        qb.localSessionsEnabled = true;
+        qb.localSessionsUpgradingEnabled = true;
+        qb.setUp();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        LOG.info("STOPPING quorum " + getClass().getName());
+        qb.tearDown();
+    }
+
+    @Test
+    public void testLocalSessionUpgradeOnFollower() throws Exception {
+        testLocalSessionUpgrade(false);
+    }
+
+    @Test
+    public void testLocalSessionUpgradeOnLeader() throws Exception {
+        testLocalSessionUpgrade(true);
+    }
+
+    private void testLocalSessionUpgrade(boolean testLeader) throws Exception {
+
+        int leaderIdx = qb.getLeaderIndex();
+        Assert.assertFalse("No leader in quorum?", leaderIdx == -1);
+        int followerIdx = (leaderIdx + 1) % 5;
+        int testPeerIdx = testLeader ? leaderIdx : followerIdx;
+        String hostPorts[] = qb.hostPort.split(",");
+
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = qb.createClient(watcher, hostPorts[testPeerIdx],
+                CONNECTION_TIMEOUT);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+        final String firstPath = "/first";
+        final String secondPath = "/ephemeral";
+
+        // Just create some node so that we know the current zxid
+        zk.create(firstPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        // Now, try an ephemeral node. This will trigger session upgrade
+        // so there will be createSession request inject into the pipeline
+        // prior to this request
+        zk.create(secondPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.EPHEMERAL);
+
+        Stat firstStat = zk.exists(firstPath, null);
+        Assert.assertNotNull(firstStat);
+
+        Stat secondStat = zk.exists(secondPath, null);
+        Assert.assertNotNull(secondStat);
+
+        long zxidDiff = secondStat.getCzxid() - firstStat.getCzxid();
+
+        // If there is only one createSession request in between, zxid diff
+        // will be exactly 2. The alternative way of checking is to actually
+        // read txnlog but this should be sufficient
+        Assert.assertEquals(2L, zxidDiff);
+
+    }
+}

Reply via email to