SLIDER-389. Upgrade kazoo code base to the latest version
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/ce647d39 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/ce647d39 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/ce647d39 Branch: refs/heads/feature/SLIDER-149_Support_a_YARN_service_registry Commit: ce647d399acf5b2d90e51a8509633183438e11d3 Parents: 732569d Author: Sumit Mohanty <[email protected]> Authored: Fri Sep 26 14:20:49 2014 -0700 Committer: Sumit Mohanty <[email protected]> Committed: Fri Sep 26 14:20:49 2014 -0700 ---------------------------------------------------------------------- slider-agent/src/main/python/kazoo/client.py | 56 +++++++----- .../src/main/python/kazoo/handlers/utils.py | 42 +-------- .../main/python/kazoo/protocol/connection.py | 74 ++++++++-------- .../src/main/python/kazoo/tests/test_client.py | 55 ++++++------ .../main/python/kazoo/tests/test_connection.py | 89 ++++++++------------ 5 files changed, 137 insertions(+), 179 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/ce647d39/slider-agent/src/main/python/kazoo/client.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/client.py b/slider-agent/src/main/python/kazoo/client.py index 11d9008..47545ee 100644 --- a/slider-agent/src/main/python/kazoo/client.py +++ b/slider-agent/src/main/python/kazoo/client.py @@ -20,7 +20,7 @@ from kazoo.exceptions import ( WriterNotClosedException, ) from kazoo.handlers.threading import SequentialThreadingHandler -from kazoo.handlers.utils import capture_exceptions, wrap, pipe_or_sock_write +from kazoo.handlers.utils import capture_exceptions, wrap from kazoo.hosts import collect_hosts from kazoo.loggingsupport import BLATHER from kazoo.protocol.connection import ConnectionHandler @@ -220,7 +220,6 @@ class KazooClient(object): elif type(command_retry) is KazooRetry: self.retry = command_retry - if type(self._conn_retry) is KazooRetry: if self.handler.sleep_func != self._conn_retry.sleep_func: raise ConfigurationError("Retry handler and event handler " @@ -228,19 +227,21 @@ class KazooClient(object): if type(self.retry) is KazooRetry: if self.handler.sleep_func != self.retry.sleep_func: - raise ConfigurationError("Command retry handler and event " - "handler must use the same sleep func") + raise ConfigurationError( + "Command retry handler and event handler " + "must use the same sleep func") if self.retry is None or self._conn_retry is None: old_retry_keys = dict(_RETRY_COMPAT_DEFAULTS) for key in old_retry_keys: try: old_retry_keys[key] = kwargs.pop(key) - warnings.warn('Passing retry configuration param %s to the' - ' client directly is deprecated, please pass a' - ' configured retry object (using param %s)' % ( - key, _RETRY_COMPAT_MAPPING[key]), - DeprecationWarning, stacklevel=2) + warnings.warn( + 'Passing retry configuration param %s to the ' + 'client directly is deprecated, please pass a ' + 'configured retry object (using param %s)' % ( + key, _RETRY_COMPAT_MAPPING[key]), + DeprecationWarning, stacklevel=2) except KeyError: pass @@ -258,12 +259,13 @@ class KazooClient(object): **retry_keys) self._conn_retry.interrupt = lambda: self._stopped.is_set() - self._connection = ConnectionHandler(self, self._conn_retry.copy(), - logger=self.logger) + self._connection = ConnectionHandler( + self, self._conn_retry.copy(), logger=self.logger) # Every retry call should have its own copy of the retry helper # to avoid shared retry counts self._retry = self.retry + def _retry(*args, **kwargs): return self._retry.copy()(*args, **kwargs) self.retry = _retry @@ -282,7 +284,7 @@ class KazooClient(object): self.Semaphore = partial(Semaphore, self) self.ShallowParty = partial(ShallowParty, self) - # If we got any unhandled keywords, complain like python would + # If we got any unhandled keywords, complain like Python would if kwargs: raise TypeError('__init__() got unexpected keyword arguments: %s' % (kwargs.keys(),)) @@ -433,7 +435,8 @@ class KazooClient(object): return if state in (KeeperState.CONNECTED, KeeperState.CONNECTED_RO): - self.logger.info("Zookeeper connection established, state: %s", state) + self.logger.info("Zookeeper connection established, " + "state: %s", state) self._live.set() self._make_state_change(KazooState.CONNECTED) elif state in LOST_STATES: @@ -510,12 +513,12 @@ class KazooClient(object): self._queue.append((request, async_object)) # wake the connection, guarding against a race with close() - write_pipe = self._connection._write_pipe - if write_pipe is None: + write_sock = self._connection._write_sock + if write_sock is None: async_object.set_exception(ConnectionClosedError( "Connection has been closed")) try: - pipe_or_sock_write(write_pipe, b'\0') + write_sock.send(b'\0') except: async_object.set_exception(ConnectionClosedError( "Connection has been closed")) @@ -585,7 +588,7 @@ class KazooClient(object): self._stopped.set() self._queue.append((CloseInstance, None)) - pipe_or_sock_write(self._connection._write_pipe, b'\0') + self._connection._write_sock.send(b'\0') self._safe_close() def restart(self): @@ -622,7 +625,7 @@ class KazooClient(object): if not self._live.is_set(): raise ConnectionLoss("No connection to server") - peer = self._connection._socket.getpeername() + peer = self._connection._socket.getpeername()[:2] sock = self.handler.create_connection( peer, timeout=self._session_timeout / 1000.0) sock.sendall(cmd) @@ -786,7 +789,7 @@ class KazooClient(object): """ acl = acl or self.default_acl return self.create_async(path, value, acl=acl, ephemeral=ephemeral, - sequence=sequence, makepath=makepath).get() + sequence=sequence, makepath=makepath).get() def create_async(self, path, value=b"", acl=None, ephemeral=False, sequence=False, makepath=False): @@ -828,7 +831,8 @@ class KazooClient(object): @capture_exceptions(async_result) def do_create(): - result = self._create_async_inner(path, value, acl, flags, trailing=sequence) + result = self._create_async_inner( + path, value, acl, flags, trailing=sequence) result.rawlink(create_completion) @capture_exceptions(async_result) @@ -867,10 +871,13 @@ class KazooClient(object): return async_result def ensure_path(self, path, acl=None): - """Recursively create a path if it doesn't exist. + """Recursively create a path if it doesn't exist. Also return value indicates + if path already existed or had to be created. :param path: Path of node. :param acl: Permissions for node. + :returns `True` if path existed, `False` otherwise. + :rtype: bool """ return self.ensure_path_async(path, acl).get() @@ -1291,6 +1298,13 @@ class TransactionRequest(object): Transactions are not thread-safe and should not be accessed from multiple threads at once. + .. note:: + + The ``committed`` attribute only indicates whether this + transaction has been sent to Zookeeper and is used to prevent + duplicate commits of the same transaction. The result should be + checked to determine if the transaction executed as desired. + .. versionadded:: 0.6 Requires Zookeeper 3.4+ http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/ce647d39/slider-agent/src/main/python/kazoo/handlers/utils.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/handlers/utils.py b/slider-agent/src/main/python/kazoo/handlers/utils.py index 385495e..93cfdb5 100644 --- a/slider-agent/src/main/python/kazoo/handlers/utils.py +++ b/slider-agent/src/main/python/kazoo/handlers/utils.py @@ -23,47 +23,7 @@ def _set_default_tcpsock_options(module, sock): _set_fd_cloexec(sock) return sock -def pipe_or_sock_read(p_or_s, n): - ''' Use a socket or a pipe to read something''' - if isinstance(p_or_s, int): - # This is a pipe - return os.read(p_or_s, n) - else: - return p_or_s.recv(n) - -def pipe_or_sock_close(p_or_s): - ''' Closes either a socket or a pipe''' - if isinstance(p_or_s, int): - os.close(p_or_s) - else: - p_or_s.close() - -def pipe_or_sock_write(p_or_s, b): - ''' Read from a socket or a pipe depending on what is passed''' - if isinstance(p_or_s, int): - # This is a pipe - os.write(p_or_s,b) - else: - p_or_s.send(b) - -def create_pipe_or_sock(): - """ Create a non-blocking read/write pipe. - On Windows create a pair of sockets - """ - if sys.platform == "win32": - r, w = create_sock_pair() - else: - r, w = os.pipe() - if HAS_FNCTL: - fcntl.fcntl(r, fcntl.F_SETFL, os.O_NONBLOCK) - fcntl.fcntl(w, fcntl.F_SETFL, os.O_NONBLOCK) - _set_fd_cloexec(r) - _set_fd_cloexec(w) - return r, w - - - -def create_sock_pair(port=0): +def create_socket_pair(port=0): """Create socket pair. If socket.socketpair isn't available, we emulate it. http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/ce647d39/slider-agent/src/main/python/kazoo/protocol/connection.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/protocol/connection.py b/slider-agent/src/main/python/kazoo/protocol/connection.py index 7872928..6b89c18 100644 --- a/slider-agent/src/main/python/kazoo/protocol/connection.py +++ b/slider-agent/src/main/python/kazoo/protocol/connection.py @@ -17,7 +17,7 @@ from kazoo.exceptions import ( SessionExpiredError, NoNodeError ) -from kazoo.handlers.utils import create_pipe_or_sock, pipe_or_sock_read, pipe_or_sock_write, pipe_or_sock_close +from kazoo.handlers.utils import create_socket_pair from kazoo.loggingsupport import BLATHER from kazoo.protocol.serialization import ( Auth, @@ -146,8 +146,8 @@ class ConnectionHandler(object): self.connection_stopped.set() self.ping_outstanding = client.handler.event_object() - self._read_pipe = None - self._write_pipe = None + self._read_sock = None + self._write_sock = None self._socket = None self._xid = None @@ -169,7 +169,7 @@ class ConnectionHandler(object): def start(self): """Start the connection up""" if self.connection_closed.is_set(): - self._read_pipe, self._write_pipe = create_pipe_or_sock() + self._read_sock, self._write_sock = create_socket_pair() self.connection_closed.clear() if self._connection_routine: raise Exception("Unable to start, connection routine already " @@ -192,12 +192,12 @@ class ConnectionHandler(object): if not self.connection_stopped.is_set(): raise Exception("Cannot close connection until it is stopped") self.connection_closed.set() - wp, rp = self._write_pipe, self._read_pipe - self._write_pipe = self._read_pipe = None - if wp is not None: - pipe_or_sock_close(wp) - if rp is not None: - pipe_or_sock_close(rp) + ws, rs = self._write_sock, self._read_sock + self._write_sock = self._read_sock = None + if ws is not None: + ws.close() + if rs is not None: + rs.close() def _server_pinger(self): """Returns a server pinger iterable, that will ping the next @@ -238,8 +238,8 @@ class ConnectionHandler(object): if xid: header, buffer, offset = self._read_header(timeout) if header.xid != xid: - raise RuntimeError('xids do not match, expected %r received %r', - xid, header.xid) + raise RuntimeError('xids do not match, expected %r ' + 'received %r', xid, header.xid) if header.zxid > 0: zxid = header.zxid if header.err: @@ -257,8 +257,9 @@ class ConnectionHandler(object): try: obj, _ = request.deserialize(msg, 0) except Exception: - self.logger.exception("Exception raised during deserialization" - " of request: %s", request) + self.logger.exception( + "Exception raised during deserialization " + "of request: %s", request) # raise ConnectionDropped so connect loop will retry raise ConnectionDropped('invalid server response') @@ -276,8 +277,9 @@ class ConnectionHandler(object): if request.type: b.extend(int_struct.pack(request.type)) b += request.serialize() - self.logger.log((BLATHER if isinstance(request, Ping) else logging.DEBUG), - "Sending request(xid=%s): %s", xid, request) + self.logger.log( + (BLATHER if isinstance(request, Ping) else logging.DEBUG), + "Sending request(xid=%s): %s", xid, request) self._write(int_struct.pack(len(b)) + b, timeout) def _write(self, msg, timeout): @@ -358,8 +360,9 @@ class ConnectionHandler(object): try: response = request.deserialize(buffer, offset) except Exception as exc: - self.logger.exception("Exception raised during deserialization" - " of request: %s", request) + self.logger.exception( + "Exception raised during deserialization " + "of request: %s", request) async_object.set_exception(exc) return self.logger.debug( @@ -415,11 +418,11 @@ class ConnectionHandler(object): except IndexError: # Not actually something on the queue, this can occur if # something happens to cancel the request such that we - # don't clear the pipe below after sending + # don't clear the socket below after sending try: # Clear possible inconsistence (no request in the queue - # but have data in the read pipe), which causes cpu to spin. - pipe_or_sock_read(self._read_pipe, 1) + # but have data in the read socket), which causes cpu to spin. + self._read_sock.recv(1) except OSError: pass return @@ -440,7 +443,7 @@ class ConnectionHandler(object): self._submit(request, connect_timeout, xid) client._queue.popleft() - pipe_or_sock_read(self._read_pipe, 1) + self._read_sock.recv(1) client._pending.append((request, async_object, xid)) def _send_ping(self, connect_timeout): @@ -519,7 +522,7 @@ class ConnectionHandler(object): jitter_time = random.randint(0, 40) / 100.0 # Ensure our timeout is positive timeout = max([read_timeout / 2.0 - jitter_time, jitter_time]) - s = self.handler.select([self._socket, self._read_pipe], + s = self.handler.select([self._socket, self._read_sock], [], [], timeout)[0] if not s: @@ -570,9 +573,9 @@ class ConnectionHandler(object): self.logger.info('Connecting to %s:%s', host, port) self.logger.log(BLATHER, - ' Using session_id: %r session_passwd: %s', - client._session_id, - hexlify(client._session_passwd)) + ' Using session_id: %r session_passwd: %s', + client._session_id, + hexlify(client._session_passwd)) with self._socket_error_handling(): self._socket = self.handler.create_connection( @@ -584,7 +587,8 @@ class ConnectionHandler(object): client._session_id or 0, client._session_passwd, client.read_only) - connect_result, zxid = self._invoke(client._session_timeout, connect) + connect_result, zxid = self._invoke( + client._session_timeout / 1000.0, connect) if connect_result.time_out <= 0: raise SessionExpiredError("Session has expired") @@ -601,13 +605,13 @@ class ConnectionHandler(object): client._session_passwd = connect_result.passwd self.logger.log(BLATHER, - 'Session created, session_id: %r session_passwd: %s\n' - ' negotiated session timeout: %s\n' - ' connect timeout: %s\n' - ' read timeout: %s', client._session_id, - hexlify(client._session_passwd), - negotiated_session_timeout, connect_timeout, - read_timeout) + 'Session created, session_id: %r session_passwd: %s\n' + ' negotiated session timeout: %s\n' + ' connect timeout: %s\n' + ' read timeout: %s', client._session_id, + hexlify(client._session_passwd), + negotiated_session_timeout, connect_timeout, + read_timeout) if connect_result.read_only: client._session_callback(KeeperState.CONNECTED_RO) @@ -618,7 +622,7 @@ class ConnectionHandler(object): for scheme, auth in client.auth_data: ap = Auth(0, scheme, auth) - zxid = self._invoke(connect_timeout, ap, xid=AUTH_XID) + zxid = self._invoke(connect_timeout / 1000.0, ap, xid=AUTH_XID) if zxid: client.last_zxid = zxid return read_timeout, connect_timeout http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/ce647d39/slider-agent/src/main/python/kazoo/tests/test_client.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_client.py b/slider-agent/src/main/python/kazoo/tests/test_client.py index 5807292..f851b63 100644 --- a/slider-agent/src/main/python/kazoo/tests/test_client.py +++ b/slider-agent/src/main/python/kazoo/tests/test_client.py @@ -72,7 +72,8 @@ class TestClientConstructor(unittest.TestCase): def test_invalid_handler(self): from kazoo.handlers.threading import SequentialThreadingHandler - self.assertRaises(ConfigurationError, + self.assertRaises( + ConfigurationError, self._makeOne, handler=SequentialThreadingHandler) def test_chroot(self): @@ -91,7 +92,7 @@ class TestClientConstructor(unittest.TestCase): def test_ordered_host_selection(self): client = self._makeOne(hosts='127.0.0.1:9,127.0.0.2:9/a', - randomize_hosts=False) + randomize_hosts=False) hosts = [h for h in client.hosts] eq_(hosts, [('127.0.0.1', 9), ('127.0.0.2', 9)]) @@ -371,29 +372,29 @@ class TestConnection(KazooTestCase): client = self.client client.stop() - write_pipe = client._connection._write_pipe + write_sock = client._connection._write_sock - # close the connection to free the pipe + # close the connection to free the socket client.close() - eq_(client._connection._write_pipe, None) + eq_(client._connection._write_sock, None) # sneak in and patch client to simulate race between a thread # calling stop(); close() and one running a command oldstate = client._state client._state = KeeperState.CONNECTED - client._connection._write_pipe = write_pipe + client._connection._write_sock = write_sock try: - # simulate call made after write pipe is closed + # simulate call made after write socket is closed self.assertRaises(ConnectionClosedError, client.exists, '/') - # simualte call made after write pipe is set to None - client._connection._write_pipe = None + # simulate call made after write socket is set to None + client._connection._write_sock = None self.assertRaises(ConnectionClosedError, client.exists, '/') finally: # reset for teardown client._state = oldstate - client._connection._write_pipe = None + client._connection._write_sock = None class TestClient(KazooTestCase): @@ -544,9 +545,9 @@ class TestClient(KazooTestCase): client = self.client client.create("/1", b"ephemeral", ephemeral=True) self.assertRaises(NoChildrenForEphemeralsError, - client.create, "/1/2", b"val1") + client.create, "/1/2", b"val1") self.assertRaises(NoChildrenForEphemeralsError, - client.create, "/1/2", b"val1", ephemeral=True) + client.create, "/1/2", b"val1", ephemeral=True) def test_create_sequence(self): client = self.client @@ -560,8 +561,8 @@ class TestClient(KazooTestCase): def test_create_ephemeral_sequence(self): basepath = "/" + uuid.uuid4().hex - realpath = self.client.create(basepath, b"sandwich", sequence=True, - ephemeral=True) + realpath = self.client.create(basepath, b"sandwich", + sequence=True, ephemeral=True) self.assertTrue(basepath != realpath and realpath.startswith(basepath)) data, stat = self.client.get(realpath) eq_(data, b"sandwich") @@ -575,33 +576,35 @@ class TestClient(KazooTestCase): data, stat = self.client.get("/1/2/3/4/5") eq_(data, b"val2") - self.assertRaises(NodeExistsError, self.client.create, "/1/2/3/4/5", - b"val2", makepath=True) + self.assertRaises(NodeExistsError, self.client.create, + "/1/2/3/4/5", b"val2", makepath=True) def test_create_makepath_incompatible_acls(self): from kazoo.client import KazooClient from kazoo.security import make_digest_acl_credential, CREATOR_ALL_ACL credential = make_digest_acl_credential("username", "password") - alt_client = KazooClient(self.cluster[0].address + self.client.chroot, + alt_client = KazooClient( + self.cluster[0].address + self.client.chroot, max_retries=5, auth_data=[("digest", credential)]) alt_client.start() alt_client.create("/1/2", b"val2", makepath=True, acl=CREATOR_ALL_ACL) try: - self.assertRaises(NoAuthError, self.client.create, "/1/2/3/4/5", - b"val2", makepath=True) + self.assertRaises(NoAuthError, self.client.create, + "/1/2/3/4/5", b"val2", makepath=True) finally: alt_client.delete('/', recursive=True) alt_client.stop() def test_create_no_makepath(self): - self.assertRaises(NoNodeError, self.client.create, "/1/2", b"val1") - self.assertRaises(NoNodeError, self.client.create, "/1/2", b"val1", - makepath=False) + self.assertRaises(NoNodeError, self.client.create, + "/1/2", b"val1") + self.assertRaises(NoNodeError, self.client.create, + "/1/2", b"val1", makepath=False) self.client.create("/1/2", b"val1", makepath=True) - self.assertRaises(NoNodeError, self.client.create, "/1/2/3/4", b"val1", - makepath=False) + self.assertRaises(NoNodeError, self.client.create, + "/1/2/3/4", b"val1", makepath=False) def test_create_exists(self): from kazoo.exceptions import NodeExistsError @@ -844,7 +847,7 @@ class TestClient(KazooTestCase): client = self.client self.assertRaises(NoNodeError, client.get_children, '/none') self.assertRaises(NoNodeError, client.get_children, - '/none', include_data=True) + '/none', include_data=True) def test_get_children_invalid_path(self): client = self.client @@ -855,7 +858,7 @@ class TestClient(KazooTestCase): self.assertRaises(TypeError, client.get_children, ('a', 'b')) self.assertRaises(TypeError, client.get_children, 'a', watch=True) self.assertRaises(TypeError, client.get_children, - 'a', include_data='yes') + 'a', include_data='yes') def test_invalid_auth(self): from kazoo.exceptions import AuthFailedError http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/ce647d39/slider-agent/src/main/python/kazoo/tests/test_connection.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_connection.py b/slider-agent/src/main/python/kazoo/tests/test_connection.py index d89a2c6..c8c4581 100644 --- a/slider-agent/src/main/python/kazoo/tests/test_connection.py +++ b/slider-agent/src/main/python/kazoo/tests/test_connection.py @@ -43,8 +43,9 @@ class Delete(namedtuple('Delete', 'path version')): class TestConnectionHandler(KazooTestCase): def test_bad_deserialization(self): async_object = self.client.handler.async_result() - self.client._queue.append((Delete(self.client.chroot, -1), async_object)) - os.write(self.client._connection._write_pipe, b'\0') + self.client._queue.append( + (Delete(self.client.chroot, -1), async_object)) + self.client._connection._write_sock.send(b'\0') @raises(ValueError) def testit(): @@ -185,75 +186,51 @@ class TestConnectionHandler(KazooTestCase): # should be able to restart self.client.start() - def test_connection_pipe(self): + def test_connection_sock(self): client = self.client - read_pipe = client._connection._read_pipe - write_pipe = client._connection._write_pipe + read_sock = client._connection._read_sock + write_sock = client._connection._write_sock - assert read_pipe is not None - assert write_pipe is not None + assert read_sock is not None + assert write_sock is not None - # stop client and pipe should not yet be closed + # stop client and socket should not yet be closed client.stop() - assert read_pipe is not None - assert write_pipe is not None - if sys.platform != "win32" - os.fstat(read_pipe) - os.fstat(write_pipe) - else: - read_pipe.getsockname() - write_pipe.getsockname() + assert read_sock is not None + assert write_sock is not None + + read_sock.getsockname() + write_sock.getsockname() - # close client, and pipes should be + # close client, and sockets should be closed client.close() - if sys.platform != "win32" - try: - os.fstat(read_pipe) - except OSError as e: - if not e.errno == errno.EBADF: - raise - else: - self.fail("Expected read_pipe to be closed") - - try: - os.fstat(write_pipe) - except OSError as e: - if not e.errno == errno.EBADF: - raise - else: - self.fail("Expected write_pipe to be closed") - else: - pass # Not sure what to do here - - # start client back up. should get a new, valid pipe + # Todo check socket closing + + # start client back up. should get a new, valid socket client.start() - read_pipe = client._connection._read_pipe - write_pipe = client._connection._write_pipe - - assert read_pipe is not None - assert write_pipe is not None - if sys.platform != "win32" - os.fstat(read_pipe) - os.fstat(write_pipe) - else: - read_pipe.getsockname() - write_pipe.getsockname() + read_sock = client._connection._read_sock + write_sock = client._connection._write_sock + + assert read_sock is not None + assert write_sock is not None + read_sock.getsockname() + write_sock.getsockname() - def test_dirty_pipe(self): + def test_dirty_sock(self): client = self.client - read_pipe = client._connection._read_pipe - write_pipe = client._connection._write_pipe + read_sock = client._connection._read_sock + write_sock = client._connection._write_sock - # add a stray byte to the pipe and ensure that doesn't + # add a stray byte to the socket and ensure that doesn't # blow up client. simulates case where some error leaves - # a byte in the pipe which doesn't correspond to the + # a byte in the socket which doesn't correspond to the # request queue. - pipe_or_sock_write(write_pipe, b'\0') + write_sock.send(b'\0') - # eventually this byte should disappear from pipe - wait(lambda: client.handler.select([read_pipe], [], [], 0)[0] == []) + # eventually this byte should disappear from socket + wait(lambda: client.handler.select([read_sock], [], [], 0)[0] == []) class TestConnectionDrop(KazooTestCase):
