http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_connection.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_connection.py b/slider-agent/src/main/python/kazoo/tests/test_connection.py deleted file mode 100644 index c8c4581..0000000 --- a/slider-agent/src/main/python/kazoo/tests/test_connection.py +++ /dev/null @@ -1,310 +0,0 @@ -"""license: Apache License 2.0, see LICENSE for more details.""" -from collections import namedtuple -import sys -import os -import errno -import threading -import time -import uuid -import struct - -from nose import SkipTest -from nose.tools import eq_ -from nose.tools import raises -import mock - -from kazoo.exceptions import ConnectionLoss -from kazoo.protocol.serialization import ( - Connect, - int_struct, - write_string, -) -from kazoo.protocol.states import KazooState -from kazoo.protocol.connection import _CONNECTION_DROP -from kazoo.testing import KazooTestCase -from kazoo.tests.util import wait -from kazoo.tests.util import TRAVIS_ZK_VERSION - - -class Delete(namedtuple('Delete', 'path version')): - type = 2 - - def serialize(self): - b = bytearray() - b.extend(write_string(self.path)) - b.extend(int_struct.pack(self.version)) - return b - - @classmethod - def deserialize(self, bytes, offset): - raise ValueError("oh my") - - -class TestConnectionHandler(KazooTestCase): - def test_bad_deserialization(self): - async_object = self.client.handler.async_result() - self.client._queue.append( - (Delete(self.client.chroot, -1), async_object)) - self.client._connection._write_sock.send(b'\0') - - @raises(ValueError) - def testit(): - async_object.get() - testit() - - def test_with_bad_sessionid(self): - ev = threading.Event() - - def expired(state): - if state == KazooState.CONNECTED: - ev.set() - - password = os.urandom(16) - client = self._get_client(client_id=(82838284824, password)) - client.add_listener(expired) - client.start() - try: - ev.wait(15) - eq_(ev.is_set(), True) - finally: - client.stop() - - def test_connection_read_timeout(self): - client = self.client - ev = threading.Event() - path = "/" + uuid.uuid4().hex - handler = client.handler - _select = handler.select - _socket = client._connection._socket - - def delayed_select(*args, **kwargs): - result = _select(*args, **kwargs) - if len(args[0]) == 1 and _socket in args[0]: - # for any socket read, simulate a timeout - return [], [], [] - return result - - def back(state): - if state == KazooState.CONNECTED: - ev.set() - - client.add_listener(back) - client.create(path, b"1") - try: - handler.select = delayed_select - self.assertRaises(ConnectionLoss, client.get, path) - finally: - handler.select = _select - # the client reconnects automatically - ev.wait(5) - eq_(ev.is_set(), True) - eq_(client.get(path)[0], b"1") - - def test_connection_write_timeout(self): - client = self.client - ev = threading.Event() - path = "/" + uuid.uuid4().hex - handler = client.handler - _select = handler.select - _socket = client._connection._socket - - def delayed_select(*args, **kwargs): - result = _select(*args, **kwargs) - if _socket in args[1]: - # for any socket write, simulate a timeout - return [], [], [] - return result - - def back(state): - if state == KazooState.CONNECTED: - ev.set() - client.add_listener(back) - - try: - handler.select = delayed_select - self.assertRaises(ConnectionLoss, client.create, path) - finally: - handler.select = _select - # the client reconnects automatically - ev.wait(5) - eq_(ev.is_set(), True) - eq_(client.exists(path), None) - - def test_connection_deserialize_fail(self): - client = self.client - ev = threading.Event() - path = "/" + uuid.uuid4().hex - handler = client.handler - _select = handler.select - _socket = client._connection._socket - - def delayed_select(*args, **kwargs): - result = _select(*args, **kwargs) - if _socket in args[1]: - # for any socket write, simulate a timeout - return [], [], [] - return result - - def back(state): - if state == KazooState.CONNECTED: - ev.set() - client.add_listener(back) - - deserialize_ev = threading.Event() - - def bad_deserialize(_bytes, offset): - deserialize_ev.set() - raise struct.error() - - # force the connection to die but, on reconnect, cause the - # server response to be non-deserializable. ensure that the client - # continues to retry. This partially reproduces a rare bug seen - # in production. - - with mock.patch.object(Connect, 'deserialize') as mock_deserialize: - mock_deserialize.side_effect = bad_deserialize - try: - handler.select = delayed_select - self.assertRaises(ConnectionLoss, client.create, path) - finally: - handler.select = _select - # the client reconnects automatically but the first attempt will - # hit a deserialize failure. wait for that. - deserialize_ev.wait(5) - eq_(deserialize_ev.is_set(), True) - - # this time should succeed - ev.wait(5) - eq_(ev.is_set(), True) - eq_(client.exists(path), None) - - def test_connection_close(self): - self.assertRaises(Exception, self.client.close) - self.client.stop() - self.client.close() - - # should be able to restart - self.client.start() - - def test_connection_sock(self): - client = self.client - read_sock = client._connection._read_sock - write_sock = client._connection._write_sock - - assert read_sock is not None - assert write_sock is not None - - # stop client and socket should not yet be closed - client.stop() - assert read_sock is not None - assert write_sock is not None - - read_sock.getsockname() - write_sock.getsockname() - - # close client, and sockets should be closed - client.close() - - # Todo check socket closing - - # start client back up. should get a new, valid socket - client.start() - read_sock = client._connection._read_sock - write_sock = client._connection._write_sock - - assert read_sock is not None - assert write_sock is not None - read_sock.getsockname() - write_sock.getsockname() - - - def test_dirty_sock(self): - client = self.client - read_sock = client._connection._read_sock - write_sock = client._connection._write_sock - - # add a stray byte to the socket and ensure that doesn't - # blow up client. simulates case where some error leaves - # a byte in the socket which doesn't correspond to the - # request queue. - write_sock.send(b'\0') - - # eventually this byte should disappear from socket - wait(lambda: client.handler.select([read_sock], [], [], 0)[0] == []) - - -class TestConnectionDrop(KazooTestCase): - def test_connection_dropped(self): - ev = threading.Event() - - def back(state): - if state == KazooState.CONNECTED: - ev.set() - - # create a node with a large value and stop the ZK node - path = "/" + uuid.uuid4().hex - self.client.create(path) - self.client.add_listener(back) - result = self.client.set_async(path, b'a' * 1000 * 1024) - self.client._call(_CONNECTION_DROP, None) - - self.assertRaises(ConnectionLoss, result.get) - # we have a working connection to a new node - ev.wait(30) - eq_(ev.is_set(), True) - - -class TestReadOnlyMode(KazooTestCase): - - def setUp(self): - self.setup_zookeeper(read_only=True) - skip = False - if TRAVIS_ZK_VERSION and TRAVIS_ZK_VERSION < (3, 4): - skip = True - elif TRAVIS_ZK_VERSION and TRAVIS_ZK_VERSION >= (3, 4): - skip = False - else: - ver = self.client.server_version() - if ver[1] < 4: - skip = True - if skip: - raise SkipTest("Must use Zookeeper 3.4 or above") - - def tearDown(self): - self.client.stop() - - def test_read_only(self): - from kazoo.exceptions import NotReadOnlyCallError - from kazoo.protocol.states import KeeperState - - client = self.client - states = [] - ev = threading.Event() - - @client.add_listener - def listen(state): - states.append(state) - if client.client_state == KeeperState.CONNECTED_RO: - ev.set() - try: - self.cluster[1].stop() - self.cluster[2].stop() - ev.wait(6) - eq_(ev.is_set(), True) - eq_(client.client_state, KeeperState.CONNECTED_RO) - - # Test read only command - eq_(client.get_children('/'), []) - - # Test error with write command - @raises(NotReadOnlyCallError) - def testit(): - client.create('/fred') - testit() - - # Wait for a ping - time.sleep(15) - finally: - client.remove_listener(listen) - self.cluster[1].run() - self.cluster[2].run()
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_counter.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_counter.py b/slider-agent/src/main/python/kazoo/tests/test_counter.py deleted file mode 100644 index b0361d0..0000000 --- a/slider-agent/src/main/python/kazoo/tests/test_counter.py +++ /dev/null @@ -1,36 +0,0 @@ -"""license: Apache License 2.0, see LICENSE for more details.""" -import uuid - -from nose.tools import eq_ - -from kazoo.testing import KazooTestCase - - -class KazooCounterTests(KazooTestCase): - - def _makeOne(self, **kw): - path = "/" + uuid.uuid4().hex - return self.client.Counter(path, **kw) - - def test_int_counter(self): - counter = self._makeOne() - eq_(counter.value, 0) - counter += 2 - counter + 1 - eq_(counter.value, 3) - counter -= 3 - counter - 1 - eq_(counter.value, -1) - - def test_float_counter(self): - counter = self._makeOne(default=0.0) - eq_(counter.value, 0.0) - counter += 2.1 - eq_(counter.value, 2.1) - counter -= 3.1 - eq_(counter.value, -1.0) - - def test_errors(self): - counter = self._makeOne() - self.assertRaises(TypeError, counter.__add__, 2.1) - self.assertRaises(TypeError, counter.__add__, b"a") http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_election.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_election.py b/slider-agent/src/main/python/kazoo/tests/test_election.py deleted file mode 100644 index a9610bf..0000000 --- a/slider-agent/src/main/python/kazoo/tests/test_election.py +++ /dev/null @@ -1,140 +0,0 @@ -"""license: Apache License 2.0, see LICENSE for more details.""" -import uuid -import sys -import threading - -from nose.tools import eq_ - -from kazoo.testing import KazooTestCase -from kazoo.tests.util import wait - - -class UniqueError(Exception): - """Error raised only by test leader function - """ - - -class KazooElectionTests(KazooTestCase): - def setUp(self): - super(KazooElectionTests, self).setUp() - self.path = "/" + uuid.uuid4().hex - - self.condition = threading.Condition() - - # election contenders set these when elected. The exit event is set by - # the test to make the leader exit. - self.leader_id = None - self.exit_event = None - - # tests set this before the event to make the leader raise an error - self.raise_exception = False - - # set by a worker thread when an unexpected error is hit. - # better way to do this? - self.thread_exc_info = None - - def _spawn_contender(self, contender_id, election): - thread = threading.Thread(target=self._election_thread, - args=(contender_id, election)) - thread.daemon = True - thread.start() - return thread - - def _election_thread(self, contender_id, election): - try: - election.run(self._leader_func, contender_id) - except UniqueError: - if not self.raise_exception: - self.thread_exc_info = sys.exc_info() - except Exception: - self.thread_exc_info = sys.exc_info() - else: - if self.raise_exception: - e = Exception("expected leader func to raise exception") - self.thread_exc_info = (Exception, e, None) - - def _leader_func(self, name): - exit_event = threading.Event() - with self.condition: - self.exit_event = exit_event - self.leader_id = name - self.condition.notify_all() - - exit_event.wait(45) - if self.raise_exception: - raise UniqueError("expected error in the leader function") - - def _check_thread_error(self): - if self.thread_exc_info: - t, o, tb = self.thread_exc_info - raise t(o) - - def test_election(self): - elections = {} - threads = {} - for _ in range(3): - contender = "c" + uuid.uuid4().hex - elections[contender] = self.client.Election(self.path, contender) - threads[contender] = self._spawn_contender(contender, - elections[contender]) - - # wait for a leader to be elected - times = 0 - with self.condition: - while not self.leader_id: - self.condition.wait(5) - times += 1 - if times > 5: - raise Exception("Still not a leader: lid: %s", - self.leader_id) - - election = self.client.Election(self.path) - - # make sure all contenders are in the pool - wait(lambda: len(election.contenders()) == len(elections)) - contenders = election.contenders() - - eq_(set(contenders), set(elections.keys())) - - # first one in list should be leader - first_leader = contenders[0] - eq_(first_leader, self.leader_id) - - # tell second one to cancel election. should never get elected. - elections[contenders[1]].cancel() - - # make leader exit. third contender should be elected. - self.exit_event.set() - with self.condition: - while self.leader_id == first_leader: - self.condition.wait(45) - eq_(self.leader_id, contenders[2]) - self._check_thread_error() - - # make first contender re-enter the race - threads[first_leader].join() - threads[first_leader] = self._spawn_contender(first_leader, - elections[first_leader]) - - # contender set should now be the current leader plus the first leader - wait(lambda: len(election.contenders()) == 2) - contenders = election.contenders() - eq_(set(contenders), set([self.leader_id, first_leader])) - - # make current leader raise an exception. first should be reelected - self.raise_exception = True - self.exit_event.set() - with self.condition: - while self.leader_id != first_leader: - self.condition.wait(45) - eq_(self.leader_id, first_leader) - self._check_thread_error() - - self.exit_event.set() - for thread in threads.values(): - thread.join() - self._check_thread_error() - - def test_bad_func(self): - election = self.client.Election(self.path) - self.assertRaises(ValueError, election.run, "not a callable") http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_exceptions.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_exceptions.py b/slider-agent/src/main/python/kazoo/tests/test_exceptions.py deleted file mode 100644 index e469089..0000000 --- a/slider-agent/src/main/python/kazoo/tests/test_exceptions.py +++ /dev/null @@ -1,23 +0,0 @@ -"""license: Apache License 2.0, see LICENSE for more details.""" -from unittest import TestCase - - -class ExceptionsTestCase(TestCase): - - def _get(self): - from kazoo import exceptions - return exceptions - - def test_backwards_alias(self): - module = self._get() - self.assertTrue(getattr(module, 'NoNodeException')) - self.assertTrue(module.NoNodeException, module.NoNodeError) - - def test_exceptions_code(self): - module = self._get() - exc_8 = module.EXCEPTIONS[-8] - self.assertTrue(isinstance(exc_8(), module.BadArgumentsError)) - - def test_invalid_code(self): - module = self._get() - self.assertRaises(RuntimeError, module.EXCEPTIONS.__getitem__, 666) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_gevent_handler.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_gevent_handler.py b/slider-agent/src/main/python/kazoo/tests/test_gevent_handler.py deleted file mode 100644 index 71d9727..0000000 --- a/slider-agent/src/main/python/kazoo/tests/test_gevent_handler.py +++ /dev/null @@ -1,161 +0,0 @@ -"""license: Apache License 2.0, see LICENSE for more details.""" -import unittest - -from nose import SkipTest -from nose.tools import eq_ -from nose.tools import raises - -from kazoo.client import KazooClient -from kazoo.exceptions import NoNodeError -from kazoo.protocol.states import Callback -from kazoo.testing import KazooTestCase -from kazoo.tests import test_client - - -class TestGeventHandler(unittest.TestCase): - - def setUp(self): - try: - import gevent - except ImportError: - raise SkipTest('gevent not available.') - - def _makeOne(self, *args): - from kazoo.handlers.gevent import SequentialGeventHandler - return SequentialGeventHandler(*args) - - def _getAsync(self, *args): - from kazoo.handlers.gevent import AsyncResult - return AsyncResult - - def _getEvent(self): - from gevent.event import Event - return Event - - def test_proper_threading(self): - h = self._makeOne() - h.start() - assert isinstance(h.event_object(), self._getEvent()) - - def test_matching_async(self): - h = self._makeOne() - h.start() - async = self._getAsync() - assert isinstance(h.async_result(), async) - - def test_exception_raising(self): - h = self._makeOne() - - @raises(h.timeout_exception) - def testit(): - raise h.timeout_exception("This is a timeout") - testit() - - def test_exception_in_queue(self): - h = self._makeOne() - h.start() - ev = self._getEvent()() - - def func(): - ev.set() - raise ValueError('bang') - - call1 = Callback('completion', func, ()) - h.dispatch_callback(call1) - ev.wait() - - def test_queue_empty_exception(self): - from gevent.queue import Empty - h = self._makeOne() - h.start() - ev = self._getEvent()() - - def func(): - ev.set() - raise Empty() - - call1 = Callback('completion', func, ()) - h.dispatch_callback(call1) - ev.wait() - - -class TestBasicGeventClient(KazooTestCase): - - def setUp(self): - try: - import gevent - except ImportError: - raise SkipTest('gevent not available.') - KazooTestCase.setUp(self) - - def _makeOne(self, *args): - from kazoo.handlers.gevent import SequentialGeventHandler - return SequentialGeventHandler(*args) - - def _getEvent(self): - from gevent.event import Event - return Event - - def test_start(self): - client = self._get_client(handler=self._makeOne()) - client.start() - self.assertEqual(client.state, 'CONNECTED') - client.stop() - - def test_start_stop_double(self): - client = self._get_client(handler=self._makeOne()) - client.start() - self.assertEqual(client.state, 'CONNECTED') - client.handler.start() - client.handler.stop() - client.stop() - - def test_basic_commands(self): - client = self._get_client(handler=self._makeOne()) - client.start() - self.assertEqual(client.state, 'CONNECTED') - client.create('/anode', 'fred') - eq_(client.get('/anode')[0], 'fred') - eq_(client.delete('/anode'), True) - eq_(client.exists('/anode'), None) - client.stop() - - def test_failures(self): - client = self._get_client(handler=self._makeOne()) - client.start() - self.assertRaises(NoNodeError, client.get, '/none') - client.stop() - - def test_data_watcher(self): - client = self._get_client(handler=self._makeOne()) - client.start() - client.ensure_path('/some/node') - ev = self._getEvent()() - - @client.DataWatch('/some/node') - def changed(d, stat): - ev.set() - - ev.wait() - ev.clear() - client.set('/some/node', 'newvalue') - ev.wait() - client.stop() - - -class TestGeventClient(test_client.TestClient): - - def setUp(self): - try: - import gevent - except ImportError: - raise SkipTest('gevent not available.') - KazooTestCase.setUp(self) - - def _makeOne(self, *args): - from kazoo.handlers.gevent import SequentialGeventHandler - return SequentialGeventHandler(*args) - - def _get_client(self, **kwargs): - kwargs["handler"] = self._makeOne() - return KazooClient(self.hosts, **kwargs) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_lock.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_lock.py b/slider-agent/src/main/python/kazoo/tests/test_lock.py deleted file mode 100644 index 6dd15b0..0000000 --- a/slider-agent/src/main/python/kazoo/tests/test_lock.py +++ /dev/null @@ -1,518 +0,0 @@ -"""license: Apache License 2.0, see LICENSE for more details.""" -import uuid -import threading - -from nose.tools import eq_, ok_ - -from kazoo.exceptions import CancelledError -from kazoo.exceptions import LockTimeout -from kazoo.testing import KazooTestCase -from kazoo.tests.util import wait - - -class KazooLockTests(KazooTestCase): - def setUp(self): - super(KazooLockTests, self).setUp() - self.lockpath = "/" + uuid.uuid4().hex - - self.condition = threading.Condition() - self.released = threading.Event() - self.active_thread = None - self.cancelled_threads = [] - - def _thread_lock_acquire_til_event(self, name, lock, event): - try: - with lock: - with self.condition: - eq_(self.active_thread, None) - self.active_thread = name - self.condition.notify_all() - - event.wait() - - with self.condition: - eq_(self.active_thread, name) - self.active_thread = None - self.condition.notify_all() - self.released.set() - except CancelledError: - with self.condition: - self.cancelled_threads.append(name) - self.condition.notify_all() - - def test_lock_one(self): - lock_name = uuid.uuid4().hex - lock = self.client.Lock(self.lockpath, lock_name) - event = threading.Event() - - thread = threading.Thread(target=self._thread_lock_acquire_til_event, - args=(lock_name, lock, event)) - thread.start() - - lock2_name = uuid.uuid4().hex - anotherlock = self.client.Lock(self.lockpath, lock2_name) - - # wait for any contender to show up on the lock - wait(anotherlock.contenders) - eq_(anotherlock.contenders(), [lock_name]) - - with self.condition: - while self.active_thread != lock_name: - self.condition.wait() - - # release the lock - event.set() - - with self.condition: - while self.active_thread: - self.condition.wait() - self.released.wait() - thread.join() - - def test_lock(self): - threads = [] - names = ["contender" + str(i) for i in range(5)] - - contender_bits = {} - - for name in names: - e = threading.Event() - - l = self.client.Lock(self.lockpath, name) - t = threading.Thread(target=self._thread_lock_acquire_til_event, - args=(name, l, e)) - contender_bits[name] = (t, e) - threads.append(t) - - # acquire the lock ourselves first to make the others line up - lock = self.client.Lock(self.lockpath, "test") - lock.acquire() - - for t in threads: - t.start() - - # wait for everyone to line up on the lock - wait(lambda: len(lock.contenders()) == 6) - contenders = lock.contenders() - - eq_(contenders[0], "test") - contenders = contenders[1:] - remaining = list(contenders) - - # release the lock and contenders should claim it in order - lock.release() - - for contender in contenders: - thread, event = contender_bits[contender] - - with self.condition: - while not self.active_thread: - self.condition.wait() - eq_(self.active_thread, contender) - - eq_(lock.contenders(), remaining) - remaining = remaining[1:] - - event.set() - - with self.condition: - while self.active_thread: - self.condition.wait() - for thread in threads: - thread.join() - - def test_lock_reconnect(self): - event = threading.Event() - other_lock = self.client.Lock(self.lockpath, 'contender') - thread = threading.Thread(target=self._thread_lock_acquire_til_event, - args=('contender', other_lock, event)) - - # acquire the lock ourselves first to make the contender line up - lock = self.client.Lock(self.lockpath, "test") - lock.acquire() - - thread.start() - # wait for the contender to line up on the lock - wait(lambda: len(lock.contenders()) == 2) - eq_(lock.contenders(), ['test', 'contender']) - - self.expire_session() - - lock.release() - - with self.condition: - while not self.active_thread: - self.condition.wait() - eq_(self.active_thread, 'contender') - - event.set() - thread.join() - - def test_lock_non_blocking(self): - lock_name = uuid.uuid4().hex - lock = self.client.Lock(self.lockpath, lock_name) - event = threading.Event() - - thread = threading.Thread(target=self._thread_lock_acquire_til_event, - args=(lock_name, lock, event)) - thread.start() - - lock1 = self.client.Lock(self.lockpath, lock_name) - - # wait for the thread to acquire the lock - with self.condition: - if not self.active_thread: - self.condition.wait(5) - - ok_(not lock1.acquire(blocking=False)) - eq_(lock.contenders(), [lock_name]) # just one - itself - - event.set() - thread.join() - - def test_lock_fail_first_call(self): - event1 = threading.Event() - lock1 = self.client.Lock(self.lockpath, "one") - thread1 = threading.Thread(target=self._thread_lock_acquire_til_event, - args=("one", lock1, event1)) - thread1.start() - - # wait for this thread to acquire the lock - with self.condition: - if not self.active_thread: - self.condition.wait(5) - eq_(self.active_thread, "one") - eq_(lock1.contenders(), ["one"]) - event1.set() - thread1.join() - - def test_lock_cancel(self): - event1 = threading.Event() - lock1 = self.client.Lock(self.lockpath, "one") - thread1 = threading.Thread(target=self._thread_lock_acquire_til_event, - args=("one", lock1, event1)) - thread1.start() - - # wait for this thread to acquire the lock - with self.condition: - if not self.active_thread: - self.condition.wait(5) - eq_(self.active_thread, "one") - - client2 = self._get_client() - client2.start() - event2 = threading.Event() - lock2 = client2.Lock(self.lockpath, "two") - thread2 = threading.Thread(target=self._thread_lock_acquire_til_event, - args=("two", lock2, event2)) - thread2.start() - - # this one should block in acquire. check that it is a contender - wait(lambda: len(lock2.contenders()) > 1) - eq_(lock2.contenders(), ["one", "two"]) - - lock2.cancel() - with self.condition: - if not "two" in self.cancelled_threads: - self.condition.wait() - assert "two" in self.cancelled_threads - - eq_(lock2.contenders(), ["one"]) - - thread2.join() - event1.set() - thread1.join() - client2.stop() - - def test_lock_double_calls(self): - lock1 = self.client.Lock(self.lockpath, "one") - lock1.acquire() - lock1.acquire() - lock1.release() - lock1.release() - - def test_lock_reacquire(self): - lock = self.client.Lock(self.lockpath, "one") - lock.acquire() - lock.release() - lock.acquire() - lock.release() - - def test_lock_timeout(self): - timeout = 3 - e = threading.Event() - started = threading.Event() - - # In the background thread, acquire the lock and wait thrice the time - # that the main thread is going to wait to acquire the lock. - lock1 = self.client.Lock(self.lockpath, "one") - - def _thread(lock, event, timeout): - with lock: - started.set() - event.wait(timeout) - if not event.isSet(): - # Eventually fail to avoid hanging the tests - self.fail("lock2 never timed out") - - t = threading.Thread(target=_thread, args=(lock1, e, timeout * 3)) - t.start() - - # Start the main thread's kazoo client and try to acquire the lock - # but give up after `timeout` seconds - client2 = self._get_client() - client2.start() - started.wait(5) - self.assertTrue(started.isSet()) - lock2 = client2.Lock(self.lockpath, "two") - try: - lock2.acquire(timeout=timeout) - except LockTimeout: - # A timeout is the behavior we're expecting, since the background - # thread should still be holding onto the lock - pass - else: - self.fail("Main thread unexpectedly acquired the lock") - finally: - # Cleanup - e.set() - t.join() - client2.stop() - - -class TestSemaphore(KazooTestCase): - def setUp(self): - super(TestSemaphore, self).setUp() - self.lockpath = "/" + uuid.uuid4().hex - - self.condition = threading.Condition() - self.released = threading.Event() - self.active_thread = None - self.cancelled_threads = [] - - def test_basic(self): - sem1 = self.client.Semaphore(self.lockpath) - sem1.acquire() - sem1.release() - - def test_lock_one(self): - sem1 = self.client.Semaphore(self.lockpath, max_leases=1) - sem2 = self.client.Semaphore(self.lockpath, max_leases=1) - started = threading.Event() - event = threading.Event() - - sem1.acquire() - - def sema_one(): - started.set() - with sem2: - event.set() - - thread = threading.Thread(target=sema_one, args=()) - thread.start() - started.wait(10) - - self.assertFalse(event.is_set()) - - sem1.release() - event.wait(10) - self.assert_(event.is_set()) - thread.join() - - def test_non_blocking(self): - sem1 = self.client.Semaphore( - self.lockpath, identifier='sem1', max_leases=2) - sem2 = self.client.Semaphore( - self.lockpath, identifier='sem2', max_leases=2) - sem3 = self.client.Semaphore( - self.lockpath, identifier='sem3', max_leases=2) - - sem1.acquire() - sem2.acquire() - ok_(not sem3.acquire(blocking=False)) - eq_(set(sem1.lease_holders()), set(['sem1', 'sem2'])) - sem2.release() - # the next line isn't required, but avoids timing issues in tests - sem3.acquire() - eq_(set(sem1.lease_holders()), set(['sem1', 'sem3'])) - sem1.release() - sem3.release() - - def test_non_blocking_release(self): - sem1 = self.client.Semaphore( - self.lockpath, identifier='sem1', max_leases=1) - sem2 = self.client.Semaphore( - self.lockpath, identifier='sem2', max_leases=1) - sem1.acquire() - sem2.acquire(blocking=False) - - # make sure there's no shutdown / cleanup error - sem1.release() - sem2.release() - - def test_holders(self): - started = threading.Event() - event = threading.Event() - - def sema_one(): - with self.client.Semaphore(self.lockpath, 'fred', max_leases=1): - started.set() - event.wait() - - thread = threading.Thread(target=sema_one, args=()) - thread.start() - started.wait() - sem1 = self.client.Semaphore(self.lockpath) - holders = sem1.lease_holders() - eq_(holders, ['fred']) - event.set() - thread.join() - - def test_semaphore_cancel(self): - sem1 = self.client.Semaphore(self.lockpath, 'fred', max_leases=1) - sem2 = self.client.Semaphore(self.lockpath, 'george', max_leases=1) - sem1.acquire() - started = threading.Event() - event = threading.Event() - - def sema_one(): - started.set() - try: - with sem2: - started.set() - except CancelledError: - event.set() - - thread = threading.Thread(target=sema_one, args=()) - thread.start() - started.wait() - eq_(sem1.lease_holders(), ['fred']) - eq_(event.is_set(), False) - sem2.cancel() - event.wait() - eq_(event.is_set(), True) - thread.join() - - def test_multiple_acquire_and_release(self): - sem1 = self.client.Semaphore(self.lockpath, 'fred', max_leases=1) - sem1.acquire() - sem1.acquire() - - eq_(True, sem1.release()) - eq_(False, sem1.release()) - - def test_handle_session_loss(self): - expire_semaphore = self.client.Semaphore(self.lockpath, 'fred', - max_leases=1) - - client = self._get_client() - client.start() - lh_semaphore = client.Semaphore(self.lockpath, 'george', max_leases=1) - lh_semaphore.acquire() - - started = threading.Event() - event = threading.Event() - event2 = threading.Event() - - def sema_one(): - started.set() - with expire_semaphore: - event.set() - event2.wait() - - thread = threading.Thread(target=sema_one, args=()) - thread.start() - - started.wait() - eq_(lh_semaphore.lease_holders(), ['george']) - - # Fired in a separate thread to make sure we can see the effect - expired = threading.Event() - - def expire(): - self.expire_session() - expired.set() - - thread = threading.Thread(target=expire, args=()) - thread.start() - expire_semaphore.wake_event.wait() - expired.wait() - - lh_semaphore.release() - client.stop() - - event.wait(5) - eq_(expire_semaphore.lease_holders(), ['fred']) - event2.set() - thread.join() - - def test_inconsistent_max_leases(self): - sem1 = self.client.Semaphore(self.lockpath, max_leases=1) - sem2 = self.client.Semaphore(self.lockpath, max_leases=2) - - sem1.acquire() - self.assertRaises(ValueError, sem2.acquire) - - def test_inconsistent_max_leases_other_data(self): - sem1 = self.client.Semaphore(self.lockpath, max_leases=1) - sem2 = self.client.Semaphore(self.lockpath, max_leases=2) - - self.client.ensure_path(self.lockpath) - self.client.set(self.lockpath, b'a$') - - sem1.acquire() - # sem2 thinks it's ok to have two lease holders - ok_(sem2.acquire(blocking=False)) - - def test_reacquire(self): - lock = self.client.Semaphore(self.lockpath) - lock.acquire() - lock.release() - lock.acquire() - lock.release() - - def test_acquire_after_cancelled(self): - lock = self.client.Semaphore(self.lockpath) - self.assertTrue(lock.acquire()) - self.assertTrue(lock.release()) - lock.cancel() - self.assertTrue(lock.cancelled) - self.assertTrue(lock.acquire()) - - def test_timeout(self): - timeout = 3 - e = threading.Event() - started = threading.Event() - - # In the background thread, acquire the lock and wait thrice the time - # that the main thread is going to wait to acquire the lock. - sem1 = self.client.Semaphore(self.lockpath, "one") - - def _thread(sem, event, timeout): - with sem: - started.set() - event.wait(timeout) - if not event.isSet(): - # Eventually fail to avoid hanging the tests - self.fail("sem2 never timed out") - - t = threading.Thread(target=_thread, args=(sem1, e, timeout * 3)) - t.start() - - # Start the main thread's kazoo client and try to acquire the lock - # but give up after `timeout` seconds - client2 = self._get_client() - client2.start() - started.wait(5) - self.assertTrue(started.isSet()) - sem2 = client2.Semaphore(self.lockpath, "two") - try: - sem2.acquire(timeout=timeout) - except LockTimeout: - # A timeout is the behavior we're expecting, since the background - # thread will still be holding onto the lock - e.set() - finally: - # Cleanup - t.join() - client2.stop() http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_partitioner.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_partitioner.py b/slider-agent/src/main/python/kazoo/tests/test_partitioner.py deleted file mode 100644 index 1a4f205..0000000 --- a/slider-agent/src/main/python/kazoo/tests/test_partitioner.py +++ /dev/null @@ -1,93 +0,0 @@ -"""license: Apache License 2.0, see LICENSE for more details.""" -import uuid -import time - -from nose.tools import eq_ - -from kazoo.testing import KazooTestCase -from kazoo.recipe.partitioner import PartitionState - - -class KazooPartitionerTests(KazooTestCase): - def setUp(self): - super(KazooPartitionerTests, self).setUp() - self.path = "/" + uuid.uuid4().hex - - def test_party_of_one(self): - partitioner = self.client.SetPartitioner( - self.path, set=(1, 2, 3), time_boundary=0.2) - partitioner.wait_for_acquire(14) - eq_(partitioner.state, PartitionState.ACQUIRED) - eq_(list(partitioner), [1, 2, 3]) - partitioner.finish() - - def test_party_of_two(self): - partitioners = [self.client.SetPartitioner(self.path, (1, 2), - identifier="p%s" % i, time_boundary=0.2) - for i in range(2)] - - partitioners[0].wait_for_acquire(14) - partitioners[1].wait_for_acquire(14) - eq_(list(partitioners[0]), [1]) - eq_(list(partitioners[1]), [2]) - partitioners[0].finish() - time.sleep(0.1) - eq_(partitioners[1].release, True) - partitioners[1].finish() - - def test_party_expansion(self): - partitioners = [self.client.SetPartitioner(self.path, (1, 2, 3), - identifier="p%s" % i, time_boundary=0.2) - for i in range(2)] - - partitioners[0].wait_for_acquire(14) - partitioners[1].wait_for_acquire(14) - eq_(partitioners[0].state, PartitionState.ACQUIRED) - eq_(partitioners[1].state, PartitionState.ACQUIRED) - - eq_(list(partitioners[0]), [1, 3]) - eq_(list(partitioners[1]), [2]) - - # Add another partition, wait till they settle - partitioners.append(self.client.SetPartitioner(self.path, (1, 2, 3), - identifier="p2", time_boundary=0.2)) - time.sleep(0.1) - eq_(partitioners[0].release, True) - for p in partitioners[:-1]: - p.release_set() - - for p in partitioners: - p.wait_for_acquire(14) - - eq_(list(partitioners[0]), [1]) - eq_(list(partitioners[1]), [2]) - eq_(list(partitioners[2]), [3]) - - for p in partitioners: - p.finish() - - def test_more_members_than_set_items(self): - partitioners = [self.client.SetPartitioner(self.path, (1,), - identifier="p%s" % i, time_boundary=0.2) - for i in range(2)] - - partitioners[0].wait_for_acquire(14) - partitioners[1].wait_for_acquire(14) - eq_(partitioners[0].state, PartitionState.ACQUIRED) - eq_(partitioners[1].state, PartitionState.ACQUIRED) - - eq_(list(partitioners[0]), [1]) - eq_(list(partitioners[1]), []) - - for p in partitioners: - p.finish() - - def test_party_session_failure(self): - partitioner = self.client.SetPartitioner( - self.path, set=(1, 2, 3), time_boundary=0.2) - partitioner.wait_for_acquire(14) - eq_(partitioner.state, PartitionState.ACQUIRED) - # simulate session failure - partitioner._fail_out() - partitioner.release_set() - self.assertTrue(partitioner.failed) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_party.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_party.py b/slider-agent/src/main/python/kazoo/tests/test_party.py deleted file mode 100644 index 61400ae..0000000 --- a/slider-agent/src/main/python/kazoo/tests/test_party.py +++ /dev/null @@ -1,85 +0,0 @@ -"""license: Apache License 2.0, see LICENSE for more details.""" -import uuid - -from nose.tools import eq_ - -from kazoo.testing import KazooTestCase - - -class KazooPartyTests(KazooTestCase): - def setUp(self): - super(KazooPartyTests, self).setUp() - self.path = "/" + uuid.uuid4().hex - - def test_party(self): - parties = [self.client.Party(self.path, "p%s" % i) - for i in range(5)] - - one_party = parties[0] - - eq_(list(one_party), []) - eq_(len(one_party), 0) - - participants = set() - for party in parties: - party.join() - participants.add(party.data.decode('utf-8')) - - eq_(set(party), participants) - eq_(len(party), len(participants)) - - for party in parties: - party.leave() - participants.remove(party.data.decode('utf-8')) - - eq_(set(party), participants) - eq_(len(party), len(participants)) - - def test_party_reuse_node(self): - party = self.client.Party(self.path, "p1") - self.client.ensure_path(self.path) - self.client.create(party.create_path) - party.join() - self.assertTrue(party.participating) - party.leave() - self.assertFalse(party.participating) - self.assertEqual(len(party), 0) - - def test_party_vanishing_node(self): - party = self.client.Party(self.path, "p1") - party.join() - self.assertTrue(party.participating) - self.client.delete(party.create_path) - party.leave() - self.assertFalse(party.participating) - self.assertEqual(len(party), 0) - - -class KazooShallowPartyTests(KazooTestCase): - def setUp(self): - super(KazooShallowPartyTests, self).setUp() - self.path = "/" + uuid.uuid4().hex - - def test_party(self): - parties = [self.client.ShallowParty(self.path, "p%s" % i) - for i in range(5)] - - one_party = parties[0] - - eq_(list(one_party), []) - eq_(len(one_party), 0) - - participants = set() - for party in parties: - party.join() - participants.add(party.data.decode('utf-8')) - - eq_(set(party), participants) - eq_(len(party), len(participants)) - - for party in parties: - party.leave() - participants.remove(party.data.decode('utf-8')) - - eq_(set(party), participants) - eq_(len(party), len(participants)) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_paths.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_paths.py b/slider-agent/src/main/python/kazoo/tests/test_paths.py deleted file mode 100644 index c9064bb..0000000 --- a/slider-agent/src/main/python/kazoo/tests/test_paths.py +++ /dev/null @@ -1,99 +0,0 @@ -"""license: Apache License 2.0, see LICENSE for more details.""" -import sys -from unittest import TestCase - -from kazoo.protocol import paths - - -if sys.version_info > (3, ): # pragma: nocover - def u(s): - return s -else: # pragma: nocover - def u(s): - return unicode(s, "unicode_escape") - - -class NormPathTestCase(TestCase): - - def test_normpath(self): - self.assertEqual(paths.normpath('/a/b'), '/a/b') - - def test_normpath_empty(self): - self.assertEqual(paths.normpath(''), '') - - def test_normpath_unicode(self): - self.assertEqual(paths.normpath(u('/\xe4/b')), u('/\xe4/b')) - - def test_normpath_dots(self): - self.assertEqual(paths.normpath('/a./b../c'), '/a./b../c') - - def test_normpath_slash(self): - self.assertEqual(paths.normpath('/'), '/') - - def test_normpath_multiple_slashes(self): - self.assertEqual(paths.normpath('//'), '/') - self.assertEqual(paths.normpath('//a/b'), '/a/b') - self.assertEqual(paths.normpath('/a//b//'), '/a/b') - self.assertEqual(paths.normpath('//a////b///c/'), '/a/b/c') - - def test_normpath_relative(self): - self.assertRaises(ValueError, paths.normpath, './a/b') - self.assertRaises(ValueError, paths.normpath, '/a/../b') - - -class JoinTestCase(TestCase): - - def test_join(self): - self.assertEqual(paths.join('/a'), '/a') - self.assertEqual(paths.join('/a', 'b/'), '/a/b/') - self.assertEqual(paths.join('/a', 'b', 'c'), '/a/b/c') - - def test_join_empty(self): - self.assertEqual(paths.join(''), '') - self.assertEqual(paths.join('', 'a', 'b'), 'a/b') - self.assertEqual(paths.join('/a', '', 'b/', 'c'), '/a/b/c') - - def test_join_absolute(self): - self.assertEqual(paths.join('/a/b', '/c'), '/c') - - -class IsAbsTestCase(TestCase): - - def test_isabs(self): - self.assertTrue(paths.isabs('/')) - self.assertTrue(paths.isabs('/a')) - self.assertTrue(paths.isabs('/a//b/c')) - self.assertTrue(paths.isabs('//a/b')) - - def test_isabs_false(self): - self.assertFalse(paths.isabs('')) - self.assertFalse(paths.isabs('a/')) - self.assertFalse(paths.isabs('a/../')) - - -class BaseNameTestCase(TestCase): - - def test_basename(self): - self.assertEquals(paths.basename(''), '') - self.assertEquals(paths.basename('/'), '') - self.assertEquals(paths.basename('//a'), 'a') - self.assertEquals(paths.basename('//a/'), '') - self.assertEquals(paths.basename('/a/b.//c..'), 'c..') - - -class PrefixRootTestCase(TestCase): - - def test_prefix_root(self): - self.assertEquals(paths._prefix_root('/a/', 'b/c'), '/a/b/c') - self.assertEquals(paths._prefix_root('/a/b', 'c/d'), '/a/b/c/d') - self.assertEquals(paths._prefix_root('/a', '/b/c'), '/a/b/c') - self.assertEquals(paths._prefix_root('/a', '//b/c.'), '/a/b/c.') - - -class NormRootTestCase(TestCase): - - def test_norm_root(self): - self.assertEquals(paths._norm_root(''), '/') - self.assertEquals(paths._norm_root('/'), '/') - self.assertEquals(paths._norm_root('//a'), '/a') - self.assertEquals(paths._norm_root('//a./b'), '/a./b') http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_queue.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_queue.py b/slider-agent/src/main/python/kazoo/tests/test_queue.py deleted file mode 100644 index 4c13ca9..0000000 --- a/slider-agent/src/main/python/kazoo/tests/test_queue.py +++ /dev/null @@ -1,180 +0,0 @@ -"""license: Apache License 2.0, see LICENSE for more details.""" -import uuid - -from nose import SkipTest -from nose.tools import eq_, ok_ - -from kazoo.testing import KazooTestCase -from kazoo.tests.util import TRAVIS_ZK_VERSION - - -class KazooQueueTests(KazooTestCase): - - def _makeOne(self): - path = "/" + uuid.uuid4().hex - return self.client.Queue(path) - - def test_queue_validation(self): - queue = self._makeOne() - self.assertRaises(TypeError, queue.put, {}) - self.assertRaises(TypeError, queue.put, b"one", b"100") - self.assertRaises(TypeError, queue.put, b"one", 10.0) - self.assertRaises(ValueError, queue.put, b"one", -100) - self.assertRaises(ValueError, queue.put, b"one", 100000) - - def test_empty_queue(self): - queue = self._makeOne() - eq_(len(queue), 0) - self.assertTrue(queue.get() is None) - eq_(len(queue), 0) - - def test_queue(self): - queue = self._makeOne() - queue.put(b"one") - queue.put(b"two") - queue.put(b"three") - eq_(len(queue), 3) - - eq_(queue.get(), b"one") - eq_(queue.get(), b"two") - eq_(queue.get(), b"three") - eq_(len(queue), 0) - - def test_priority(self): - queue = self._makeOne() - queue.put(b"four", priority=101) - queue.put(b"one", priority=0) - queue.put(b"two", priority=0) - queue.put(b"three", priority=10) - - eq_(queue.get(), b"one") - eq_(queue.get(), b"two") - eq_(queue.get(), b"three") - eq_(queue.get(), b"four") - - -class KazooLockingQueueTests(KazooTestCase): - - def setUp(self): - KazooTestCase.setUp(self) - skip = False - if TRAVIS_ZK_VERSION and TRAVIS_ZK_VERSION < (3, 4): - skip = True - elif TRAVIS_ZK_VERSION and TRAVIS_ZK_VERSION >= (3, 4): - skip = False - else: - ver = self.client.server_version() - if ver[1] < 4: - skip = True - if skip: - raise SkipTest("Must use Zookeeper 3.4 or above") - - def _makeOne(self): - path = "/" + uuid.uuid4().hex - return self.client.LockingQueue(path) - - def test_queue_validation(self): - queue = self._makeOne() - self.assertRaises(TypeError, queue.put, {}) - self.assertRaises(TypeError, queue.put, b"one", b"100") - self.assertRaises(TypeError, queue.put, b"one", 10.0) - self.assertRaises(ValueError, queue.put, b"one", -100) - self.assertRaises(ValueError, queue.put, b"one", 100000) - self.assertRaises(TypeError, queue.put_all, {}) - self.assertRaises(TypeError, queue.put_all, [{}]) - self.assertRaises(TypeError, queue.put_all, [b"one"], b"100") - self.assertRaises(TypeError, queue.put_all, [b"one"], 10.0) - self.assertRaises(ValueError, queue.put_all, [b"one"], -100) - self.assertRaises(ValueError, queue.put_all, [b"one"], 100000) - - def test_empty_queue(self): - queue = self._makeOne() - eq_(len(queue), 0) - self.assertTrue(queue.get(0) is None) - eq_(len(queue), 0) - - def test_queue(self): - queue = self._makeOne() - queue.put(b"one") - queue.put_all([b"two", b"three"]) - eq_(len(queue), 3) - - ok_(not queue.consume()) - ok_(not queue.holds_lock()) - eq_(queue.get(1), b"one") - ok_(queue.holds_lock()) - # Without consuming, should return the same element - eq_(queue.get(1), b"one") - ok_(queue.consume()) - ok_(not queue.holds_lock()) - eq_(queue.get(1), b"two") - ok_(queue.holds_lock()) - ok_(queue.consume()) - ok_(not queue.holds_lock()) - eq_(queue.get(1), b"three") - ok_(queue.holds_lock()) - ok_(queue.consume()) - ok_(not queue.holds_lock()) - ok_(not queue.consume()) - eq_(len(queue), 0) - - def test_consume(self): - queue = self._makeOne() - - queue.put(b"one") - ok_(not queue.consume()) - queue.get(.1) - ok_(queue.consume()) - ok_(not queue.consume()) - - def test_holds_lock(self): - queue = self._makeOne() - - ok_(not queue.holds_lock()) - queue.put(b"one") - queue.get(.1) - ok_(queue.holds_lock()) - queue.consume() - ok_(not queue.holds_lock()) - - def test_priority(self): - queue = self._makeOne() - queue.put(b"four", priority=101) - queue.put(b"one", priority=0) - queue.put(b"two", priority=0) - queue.put(b"three", priority=10) - - eq_(queue.get(1), b"one") - ok_(queue.consume()) - eq_(queue.get(1), b"two") - ok_(queue.consume()) - eq_(queue.get(1), b"three") - ok_(queue.consume()) - eq_(queue.get(1), b"four") - ok_(queue.consume()) - - def test_concurrent_execution(self): - queue = self._makeOne() - value1 = [] - value2 = [] - value3 = [] - event1 = self.client.handler.event_object() - event2 = self.client.handler.event_object() - event3 = self.client.handler.event_object() - - def get_concurrently(value, event): - q = self.client.LockingQueue(queue.path) - value.append(q.get(.1)) - event.set() - - self.client.handler.spawn(get_concurrently, value1, event1) - self.client.handler.spawn(get_concurrently, value2, event2) - self.client.handler.spawn(get_concurrently, value3, event3) - queue.put(b"one") - event1.wait(.2) - event2.wait(.2) - event3.wait(.2) - - result = value1 + value2 + value3 - eq_(result.count(b"one"), 1) - eq_(result.count(None), 2) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_retry.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_retry.py b/slider-agent/src/main/python/kazoo/tests/test_retry.py deleted file mode 100644 index 84c8d41..0000000 --- a/slider-agent/src/main/python/kazoo/tests/test_retry.py +++ /dev/null @@ -1,78 +0,0 @@ -"""license: Apache License 2.0, see LICENSE for more details.""" -import unittest - -from nose.tools import eq_ - - -class TestRetrySleeper(unittest.TestCase): - - def _pass(self): - pass - - def _fail(self, times=1): - from kazoo.retry import ForceRetryError - scope = dict(times=0) - - def inner(): - if scope['times'] >= times: - pass - else: - scope['times'] += 1 - raise ForceRetryError('Failed!') - return inner - - def _makeOne(self, *args, **kwargs): - from kazoo.retry import KazooRetry - return KazooRetry(*args, **kwargs) - - def test_reset(self): - retry = self._makeOne(delay=0, max_tries=2) - retry(self._fail()) - eq_(retry._attempts, 1) - retry.reset() - eq_(retry._attempts, 0) - - def test_too_many_tries(self): - from kazoo.retry import RetryFailedError - retry = self._makeOne(delay=0) - self.assertRaises(RetryFailedError, retry, self._fail(times=999)) - eq_(retry._attempts, 1) - - def test_maximum_delay(self): - def sleep_func(_time): - pass - - retry = self._makeOne(delay=10, max_tries=100, sleep_func=sleep_func) - retry(self._fail(times=10)) - self.assertTrue(retry._cur_delay < 4000, retry._cur_delay) - # gevent's sleep function is picky about the type - eq_(type(retry._cur_delay), float) - - def test_copy(self): - _sleep = lambda t: None - retry = self._makeOne(sleep_func=_sleep) - rcopy = retry.copy() - self.assertTrue(rcopy.sleep_func is _sleep) - - -class TestKazooRetry(unittest.TestCase): - - def _makeOne(self, **kw): - from kazoo.retry import KazooRetry - return KazooRetry(**kw) - - def test_connection_closed(self): - from kazoo.exceptions import ConnectionClosedError - retry = self._makeOne() - - def testit(): - raise ConnectionClosedError() - self.assertRaises(ConnectionClosedError, retry, testit) - - def test_session_expired(self): - from kazoo.exceptions import SessionExpiredError - retry = self._makeOne(max_tries=1) - - def testit(): - raise SessionExpiredError() - self.assertRaises(Exception, retry, testit) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_security.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_security.py b/slider-agent/src/main/python/kazoo/tests/test_security.py deleted file mode 100644 index 587c265..0000000 --- a/slider-agent/src/main/python/kazoo/tests/test_security.py +++ /dev/null @@ -1,41 +0,0 @@ -"""license: Apache License 2.0, see LICENSE for more details.""" -import unittest - -from nose.tools import eq_ -from kazoo.security import Permissions - - -class TestACL(unittest.TestCase): - def _makeOne(self, *args, **kwargs): - from kazoo.security import make_acl - return make_acl(*args, **kwargs) - - def test_read_acl(self): - acl = self._makeOne("digest", ":", read=True) - eq_(acl.perms & Permissions.READ, Permissions.READ) - - def test_all_perms(self): - acl = self._makeOne("digest", ":", read=True, write=True, - create=True, delete=True, admin=True) - for perm in [Permissions.READ, Permissions.CREATE, Permissions.WRITE, - Permissions.DELETE, Permissions.ADMIN]: - eq_(acl.perms & perm, perm) - - def test_perm_listing(self): - from kazoo.security import ACL - f = ACL(15, 'fred') - self.assert_('READ' in f.acl_list) - self.assert_('WRITE' in f.acl_list) - self.assert_('CREATE' in f.acl_list) - self.assert_('DELETE' in f.acl_list) - - f = ACL(16, 'fred') - self.assert_('ADMIN' in f.acl_list) - - f = ACL(31, 'george') - self.assert_('ALL' in f.acl_list) - - def test_perm_repr(self): - from kazoo.security import ACL - f = ACL(16, 'fred') - self.assert_("ACL(perms=16, acl_list=['ADMIN']" in repr(f)) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_threading_handler.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_threading_handler.py b/slider-agent/src/main/python/kazoo/tests/test_threading_handler.py deleted file mode 100644 index 4de5781..0000000 --- a/slider-agent/src/main/python/kazoo/tests/test_threading_handler.py +++ /dev/null @@ -1,327 +0,0 @@ -"""license: Apache License 2.0, see LICENSE for more details.""" -import threading -import unittest - -import mock -from nose.tools import assert_raises -from nose.tools import eq_ -from nose.tools import raises - - -class TestThreadingHandler(unittest.TestCase): - def _makeOne(self, *args): - from kazoo.handlers.threading import SequentialThreadingHandler - return SequentialThreadingHandler(*args) - - def _getAsync(self, *args): - from kazoo.handlers.threading import AsyncResult - return AsyncResult - - def test_proper_threading(self): - h = self._makeOne() - h.start() - # In Python 3.3 _Event is gone, before Event is function - event_class = getattr(threading, '_Event', threading.Event) - assert isinstance(h.event_object(), event_class) - - def test_matching_async(self): - h = self._makeOne() - h.start() - async = self._getAsync() - assert isinstance(h.async_result(), async) - - def test_exception_raising(self): - h = self._makeOne() - - @raises(h.timeout_exception) - def testit(): - raise h.timeout_exception("This is a timeout") - testit() - - def test_double_start_stop(self): - h = self._makeOne() - h.start() - self.assertTrue(h._running) - h.start() - h.stop() - h.stop() - self.assertFalse(h._running) - - -class TestThreadingAsync(unittest.TestCase): - def _makeOne(self, *args): - from kazoo.handlers.threading import AsyncResult - return AsyncResult(*args) - - def _makeHandler(self): - from kazoo.handlers.threading import SequentialThreadingHandler - return SequentialThreadingHandler() - - def test_ready(self): - mock_handler = mock.Mock() - async = self._makeOne(mock_handler) - - eq_(async.ready(), False) - async.set('val') - eq_(async.ready(), True) - eq_(async.successful(), True) - eq_(async.exception, None) - - def test_callback_queued(self): - mock_handler = mock.Mock() - mock_handler.completion_queue = mock.Mock() - async = self._makeOne(mock_handler) - - async.rawlink(lambda a: a) - async.set('val') - - assert mock_handler.completion_queue.put.called - - def test_set_exception(self): - mock_handler = mock.Mock() - mock_handler.completion_queue = mock.Mock() - async = self._makeOne(mock_handler) - async.rawlink(lambda a: a) - async.set_exception(ImportError('Error occured')) - - assert isinstance(async.exception, ImportError) - assert mock_handler.completion_queue.put.called - - def test_get_wait_while_setting(self): - mock_handler = mock.Mock() - async = self._makeOne(mock_handler) - - lst = [] - bv = threading.Event() - cv = threading.Event() - - def wait_for_val(): - bv.set() - val = async.get() - lst.append(val) - cv.set() - th = threading.Thread(target=wait_for_val) - th.start() - bv.wait() - - async.set('fred') - cv.wait() - eq_(lst, ['fred']) - th.join() - - def test_get_with_nowait(self): - mock_handler = mock.Mock() - async = self._makeOne(mock_handler) - timeout = self._makeHandler().timeout_exception - - @raises(timeout) - def test_it(): - async.get(block=False) - test_it() - - @raises(timeout) - def test_nowait(): - async.get_nowait() - test_nowait() - - def test_get_with_exception(self): - mock_handler = mock.Mock() - async = self._makeOne(mock_handler) - - lst = [] - bv = threading.Event() - cv = threading.Event() - - def wait_for_val(): - bv.set() - try: - val = async.get() - except ImportError: - lst.append('oops') - else: - lst.append(val) - cv.set() - th = threading.Thread(target=wait_for_val) - th.start() - bv.wait() - - async.set_exception(ImportError) - cv.wait() - eq_(lst, ['oops']) - th.join() - - def test_wait(self): - mock_handler = mock.Mock() - async = self._makeOne(mock_handler) - - lst = [] - bv = threading.Event() - cv = threading.Event() - - def wait_for_val(): - bv.set() - try: - val = async.wait(10) - except ImportError: - lst.append('oops') - else: - lst.append(val) - cv.set() - th = threading.Thread(target=wait_for_val) - th.start() - bv.wait(10) - - async.set("fred") - cv.wait(15) - eq_(lst, [True]) - th.join() - - def test_set_before_wait(self): - mock_handler = mock.Mock() - async = self._makeOne(mock_handler) - - lst = [] - cv = threading.Event() - async.set('fred') - - def wait_for_val(): - val = async.get() - lst.append(val) - cv.set() - th = threading.Thread(target=wait_for_val) - th.start() - cv.wait() - eq_(lst, ['fred']) - th.join() - - def test_set_exc_before_wait(self): - mock_handler = mock.Mock() - async = self._makeOne(mock_handler) - - lst = [] - cv = threading.Event() - async.set_exception(ImportError) - - def wait_for_val(): - try: - val = async.get() - except ImportError: - lst.append('ooops') - else: - lst.append(val) - cv.set() - th = threading.Thread(target=wait_for_val) - th.start() - cv.wait() - eq_(lst, ['ooops']) - th.join() - - def test_linkage(self): - mock_handler = mock.Mock() - async = self._makeOne(mock_handler) - cv = threading.Event() - - lst = [] - - def add_on(): - lst.append(True) - - def wait_for_val(): - async.get() - cv.set() - - th = threading.Thread(target=wait_for_val) - th.start() - - async.rawlink(add_on) - async.set('fred') - assert mock_handler.completion_queue.put.called - async.unlink(add_on) - cv.wait() - eq_(async.value, 'fred') - th.join() - - def test_linkage_not_ready(self): - mock_handler = mock.Mock() - async = self._makeOne(mock_handler) - - lst = [] - - def add_on(): - lst.append(True) - - async.set('fred') - assert not mock_handler.completion_queue.called - async.rawlink(add_on) - assert mock_handler.completion_queue.put.called - - def test_link_and_unlink(self): - mock_handler = mock.Mock() - async = self._makeOne(mock_handler) - - lst = [] - - def add_on(): - lst.append(True) - - async.rawlink(add_on) - assert not mock_handler.completion_queue.put.called - async.unlink(add_on) - async.set('fred') - assert not mock_handler.completion_queue.put.called - - def test_captured_exception(self): - from kazoo.handlers.utils import capture_exceptions - - mock_handler = mock.Mock() - async = self._makeOne(mock_handler) - - @capture_exceptions(async) - def exceptional_function(): - return 1/0 - - exceptional_function() - - assert_raises(ZeroDivisionError, async.get) - - def test_no_capture_exceptions(self): - from kazoo.handlers.utils import capture_exceptions - - mock_handler = mock.Mock() - async = self._makeOne(mock_handler) - - lst = [] - - def add_on(): - lst.append(True) - - async.rawlink(add_on) - - @capture_exceptions(async) - def regular_function(): - return True - - regular_function() - - assert not mock_handler.completion_queue.put.called - - def test_wraps(self): - from kazoo.handlers.utils import wrap - - mock_handler = mock.Mock() - async = self._makeOne(mock_handler) - - lst = [] - - def add_on(result): - lst.append(result.get()) - - async.rawlink(add_on) - - @wrap(async) - def regular_function(): - return 'hello' - - assert regular_function() == 'hello' - assert mock_handler.completion_queue.put.called - assert async.get() == 'hello' http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/test_watchers.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_watchers.py b/slider-agent/src/main/python/kazoo/tests/test_watchers.py deleted file mode 100644 index 44795c4..0000000 --- a/slider-agent/src/main/python/kazoo/tests/test_watchers.py +++ /dev/null @@ -1,490 +0,0 @@ -"""license: Apache License 2.0, see LICENSE for more details.""" -import time -import threading -import uuid - -from nose.tools import eq_ -from nose.tools import raises - -from kazoo.exceptions import KazooException -from kazoo.protocol.states import EventType -from kazoo.testing import KazooTestCase - - -class KazooDataWatcherTests(KazooTestCase): - def setUp(self): - super(KazooDataWatcherTests, self).setUp() - self.path = "/" + uuid.uuid4().hex - self.client.ensure_path(self.path) - - def test_data_watcher(self): - update = threading.Event() - data = [True] - - # Make it a non-existent path - self.path += 'f' - - @self.client.DataWatch(self.path) - def changed(d, stat): - data.pop() - data.append(d) - update.set() - - update.wait(10) - eq_(data, [None]) - update.clear() - - self.client.create(self.path, b'fred') - update.wait(10) - eq_(data[0], b'fred') - update.clear() - - def test_data_watcher_once(self): - update = threading.Event() - data = [True] - - # Make it a non-existent path - self.path += 'f' - - dwatcher = self.client.DataWatch(self.path) - - @dwatcher - def changed(d, stat): - data.pop() - data.append(d) - update.set() - - update.wait(10) - eq_(data, [None]) - update.clear() - - @raises(KazooException) - def test_it(): - @dwatcher - def func(d, stat): - data.pop() - test_it() - - def test_data_watcher_with_event(self): - # Test that the data watcher gets passed the event, if it - # accepts three arguments - update = threading.Event() - data = [True] - - # Make it a non-existent path - self.path += 'f' - - @self.client.DataWatch(self.path) - def changed(d, stat, event): - data.pop() - data.append(event) - update.set() - - update.wait(10) - eq_(data, [None]) - update.clear() - - self.client.create(self.path, b'fred') - update.wait(10) - eq_(data[0].type, EventType.CREATED) - update.clear() - - def test_func_style_data_watch(self): - update = threading.Event() - data = [True] - - # Make it a non-existent path - path = self.path + 'f' - - def changed(d, stat): - data.pop() - data.append(d) - update.set() - self.client.DataWatch(path, changed) - - update.wait(10) - eq_(data, [None]) - update.clear() - - self.client.create(path, b'fred') - update.wait(10) - eq_(data[0], b'fred') - update.clear() - - def test_datawatch_across_session_expire(self): - update = threading.Event() - data = [True] - - @self.client.DataWatch(self.path) - def changed(d, stat): - data.pop() - data.append(d) - update.set() - - update.wait(10) - eq_(data, [b""]) - update.clear() - - self.expire_session() - self.client.retry(self.client.set, self.path, b'fred') - update.wait(25) - eq_(data[0], b'fred') - - def test_func_stops(self): - update = threading.Event() - data = [True] - - self.path += "f" - - fail_through = [] - - @self.client.DataWatch(self.path) - def changed(d, stat): - data.pop() - data.append(d) - update.set() - if fail_through: - return False - - update.wait(10) - eq_(data, [None]) - update.clear() - - fail_through.append(True) - self.client.create(self.path, b'fred') - update.wait(10) - eq_(data[0], b'fred') - update.clear() - - self.client.set(self.path, b'asdfasdf') - update.wait(0.2) - eq_(data[0], b'fred') - - d, stat = self.client.get(self.path) - eq_(d, b'asdfasdf') - - def test_no_such_node(self): - args = [] - - @self.client.DataWatch("/some/path") - def changed(d, stat): - args.extend([d, stat]) - - eq_(args, [None, None]) - - def test_bad_watch_func2(self): - counter = 0 - - @self.client.DataWatch(self.path) - def changed(d, stat): - if counter > 0: - raise Exception("oops") - - raises(Exception)(changed) - - counter += 1 - self.client.set(self.path, b'asdfasdf') - - def test_watcher_evaluating_to_false(self): - class WeirdWatcher(list): - def __call__(self, *args): - self.called = True - watcher = WeirdWatcher() - self.client.DataWatch(self.path, watcher) - self.client.set(self.path, b'mwahaha') - self.assertTrue(watcher.called) - - def test_watcher_repeat_delete(self): - a = [] - ev = threading.Event() - - self.client.delete(self.path) - - @self.client.DataWatch(self.path) - def changed(val, stat): - a.append(val) - ev.set() - - eq_(a, [None]) - ev.wait(10) - ev.clear() - self.client.create(self.path, b'blah') - ev.wait(10) - eq_(ev.is_set(), True) - ev.clear() - eq_(a, [None, b'blah']) - self.client.delete(self.path) - ev.wait(10) - eq_(ev.is_set(), True) - ev.clear() - eq_(a, [None, b'blah', None]) - self.client.create(self.path, b'blah') - ev.wait(10) - eq_(ev.is_set(), True) - ev.clear() - eq_(a, [None, b'blah', None, b'blah']) - - def test_watcher_with_closing(self): - a = [] - ev = threading.Event() - - self.client.delete(self.path) - - @self.client.DataWatch(self.path) - def changed(val, stat): - a.append(val) - ev.set() - eq_(a, [None]) - - b = False - try: - self.client.stop() - except: - b = True - eq_(b, False) - - -class KazooChildrenWatcherTests(KazooTestCase): - def setUp(self): - super(KazooChildrenWatcherTests, self).setUp() - self.path = "/" + uuid.uuid4().hex - self.client.ensure_path(self.path) - - def test_child_watcher(self): - update = threading.Event() - all_children = ['fred'] - - @self.client.ChildrenWatch(self.path) - def changed(children): - while all_children: - all_children.pop() - all_children.extend(children) - update.set() - - update.wait(10) - eq_(all_children, []) - update.clear() - - self.client.create(self.path + '/' + 'smith') - update.wait(10) - eq_(all_children, ['smith']) - update.clear() - - self.client.create(self.path + '/' + 'george') - update.wait(10) - eq_(sorted(all_children), ['george', 'smith']) - - def test_child_watcher_once(self): - update = threading.Event() - all_children = ['fred'] - - cwatch = self.client.ChildrenWatch(self.path) - - @cwatch - def changed(children): - while all_children: - all_children.pop() - all_children.extend(children) - update.set() - - update.wait(10) - eq_(all_children, []) - update.clear() - - @raises(KazooException) - def test_it(): - @cwatch - def changed_again(children): - update.set() - test_it() - - def test_child_watcher_with_event(self): - update = threading.Event() - events = [True] - - @self.client.ChildrenWatch(self.path, send_event=True) - def changed(children, event): - events.pop() - events.append(event) - update.set() - - update.wait(10) - eq_(events, [None]) - update.clear() - - self.client.create(self.path + '/' + 'smith') - update.wait(10) - eq_(events[0].type, EventType.CHILD) - update.clear() - - def test_func_style_child_watcher(self): - update = threading.Event() - all_children = ['fred'] - - def changed(children): - while all_children: - all_children.pop() - all_children.extend(children) - update.set() - - self.client.ChildrenWatch(self.path, changed) - - update.wait(10) - eq_(all_children, []) - update.clear() - - self.client.create(self.path + '/' + 'smith') - update.wait(10) - eq_(all_children, ['smith']) - update.clear() - - self.client.create(self.path + '/' + 'george') - update.wait(10) - eq_(sorted(all_children), ['george', 'smith']) - - def test_func_stops(self): - update = threading.Event() - all_children = ['fred'] - - fail_through = [] - - @self.client.ChildrenWatch(self.path) - def changed(children): - while all_children: - all_children.pop() - all_children.extend(children) - update.set() - if fail_through: - return False - - update.wait(10) - eq_(all_children, []) - update.clear() - - fail_through.append(True) - self.client.create(self.path + '/' + 'smith') - update.wait(10) - eq_(all_children, ['smith']) - update.clear() - - self.client.create(self.path + '/' + 'george') - update.wait(0.5) - eq_(all_children, ['smith']) - - def test_child_watch_session_loss(self): - update = threading.Event() - all_children = ['fred'] - - @self.client.ChildrenWatch(self.path) - def changed(children): - while all_children: - all_children.pop() - all_children.extend(children) - update.set() - - update.wait(10) - eq_(all_children, []) - update.clear() - - self.client.create(self.path + '/' + 'smith') - update.wait(10) - eq_(all_children, ['smith']) - update.clear() - self.expire_session() - - self.client.retry(self.client.create, - self.path + '/' + 'george') - update.wait(20) - eq_(sorted(all_children), ['george', 'smith']) - - def test_child_stop_on_session_loss(self): - update = threading.Event() - all_children = ['fred'] - - @self.client.ChildrenWatch(self.path, allow_session_lost=False) - def changed(children): - while all_children: - all_children.pop() - all_children.extend(children) - update.set() - - update.wait(10) - eq_(all_children, []) - update.clear() - - self.client.create(self.path + '/' + 'smith') - update.wait(10) - eq_(all_children, ['smith']) - update.clear() - self.expire_session() - - self.client.retry(self.client.create, - self.path + '/' + 'george') - update.wait(4) - eq_(update.is_set(), False) - eq_(all_children, ['smith']) - - children = self.client.get_children(self.path) - eq_(sorted(children), ['george', 'smith']) - - def test_bad_children_watch_func(self): - counter = 0 - - @self.client.ChildrenWatch(self.path) - def changed(children): - if counter > 0: - raise Exception("oops") - - raises(Exception)(changed) - counter += 1 - self.client.create(self.path + '/' + 'smith') - - -class KazooPatientChildrenWatcherTests(KazooTestCase): - def setUp(self): - super(KazooPatientChildrenWatcherTests, self).setUp() - self.path = "/" + uuid.uuid4().hex - - def _makeOne(self, *args, **kwargs): - from kazoo.recipe.watchers import PatientChildrenWatch - return PatientChildrenWatch(*args, **kwargs) - - def test_watch(self): - self.client.ensure_path(self.path) - watcher = self._makeOne(self.client, self.path, 0.1) - result = watcher.start() - children, asy = result.get() - eq_(len(children), 0) - eq_(asy.ready(), False) - - self.client.create(self.path + '/' + 'fred') - asy.get(timeout=1) - eq_(asy.ready(), True) - - def test_exception(self): - from kazoo.exceptions import NoNodeError - watcher = self._makeOne(self.client, self.path, 0.1) - result = watcher.start() - - @raises(NoNodeError) - def testit(): - result.get() - testit() - - def test_watch_iterations(self): - self.client.ensure_path(self.path) - watcher = self._makeOne(self.client, self.path, 0.5) - result = watcher.start() - eq_(result.ready(), False) - - time.sleep(0.08) - self.client.create(self.path + '/' + uuid.uuid4().hex) - eq_(result.ready(), False) - time.sleep(0.08) - eq_(result.ready(), False) - self.client.create(self.path + '/' + uuid.uuid4().hex) - time.sleep(0.08) - eq_(result.ready(), False) - - children, asy = result.get() - eq_(len(children), 2) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4156c1c3/slider-agent/src/main/python/kazoo/tests/util.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/util.py b/slider-agent/src/main/python/kazoo/tests/util.py deleted file mode 100644 index 906cbc0..0000000 --- a/slider-agent/src/main/python/kazoo/tests/util.py +++ /dev/null @@ -1,127 +0,0 @@ -"""license: Apache License 2.0, see LICENSE for more details.""" -############################################################################## -# -# Copyright Zope Foundation and Contributors. -# All Rights Reserved. -# -# This software is subject to the provisions of the Zope Public License, -# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. -# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED -# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS -# FOR A PARTICULAR PURPOSE. -# -############################################################################## - -import logging -import os -import time - -TRAVIS = os.environ.get('TRAVIS', False) -TRAVIS_ZK_VERSION = TRAVIS and os.environ.get('ZOOKEEPER_VERSION', None) -if TRAVIS_ZK_VERSION: - TRAVIS_ZK_VERSION = tuple([int(n) for n in TRAVIS_ZK_VERSION.split('.')]) - - -class Handler(logging.Handler): - - def __init__(self, *names, **kw): - logging.Handler.__init__(self) - self.names = names - self.records = [] - self.setLoggerLevel(**kw) - - def setLoggerLevel(self, level=1): - self.level = level - self.oldlevels = {} - - def emit(self, record): - self.records.append(record) - - def clear(self): - del self.records[:] - - def install(self): - for name in self.names: - logger = logging.getLogger(name) - self.oldlevels[name] = logger.level - logger.setLevel(self.level) - logger.addHandler(self) - - def uninstall(self): - for name in self.names: - logger = logging.getLogger(name) - logger.setLevel(self.oldlevels[name]) - logger.removeHandler(self) - - def __str__(self): - return '\n'.join( - [("%s %s\n %s" % - (record.name, record.levelname, - '\n'.join([line - for line in record.getMessage().split('\n') - if line.strip()]) - ) - ) - for record in self.records] - ) - - -class InstalledHandler(Handler): - - def __init__(self, *names, **kw): - Handler.__init__(self, *names, **kw) - self.install() - - -class Wait(object): - - class TimeOutWaitingFor(Exception): - "A test condition timed out" - - timeout = 9 - wait = .01 - - def __init__(self, timeout=None, wait=None, exception=None, - getnow=(lambda: time.time), getsleep=(lambda: time.sleep)): - - if timeout is not None: - self.timeout = timeout - - if wait is not None: - self.wait = wait - - if exception is not None: - self.TimeOutWaitingFor = exception - - self.getnow = getnow - self.getsleep = getsleep - - def __call__(self, func=None, timeout=None, wait=None, message=None): - if func is None: - return lambda func: self(func, timeout, wait, message) - - if func(): - return - - now = self.getnow() - sleep = self.getsleep() - if timeout is None: - timeout = self.timeout - if wait is None: - wait = self.wait - wait = float(wait) - - deadline = now() + timeout - while 1: - sleep(wait) - if func(): - return - if now() > deadline: - raise self.TimeOutWaitingFor( - message or - getattr(func, '__doc__') or - getattr(func, '__name__') - ) - -wait = Wait()
