commit: 40306beadf35659ce6dbe40ffe10c677e6e13918
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sat Nov 16 08:52:51 2019 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sat Nov 16 08:54:49 2019 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=40306bea
SubprocessExecTestCase: test pipe between processes
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
.../util/futures/asyncio/test_subprocess_exec.py | 36 ++++++++++++++++++++--
1 file changed, 34 insertions(+), 2 deletions(-)
diff --git a/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py
b/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py
index 61646cb92..d7e94d132 100644
--- a/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py
+++ b/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py
@@ -1,4 +1,4 @@
-# Copyright 2018 Gentoo Foundation
+# Copyright 2018-2019 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import os
@@ -10,7 +10,6 @@ from portage.tests import TestCase
from portage.util._eventloop.global_event_loop import global_event_loop
from portage.util.futures import asyncio
from portage.util.futures._asyncio import create_subprocess_exec
-from portage.util.futures._asyncio.streams import _reader as reader
from portage.util.futures.compat_coroutine import coroutine, coroutine_return
from portage.util.futures.unix_events import DefaultEventLoopPolicy
@@ -94,6 +93,39 @@ class SubprocessExecTestCase(TestCase):
self._run_test(test)
+ def testPipe(self):
+ stdin_data = b'hello world'
+ cat_binary = find_binary("cat")
+ self.assertNotEqual(cat_binary, None)
+ cat_binary = cat_binary.encode()
+
+ echo_binary = find_binary("echo")
+ self.assertNotEqual(echo_binary, None)
+ echo_binary = echo_binary.encode()
+
+ def test(loop):
+
+ pr, pw = os.pipe()
+
+ cat_proc =
loop.run_until_complete(create_subprocess_exec(
+ cat_binary, stdin=pr, stdout=subprocess.PIPE,
+ loop=loop))
+
+ echo_proc =
loop.run_until_complete(create_subprocess_exec(
+ echo_binary, b'-n', stdin_data, stdout=pw,
+ loop=loop))
+
+ os.close(pr)
+ os.close(pw)
+
+ out, err =
loop.run_until_complete(cat_proc.communicate())
+
+
self.assertEqual(loop.run_until_complete(cat_proc.wait()), os.EX_OK)
+
self.assertEqual(loop.run_until_complete(echo_proc.wait()), os.EX_OK)
+ self.assertEqual(out, stdin_data)
+
+ self._run_test(test)
+
def testReadTransport(self):
"""
Test asyncio.create_subprocess_exec(stdout=subprocess.PIPE)
which