Repository: incubator-slider
Updated Branches:
  refs/heads/develop 9b9c5d8ea -> 4156c1c32


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_connection.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_connection.py 
b/slider-agent/src/main/python/kazoo/tests/test_connection.py
deleted file mode 100644
index c8c4581..0000000
--- a/slider-agent/src/main/python/kazoo/tests/test_connection.py
+++ /dev/null
@@ -1,310 +0,0 @@
-"""license: Apache License 2.0, see LICENSE for more details."""
-from collections import namedtuple
-import sys
-import os
-import errno
-import threading
-import time
-import uuid
-import struct
-
-from nose import SkipTest
-from nose.tools import eq_
-from nose.tools import raises
-import mock
-
-from kazoo.exceptions import ConnectionLoss
-from kazoo.protocol.serialization import (
-    Connect,
-    int_struct,
-    write_string,
-)
-from kazoo.protocol.states import KazooState
-from kazoo.protocol.connection import _CONNECTION_DROP
-from kazoo.testing import KazooTestCase
-from kazoo.tests.util import wait
-from kazoo.tests.util import TRAVIS_ZK_VERSION
-
-
-class Delete(namedtuple('Delete', 'path version')):
-    type = 2
-
-    def serialize(self):
-        b = bytearray()
-        b.extend(write_string(self.path))
-        b.extend(int_struct.pack(self.version))
-        return b
-
-    @classmethod
-    def deserialize(self, bytes, offset):
-        raise ValueError("oh my")
-
-
-class TestConnectionHandler(KazooTestCase):
-    def test_bad_deserialization(self):
-        async_object = self.client.handler.async_result()
-        self.client._queue.append(
-            (Delete(self.client.chroot, -1), async_object))
-        self.client._connection._write_sock.send(b'\0')
-
-        @raises(ValueError)
-        def testit():
-            async_object.get()
-        testit()
-
-    def test_with_bad_sessionid(self):
-        ev = threading.Event()
-
-        def expired(state):
-            if state == KazooState.CONNECTED:
-                ev.set()
-
-        password = os.urandom(16)
-        client = self._get_client(client_id=(82838284824, password))
-        client.add_listener(expired)
-        client.start()
-        try:
-            ev.wait(15)
-            eq_(ev.is_set(), True)
-        finally:
-            client.stop()
-
-    def test_connection_read_timeout(self):
-        client = self.client
-        ev = threading.Event()
-        path = "/" + uuid.uuid4().hex
-        handler = client.handler
-        _select = handler.select
-        _socket = client._connection._socket
-
-        def delayed_select(*args, **kwargs):
-            result = _select(*args, **kwargs)
-            if len(args[0]) == 1 and _socket in args[0]:
-                # for any socket read, simulate a timeout
-                return [], [], []
-            return result
-
-        def back(state):
-            if state == KazooState.CONNECTED:
-                ev.set()
-
-        client.add_listener(back)
-        client.create(path, b"1")
-        try:
-            handler.select = delayed_select
-            self.assertRaises(ConnectionLoss, client.get, path)
-        finally:
-            handler.select = _select
-        # the client reconnects automatically
-        ev.wait(5)
-        eq_(ev.is_set(), True)
-        eq_(client.get(path)[0], b"1")
-
-    def test_connection_write_timeout(self):
-        client = self.client
-        ev = threading.Event()
-        path = "/" + uuid.uuid4().hex
-        handler = client.handler
-        _select = handler.select
-        _socket = client._connection._socket
-
-        def delayed_select(*args, **kwargs):
-            result = _select(*args, **kwargs)
-            if _socket in args[1]:
-                # for any socket write, simulate a timeout
-                return [], [], []
-            return result
-
-        def back(state):
-            if state == KazooState.CONNECTED:
-                ev.set()
-        client.add_listener(back)
-
-        try:
-            handler.select = delayed_select
-            self.assertRaises(ConnectionLoss, client.create, path)
-        finally:
-            handler.select = _select
-        # the client reconnects automatically
-        ev.wait(5)
-        eq_(ev.is_set(), True)
-        eq_(client.exists(path), None)
-
-    def test_connection_deserialize_fail(self):
-        client = self.client
-        ev = threading.Event()
-        path = "/" + uuid.uuid4().hex
-        handler = client.handler
-        _select = handler.select
-        _socket = client._connection._socket
-
-        def delayed_select(*args, **kwargs):
-            result = _select(*args, **kwargs)
-            if _socket in args[1]:
-                # for any socket write, simulate a timeout
-                return [], [], []
-            return result
-
-        def back(state):
-            if state == KazooState.CONNECTED:
-                ev.set()
-        client.add_listener(back)
-
-        deserialize_ev = threading.Event()
-
-        def bad_deserialize(_bytes, offset):
-            deserialize_ev.set()
-            raise struct.error()
-
-        # force the connection to die but, on reconnect, cause the
-        # server response to be non-deserializable. ensure that the client
-        # continues to retry. This partially reproduces a rare bug seen
-        # in production.
-
-        with mock.patch.object(Connect, 'deserialize') as mock_deserialize:
-            mock_deserialize.side_effect = bad_deserialize
-            try:
-                handler.select = delayed_select
-                self.assertRaises(ConnectionLoss, client.create, path)
-            finally:
-                handler.select = _select
-            # the client reconnects automatically but the first attempt will
-            # hit a deserialize failure. wait for that.
-            deserialize_ev.wait(5)
-            eq_(deserialize_ev.is_set(), True)
-
-        # this time should succeed
-        ev.wait(5)
-        eq_(ev.is_set(), True)
-        eq_(client.exists(path), None)
-
-    def test_connection_close(self):
-        self.assertRaises(Exception, self.client.close)
-        self.client.stop()
-        self.client.close()
-
-        # should be able to restart
-        self.client.start()
-
-    def test_connection_sock(self):
-        client = self.client
-        read_sock = client._connection._read_sock
-        write_sock = client._connection._write_sock
-
-        assert read_sock is not None
-        assert write_sock is not None
-
-        # stop client and socket should not yet be closed
-        client.stop()
-        assert read_sock is not None
-        assert write_sock is not None
-        
-        read_sock.getsockname()
-        write_sock.getsockname()
-
-        # close client, and sockets should be closed
-        client.close()
-
-        # Todo check socket closing
-                   
-        # start client back up. should get a new, valid socket
-        client.start()
-        read_sock = client._connection._read_sock
-        write_sock = client._connection._write_sock
-
-        assert read_sock is not None
-        assert write_sock is not None
-        read_sock.getsockname()
-        write_sock.getsockname()
-            
-
-    def test_dirty_sock(self):
-        client = self.client
-        read_sock = client._connection._read_sock
-        write_sock = client._connection._write_sock
-
-        # add a stray byte to the socket and ensure that doesn't
-        # blow up client. simulates case where some error leaves
-        # a byte in the socket which doesn't correspond to the
-        # request queue.
-        write_sock.send(b'\0')
-
-        # eventually this byte should disappear from socket
-        wait(lambda: client.handler.select([read_sock], [], [], 0)[0] == [])
-
-
-class TestConnectionDrop(KazooTestCase):
-    def test_connection_dropped(self):
-        ev = threading.Event()
-
-        def back(state):
-            if state == KazooState.CONNECTED:
-                ev.set()
-
-        # create a node with a large value and stop the ZK node
-        path = "/" + uuid.uuid4().hex
-        self.client.create(path)
-        self.client.add_listener(back)
-        result = self.client.set_async(path, b'a' * 1000 * 1024)
-        self.client._call(_CONNECTION_DROP, None)
-
-        self.assertRaises(ConnectionLoss, result.get)
-        # we have a working connection to a new node
-        ev.wait(30)
-        eq_(ev.is_set(), True)
-
-
-class TestReadOnlyMode(KazooTestCase):
-
-    def setUp(self):
-        self.setup_zookeeper(read_only=True)
-        skip = False
-        if TRAVIS_ZK_VERSION and TRAVIS_ZK_VERSION < (3, 4):
-            skip = True
-        elif TRAVIS_ZK_VERSION and TRAVIS_ZK_VERSION >= (3, 4):
-            skip = False
-        else:
-            ver = self.client.server_version()
-            if ver[1] < 4:
-                skip = True
-        if skip:
-            raise SkipTest("Must use Zookeeper 3.4 or above")
-
-    def tearDown(self):
-        self.client.stop()
-
-    def test_read_only(self):
-        from kazoo.exceptions import NotReadOnlyCallError
-        from kazoo.protocol.states import KeeperState
-
-        client = self.client
-        states = []
-        ev = threading.Event()
-
-        @client.add_listener
-        def listen(state):
-            states.append(state)
-            if client.client_state == KeeperState.CONNECTED_RO:
-                ev.set()
-        try:
-            self.cluster[1].stop()
-            self.cluster[2].stop()
-            ev.wait(6)
-            eq_(ev.is_set(), True)
-            eq_(client.client_state, KeeperState.CONNECTED_RO)
-
-            # Test read only command
-            eq_(client.get_children('/'), [])
-
-            # Test error with write command
-            @raises(NotReadOnlyCallError)
-            def testit():
-                client.create('/fred')
-            testit()
-
-            # Wait for a ping
-            time.sleep(15)
-        finally:
-            client.remove_listener(listen)
-            self.cluster[1].run()
-            self.cluster[2].run()

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_counter.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_counter.py 
b/slider-agent/src/main/python/kazoo/tests/test_counter.py
deleted file mode 100644
index b0361d0..0000000
--- a/slider-agent/src/main/python/kazoo/tests/test_counter.py
+++ /dev/null
@@ -1,36 +0,0 @@
-"""license: Apache License 2.0, see LICENSE for more details."""
-import uuid
-
-from nose.tools import eq_
-
-from kazoo.testing import KazooTestCase
-
-
-class KazooCounterTests(KazooTestCase):
-
-    def _makeOne(self, **kw):
-        path = "/" + uuid.uuid4().hex
-        return self.client.Counter(path, **kw)
-
-    def test_int_counter(self):
-        counter = self._makeOne()
-        eq_(counter.value, 0)
-        counter += 2
-        counter + 1
-        eq_(counter.value, 3)
-        counter -= 3
-        counter - 1
-        eq_(counter.value, -1)
-
-    def test_float_counter(self):
-        counter = self._makeOne(default=0.0)
-        eq_(counter.value, 0.0)
-        counter += 2.1
-        eq_(counter.value, 2.1)
-        counter -= 3.1
-        eq_(counter.value, -1.0)
-
-    def test_errors(self):
-        counter = self._makeOne()
-        self.assertRaises(TypeError, counter.__add__, 2.1)
-        self.assertRaises(TypeError, counter.__add__, b"a")

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_election.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_election.py 
b/slider-agent/src/main/python/kazoo/tests/test_election.py
deleted file mode 100644
index a9610bf..0000000
--- a/slider-agent/src/main/python/kazoo/tests/test_election.py
+++ /dev/null
@@ -1,140 +0,0 @@
-"""license: Apache License 2.0, see LICENSE for more details."""
-import uuid
-import sys
-import threading
-
-from nose.tools import eq_
-
-from kazoo.testing import KazooTestCase
-from kazoo.tests.util import wait
-
-
-class UniqueError(Exception):
-    """Error raised only by test leader function
-    """
-
-
-class KazooElectionTests(KazooTestCase):
-    def setUp(self):
-        super(KazooElectionTests, self).setUp()
-        self.path = "/" + uuid.uuid4().hex
-
-        self.condition = threading.Condition()
-
-        # election contenders set these when elected. The exit event is set by
-        # the test to make the leader exit.
-        self.leader_id = None
-        self.exit_event = None
-
-        # tests set this before the event to make the leader raise an error
-        self.raise_exception = False
-
-        # set by a worker thread when an unexpected error is hit.
-        # better way to do this?
-        self.thread_exc_info = None
-
-    def _spawn_contender(self, contender_id, election):
-        thread = threading.Thread(target=self._election_thread,
-            args=(contender_id, election))
-        thread.daemon = True
-        thread.start()
-        return thread
-
-    def _election_thread(self, contender_id, election):
-        try:
-            election.run(self._leader_func, contender_id)
-        except UniqueError:
-            if not self.raise_exception:
-                self.thread_exc_info = sys.exc_info()
-        except Exception:
-            self.thread_exc_info = sys.exc_info()
-        else:
-            if self.raise_exception:
-                e = Exception("expected leader func to raise exception")
-                self.thread_exc_info = (Exception, e, None)
-
-    def _leader_func(self, name):
-        exit_event = threading.Event()
-        with self.condition:
-            self.exit_event = exit_event
-            self.leader_id = name
-            self.condition.notify_all()
-
-        exit_event.wait(45)
-        if self.raise_exception:
-            raise UniqueError("expected error in the leader function")
-
-    def _check_thread_error(self):
-        if self.thread_exc_info:
-            t, o, tb = self.thread_exc_info
-            raise t(o)
-
-    def test_election(self):
-        elections = {}
-        threads = {}
-        for _ in range(3):
-            contender = "c" + uuid.uuid4().hex
-            elections[contender] = self.client.Election(self.path, contender)
-            threads[contender] = self._spawn_contender(contender,
-                elections[contender])
-
-        # wait for a leader to be elected
-        times = 0
-        with self.condition:
-            while not self.leader_id:
-                self.condition.wait(5)
-                times += 1
-                if times > 5:
-                    raise Exception("Still not a leader: lid: %s",
-                                    self.leader_id)
-
-        election = self.client.Election(self.path)
-
-        # make sure all contenders are in the pool
-        wait(lambda: len(election.contenders()) == len(elections))
-        contenders = election.contenders()
-
-        eq_(set(contenders), set(elections.keys()))
-
-        # first one in list should be leader
-        first_leader = contenders[0]
-        eq_(first_leader, self.leader_id)
-
-        # tell second one to cancel election. should never get elected.
-        elections[contenders[1]].cancel()
-
-        # make leader exit. third contender should be elected.
-        self.exit_event.set()
-        with self.condition:
-            while self.leader_id == first_leader:
-                self.condition.wait(45)
-        eq_(self.leader_id, contenders[2])
-        self._check_thread_error()
-
-        # make first contender re-enter the race
-        threads[first_leader].join()
-        threads[first_leader] = self._spawn_contender(first_leader,
-            elections[first_leader])
-
-        # contender set should now be the current leader plus the first leader
-        wait(lambda: len(election.contenders()) == 2)
-        contenders = election.contenders()
-        eq_(set(contenders), set([self.leader_id, first_leader]))
-
-        # make current leader raise an exception. first should be reelected
-        self.raise_exception = True
-        self.exit_event.set()
-        with self.condition:
-            while self.leader_id != first_leader:
-                self.condition.wait(45)
-        eq_(self.leader_id, first_leader)
-        self._check_thread_error()
-
-        self.exit_event.set()
-        for thread in threads.values():
-            thread.join()
-        self._check_thread_error()
-
-    def test_bad_func(self):
-        election = self.client.Election(self.path)
-        self.assertRaises(ValueError, election.run, "not a callable")

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_exceptions.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_exceptions.py 
b/slider-agent/src/main/python/kazoo/tests/test_exceptions.py
deleted file mode 100644
index e469089..0000000
--- a/slider-agent/src/main/python/kazoo/tests/test_exceptions.py
+++ /dev/null
@@ -1,23 +0,0 @@
-"""license: Apache License 2.0, see LICENSE for more details."""
-from unittest import TestCase
-
-
-class ExceptionsTestCase(TestCase):
-
-    def _get(self):
-        from kazoo import exceptions
-        return exceptions
-
-    def test_backwards_alias(self):
-        module = self._get()
-        self.assertTrue(getattr(module, 'NoNodeException'))
-        self.assertTrue(module.NoNodeException, module.NoNodeError)
-
-    def test_exceptions_code(self):
-        module = self._get()
-        exc_8 = module.EXCEPTIONS[-8]
-        self.assertTrue(isinstance(exc_8(), module.BadArgumentsError))
-
-    def test_invalid_code(self):
-        module = self._get()
-        self.assertRaises(RuntimeError, module.EXCEPTIONS.__getitem__, 666)

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_gevent_handler.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_gevent_handler.py 
b/slider-agent/src/main/python/kazoo/tests/test_gevent_handler.py
deleted file mode 100644
index 71d9727..0000000
--- a/slider-agent/src/main/python/kazoo/tests/test_gevent_handler.py
+++ /dev/null
@@ -1,161 +0,0 @@
-"""license: Apache License 2.0, see LICENSE for more details."""
-import unittest
-
-from nose import SkipTest
-from nose.tools import eq_
-from nose.tools import raises
-
-from kazoo.client import KazooClient
-from kazoo.exceptions import NoNodeError
-from kazoo.protocol.states import Callback
-from kazoo.testing import KazooTestCase
-from kazoo.tests import test_client
-
-
-class TestGeventHandler(unittest.TestCase):
-
-    def setUp(self):
-        try:
-            import gevent
-        except ImportError:
-            raise SkipTest('gevent not available.')
-
-    def _makeOne(self, *args):
-        from kazoo.handlers.gevent import SequentialGeventHandler
-        return SequentialGeventHandler(*args)
-
-    def _getAsync(self, *args):
-        from kazoo.handlers.gevent import AsyncResult
-        return AsyncResult
-
-    def _getEvent(self):
-        from gevent.event import Event
-        return Event
-
-    def test_proper_threading(self):
-        h = self._makeOne()
-        h.start()
-        assert isinstance(h.event_object(), self._getEvent())
-
-    def test_matching_async(self):
-        h = self._makeOne()
-        h.start()
-        async = self._getAsync()
-        assert isinstance(h.async_result(), async)
-
-    def test_exception_raising(self):
-        h = self._makeOne()
-
-        @raises(h.timeout_exception)
-        def testit():
-            raise h.timeout_exception("This is a timeout")
-        testit()
-
-    def test_exception_in_queue(self):
-        h = self._makeOne()
-        h.start()
-        ev = self._getEvent()()
-
-        def func():
-            ev.set()
-            raise ValueError('bang')
-
-        call1 = Callback('completion', func, ())
-        h.dispatch_callback(call1)
-        ev.wait()
-
-    def test_queue_empty_exception(self):
-        from gevent.queue import Empty
-        h = self._makeOne()
-        h.start()
-        ev = self._getEvent()()
-
-        def func():
-            ev.set()
-            raise Empty()
-
-        call1 = Callback('completion', func, ())
-        h.dispatch_callback(call1)
-        ev.wait()
-
-
-class TestBasicGeventClient(KazooTestCase):
-
-    def setUp(self):
-        try:
-            import gevent
-        except ImportError:
-            raise SkipTest('gevent not available.')
-        KazooTestCase.setUp(self)
-
-    def _makeOne(self, *args):
-        from kazoo.handlers.gevent import SequentialGeventHandler
-        return SequentialGeventHandler(*args)
-
-    def _getEvent(self):
-        from gevent.event import Event
-        return Event
-
-    def test_start(self):
-        client = self._get_client(handler=self._makeOne())
-        client.start()
-        self.assertEqual(client.state, 'CONNECTED')
-        client.stop()
-
-    def test_start_stop_double(self):
-        client = self._get_client(handler=self._makeOne())
-        client.start()
-        self.assertEqual(client.state, 'CONNECTED')
-        client.handler.start()
-        client.handler.stop()
-        client.stop()
-
-    def test_basic_commands(self):
-        client = self._get_client(handler=self._makeOne())
-        client.start()
-        self.assertEqual(client.state, 'CONNECTED')
-        client.create('/anode', 'fred')
-        eq_(client.get('/anode')[0], 'fred')
-        eq_(client.delete('/anode'), True)
-        eq_(client.exists('/anode'), None)
-        client.stop()
-
-    def test_failures(self):
-        client = self._get_client(handler=self._makeOne())
-        client.start()
-        self.assertRaises(NoNodeError, client.get, '/none')
-        client.stop()
-
-    def test_data_watcher(self):
-        client = self._get_client(handler=self._makeOne())
-        client.start()
-        client.ensure_path('/some/node')
-        ev = self._getEvent()()
-
-        @client.DataWatch('/some/node')
-        def changed(d, stat):
-            ev.set()
-
-        ev.wait()
-        ev.clear()
-        client.set('/some/node', 'newvalue')
-        ev.wait()
-        client.stop()
-
-
-class TestGeventClient(test_client.TestClient):
-
-    def setUp(self):
-        try:
-            import gevent
-        except ImportError:
-            raise SkipTest('gevent not available.')
-        KazooTestCase.setUp(self)
-
-    def _makeOne(self, *args):
-        from kazoo.handlers.gevent import SequentialGeventHandler
-        return SequentialGeventHandler(*args)
-
-    def _get_client(self, **kwargs):
-        kwargs["handler"] = self._makeOne()
-        return KazooClient(self.hosts, **kwargs)

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_lock.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_lock.py 
b/slider-agent/src/main/python/kazoo/tests/test_lock.py
deleted file mode 100644
index 6dd15b0..0000000
--- a/slider-agent/src/main/python/kazoo/tests/test_lock.py
+++ /dev/null
@@ -1,518 +0,0 @@
-"""license: Apache License 2.0, see LICENSE for more details."""
-import uuid
-import threading
-
-from nose.tools import eq_, ok_
-
-from kazoo.exceptions import CancelledError
-from kazoo.exceptions import LockTimeout
-from kazoo.testing import KazooTestCase
-from kazoo.tests.util import wait
-
-
-class KazooLockTests(KazooTestCase):
-    def setUp(self):
-        super(KazooLockTests, self).setUp()
-        self.lockpath = "/" + uuid.uuid4().hex
-
-        self.condition = threading.Condition()
-        self.released = threading.Event()
-        self.active_thread = None
-        self.cancelled_threads = []
-
-    def _thread_lock_acquire_til_event(self, name, lock, event):
-        try:
-            with lock:
-                with self.condition:
-                    eq_(self.active_thread, None)
-                    self.active_thread = name
-                    self.condition.notify_all()
-
-                event.wait()
-
-                with self.condition:
-                    eq_(self.active_thread, name)
-                    self.active_thread = None
-                    self.condition.notify_all()
-            self.released.set()
-        except CancelledError:
-            with self.condition:
-                self.cancelled_threads.append(name)
-                self.condition.notify_all()
-
-    def test_lock_one(self):
-        lock_name = uuid.uuid4().hex
-        lock = self.client.Lock(self.lockpath, lock_name)
-        event = threading.Event()
-
-        thread = threading.Thread(target=self._thread_lock_acquire_til_event,
-            args=(lock_name, lock, event))
-        thread.start()
-
-        lock2_name = uuid.uuid4().hex
-        anotherlock = self.client.Lock(self.lockpath, lock2_name)
-
-        # wait for any contender to show up on the lock
-        wait(anotherlock.contenders)
-        eq_(anotherlock.contenders(), [lock_name])
-
-        with self.condition:
-            while self.active_thread != lock_name:
-                self.condition.wait()
-
-        # release the lock
-        event.set()
-
-        with self.condition:
-            while self.active_thread:
-                self.condition.wait()
-        self.released.wait()
-        thread.join()
-
-    def test_lock(self):
-        threads = []
-        names = ["contender" + str(i) for i in range(5)]
-
-        contender_bits = {}
-
-        for name in names:
-            e = threading.Event()
-
-            l = self.client.Lock(self.lockpath, name)
-            t = threading.Thread(target=self._thread_lock_acquire_til_event,
-                args=(name, l, e))
-            contender_bits[name] = (t, e)
-            threads.append(t)
-
-        # acquire the lock ourselves first to make the others line up
-        lock = self.client.Lock(self.lockpath, "test")
-        lock.acquire()
-
-        for t in threads:
-            t.start()
-
-        # wait for everyone to line up on the lock
-        wait(lambda: len(lock.contenders()) == 6)
-        contenders = lock.contenders()
-
-        eq_(contenders[0], "test")
-        contenders = contenders[1:]
-        remaining = list(contenders)
-
-        # release the lock and contenders should claim it in order
-        lock.release()
-
-        for contender in contenders:
-            thread, event = contender_bits[contender]
-
-            with self.condition:
-                while not self.active_thread:
-                    self.condition.wait()
-                eq_(self.active_thread, contender)
-
-            eq_(lock.contenders(), remaining)
-            remaining = remaining[1:]
-
-            event.set()
-
-            with self.condition:
-                while self.active_thread:
-                    self.condition.wait()
-        for thread in threads:
-            thread.join()
-
-    def test_lock_reconnect(self):
-        event = threading.Event()
-        other_lock = self.client.Lock(self.lockpath, 'contender')
-        thread = threading.Thread(target=self._thread_lock_acquire_til_event,
-                                  args=('contender', other_lock, event))
-
-        # acquire the lock ourselves first to make the contender line up
-        lock = self.client.Lock(self.lockpath, "test")
-        lock.acquire()
-
-        thread.start()
-        # wait for the contender to line up on the lock
-        wait(lambda: len(lock.contenders()) == 2)
-        eq_(lock.contenders(), ['test', 'contender'])
-
-        self.expire_session()
-
-        lock.release()
-
-        with self.condition:
-            while not self.active_thread:
-                self.condition.wait()
-            eq_(self.active_thread, 'contender')
-
-        event.set()
-        thread.join()
-
-    def test_lock_non_blocking(self):
-        lock_name = uuid.uuid4().hex
-        lock = self.client.Lock(self.lockpath, lock_name)
-        event = threading.Event()
-
-        thread = threading.Thread(target=self._thread_lock_acquire_til_event,
-            args=(lock_name, lock, event))
-        thread.start()
-
-        lock1 = self.client.Lock(self.lockpath, lock_name)
-
-        # wait for the thread to acquire the lock
-        with self.condition:
-            if not self.active_thread:
-                self.condition.wait(5)
-
-        ok_(not lock1.acquire(blocking=False))
-        eq_(lock.contenders(), [lock_name])  # just one - itself
-
-        event.set()
-        thread.join()
-
-    def test_lock_fail_first_call(self):
-        event1 = threading.Event()
-        lock1 = self.client.Lock(self.lockpath, "one")
-        thread1 = threading.Thread(target=self._thread_lock_acquire_til_event,
-            args=("one", lock1, event1))
-        thread1.start()
-
-        # wait for this thread to acquire the lock
-        with self.condition:
-            if not self.active_thread:
-                self.condition.wait(5)
-                eq_(self.active_thread, "one")
-        eq_(lock1.contenders(), ["one"])
-        event1.set()
-        thread1.join()
-
-    def test_lock_cancel(self):
-        event1 = threading.Event()
-        lock1 = self.client.Lock(self.lockpath, "one")
-        thread1 = threading.Thread(target=self._thread_lock_acquire_til_event,
-            args=("one", lock1, event1))
-        thread1.start()
-
-        # wait for this thread to acquire the lock
-        with self.condition:
-            if not self.active_thread:
-                self.condition.wait(5)
-                eq_(self.active_thread, "one")
-
-        client2 = self._get_client()
-        client2.start()
-        event2 = threading.Event()
-        lock2 = client2.Lock(self.lockpath, "two")
-        thread2 = threading.Thread(target=self._thread_lock_acquire_til_event,
-            args=("two", lock2, event2))
-        thread2.start()
-
-        # this one should block in acquire. check that it is a contender
-        wait(lambda: len(lock2.contenders()) > 1)
-        eq_(lock2.contenders(), ["one", "two"])
-
-        lock2.cancel()
-        with self.condition:
-            if not "two" in self.cancelled_threads:
-                self.condition.wait()
-                assert "two" in self.cancelled_threads
-
-        eq_(lock2.contenders(), ["one"])
-
-        thread2.join()
-        event1.set()
-        thread1.join()
-        client2.stop()
-
-    def test_lock_double_calls(self):
-        lock1 = self.client.Lock(self.lockpath, "one")
-        lock1.acquire()
-        lock1.acquire()
-        lock1.release()
-        lock1.release()
-
-    def test_lock_reacquire(self):
-        lock = self.client.Lock(self.lockpath, "one")
-        lock.acquire()
-        lock.release()
-        lock.acquire()
-        lock.release()
-
-    def test_lock_timeout(self):
-        timeout = 3
-        e = threading.Event()
-        started = threading.Event()
-
-        # In the background thread, acquire the lock and wait thrice the time
-        # that the main thread is going to wait to acquire the lock.
-        lock1 = self.client.Lock(self.lockpath, "one")
-
-        def _thread(lock, event, timeout):
-            with lock:
-                started.set()
-                event.wait(timeout)
-                if not event.isSet():
-                    # Eventually fail to avoid hanging the tests
-                    self.fail("lock2 never timed out")
-
-        t = threading.Thread(target=_thread, args=(lock1, e, timeout * 3))
-        t.start()
-
-        # Start the main thread's kazoo client and try to acquire the lock
-        # but give up after `timeout` seconds
-        client2 = self._get_client()
-        client2.start()
-        started.wait(5)
-        self.assertTrue(started.isSet())
-        lock2 = client2.Lock(self.lockpath, "two")
-        try:
-            lock2.acquire(timeout=timeout)
-        except LockTimeout:
-            # A timeout is the behavior we're expecting, since the background
-            # thread should still be holding onto the lock
-            pass
-        else:
-            self.fail("Main thread unexpectedly acquired the lock")
-        finally:
-            # Cleanup
-            e.set()
-            t.join()
-            client2.stop()
-
-
-class TestSemaphore(KazooTestCase):
-    def setUp(self):
-        super(TestSemaphore, self).setUp()
-        self.lockpath = "/" + uuid.uuid4().hex
-
-        self.condition = threading.Condition()
-        self.released = threading.Event()
-        self.active_thread = None
-        self.cancelled_threads = []
-
-    def test_basic(self):
-        sem1 = self.client.Semaphore(self.lockpath)
-        sem1.acquire()
-        sem1.release()
-
-    def test_lock_one(self):
-        sem1 = self.client.Semaphore(self.lockpath, max_leases=1)
-        sem2 = self.client.Semaphore(self.lockpath, max_leases=1)
-        started = threading.Event()
-        event = threading.Event()
-
-        sem1.acquire()
-
-        def sema_one():
-            started.set()
-            with sem2:
-                event.set()
-
-        thread = threading.Thread(target=sema_one, args=())
-        thread.start()
-        started.wait(10)
-
-        self.assertFalse(event.is_set())
-
-        sem1.release()
-        event.wait(10)
-        self.assert_(event.is_set())
-        thread.join()
-
-    def test_non_blocking(self):
-        sem1 = self.client.Semaphore(
-            self.lockpath, identifier='sem1', max_leases=2)
-        sem2 = self.client.Semaphore(
-            self.lockpath, identifier='sem2', max_leases=2)
-        sem3 = self.client.Semaphore(
-            self.lockpath, identifier='sem3', max_leases=2)
-
-        sem1.acquire()
-        sem2.acquire()
-        ok_(not sem3.acquire(blocking=False))
-        eq_(set(sem1.lease_holders()), set(['sem1', 'sem2']))
-        sem2.release()
-        # the next line isn't required, but avoids timing issues in tests
-        sem3.acquire()
-        eq_(set(sem1.lease_holders()), set(['sem1', 'sem3']))
-        sem1.release()
-        sem3.release()
-
-    def test_non_blocking_release(self):
-        sem1 = self.client.Semaphore(
-            self.lockpath, identifier='sem1', max_leases=1)
-        sem2 = self.client.Semaphore(
-            self.lockpath, identifier='sem2', max_leases=1)
-        sem1.acquire()
-        sem2.acquire(blocking=False)
-
-        # make sure there's no shutdown / cleanup error
-        sem1.release()
-        sem2.release()
-
-    def test_holders(self):
-        started = threading.Event()
-        event = threading.Event()
-
-        def sema_one():
-            with self.client.Semaphore(self.lockpath, 'fred', max_leases=1):
-                started.set()
-                event.wait()
-
-        thread = threading.Thread(target=sema_one, args=())
-        thread.start()
-        started.wait()
-        sem1 = self.client.Semaphore(self.lockpath)
-        holders = sem1.lease_holders()
-        eq_(holders, ['fred'])
-        event.set()
-        thread.join()
-
-    def test_semaphore_cancel(self):
-        sem1 = self.client.Semaphore(self.lockpath, 'fred', max_leases=1)
-        sem2 = self.client.Semaphore(self.lockpath, 'george', max_leases=1)
-        sem1.acquire()
-        started = threading.Event()
-        event = threading.Event()
-
-        def sema_one():
-            started.set()
-            try:
-                with sem2:
-                    started.set()
-            except CancelledError:
-                event.set()
-
-        thread = threading.Thread(target=sema_one, args=())
-        thread.start()
-        started.wait()
-        eq_(sem1.lease_holders(), ['fred'])
-        eq_(event.is_set(), False)
-        sem2.cancel()
-        event.wait()
-        eq_(event.is_set(), True)
-        thread.join()
-
-    def test_multiple_acquire_and_release(self):
-        sem1 = self.client.Semaphore(self.lockpath, 'fred', max_leases=1)
-        sem1.acquire()
-        sem1.acquire()
-
-        eq_(True, sem1.release())
-        eq_(False, sem1.release())
-
-    def test_handle_session_loss(self):
-        expire_semaphore = self.client.Semaphore(self.lockpath, 'fred',
-                                                 max_leases=1)
-
-        client = self._get_client()
-        client.start()
-        lh_semaphore = client.Semaphore(self.lockpath, 'george', max_leases=1)
-        lh_semaphore.acquire()
-
-        started = threading.Event()
-        event = threading.Event()
-        event2 = threading.Event()
-
-        def sema_one():
-            started.set()
-            with expire_semaphore:
-                event.set()
-                event2.wait()
-
-        thread = threading.Thread(target=sema_one, args=())
-        thread.start()
-
-        started.wait()
-        eq_(lh_semaphore.lease_holders(), ['george'])
-
-        # Fired in a separate thread to make sure we can see the effect
-        expired = threading.Event()
-
-        def expire():
-            self.expire_session()
-            expired.set()
-
-        thread = threading.Thread(target=expire, args=())
-        thread.start()
-        expire_semaphore.wake_event.wait()
-        expired.wait()
-
-        lh_semaphore.release()
-        client.stop()
-
-        event.wait(5)
-        eq_(expire_semaphore.lease_holders(), ['fred'])
-        event2.set()
-        thread.join()
-
-    def test_inconsistent_max_leases(self):
-        sem1 = self.client.Semaphore(self.lockpath, max_leases=1)
-        sem2 = self.client.Semaphore(self.lockpath, max_leases=2)
-
-        sem1.acquire()
-        self.assertRaises(ValueError, sem2.acquire)
-
-    def test_inconsistent_max_leases_other_data(self):
-        sem1 = self.client.Semaphore(self.lockpath, max_leases=1)
-        sem2 = self.client.Semaphore(self.lockpath, max_leases=2)
-
-        self.client.ensure_path(self.lockpath)
-        self.client.set(self.lockpath, b'a$')
-
-        sem1.acquire()
-        # sem2 thinks it's ok to have two lease holders
-        ok_(sem2.acquire(blocking=False))
-
-    def test_reacquire(self):
-        lock = self.client.Semaphore(self.lockpath)
-        lock.acquire()
-        lock.release()
-        lock.acquire()
-        lock.release()
-
-    def test_acquire_after_cancelled(self):
-        lock = self.client.Semaphore(self.lockpath)
-        self.assertTrue(lock.acquire())
-        self.assertTrue(lock.release())
-        lock.cancel()
-        self.assertTrue(lock.cancelled)
-        self.assertTrue(lock.acquire())
-
-    def test_timeout(self):
-        timeout = 3
-        e = threading.Event()
-        started = threading.Event()
-
-        # In the background thread, acquire the lock and wait thrice the time
-        # that the main thread is going to wait to acquire the lock.
-        sem1 = self.client.Semaphore(self.lockpath, "one")
-
-        def _thread(sem, event, timeout):
-            with sem:
-                started.set()
-                event.wait(timeout)
-                if not event.isSet():
-                    # Eventually fail to avoid hanging the tests
-                    self.fail("sem2 never timed out")
-
-        t = threading.Thread(target=_thread, args=(sem1, e, timeout * 3))
-        t.start()
-
-        # Start the main thread's kazoo client and try to acquire the lock
-        # but give up after `timeout` seconds
-        client2 = self._get_client()
-        client2.start()
-        started.wait(5)
-        self.assertTrue(started.isSet())
-        sem2 = client2.Semaphore(self.lockpath, "two")
-        try:
-            sem2.acquire(timeout=timeout)
-        except LockTimeout:
-            # A timeout is the behavior we're expecting, since the background
-            # thread will still be holding onto the lock
-            e.set()
-        finally:
-            # Cleanup
-            t.join()
-            client2.stop()

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_partitioner.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_partitioner.py 
b/slider-agent/src/main/python/kazoo/tests/test_partitioner.py
deleted file mode 100644
index 1a4f205..0000000
--- a/slider-agent/src/main/python/kazoo/tests/test_partitioner.py
+++ /dev/null
@@ -1,93 +0,0 @@
-"""license: Apache License 2.0, see LICENSE for more details."""
-import uuid
-import time
-
-from nose.tools import eq_
-
-from kazoo.testing import KazooTestCase
-from kazoo.recipe.partitioner import PartitionState
-
-
-class KazooPartitionerTests(KazooTestCase):
-    def setUp(self):
-        super(KazooPartitionerTests, self).setUp()
-        self.path = "/" + uuid.uuid4().hex
-
-    def test_party_of_one(self):
-        partitioner = self.client.SetPartitioner(
-            self.path, set=(1, 2, 3), time_boundary=0.2)
-        partitioner.wait_for_acquire(14)
-        eq_(partitioner.state, PartitionState.ACQUIRED)
-        eq_(list(partitioner), [1, 2, 3])
-        partitioner.finish()
-
-    def test_party_of_two(self):
-        partitioners = [self.client.SetPartitioner(self.path, (1, 2),
-                        identifier="p%s" % i, time_boundary=0.2)
-                        for i in range(2)]
-
-        partitioners[0].wait_for_acquire(14)
-        partitioners[1].wait_for_acquire(14)
-        eq_(list(partitioners[0]), [1])
-        eq_(list(partitioners[1]), [2])
-        partitioners[0].finish()
-        time.sleep(0.1)
-        eq_(partitioners[1].release, True)
-        partitioners[1].finish()
-
-    def test_party_expansion(self):
-        partitioners = [self.client.SetPartitioner(self.path, (1, 2, 3),
-                        identifier="p%s" % i, time_boundary=0.2)
-                        for i in range(2)]
-
-        partitioners[0].wait_for_acquire(14)
-        partitioners[1].wait_for_acquire(14)
-        eq_(partitioners[0].state, PartitionState.ACQUIRED)
-        eq_(partitioners[1].state, PartitionState.ACQUIRED)
-
-        eq_(list(partitioners[0]), [1, 3])
-        eq_(list(partitioners[1]), [2])
-
-        # Add another partition, wait till they settle
-        partitioners.append(self.client.SetPartitioner(self.path, (1, 2, 3),
-                            identifier="p2", time_boundary=0.2))
-        time.sleep(0.1)
-        eq_(partitioners[0].release, True)
-        for p in partitioners[:-1]:
-            p.release_set()
-
-        for p in partitioners:
-            p.wait_for_acquire(14)
-
-        eq_(list(partitioners[0]), [1])
-        eq_(list(partitioners[1]), [2])
-        eq_(list(partitioners[2]), [3])
-
-        for p in partitioners:
-            p.finish()
-
-    def test_more_members_than_set_items(self):
-        partitioners = [self.client.SetPartitioner(self.path, (1,),
-                        identifier="p%s" % i, time_boundary=0.2)
-                        for i in range(2)]
-
-        partitioners[0].wait_for_acquire(14)
-        partitioners[1].wait_for_acquire(14)
-        eq_(partitioners[0].state, PartitionState.ACQUIRED)
-        eq_(partitioners[1].state, PartitionState.ACQUIRED)
-
-        eq_(list(partitioners[0]), [1])
-        eq_(list(partitioners[1]), [])
-
-        for p in partitioners:
-            p.finish()
-
-    def test_party_session_failure(self):
-        partitioner = self.client.SetPartitioner(
-            self.path, set=(1, 2, 3), time_boundary=0.2)
-        partitioner.wait_for_acquire(14)
-        eq_(partitioner.state, PartitionState.ACQUIRED)
-        # simulate session failure
-        partitioner._fail_out()
-        partitioner.release_set()
-        self.assertTrue(partitioner.failed)

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_party.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_party.py 
b/slider-agent/src/main/python/kazoo/tests/test_party.py
deleted file mode 100644
index 61400ae..0000000
--- a/slider-agent/src/main/python/kazoo/tests/test_party.py
+++ /dev/null
@@ -1,85 +0,0 @@
-"""license: Apache License 2.0, see LICENSE for more details."""
-import uuid
-
-from nose.tools import eq_
-
-from kazoo.testing import KazooTestCase
-
-
-class KazooPartyTests(KazooTestCase):
-    def setUp(self):
-        super(KazooPartyTests, self).setUp()
-        self.path = "/" + uuid.uuid4().hex
-
-    def test_party(self):
-        parties = [self.client.Party(self.path, "p%s" % i)
-                   for i in range(5)]
-
-        one_party = parties[0]
-
-        eq_(list(one_party), [])
-        eq_(len(one_party), 0)
-
-        participants = set()
-        for party in parties:
-            party.join()
-            participants.add(party.data.decode('utf-8'))
-
-            eq_(set(party), participants)
-            eq_(len(party), len(participants))
-
-        for party in parties:
-            party.leave()
-            participants.remove(party.data.decode('utf-8'))
-
-            eq_(set(party), participants)
-            eq_(len(party), len(participants))
-
-    def test_party_reuse_node(self):
-        party = self.client.Party(self.path, "p1")
-        self.client.ensure_path(self.path)
-        self.client.create(party.create_path)
-        party.join()
-        self.assertTrue(party.participating)
-        party.leave()
-        self.assertFalse(party.participating)
-        self.assertEqual(len(party), 0)
-
-    def test_party_vanishing_node(self):
-        party = self.client.Party(self.path, "p1")
-        party.join()
-        self.assertTrue(party.participating)
-        self.client.delete(party.create_path)
-        party.leave()
-        self.assertFalse(party.participating)
-        self.assertEqual(len(party), 0)
-
-
-class KazooShallowPartyTests(KazooTestCase):
-    def setUp(self):
-        super(KazooShallowPartyTests, self).setUp()
-        self.path = "/" + uuid.uuid4().hex
-
-    def test_party(self):
-        parties = [self.client.ShallowParty(self.path, "p%s" % i)
-                   for i in range(5)]
-
-        one_party = parties[0]
-
-        eq_(list(one_party), [])
-        eq_(len(one_party), 0)
-
-        participants = set()
-        for party in parties:
-            party.join()
-            participants.add(party.data.decode('utf-8'))
-
-            eq_(set(party), participants)
-            eq_(len(party), len(participants))
-
-        for party in parties:
-            party.leave()
-            participants.remove(party.data.decode('utf-8'))
-
-            eq_(set(party), participants)
-            eq_(len(party), len(participants))

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_paths.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_paths.py 
b/slider-agent/src/main/python/kazoo/tests/test_paths.py
deleted file mode 100644
index c9064bb..0000000
--- a/slider-agent/src/main/python/kazoo/tests/test_paths.py
+++ /dev/null
@@ -1,99 +0,0 @@
-"""license: Apache License 2.0, see LICENSE for more details."""
-import sys
-from unittest import TestCase
-
-from kazoo.protocol import paths
-
-
-if sys.version_info > (3, ):  # pragma: nocover
-    def u(s):
-        return s
-else:  # pragma: nocover
-    def u(s):
-        return unicode(s, "unicode_escape")
-
-
-class NormPathTestCase(TestCase):
-
-    def test_normpath(self):
-        self.assertEqual(paths.normpath('/a/b'), '/a/b')
-
-    def test_normpath_empty(self):
-        self.assertEqual(paths.normpath(''), '')
-
-    def test_normpath_unicode(self):
-        self.assertEqual(paths.normpath(u('/\xe4/b')), u('/\xe4/b'))
-
-    def test_normpath_dots(self):
-        self.assertEqual(paths.normpath('/a./b../c'), '/a./b../c')
-
-    def test_normpath_slash(self):
-        self.assertEqual(paths.normpath('/'), '/')
-
-    def test_normpath_multiple_slashes(self):
-        self.assertEqual(paths.normpath('//'), '/')
-        self.assertEqual(paths.normpath('//a/b'), '/a/b')
-        self.assertEqual(paths.normpath('/a//b//'), '/a/b')
-        self.assertEqual(paths.normpath('//a////b///c/'), '/a/b/c')
-
-    def test_normpath_relative(self):
-        self.assertRaises(ValueError, paths.normpath, './a/b')
-        self.assertRaises(ValueError, paths.normpath, '/a/../b')
-
-
-class JoinTestCase(TestCase):
-
-    def test_join(self):
-        self.assertEqual(paths.join('/a'), '/a')
-        self.assertEqual(paths.join('/a', 'b/'), '/a/b/')
-        self.assertEqual(paths.join('/a', 'b', 'c'), '/a/b/c')
-
-    def test_join_empty(self):
-        self.assertEqual(paths.join(''), '')
-        self.assertEqual(paths.join('', 'a', 'b'), 'a/b')
-        self.assertEqual(paths.join('/a', '', 'b/', 'c'), '/a/b/c')
-
-    def test_join_absolute(self):
-        self.assertEqual(paths.join('/a/b', '/c'), '/c')
-
-
-class IsAbsTestCase(TestCase):
-
-    def test_isabs(self):
-        self.assertTrue(paths.isabs('/'))
-        self.assertTrue(paths.isabs('/a'))
-        self.assertTrue(paths.isabs('/a//b/c'))
-        self.assertTrue(paths.isabs('//a/b'))
-
-    def test_isabs_false(self):
-        self.assertFalse(paths.isabs(''))
-        self.assertFalse(paths.isabs('a/'))
-        self.assertFalse(paths.isabs('a/../'))
-
-
-class BaseNameTestCase(TestCase):
-
-    def test_basename(self):
-        self.assertEquals(paths.basename(''), '')
-        self.assertEquals(paths.basename('/'), '')
-        self.assertEquals(paths.basename('//a'), 'a')
-        self.assertEquals(paths.basename('//a/'), '')
-        self.assertEquals(paths.basename('/a/b.//c..'), 'c..')
-
-
-class PrefixRootTestCase(TestCase):
-
-    def test_prefix_root(self):
-        self.assertEquals(paths._prefix_root('/a/', 'b/c'), '/a/b/c')
-        self.assertEquals(paths._prefix_root('/a/b', 'c/d'), '/a/b/c/d')
-        self.assertEquals(paths._prefix_root('/a', '/b/c'), '/a/b/c')
-        self.assertEquals(paths._prefix_root('/a', '//b/c.'), '/a/b/c.')
-
-
-class NormRootTestCase(TestCase):
-
-    def test_norm_root(self):
-        self.assertEquals(paths._norm_root(''), '/')
-        self.assertEquals(paths._norm_root('/'), '/')
-        self.assertEquals(paths._norm_root('//a'), '/a')
-        self.assertEquals(paths._norm_root('//a./b'), '/a./b')

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_queue.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_queue.py 
b/slider-agent/src/main/python/kazoo/tests/test_queue.py
deleted file mode 100644
index 4c13ca9..0000000
--- a/slider-agent/src/main/python/kazoo/tests/test_queue.py
+++ /dev/null
@@ -1,180 +0,0 @@
-"""license: Apache License 2.0, see LICENSE for more details."""
-import uuid
-
-from nose import SkipTest
-from nose.tools import eq_, ok_
-
-from kazoo.testing import KazooTestCase
-from kazoo.tests.util import TRAVIS_ZK_VERSION
-
-
-class KazooQueueTests(KazooTestCase):
-
-    def _makeOne(self):
-        path = "/" + uuid.uuid4().hex
-        return self.client.Queue(path)
-
-    def test_queue_validation(self):
-        queue = self._makeOne()
-        self.assertRaises(TypeError, queue.put, {})
-        self.assertRaises(TypeError, queue.put, b"one", b"100")
-        self.assertRaises(TypeError, queue.put, b"one", 10.0)
-        self.assertRaises(ValueError, queue.put, b"one", -100)
-        self.assertRaises(ValueError, queue.put, b"one", 100000)
-
-    def test_empty_queue(self):
-        queue = self._makeOne()
-        eq_(len(queue), 0)
-        self.assertTrue(queue.get() is None)
-        eq_(len(queue), 0)
-
-    def test_queue(self):
-        queue = self._makeOne()
-        queue.put(b"one")
-        queue.put(b"two")
-        queue.put(b"three")
-        eq_(len(queue), 3)
-
-        eq_(queue.get(), b"one")
-        eq_(queue.get(), b"two")
-        eq_(queue.get(), b"three")
-        eq_(len(queue), 0)
-
-    def test_priority(self):
-        queue = self._makeOne()
-        queue.put(b"four", priority=101)
-        queue.put(b"one", priority=0)
-        queue.put(b"two", priority=0)
-        queue.put(b"three", priority=10)
-
-        eq_(queue.get(), b"one")
-        eq_(queue.get(), b"two")
-        eq_(queue.get(), b"three")
-        eq_(queue.get(), b"four")
-
-
-class KazooLockingQueueTests(KazooTestCase):
-
-    def setUp(self):
-        KazooTestCase.setUp(self)
-        skip = False
-        if TRAVIS_ZK_VERSION and TRAVIS_ZK_VERSION < (3, 4):
-            skip = True
-        elif TRAVIS_ZK_VERSION and TRAVIS_ZK_VERSION >= (3, 4):
-            skip = False
-        else:
-            ver = self.client.server_version()
-            if ver[1] < 4:
-                skip = True
-        if skip:
-            raise SkipTest("Must use Zookeeper 3.4 or above")
-
-    def _makeOne(self):
-        path = "/" + uuid.uuid4().hex
-        return self.client.LockingQueue(path)
-
-    def test_queue_validation(self):
-        queue = self._makeOne()
-        self.assertRaises(TypeError, queue.put, {})
-        self.assertRaises(TypeError, queue.put, b"one", b"100")
-        self.assertRaises(TypeError, queue.put, b"one", 10.0)
-        self.assertRaises(ValueError, queue.put, b"one", -100)
-        self.assertRaises(ValueError, queue.put, b"one", 100000)
-        self.assertRaises(TypeError, queue.put_all, {})
-        self.assertRaises(TypeError, queue.put_all, [{}])
-        self.assertRaises(TypeError, queue.put_all, [b"one"], b"100")
-        self.assertRaises(TypeError, queue.put_all, [b"one"], 10.0)
-        self.assertRaises(ValueError, queue.put_all, [b"one"], -100)
-        self.assertRaises(ValueError, queue.put_all, [b"one"], 100000)
-
-    def test_empty_queue(self):
-        queue = self._makeOne()
-        eq_(len(queue), 0)
-        self.assertTrue(queue.get(0) is None)
-        eq_(len(queue), 0)
-
-    def test_queue(self):
-        queue = self._makeOne()
-        queue.put(b"one")
-        queue.put_all([b"two", b"three"])
-        eq_(len(queue), 3)
-
-        ok_(not queue.consume())
-        ok_(not queue.holds_lock())
-        eq_(queue.get(1), b"one")
-        ok_(queue.holds_lock())
-        # Without consuming, should return the same element
-        eq_(queue.get(1), b"one")
-        ok_(queue.consume())
-        ok_(not queue.holds_lock())
-        eq_(queue.get(1), b"two")
-        ok_(queue.holds_lock())
-        ok_(queue.consume())
-        ok_(not queue.holds_lock())
-        eq_(queue.get(1), b"three")
-        ok_(queue.holds_lock())
-        ok_(queue.consume())
-        ok_(not queue.holds_lock())
-        ok_(not queue.consume())
-        eq_(len(queue), 0)
-
-    def test_consume(self):
-        queue = self._makeOne()
-
-        queue.put(b"one")
-        ok_(not queue.consume())
-        queue.get(.1)
-        ok_(queue.consume())
-        ok_(not queue.consume())
-
-    def test_holds_lock(self):
-        queue = self._makeOne()
-
-        ok_(not queue.holds_lock())
-        queue.put(b"one")
-        queue.get(.1)
-        ok_(queue.holds_lock())
-        queue.consume()
-        ok_(not queue.holds_lock())
-
-    def test_priority(self):
-        queue = self._makeOne()
-        queue.put(b"four", priority=101)
-        queue.put(b"one", priority=0)
-        queue.put(b"two", priority=0)
-        queue.put(b"three", priority=10)
-
-        eq_(queue.get(1), b"one")
-        ok_(queue.consume())
-        eq_(queue.get(1), b"two")
-        ok_(queue.consume())
-        eq_(queue.get(1), b"three")
-        ok_(queue.consume())
-        eq_(queue.get(1), b"four")
-        ok_(queue.consume())
-
-    def test_concurrent_execution(self):
-        queue = self._makeOne()
-        value1 = []
-        value2 = []
-        value3 = []
-        event1 = self.client.handler.event_object()
-        event2 = self.client.handler.event_object()
-        event3 = self.client.handler.event_object()
-
-        def get_concurrently(value, event):
-            q = self.client.LockingQueue(queue.path)
-            value.append(q.get(.1))
-            event.set()
-
-        self.client.handler.spawn(get_concurrently, value1, event1)
-        self.client.handler.spawn(get_concurrently, value2, event2)
-        self.client.handler.spawn(get_concurrently, value3, event3)
-        queue.put(b"one")
-        event1.wait(.2)
-        event2.wait(.2)
-        event3.wait(.2)
-
-        result = value1 + value2 + value3
-        eq_(result.count(b"one"), 1)
-        eq_(result.count(None), 2)

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_retry.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_retry.py 
b/slider-agent/src/main/python/kazoo/tests/test_retry.py
deleted file mode 100644
index 84c8d41..0000000
--- a/slider-agent/src/main/python/kazoo/tests/test_retry.py
+++ /dev/null
@@ -1,78 +0,0 @@
-"""license: Apache License 2.0, see LICENSE for more details."""
-import unittest
-
-from nose.tools import eq_
-
-
-class TestRetrySleeper(unittest.TestCase):
-
-    def _pass(self):
-        pass
-
-    def _fail(self, times=1):
-        from kazoo.retry import ForceRetryError
-        scope = dict(times=0)
-
-        def inner():
-            if scope['times'] >= times:
-                pass
-            else:
-                scope['times'] += 1
-                raise ForceRetryError('Failed!')
-        return inner
-
-    def _makeOne(self, *args, **kwargs):
-        from kazoo.retry import KazooRetry
-        return KazooRetry(*args, **kwargs)
-
-    def test_reset(self):
-        retry = self._makeOne(delay=0, max_tries=2)
-        retry(self._fail())
-        eq_(retry._attempts, 1)
-        retry.reset()
-        eq_(retry._attempts, 0)
-
-    def test_too_many_tries(self):
-        from kazoo.retry import RetryFailedError
-        retry = self._makeOne(delay=0)
-        self.assertRaises(RetryFailedError, retry, self._fail(times=999))
-        eq_(retry._attempts, 1)
-
-    def test_maximum_delay(self):
-        def sleep_func(_time):
-            pass
-
-        retry = self._makeOne(delay=10, max_tries=100, sleep_func=sleep_func)
-        retry(self._fail(times=10))
-        self.assertTrue(retry._cur_delay < 4000, retry._cur_delay)
-        # gevent's sleep function is picky about the type
-        eq_(type(retry._cur_delay), float)
-
-    def test_copy(self):
-        _sleep = lambda t: None
-        retry = self._makeOne(sleep_func=_sleep)
-        rcopy = retry.copy()
-        self.assertTrue(rcopy.sleep_func is _sleep)
-
-
-class TestKazooRetry(unittest.TestCase):
-
-    def _makeOne(self, **kw):
-        from kazoo.retry import KazooRetry
-        return KazooRetry(**kw)
-
-    def test_connection_closed(self):
-        from kazoo.exceptions import ConnectionClosedError
-        retry = self._makeOne()
-
-        def testit():
-            raise ConnectionClosedError()
-        self.assertRaises(ConnectionClosedError, retry, testit)
-
-    def test_session_expired(self):
-        from kazoo.exceptions import SessionExpiredError
-        retry = self._makeOne(max_tries=1)
-
-        def testit():
-            raise SessionExpiredError()
-        self.assertRaises(Exception, retry, testit)

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_security.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_security.py 
b/slider-agent/src/main/python/kazoo/tests/test_security.py
deleted file mode 100644
index 587c265..0000000
--- a/slider-agent/src/main/python/kazoo/tests/test_security.py
+++ /dev/null
@@ -1,41 +0,0 @@
-"""license: Apache License 2.0, see LICENSE for more details."""
-import unittest
-
-from nose.tools import eq_
-from kazoo.security import Permissions
-
-
-class TestACL(unittest.TestCase):
-    def _makeOne(self, *args, **kwargs):
-        from kazoo.security import make_acl
-        return make_acl(*args, **kwargs)
-
-    def test_read_acl(self):
-        acl = self._makeOne("digest", ":", read=True)
-        eq_(acl.perms & Permissions.READ, Permissions.READ)
-
-    def test_all_perms(self):
-        acl = self._makeOne("digest", ":", read=True, write=True,
-                            create=True, delete=True, admin=True)
-        for perm in [Permissions.READ, Permissions.CREATE, Permissions.WRITE,
-                     Permissions.DELETE, Permissions.ADMIN]:
-            eq_(acl.perms & perm, perm)
-
-    def test_perm_listing(self):
-        from kazoo.security import ACL
-        f = ACL(15, 'fred')
-        self.assert_('READ' in f.acl_list)
-        self.assert_('WRITE' in f.acl_list)
-        self.assert_('CREATE' in f.acl_list)
-        self.assert_('DELETE' in f.acl_list)
-
-        f = ACL(16, 'fred')
-        self.assert_('ADMIN' in f.acl_list)
-
-        f = ACL(31, 'george')
-        self.assert_('ALL' in f.acl_list)
-
-    def test_perm_repr(self):
-        from kazoo.security import ACL
-        f = ACL(16, 'fred')
-        self.assert_("ACL(perms=16, acl_list=['ADMIN']" in repr(f))

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_threading_handler.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_threading_handler.py 
b/slider-agent/src/main/python/kazoo/tests/test_threading_handler.py
deleted file mode 100644
index 4de5781..0000000
--- a/slider-agent/src/main/python/kazoo/tests/test_threading_handler.py
+++ /dev/null
@@ -1,327 +0,0 @@
-"""license: Apache License 2.0, see LICENSE for more details."""
-import threading
-import unittest
-
-import mock
-from nose.tools import assert_raises
-from nose.tools import eq_
-from nose.tools import raises
-
-
-class TestThreadingHandler(unittest.TestCase):
-    def _makeOne(self, *args):
-        from kazoo.handlers.threading import SequentialThreadingHandler
-        return SequentialThreadingHandler(*args)
-
-    def _getAsync(self, *args):
-        from kazoo.handlers.threading import AsyncResult
-        return AsyncResult
-
-    def test_proper_threading(self):
-        h = self._makeOne()
-        h.start()
-        # In Python 3.3 _Event is gone, before Event is function
-        event_class = getattr(threading, '_Event', threading.Event)
-        assert isinstance(h.event_object(), event_class)
-
-    def test_matching_async(self):
-        h = self._makeOne()
-        h.start()
-        async = self._getAsync()
-        assert isinstance(h.async_result(), async)
-
-    def test_exception_raising(self):
-        h = self._makeOne()
-
-        @raises(h.timeout_exception)
-        def testit():
-            raise h.timeout_exception("This is a timeout")
-        testit()
-
-    def test_double_start_stop(self):
-        h = self._makeOne()
-        h.start()
-        self.assertTrue(h._running)
-        h.start()
-        h.stop()
-        h.stop()
-        self.assertFalse(h._running)
-
-
-class TestThreadingAsync(unittest.TestCase):
-    def _makeOne(self, *args):
-        from kazoo.handlers.threading import AsyncResult
-        return AsyncResult(*args)
-
-    def _makeHandler(self):
-        from kazoo.handlers.threading import SequentialThreadingHandler
-        return SequentialThreadingHandler()
-
-    def test_ready(self):
-        mock_handler = mock.Mock()
-        async = self._makeOne(mock_handler)
-
-        eq_(async.ready(), False)
-        async.set('val')
-        eq_(async.ready(), True)
-        eq_(async.successful(), True)
-        eq_(async.exception, None)
-
-    def test_callback_queued(self):
-        mock_handler = mock.Mock()
-        mock_handler.completion_queue = mock.Mock()
-        async = self._makeOne(mock_handler)
-
-        async.rawlink(lambda a: a)
-        async.set('val')
-
-        assert mock_handler.completion_queue.put.called
-
-    def test_set_exception(self):
-        mock_handler = mock.Mock()
-        mock_handler.completion_queue = mock.Mock()
-        async = self._makeOne(mock_handler)
-        async.rawlink(lambda a: a)
-        async.set_exception(ImportError('Error occured'))
-
-        assert isinstance(async.exception, ImportError)
-        assert mock_handler.completion_queue.put.called
-
-    def test_get_wait_while_setting(self):
-        mock_handler = mock.Mock()
-        async = self._makeOne(mock_handler)
-
-        lst = []
-        bv = threading.Event()
-        cv = threading.Event()
-
-        def wait_for_val():
-            bv.set()
-            val = async.get()
-            lst.append(val)
-            cv.set()
-        th = threading.Thread(target=wait_for_val)
-        th.start()
-        bv.wait()
-
-        async.set('fred')
-        cv.wait()
-        eq_(lst, ['fred'])
-        th.join()
-
-    def test_get_with_nowait(self):
-        mock_handler = mock.Mock()
-        async = self._makeOne(mock_handler)
-        timeout = self._makeHandler().timeout_exception
-
-        @raises(timeout)
-        def test_it():
-            async.get(block=False)
-        test_it()
-
-        @raises(timeout)
-        def test_nowait():
-            async.get_nowait()
-        test_nowait()
-
-    def test_get_with_exception(self):
-        mock_handler = mock.Mock()
-        async = self._makeOne(mock_handler)
-
-        lst = []
-        bv = threading.Event()
-        cv = threading.Event()
-
-        def wait_for_val():
-            bv.set()
-            try:
-                val = async.get()
-            except ImportError:
-                lst.append('oops')
-            else:
-                lst.append(val)
-            cv.set()
-        th = threading.Thread(target=wait_for_val)
-        th.start()
-        bv.wait()
-
-        async.set_exception(ImportError)
-        cv.wait()
-        eq_(lst, ['oops'])
-        th.join()
-
-    def test_wait(self):
-        mock_handler = mock.Mock()
-        async = self._makeOne(mock_handler)
-
-        lst = []
-        bv = threading.Event()
-        cv = threading.Event()
-
-        def wait_for_val():
-            bv.set()
-            try:
-                val = async.wait(10)
-            except ImportError:
-                lst.append('oops')
-            else:
-                lst.append(val)
-            cv.set()
-        th = threading.Thread(target=wait_for_val)
-        th.start()
-        bv.wait(10)
-
-        async.set("fred")
-        cv.wait(15)
-        eq_(lst, [True])
-        th.join()
-
-    def test_set_before_wait(self):
-        mock_handler = mock.Mock()
-        async = self._makeOne(mock_handler)
-
-        lst = []
-        cv = threading.Event()
-        async.set('fred')
-
-        def wait_for_val():
-            val = async.get()
-            lst.append(val)
-            cv.set()
-        th = threading.Thread(target=wait_for_val)
-        th.start()
-        cv.wait()
-        eq_(lst, ['fred'])
-        th.join()
-
-    def test_set_exc_before_wait(self):
-        mock_handler = mock.Mock()
-        async = self._makeOne(mock_handler)
-
-        lst = []
-        cv = threading.Event()
-        async.set_exception(ImportError)
-
-        def wait_for_val():
-            try:
-                val = async.get()
-            except ImportError:
-                lst.append('ooops')
-            else:
-                lst.append(val)
-            cv.set()
-        th = threading.Thread(target=wait_for_val)
-        th.start()
-        cv.wait()
-        eq_(lst, ['ooops'])
-        th.join()
-
-    def test_linkage(self):
-        mock_handler = mock.Mock()
-        async = self._makeOne(mock_handler)
-        cv = threading.Event()
-
-        lst = []
-
-        def add_on():
-            lst.append(True)
-
-        def wait_for_val():
-            async.get()
-            cv.set()
-
-        th = threading.Thread(target=wait_for_val)
-        th.start()
-
-        async.rawlink(add_on)
-        async.set('fred')
-        assert mock_handler.completion_queue.put.called
-        async.unlink(add_on)
-        cv.wait()
-        eq_(async.value, 'fred')
-        th.join()
-
-    def test_linkage_not_ready(self):
-        mock_handler = mock.Mock()
-        async = self._makeOne(mock_handler)
-
-        lst = []
-
-        def add_on():
-            lst.append(True)
-
-        async.set('fred')
-        assert not mock_handler.completion_queue.called
-        async.rawlink(add_on)
-        assert mock_handler.completion_queue.put.called
-
-    def test_link_and_unlink(self):
-        mock_handler = mock.Mock()
-        async = self._makeOne(mock_handler)
-
-        lst = []
-
-        def add_on():
-            lst.append(True)
-
-        async.rawlink(add_on)
-        assert not mock_handler.completion_queue.put.called
-        async.unlink(add_on)
-        async.set('fred')
-        assert not mock_handler.completion_queue.put.called
-
-    def test_captured_exception(self):
-        from kazoo.handlers.utils import capture_exceptions
-
-        mock_handler = mock.Mock()
-        async = self._makeOne(mock_handler)
-
-        @capture_exceptions(async)
-        def exceptional_function():
-            return 1/0
-
-        exceptional_function()
-
-        assert_raises(ZeroDivisionError, async.get)
-
-    def test_no_capture_exceptions(self):
-        from kazoo.handlers.utils import capture_exceptions
-
-        mock_handler = mock.Mock()
-        async = self._makeOne(mock_handler)
-
-        lst = []
-
-        def add_on():
-            lst.append(True)
-
-        async.rawlink(add_on)
-
-        @capture_exceptions(async)
-        def regular_function():
-            return True
-
-        regular_function()
-
-        assert not mock_handler.completion_queue.put.called
-
-    def test_wraps(self):
-        from kazoo.handlers.utils import wrap
-
-        mock_handler = mock.Mock()
-        async = self._makeOne(mock_handler)
-
-        lst = []
-
-        def add_on(result):
-            lst.append(result.get())
-
-        async.rawlink(add_on)
-
-        @wrap(async)
-        def regular_function():
-            return 'hello'
-
-        assert regular_function() == 'hello'
-        assert mock_handler.completion_queue.put.called
-        assert async.get() == 'hello'

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_watchers.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_watchers.py 
b/slider-agent/src/main/python/kazoo/tests/test_watchers.py
deleted file mode 100644
index 44795c4..0000000
--- a/slider-agent/src/main/python/kazoo/tests/test_watchers.py
+++ /dev/null
@@ -1,490 +0,0 @@
-"""license: Apache License 2.0, see LICENSE for more details."""
-import time
-import threading
-import uuid
-
-from nose.tools import eq_
-from nose.tools import raises
-
-from kazoo.exceptions import KazooException
-from kazoo.protocol.states import EventType
-from kazoo.testing import KazooTestCase
-
-
-class KazooDataWatcherTests(KazooTestCase):
-    def setUp(self):
-        super(KazooDataWatcherTests, self).setUp()
-        self.path = "/" + uuid.uuid4().hex
-        self.client.ensure_path(self.path)
-
-    def test_data_watcher(self):
-        update = threading.Event()
-        data = [True]
-
-        # Make it a non-existent path
-        self.path += 'f'
-
-        @self.client.DataWatch(self.path)
-        def changed(d, stat):
-            data.pop()
-            data.append(d)
-            update.set()
-
-        update.wait(10)
-        eq_(data, [None])
-        update.clear()
-
-        self.client.create(self.path, b'fred')
-        update.wait(10)
-        eq_(data[0], b'fred')
-        update.clear()
-
-    def test_data_watcher_once(self):
-        update = threading.Event()
-        data = [True]
-
-        # Make it a non-existent path
-        self.path += 'f'
-
-        dwatcher = self.client.DataWatch(self.path)
-
-        @dwatcher
-        def changed(d, stat):
-            data.pop()
-            data.append(d)
-            update.set()
-
-        update.wait(10)
-        eq_(data, [None])
-        update.clear()
-
-        @raises(KazooException)
-        def test_it():
-            @dwatcher
-            def func(d, stat):
-                data.pop()
-        test_it()
-
-    def test_data_watcher_with_event(self):
-        # Test that the data watcher gets passed the event, if it
-        # accepts three arguments
-        update = threading.Event()
-        data = [True]
-
-        # Make it a non-existent path
-        self.path += 'f'
-
-        @self.client.DataWatch(self.path)
-        def changed(d, stat, event):
-            data.pop()
-            data.append(event)
-            update.set()
-
-        update.wait(10)
-        eq_(data, [None])
-        update.clear()
-
-        self.client.create(self.path, b'fred')
-        update.wait(10)
-        eq_(data[0].type, EventType.CREATED)
-        update.clear()
-
-    def test_func_style_data_watch(self):
-        update = threading.Event()
-        data = [True]
-
-        # Make it a non-existent path
-        path = self.path + 'f'
-
-        def changed(d, stat):
-            data.pop()
-            data.append(d)
-            update.set()
-        self.client.DataWatch(path, changed)
-
-        update.wait(10)
-        eq_(data, [None])
-        update.clear()
-
-        self.client.create(path, b'fred')
-        update.wait(10)
-        eq_(data[0], b'fred')
-        update.clear()
-
-    def test_datawatch_across_session_expire(self):
-        update = threading.Event()
-        data = [True]
-
-        @self.client.DataWatch(self.path)
-        def changed(d, stat):
-            data.pop()
-            data.append(d)
-            update.set()
-
-        update.wait(10)
-        eq_(data, [b""])
-        update.clear()
-
-        self.expire_session()
-        self.client.retry(self.client.set, self.path, b'fred')
-        update.wait(25)
-        eq_(data[0], b'fred')
-
-    def test_func_stops(self):
-        update = threading.Event()
-        data = [True]
-
-        self.path += "f"
-
-        fail_through = []
-
-        @self.client.DataWatch(self.path)
-        def changed(d, stat):
-            data.pop()
-            data.append(d)
-            update.set()
-            if fail_through:
-                return False
-
-        update.wait(10)
-        eq_(data, [None])
-        update.clear()
-
-        fail_through.append(True)
-        self.client.create(self.path, b'fred')
-        update.wait(10)
-        eq_(data[0], b'fred')
-        update.clear()
-
-        self.client.set(self.path, b'asdfasdf')
-        update.wait(0.2)
-        eq_(data[0], b'fred')
-
-        d, stat = self.client.get(self.path)
-        eq_(d, b'asdfasdf')
-
-    def test_no_such_node(self):
-        args = []
-
-        @self.client.DataWatch("/some/path")
-        def changed(d, stat):
-            args.extend([d, stat])
-
-        eq_(args, [None, None])
-
-    def test_bad_watch_func2(self):
-        counter = 0
-
-        @self.client.DataWatch(self.path)
-        def changed(d, stat):
-            if counter > 0:
-                raise Exception("oops")
-
-        raises(Exception)(changed)
-
-        counter += 1
-        self.client.set(self.path, b'asdfasdf')
-
-    def test_watcher_evaluating_to_false(self):
-        class WeirdWatcher(list):
-            def __call__(self, *args):
-                self.called = True
-        watcher = WeirdWatcher()
-        self.client.DataWatch(self.path, watcher)
-        self.client.set(self.path, b'mwahaha')
-        self.assertTrue(watcher.called)
-
-    def test_watcher_repeat_delete(self):
-        a = []
-        ev = threading.Event()
-
-        self.client.delete(self.path)
-
-        @self.client.DataWatch(self.path)
-        def changed(val, stat):
-            a.append(val)
-            ev.set()
-
-        eq_(a, [None])
-        ev.wait(10)
-        ev.clear()
-        self.client.create(self.path, b'blah')
-        ev.wait(10)
-        eq_(ev.is_set(), True)
-        ev.clear()
-        eq_(a, [None, b'blah'])
-        self.client.delete(self.path)
-        ev.wait(10)
-        eq_(ev.is_set(), True)
-        ev.clear()
-        eq_(a, [None, b'blah', None])
-        self.client.create(self.path, b'blah')
-        ev.wait(10)
-        eq_(ev.is_set(), True)
-        ev.clear()
-        eq_(a, [None, b'blah', None, b'blah'])
-
-    def test_watcher_with_closing(self):
-        a = []
-        ev = threading.Event()
-
-        self.client.delete(self.path)
-
-        @self.client.DataWatch(self.path)
-        def changed(val, stat):
-            a.append(val)
-            ev.set()
-        eq_(a, [None])
-
-        b = False
-        try:
-            self.client.stop()
-        except:
-            b = True
-        eq_(b, False)
-
-
-class KazooChildrenWatcherTests(KazooTestCase):
-    def setUp(self):
-        super(KazooChildrenWatcherTests, self).setUp()
-        self.path = "/" + uuid.uuid4().hex
-        self.client.ensure_path(self.path)
-
-    def test_child_watcher(self):
-        update = threading.Event()
-        all_children = ['fred']
-
-        @self.client.ChildrenWatch(self.path)
-        def changed(children):
-            while all_children:
-                all_children.pop()
-            all_children.extend(children)
-            update.set()
-
-        update.wait(10)
-        eq_(all_children, [])
-        update.clear()
-
-        self.client.create(self.path + '/' + 'smith')
-        update.wait(10)
-        eq_(all_children, ['smith'])
-        update.clear()
-
-        self.client.create(self.path + '/' + 'george')
-        update.wait(10)
-        eq_(sorted(all_children), ['george', 'smith'])
-
-    def test_child_watcher_once(self):
-        update = threading.Event()
-        all_children = ['fred']
-
-        cwatch = self.client.ChildrenWatch(self.path)
-
-        @cwatch
-        def changed(children):
-            while all_children:
-                all_children.pop()
-            all_children.extend(children)
-            update.set()
-
-        update.wait(10)
-        eq_(all_children, [])
-        update.clear()
-
-        @raises(KazooException)
-        def test_it():
-            @cwatch
-            def changed_again(children):
-                update.set()
-        test_it()
-
-    def test_child_watcher_with_event(self):
-        update = threading.Event()
-        events = [True]
-
-        @self.client.ChildrenWatch(self.path, send_event=True)
-        def changed(children, event):
-            events.pop()
-            events.append(event)
-            update.set()
-
-        update.wait(10)
-        eq_(events, [None])
-        update.clear()
-
-        self.client.create(self.path + '/' + 'smith')
-        update.wait(10)
-        eq_(events[0].type, EventType.CHILD)
-        update.clear()
-
-    def test_func_style_child_watcher(self):
-        update = threading.Event()
-        all_children = ['fred']
-
-        def changed(children):
-            while all_children:
-                all_children.pop()
-            all_children.extend(children)
-            update.set()
-
-        self.client.ChildrenWatch(self.path, changed)
-
-        update.wait(10)
-        eq_(all_children, [])
-        update.clear()
-
-        self.client.create(self.path + '/' + 'smith')
-        update.wait(10)
-        eq_(all_children, ['smith'])
-        update.clear()
-
-        self.client.create(self.path + '/' + 'george')
-        update.wait(10)
-        eq_(sorted(all_children), ['george', 'smith'])
-
-    def test_func_stops(self):
-        update = threading.Event()
-        all_children = ['fred']
-
-        fail_through = []
-
-        @self.client.ChildrenWatch(self.path)
-        def changed(children):
-            while all_children:
-                all_children.pop()
-            all_children.extend(children)
-            update.set()
-            if fail_through:
-                return False
-
-        update.wait(10)
-        eq_(all_children, [])
-        update.clear()
-
-        fail_through.append(True)
-        self.client.create(self.path + '/' + 'smith')
-        update.wait(10)
-        eq_(all_children, ['smith'])
-        update.clear()
-
-        self.client.create(self.path + '/' + 'george')
-        update.wait(0.5)
-        eq_(all_children, ['smith'])
-
-    def test_child_watch_session_loss(self):
-        update = threading.Event()
-        all_children = ['fred']
-
-        @self.client.ChildrenWatch(self.path)
-        def changed(children):
-            while all_children:
-                all_children.pop()
-            all_children.extend(children)
-            update.set()
-
-        update.wait(10)
-        eq_(all_children, [])
-        update.clear()
-
-        self.client.create(self.path + '/' + 'smith')
-        update.wait(10)
-        eq_(all_children, ['smith'])
-        update.clear()
-        self.expire_session()
-
-        self.client.retry(self.client.create,
-                          self.path + '/' + 'george')
-        update.wait(20)
-        eq_(sorted(all_children), ['george', 'smith'])
-
-    def test_child_stop_on_session_loss(self):
-        update = threading.Event()
-        all_children = ['fred']
-
-        @self.client.ChildrenWatch(self.path, allow_session_lost=False)
-        def changed(children):
-            while all_children:
-                all_children.pop()
-            all_children.extend(children)
-            update.set()
-
-        update.wait(10)
-        eq_(all_children, [])
-        update.clear()
-
-        self.client.create(self.path + '/' + 'smith')
-        update.wait(10)
-        eq_(all_children, ['smith'])
-        update.clear()
-        self.expire_session()
-
-        self.client.retry(self.client.create,
-                          self.path + '/' + 'george')
-        update.wait(4)
-        eq_(update.is_set(), False)
-        eq_(all_children, ['smith'])
-
-        children = self.client.get_children(self.path)
-        eq_(sorted(children), ['george', 'smith'])
-
-    def test_bad_children_watch_func(self):
-        counter = 0
-
-        @self.client.ChildrenWatch(self.path)
-        def changed(children):
-            if counter > 0:
-                raise Exception("oops")
-
-        raises(Exception)(changed)
-        counter += 1
-        self.client.create(self.path + '/' + 'smith')
-
-
-class KazooPatientChildrenWatcherTests(KazooTestCase):
-    def setUp(self):
-        super(KazooPatientChildrenWatcherTests, self).setUp()
-        self.path = "/" + uuid.uuid4().hex
-
-    def _makeOne(self, *args, **kwargs):
-        from kazoo.recipe.watchers import PatientChildrenWatch
-        return PatientChildrenWatch(*args, **kwargs)
-
-    def test_watch(self):
-        self.client.ensure_path(self.path)
-        watcher = self._makeOne(self.client, self.path, 0.1)
-        result = watcher.start()
-        children, asy = result.get()
-        eq_(len(children), 0)
-        eq_(asy.ready(), False)
-
-        self.client.create(self.path + '/' + 'fred')
-        asy.get(timeout=1)
-        eq_(asy.ready(), True)
-
-    def test_exception(self):
-        from kazoo.exceptions import NoNodeError
-        watcher = self._makeOne(self.client, self.path, 0.1)
-        result = watcher.start()
-
-        @raises(NoNodeError)
-        def testit():
-            result.get()
-        testit()
-
-    def test_watch_iterations(self):
-        self.client.ensure_path(self.path)
-        watcher = self._makeOne(self.client, self.path, 0.5)
-        result = watcher.start()
-        eq_(result.ready(), False)
-
-        time.sleep(0.08)
-        self.client.create(self.path + '/' + uuid.uuid4().hex)
-        eq_(result.ready(), False)
-        time.sleep(0.08)
-        eq_(result.ready(), False)
-        self.client.create(self.path + '/' + uuid.uuid4().hex)
-        time.sleep(0.08)
-        eq_(result.ready(), False)
-
-        children, asy = result.get()
-        eq_(len(children), 2)

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/util.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/util.py 
b/slider-agent/src/main/python/kazoo/tests/util.py
deleted file mode 100644
index 906cbc0..0000000
--- a/slider-agent/src/main/python/kazoo/tests/util.py
+++ /dev/null
@@ -1,127 +0,0 @@
-"""license: Apache License 2.0, see LICENSE for more details."""
-##############################################################################
-#
-# Copyright Zope Foundation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-
-import logging
-import os
-import time
-
-TRAVIS = os.environ.get('TRAVIS', False)
-TRAVIS_ZK_VERSION = TRAVIS and os.environ.get('ZOOKEEPER_VERSION', None)
-if TRAVIS_ZK_VERSION:
-    TRAVIS_ZK_VERSION = tuple([int(n) for n in TRAVIS_ZK_VERSION.split('.')])
-
-
-class Handler(logging.Handler):
-
-    def __init__(self, *names, **kw):
-        logging.Handler.__init__(self)
-        self.names = names
-        self.records = []
-        self.setLoggerLevel(**kw)
-
-    def setLoggerLevel(self, level=1):
-        self.level = level
-        self.oldlevels = {}
-
-    def emit(self, record):
-        self.records.append(record)
-
-    def clear(self):
-        del self.records[:]
-
-    def install(self):
-        for name in self.names:
-            logger = logging.getLogger(name)
-            self.oldlevels[name] = logger.level
-            logger.setLevel(self.level)
-            logger.addHandler(self)
-
-    def uninstall(self):
-        for name in self.names:
-            logger = logging.getLogger(name)
-            logger.setLevel(self.oldlevels[name])
-            logger.removeHandler(self)
-
-    def __str__(self):
-        return '\n'.join(
-            [("%s %s\n  %s" %
-              (record.name, record.levelname,
-               '\n'.join([line
-                          for line in record.getMessage().split('\n')
-                          if line.strip()])
-               )
-              )
-              for record in self.records]
-              )
-
-
-class InstalledHandler(Handler):
-
-    def __init__(self, *names, **kw):
-        Handler.__init__(self, *names, **kw)
-        self.install()
-
-
-class Wait(object):
-
-    class TimeOutWaitingFor(Exception):
-        "A test condition timed out"
-
-    timeout = 9
-    wait = .01
-
-    def __init__(self, timeout=None, wait=None, exception=None,
-                 getnow=(lambda: time.time), getsleep=(lambda: time.sleep)):
-
-        if timeout is not None:
-            self.timeout = timeout
-
-        if wait is not None:
-            self.wait = wait
-
-        if exception is not None:
-            self.TimeOutWaitingFor = exception
-
-        self.getnow = getnow
-        self.getsleep = getsleep
-
-    def __call__(self, func=None, timeout=None, wait=None, message=None):
-        if func is None:
-            return lambda func: self(func, timeout, wait, message)
-
-        if func():
-            return
-
-        now = self.getnow()
-        sleep = self.getsleep()
-        if timeout is None:
-            timeout = self.timeout
-        if wait is None:
-            wait = self.wait
-        wait = float(wait)
-
-        deadline = now() + timeout
-        while 1:
-            sleep(wait)
-            if func():
-                return
-            if now() > deadline:
-                raise self.TimeOutWaitingFor(
-                    message or
-                    getattr(func, '__doc__') or
-                    getattr(func, '__name__')
-                    )
-
-wait = Wait()

Reply via email to