SLIDER-389. Upgrade kazoo code base to the latest version

Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/ce647d39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/ce647d39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/ce647d39

Branch: refs/heads/feature/SLIDER-149_Support_a_YARN_service_registry
Commit: ce647d399acf5b2d90e51a8509633183438e11d3
Parents: 732569d
Author: Sumit Mohanty <[email protected]>
Authored: Fri Sep 26 14:20:49 2014 -0700
Committer: Sumit Mohanty <[email protected]>
Committed: Fri Sep 26 14:20:49 2014 -0700

----------------------------------------------------------------------
 slider-agent/src/main/python/kazoo/client.py    | 56 +++++++-----
 .../src/main/python/kazoo/handlers/utils.py     | 42 +--------
 .../main/python/kazoo/protocol/connection.py    | 74 ++++++++--------
 .../src/main/python/kazoo/tests/test_client.py  | 55 ++++++------
 .../main/python/kazoo/tests/test_connection.py  | 89 ++++++++------------
 5 files changed, 137 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/ce647d39/slider-agent/src/main/python/kazoo/client.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/client.py 
b/slider-agent/src/main/python/kazoo/client.py
index 11d9008..47545ee 100644
--- a/slider-agent/src/main/python/kazoo/client.py
+++ b/slider-agent/src/main/python/kazoo/client.py
@@ -20,7 +20,7 @@ from kazoo.exceptions import (
     WriterNotClosedException,
 )
 from kazoo.handlers.threading import SequentialThreadingHandler
-from kazoo.handlers.utils import capture_exceptions, wrap, pipe_or_sock_write
+from kazoo.handlers.utils import capture_exceptions, wrap
 from kazoo.hosts import collect_hosts
 from kazoo.loggingsupport import BLATHER
 from kazoo.protocol.connection import ConnectionHandler
@@ -220,7 +220,6 @@ class KazooClient(object):
         elif type(command_retry) is KazooRetry:
             self.retry = command_retry
 
-
         if type(self._conn_retry) is KazooRetry:
             if self.handler.sleep_func != self._conn_retry.sleep_func:
                 raise ConfigurationError("Retry handler and event handler "
@@ -228,19 +227,21 @@ class KazooClient(object):
 
         if type(self.retry) is KazooRetry:
             if self.handler.sleep_func != self.retry.sleep_func:
-                raise ConfigurationError("Command retry handler and event "
-                                         "handler must use the same sleep 
func")
+                raise ConfigurationError(
+                    "Command retry handler and event handler "
+                    "must use the same sleep func")
 
         if self.retry is None or self._conn_retry is None:
             old_retry_keys = dict(_RETRY_COMPAT_DEFAULTS)
             for key in old_retry_keys:
                 try:
                     old_retry_keys[key] = kwargs.pop(key)
-                    warnings.warn('Passing retry configuration param %s to the'
-                            ' client directly is deprecated, please pass a'
-                            ' configured retry object (using param %s)' % (
-                                key, _RETRY_COMPAT_MAPPING[key]),
-                            DeprecationWarning, stacklevel=2)
+                    warnings.warn(
+                        'Passing retry configuration param %s to the '
+                        'client directly is deprecated, please pass a '
+                        'configured retry object (using param %s)' % (
+                            key, _RETRY_COMPAT_MAPPING[key]),
+                        DeprecationWarning, stacklevel=2)
                 except KeyError:
                     pass
 
@@ -258,12 +259,13 @@ class KazooClient(object):
                     **retry_keys)
 
         self._conn_retry.interrupt = lambda: self._stopped.is_set()
-        self._connection = ConnectionHandler(self, self._conn_retry.copy(),
-            logger=self.logger)
+        self._connection = ConnectionHandler(
+            self, self._conn_retry.copy(), logger=self.logger)
 
         # Every retry call should have its own copy of the retry helper
         # to avoid shared retry counts
         self._retry = self.retry
+
         def _retry(*args, **kwargs):
             return self._retry.copy()(*args, **kwargs)
         self.retry = _retry
@@ -282,7 +284,7 @@ class KazooClient(object):
         self.Semaphore = partial(Semaphore, self)
         self.ShallowParty = partial(ShallowParty, self)
 
-         # If we got any unhandled keywords, complain like python would
+        # If we got any unhandled keywords, complain like Python would
         if kwargs:
             raise TypeError('__init__() got unexpected keyword arguments: %s'
                             % (kwargs.keys(),))
@@ -433,7 +435,8 @@ class KazooClient(object):
             return
 
         if state in (KeeperState.CONNECTED, KeeperState.CONNECTED_RO):
-            self.logger.info("Zookeeper connection established, state: %s", 
state)
+            self.logger.info("Zookeeper connection established, "
+                             "state: %s", state)
             self._live.set()
             self._make_state_change(KazooState.CONNECTED)
         elif state in LOST_STATES:
@@ -510,12 +513,12 @@ class KazooClient(object):
         self._queue.append((request, async_object))
 
         # wake the connection, guarding against a race with close()
-        write_pipe = self._connection._write_pipe
-        if write_pipe is None:
+        write_sock = self._connection._write_sock
+        if write_sock is None:
             async_object.set_exception(ConnectionClosedError(
                 "Connection has been closed"))
         try:
-            pipe_or_sock_write(write_pipe, b'\0')
+            write_sock.send(b'\0')
         except:
             async_object.set_exception(ConnectionClosedError(
                 "Connection has been closed"))
@@ -585,7 +588,7 @@ class KazooClient(object):
 
         self._stopped.set()
         self._queue.append((CloseInstance, None))
-        pipe_or_sock_write(self._connection._write_pipe, b'\0')
+        self._connection._write_sock.send(b'\0')
         self._safe_close()
 
     def restart(self):
@@ -622,7 +625,7 @@ class KazooClient(object):
         if not self._live.is_set():
             raise ConnectionLoss("No connection to server")
 
-        peer = self._connection._socket.getpeername()
+        peer = self._connection._socket.getpeername()[:2]
         sock = self.handler.create_connection(
             peer, timeout=self._session_timeout / 1000.0)
         sock.sendall(cmd)
@@ -786,7 +789,7 @@ class KazooClient(object):
         """
         acl = acl or self.default_acl
         return self.create_async(path, value, acl=acl, ephemeral=ephemeral,
-            sequence=sequence, makepath=makepath).get()
+                                 sequence=sequence, makepath=makepath).get()
 
     def create_async(self, path, value=b"", acl=None, ephemeral=False,
                      sequence=False, makepath=False):
@@ -828,7 +831,8 @@ class KazooClient(object):
 
         @capture_exceptions(async_result)
         def do_create():
-            result = self._create_async_inner(path, value, acl, flags, 
trailing=sequence)
+            result = self._create_async_inner(
+                path, value, acl, flags, trailing=sequence)
             result.rawlink(create_completion)
 
         @capture_exceptions(async_result)
@@ -867,10 +871,13 @@ class KazooClient(object):
         return async_result
 
     def ensure_path(self, path, acl=None):
-        """Recursively create a path if it doesn't exist.
+        """Recursively create a path if it doesn't exist. Also return value 
indicates
+        if path already existed or had to be created.
 
         :param path: Path of node.
         :param acl: Permissions for node.
+        :returns `True` if path existed, `False` otherwise.
+        :rtype: bool
 
         """
         return self.ensure_path_async(path, acl).get()
@@ -1291,6 +1298,13 @@ class TransactionRequest(object):
     Transactions are not thread-safe and should not be accessed from
     multiple threads at once.
 
+    .. note::
+
+        The ``committed`` attribute only indicates whether this
+        transaction has been sent to Zookeeper and is used to prevent
+        duplicate commits of the same transaction. The result should be
+        checked to determine if the transaction executed as desired.
+
     .. versionadded:: 0.6
         Requires Zookeeper 3.4+
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/ce647d39/slider-agent/src/main/python/kazoo/handlers/utils.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/handlers/utils.py 
b/slider-agent/src/main/python/kazoo/handlers/utils.py
index 385495e..93cfdb5 100644
--- a/slider-agent/src/main/python/kazoo/handlers/utils.py
+++ b/slider-agent/src/main/python/kazoo/handlers/utils.py
@@ -23,47 +23,7 @@ def _set_default_tcpsock_options(module, sock):
         _set_fd_cloexec(sock)
     return sock
 
-def pipe_or_sock_read(p_or_s, n):
-    ''' Use a socket or a pipe to read something'''
-    if isinstance(p_or_s, int):
-        # This is a pipe
-        return os.read(p_or_s, n)
-    else:
-        return p_or_s.recv(n)
-
-def pipe_or_sock_close(p_or_s):
-    ''' Closes either a socket or a pipe'''
-    if isinstance(p_or_s, int):
-        os.close(p_or_s)
-    else:
-        p_or_s.close()
-        
-def pipe_or_sock_write(p_or_s, b):
-    ''' Read from a socket or a pipe depending on what is passed'''
-    if isinstance(p_or_s, int):
-        # This is a pipe
-        os.write(p_or_s,b)
-    else:
-        p_or_s.send(b)
-        
-def create_pipe_or_sock():
-    """ Create a non-blocking read/write pipe.
-        On Windows create a pair of sockets
-    """
-    if sys.platform == "win32":
-        r, w = create_sock_pair()
-    else:
-        r, w = os.pipe()
-        if HAS_FNCTL:
-            fcntl.fcntl(r, fcntl.F_SETFL, os.O_NONBLOCK)
-            fcntl.fcntl(w, fcntl.F_SETFL, os.O_NONBLOCK)
-            _set_fd_cloexec(r)
-            _set_fd_cloexec(w)
-    return r, w
-
-
-
-def create_sock_pair(port=0):
+def create_socket_pair(port=0):
     """Create socket pair.
 
     If socket.socketpair isn't available, we emulate it.

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/ce647d39/slider-agent/src/main/python/kazoo/protocol/connection.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/protocol/connection.py 
b/slider-agent/src/main/python/kazoo/protocol/connection.py
index 7872928..6b89c18 100644
--- a/slider-agent/src/main/python/kazoo/protocol/connection.py
+++ b/slider-agent/src/main/python/kazoo/protocol/connection.py
@@ -17,7 +17,7 @@ from kazoo.exceptions import (
     SessionExpiredError,
     NoNodeError
 )
-from kazoo.handlers.utils import create_pipe_or_sock, pipe_or_sock_read, 
pipe_or_sock_write, pipe_or_sock_close
+from kazoo.handlers.utils import create_socket_pair
 from kazoo.loggingsupport import BLATHER
 from kazoo.protocol.serialization import (
     Auth,
@@ -146,8 +146,8 @@ class ConnectionHandler(object):
         self.connection_stopped.set()
         self.ping_outstanding = client.handler.event_object()
 
-        self._read_pipe = None
-        self._write_pipe = None
+        self._read_sock = None
+        self._write_sock = None
 
         self._socket = None
         self._xid = None
@@ -169,7 +169,7 @@ class ConnectionHandler(object):
     def start(self):
         """Start the connection up"""
         if self.connection_closed.is_set():
-            self._read_pipe, self._write_pipe = create_pipe_or_sock()
+            self._read_sock, self._write_sock = create_socket_pair()
             self.connection_closed.clear()
         if self._connection_routine:
             raise Exception("Unable to start, connection routine already "
@@ -192,12 +192,12 @@ class ConnectionHandler(object):
         if not self.connection_stopped.is_set():
             raise Exception("Cannot close connection until it is stopped")
         self.connection_closed.set()
-        wp, rp = self._write_pipe, self._read_pipe
-        self._write_pipe = self._read_pipe = None
-        if wp is not None:
-            pipe_or_sock_close(wp)
-        if rp is not None:
-            pipe_or_sock_close(rp)
+        ws, rs = self._write_sock, self._read_sock
+        self._write_sock = self._read_sock = None
+        if ws is not None:
+            ws.close()
+        if rs is not None:
+            rs.close()
 
     def _server_pinger(self):
         """Returns a server pinger iterable, that will ping the next
@@ -238,8 +238,8 @@ class ConnectionHandler(object):
         if xid:
             header, buffer, offset = self._read_header(timeout)
             if header.xid != xid:
-                raise RuntimeError('xids do not match, expected %r received 
%r',
-                                   xid, header.xid)
+                raise RuntimeError('xids do not match, expected %r '
+                                   'received %r', xid, header.xid)
             if header.zxid > 0:
                 zxid = header.zxid
             if header.err:
@@ -257,8 +257,9 @@ class ConnectionHandler(object):
             try:
                 obj, _ = request.deserialize(msg, 0)
             except Exception:
-                self.logger.exception("Exception raised during deserialization"
-                                      " of request: %s", request)
+                self.logger.exception(
+                    "Exception raised during deserialization "
+                    "of request: %s", request)
 
                 # raise ConnectionDropped so connect loop will retry
                 raise ConnectionDropped('invalid server response')
@@ -276,8 +277,9 @@ class ConnectionHandler(object):
         if request.type:
             b.extend(int_struct.pack(request.type))
         b += request.serialize()
-        self.logger.log((BLATHER if isinstance(request, Ping) else 
logging.DEBUG),
-                        "Sending request(xid=%s): %s", xid, request)
+        self.logger.log(
+            (BLATHER if isinstance(request, Ping) else logging.DEBUG),
+            "Sending request(xid=%s): %s", xid, request)
         self._write(int_struct.pack(len(b)) + b, timeout)
 
     def _write(self, msg, timeout):
@@ -358,8 +360,9 @@ class ConnectionHandler(object):
                 try:
                     response = request.deserialize(buffer, offset)
                 except Exception as exc:
-                    self.logger.exception("Exception raised during 
deserialization"
-                                          " of request: %s", request)
+                    self.logger.exception(
+                        "Exception raised during deserialization "
+                        "of request: %s", request)
                     async_object.set_exception(exc)
                     return
                 self.logger.debug(
@@ -415,11 +418,11 @@ class ConnectionHandler(object):
         except IndexError:
             # Not actually something on the queue, this can occur if
             # something happens to cancel the request such that we
-            # don't clear the pipe below after sending
+            # don't clear the socket below after sending
             try:
                 # Clear possible inconsistence (no request in the queue
-                # but have data in the read pipe), which causes cpu to spin.
-                pipe_or_sock_read(self._read_pipe, 1)
+                # but have data in the read socket), which causes cpu to spin.
+                self._read_sock.recv(1)
             except OSError:
                 pass
             return
@@ -440,7 +443,7 @@ class ConnectionHandler(object):
 
         self._submit(request, connect_timeout, xid)
         client._queue.popleft()
-        pipe_or_sock_read(self._read_pipe, 1)
+        self._read_sock.recv(1)
         client._pending.append((request, async_object, xid))
 
     def _send_ping(self, connect_timeout):
@@ -519,7 +522,7 @@ class ConnectionHandler(object):
                 jitter_time = random.randint(0, 40) / 100.0
                 # Ensure our timeout is positive
                 timeout = max([read_timeout / 2.0 - jitter_time, jitter_time])
-                s = self.handler.select([self._socket, self._read_pipe],
+                s = self.handler.select([self._socket, self._read_sock],
                                         [], [], timeout)[0]
 
                 if not s:
@@ -570,9 +573,9 @@ class ConnectionHandler(object):
         self.logger.info('Connecting to %s:%s', host, port)
 
         self.logger.log(BLATHER,
-                          '    Using session_id: %r session_passwd: %s',
-                          client._session_id,
-                          hexlify(client._session_passwd))
+                        '    Using session_id: %r session_passwd: %s',
+                        client._session_id,
+                        hexlify(client._session_passwd))
 
         with self._socket_error_handling():
             self._socket = self.handler.create_connection(
@@ -584,7 +587,8 @@ class ConnectionHandler(object):
                           client._session_id or 0, client._session_passwd,
                           client.read_only)
 
-        connect_result, zxid = self._invoke(client._session_timeout, connect)
+        connect_result, zxid = self._invoke(
+            client._session_timeout / 1000.0, connect)
 
         if connect_result.time_out <= 0:
             raise SessionExpiredError("Session has expired")
@@ -601,13 +605,13 @@ class ConnectionHandler(object):
         client._session_passwd = connect_result.passwd
 
         self.logger.log(BLATHER,
-                          'Session created, session_id: %r session_passwd: 
%s\n'
-                          '    negotiated session timeout: %s\n'
-                          '    connect timeout: %s\n'
-                          '    read timeout: %s', client._session_id,
-                          hexlify(client._session_passwd),
-                          negotiated_session_timeout, connect_timeout,
-                          read_timeout)
+                        'Session created, session_id: %r session_passwd: %s\n'
+                        '    negotiated session timeout: %s\n'
+                        '    connect timeout: %s\n'
+                        '    read timeout: %s', client._session_id,
+                        hexlify(client._session_passwd),
+                        negotiated_session_timeout, connect_timeout,
+                        read_timeout)
 
         if connect_result.read_only:
             client._session_callback(KeeperState.CONNECTED_RO)
@@ -618,7 +622,7 @@ class ConnectionHandler(object):
 
         for scheme, auth in client.auth_data:
             ap = Auth(0, scheme, auth)
-            zxid = self._invoke(connect_timeout, ap, xid=AUTH_XID)
+            zxid = self._invoke(connect_timeout / 1000.0, ap, xid=AUTH_XID)
             if zxid:
                 client.last_zxid = zxid
         return read_timeout, connect_timeout

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/ce647d39/slider-agent/src/main/python/kazoo/tests/test_client.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_client.py 
b/slider-agent/src/main/python/kazoo/tests/test_client.py
index 5807292..f851b63 100644
--- a/slider-agent/src/main/python/kazoo/tests/test_client.py
+++ b/slider-agent/src/main/python/kazoo/tests/test_client.py
@@ -72,7 +72,8 @@ class TestClientConstructor(unittest.TestCase):
 
     def test_invalid_handler(self):
         from kazoo.handlers.threading import SequentialThreadingHandler
-        self.assertRaises(ConfigurationError,
+        self.assertRaises(
+            ConfigurationError,
             self._makeOne, handler=SequentialThreadingHandler)
 
     def test_chroot(self):
@@ -91,7 +92,7 @@ class TestClientConstructor(unittest.TestCase):
 
     def test_ordered_host_selection(self):
         client = self._makeOne(hosts='127.0.0.1:9,127.0.0.2:9/a',
-            randomize_hosts=False)
+                               randomize_hosts=False)
         hosts = [h for h in client.hosts]
         eq_(hosts, [('127.0.0.1', 9), ('127.0.0.2', 9)])
 
@@ -371,29 +372,29 @@ class TestConnection(KazooTestCase):
         client = self.client
         client.stop()
 
-        write_pipe = client._connection._write_pipe
+        write_sock = client._connection._write_sock
 
-        # close the connection to free the pipe
+        # close the connection to free the socket
         client.close()
-        eq_(client._connection._write_pipe, None)
+        eq_(client._connection._write_sock, None)
 
         # sneak in and patch client to simulate race between a thread
         # calling stop(); close() and one running a command
         oldstate = client._state
         client._state = KeeperState.CONNECTED
-        client._connection._write_pipe = write_pipe
+        client._connection._write_sock = write_sock
         try:
-            # simulate call made after write pipe is closed
+            # simulate call made after write socket is closed
             self.assertRaises(ConnectionClosedError, client.exists, '/')
 
-            # simualte call made after write pipe is set to None
-            client._connection._write_pipe = None
+            # simulate call made after write socket is set to None
+            client._connection._write_sock = None
             self.assertRaises(ConnectionClosedError, client.exists, '/')
 
         finally:
             # reset for teardown
             client._state = oldstate
-            client._connection._write_pipe = None
+            client._connection._write_sock = None
 
 
 class TestClient(KazooTestCase):
@@ -544,9 +545,9 @@ class TestClient(KazooTestCase):
         client = self.client
         client.create("/1", b"ephemeral", ephemeral=True)
         self.assertRaises(NoChildrenForEphemeralsError,
-            client.create, "/1/2", b"val1")
+                          client.create, "/1/2", b"val1")
         self.assertRaises(NoChildrenForEphemeralsError,
-            client.create, "/1/2", b"val1", ephemeral=True)
+                          client.create, "/1/2", b"val1", ephemeral=True)
 
     def test_create_sequence(self):
         client = self.client
@@ -560,8 +561,8 @@ class TestClient(KazooTestCase):
 
     def test_create_ephemeral_sequence(self):
         basepath = "/" + uuid.uuid4().hex
-        realpath = self.client.create(basepath, b"sandwich", sequence=True,
-            ephemeral=True)
+        realpath = self.client.create(basepath, b"sandwich",
+                                      sequence=True, ephemeral=True)
         self.assertTrue(basepath != realpath and realpath.startswith(basepath))
         data, stat = self.client.get(realpath)
         eq_(data, b"sandwich")
@@ -575,33 +576,35 @@ class TestClient(KazooTestCase):
         data, stat = self.client.get("/1/2/3/4/5")
         eq_(data, b"val2")
 
-        self.assertRaises(NodeExistsError, self.client.create, "/1/2/3/4/5",
-            b"val2", makepath=True)
+        self.assertRaises(NodeExistsError, self.client.create,
+                          "/1/2/3/4/5", b"val2", makepath=True)
 
     def test_create_makepath_incompatible_acls(self):
         from kazoo.client import KazooClient
         from kazoo.security import make_digest_acl_credential, CREATOR_ALL_ACL
         credential = make_digest_acl_credential("username", "password")
-        alt_client = KazooClient(self.cluster[0].address + self.client.chroot,
+        alt_client = KazooClient(
+            self.cluster[0].address + self.client.chroot,
             max_retries=5, auth_data=[("digest", credential)])
         alt_client.start()
         alt_client.create("/1/2", b"val2", makepath=True, acl=CREATOR_ALL_ACL)
 
         try:
-            self.assertRaises(NoAuthError, self.client.create, "/1/2/3/4/5",
-                b"val2", makepath=True)
+            self.assertRaises(NoAuthError, self.client.create,
+                              "/1/2/3/4/5", b"val2", makepath=True)
         finally:
             alt_client.delete('/', recursive=True)
             alt_client.stop()
 
     def test_create_no_makepath(self):
-        self.assertRaises(NoNodeError, self.client.create, "/1/2", b"val1")
-        self.assertRaises(NoNodeError, self.client.create, "/1/2", b"val1",
-            makepath=False)
+        self.assertRaises(NoNodeError, self.client.create,
+                          "/1/2", b"val1")
+        self.assertRaises(NoNodeError, self.client.create,
+                          "/1/2", b"val1", makepath=False)
 
         self.client.create("/1/2", b"val1", makepath=True)
-        self.assertRaises(NoNodeError, self.client.create, "/1/2/3/4", b"val1",
-            makepath=False)
+        self.assertRaises(NoNodeError, self.client.create,
+                          "/1/2/3/4", b"val1", makepath=False)
 
     def test_create_exists(self):
         from kazoo.exceptions import NodeExistsError
@@ -844,7 +847,7 @@ class TestClient(KazooTestCase):
         client = self.client
         self.assertRaises(NoNodeError, client.get_children, '/none')
         self.assertRaises(NoNodeError, client.get_children,
-            '/none', include_data=True)
+                          '/none', include_data=True)
 
     def test_get_children_invalid_path(self):
         client = self.client
@@ -855,7 +858,7 @@ class TestClient(KazooTestCase):
         self.assertRaises(TypeError, client.get_children, ('a', 'b'))
         self.assertRaises(TypeError, client.get_children, 'a', watch=True)
         self.assertRaises(TypeError, client.get_children,
-            'a', include_data='yes')
+                          'a', include_data='yes')
 
     def test_invalid_auth(self):
         from kazoo.exceptions import AuthFailedError

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/ce647d39/slider-agent/src/main/python/kazoo/tests/test_connection.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_connection.py 
b/slider-agent/src/main/python/kazoo/tests/test_connection.py
index d89a2c6..c8c4581 100644
--- a/slider-agent/src/main/python/kazoo/tests/test_connection.py
+++ b/slider-agent/src/main/python/kazoo/tests/test_connection.py
@@ -43,8 +43,9 @@ class Delete(namedtuple('Delete', 'path version')):
 class TestConnectionHandler(KazooTestCase):
     def test_bad_deserialization(self):
         async_object = self.client.handler.async_result()
-        self.client._queue.append((Delete(self.client.chroot, -1), 
async_object))
-        os.write(self.client._connection._write_pipe, b'\0')
+        self.client._queue.append(
+            (Delete(self.client.chroot, -1), async_object))
+        self.client._connection._write_sock.send(b'\0')
 
         @raises(ValueError)
         def testit():
@@ -185,75 +186,51 @@ class TestConnectionHandler(KazooTestCase):
         # should be able to restart
         self.client.start()
 
-    def test_connection_pipe(self):
+    def test_connection_sock(self):
         client = self.client
-        read_pipe = client._connection._read_pipe
-        write_pipe = client._connection._write_pipe
+        read_sock = client._connection._read_sock
+        write_sock = client._connection._write_sock
 
-        assert read_pipe is not None
-        assert write_pipe is not None
+        assert read_sock is not None
+        assert write_sock is not None
 
-        # stop client and pipe should not yet be closed
+        # stop client and socket should not yet be closed
         client.stop()
-        assert read_pipe is not None
-        assert write_pipe is not None
-        if sys.platform != "win32"
-            os.fstat(read_pipe)
-            os.fstat(write_pipe)
-        else:
-            read_pipe.getsockname()
-            write_pipe.getsockname()
+        assert read_sock is not None
+        assert write_sock is not None
+        
+        read_sock.getsockname()
+        write_sock.getsockname()
 
-        # close client, and pipes should be
+        # close client, and sockets should be closed
         client.close()
 
-        if sys.platform != "win32"
-            try:
-                os.fstat(read_pipe)
-            except OSError as e:
-                if not e.errno == errno.EBADF:
-                    raise
-            else:
-                self.fail("Expected read_pipe to be closed")
-    
-            try:
-                os.fstat(write_pipe)
-            except OSError as e:
-                if not e.errno == errno.EBADF:
-                    raise
-            else:
-                self.fail("Expected write_pipe to be closed")
-        else:
-            pass # Not sure what to do here
-            
-        # start client back up. should get a new, valid pipe
+        # Todo check socket closing
+                   
+        # start client back up. should get a new, valid socket
         client.start()
-        read_pipe = client._connection._read_pipe
-        write_pipe = client._connection._write_pipe
-
-        assert read_pipe is not None
-        assert write_pipe is not None
-        if sys.platform != "win32"
-            os.fstat(read_pipe)
-            os.fstat(write_pipe)
-        else:
-            read_pipe.getsockname()
-            write_pipe.getsockname()
+        read_sock = client._connection._read_sock
+        write_sock = client._connection._write_sock
+
+        assert read_sock is not None
+        assert write_sock is not None
+        read_sock.getsockname()
+        write_sock.getsockname()
             
 
-    def test_dirty_pipe(self):
+    def test_dirty_sock(self):
         client = self.client
-        read_pipe = client._connection._read_pipe
-        write_pipe = client._connection._write_pipe
+        read_sock = client._connection._read_sock
+        write_sock = client._connection._write_sock
 
-        # add a stray byte to the pipe and ensure that doesn't
+        # add a stray byte to the socket and ensure that doesn't
         # blow up client. simulates case where some error leaves
-        # a byte in the pipe which doesn't correspond to the
+        # a byte in the socket which doesn't correspond to the
         # request queue.
-        pipe_or_sock_write(write_pipe, b'\0')
+        write_sock.send(b'\0')
 
-        # eventually this byte should disappear from pipe
-        wait(lambda: client.handler.select([read_pipe], [], [], 0)[0] == [])
+        # eventually this byte should disappear from socket
+        wait(lambda: client.handler.select([read_sock], [], [], 0)[0] == [])
 
 
 class TestConnectionDrop(KazooTestCase):

Reply via email to