commit:     3b0c52b9ef364dc8e69208ec5341255ac94d41d8
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sat May 26 10:44:38 2018 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sat May 26 10:44:38 2018 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=3b0c52b9

PipeReaderTestCase: cover sockets and named pipes

 pym/portage/tests/process/test_poll.py | 74 ++++++++++++++++++++++------------
 1 file changed, 49 insertions(+), 25 deletions(-)

diff --git a/pym/portage/tests/process/test_poll.py 
b/pym/portage/tests/process/test_poll.py
index 596ea3088..d71c9b59c 100644
--- a/pym/portage/tests/process/test_poll.py
+++ b/pym/portage/tests/process/test_poll.py
@@ -1,11 +1,16 @@
-# Copyright 1998-2013 Gentoo Foundation
+# Copyright 1998-2018 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
+import functools
+import pty
+import shutil
+import socket
+import sys
 import subprocess
+import tempfile
 
 from portage import os
 from portage.tests import TestCase
-from portage.util._pty import _create_pty_or_pipe
 from portage.util._async.PopenProcess import PopenProcess
 from portage.util._eventloop.global_event_loop import global_event_loop
 from _emerge.PipeReader import PipeReader
@@ -13,28 +18,47 @@ from _emerge.PipeReader import PipeReader
 class PipeReaderTestCase(TestCase):
 
        _use_array = False
-       _use_pty = False
        _echo_cmd = "echo -n '%s'"
 
-       def _testPipeReader(self, test_string):
+       def test_pipe(self):
+               def make_pipes():
+                       return os.pipe(), None
+               self._do_test(make_pipes)
+
+       def test_pty_device(self):
+               def make_pipes():
+                       try:
+                               return pty.openpty(), None
+                       except EnvironmentError:
+                               self.skipTest('pty not available')
+               self._do_test(make_pipes)
+
+       def test_domain_socket(self):
+               def make_pipes():
+                       if sys.version_info >= (3, 2):
+                               read_end, write_end = socket.socketpair()
+                               return (read_end.detach(), write_end.detach()), 
None
+                       else:
+                               self.skipTest('socket detach not supported')
+               self._do_test(make_pipes)
+
+       def test_named_pipe(self):
+               def make_pipes():
+                       tempdir = tempfile.mkdtemp()
+                       fifo_path = os.path.join(tempdir, 'fifo')
+                       os.mkfifo(fifo_path)
+                       return ((os.open(fifo_path, os.O_NONBLOCK|os.O_RDONLY),
+                               os.open(fifo_path, os.O_NONBLOCK|os.O_WRONLY)),
+                               functools.partial(shutil.rmtree, tempdir))
+               self._do_test(make_pipes)
+
+       def _testPipeReader(self, master_fd, slave_fd, test_string):
                """
                Use a poll loop to read data from a pipe and assert that
                the data written to the pipe is identical to the data
                read from the pipe.
                """
 
-               if self._use_pty:
-                       got_pty, master_fd, slave_fd = _create_pty_or_pipe()
-                       if not got_pty:
-                               os.close(slave_fd)
-                               os.close(master_fd)
-                               skip_reason = "pty not acquired"
-                               self.portage_skip = skip_reason
-                               self.fail(skip_reason)
-                               return
-               else:
-                       master_fd, slave_fd = os.pipe()
-
                # WARNING: It is very important to use unbuffered mode here,
                # in order to avoid issue 5380 with python3.
                master_file = os.fdopen(master_fd, 'rb', 0)
@@ -60,15 +84,18 @@ class PipeReaderTestCase(TestCase):
 
                return consumer.getvalue().decode('ascii', 'replace')
 
-       def testPipeReader(self):
+       def _do_test(self, make_pipes):
                for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14):
                        test_string = x * "a"
-                       output = self._testPipeReader(test_string)
-                       self.assertEqual(test_string, output,
-                               "x = %s, len(output) = %s" % (x, len(output)))
+                       (read_end, write_end), cleanup = make_pipes()
+                       try:
+                               output = self._testPipeReader(read_end, 
write_end, test_string)
+                               self.assertEqual(test_string, output,
+                                       "x = %s, len(output) = %s" % (x, 
len(output)))
+                       finally:
+                               if cleanup is not None:
+                                       cleanup()
 
-class PipeReaderPtyTestCase(PipeReaderTestCase):
-       _use_pty = True
 
 class PipeReaderArrayTestCase(PipeReaderTestCase):
 
@@ -81,6 +108,3 @@ class PipeReaderArrayTestCase(PipeReaderTestCase):
                # https://bugs.python.org/issue5380
                # https://bugs.pypy.org/issue956
                self.todo = True
-
-class PipeReaderPtyArrayTestCase(PipeReaderArrayTestCase):
-       _use_pty = True

Reply via email to