Script 'mail_helper' called by obssrc Hello community, here is the log from the commit of package python-sarge for openSUSE:Factory checked in at 2022-10-15 16:37:39 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-sarge (Old) and /work/SRC/openSUSE:Factory/.python-sarge.new.2275 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-sarge" Sat Oct 15 16:37:39 2022 rev:2 rq:1010966 version:0.1.7.post1 Changes: -------- --- /work/SRC/openSUSE:Factory/python-sarge/python-sarge.changes 2019-02-26 22:18:45.738142846 +0100 +++ /work/SRC/openSUSE:Factory/.python-sarge.new.2275/python-sarge.changes 2022-10-15 16:40:30.482593265 +0200 @@ -1,0 +2,8 @@ +Fri Oct 7 15:19:47 UTC 2022 - Yogalakshmi Arunachalam <[email protected]> + +- Update to version 0.1.7.post1 + * Fixed #50: Initialized commands attribute in a constructor. + * Fixed #52: Updated documentation to show improved command line parsing under Windows. + * Fixed #53: Added waiter.py to the manifest so that the test suite can be run. + +------------------------------------------------------------------- Old: ---- sarge-0.1.5.post0.tar.gz New: ---- sarge-0.1.7.post1.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-sarge.spec ++++++ --- /var/tmp/diff_new_pack.2WDLDW/_old 2022-10-15 16:40:31.054594640 +0200 +++ /var/tmp/diff_new_pack.2WDLDW/_new 2022-10-15 16:40:31.058594650 +0200 @@ -1,7 +1,7 @@ # # spec file for package python-sarge # -# Copyright (c) 2019 SUSE LINUX GmbH, Nuernberg, Germany. +# Copyright (c) 2022 SUSE LLC # # All modifications and additions to the file contributed by third parties # remain the property of their copyright owners, unless otherwise agreed @@ -13,20 +13,21 @@ # published by the Open Source Initiative. # Please submit bugfixes or comments via https://bugs.opensuse.org/ +# %{?!python_module:%define python_module() python-%{**} python3-%{**}} Name: python-sarge -Version: 0.1.5.post0 +Version: 0.1.7.post1 Release: 0 License: BSD-3-Clause Summary: Command pipelines for python -Url: https://sarge.readthedocs.org/ +URL: https://sarge.readthedocs.org/ Group: Development/Languages/Python Source: https://files.pythonhosted.org/packages/source/s/sarge/sarge-%{version}.tar.gz -BuildRequires: python-rpm-macros BuildRequires: %{python_module setuptools} BuildRequires: fdupes +BuildRequires: python-rpm-macros BuildArch: noarch %python_subpackages ++++++ sarge-0.1.5.post0.tar.gz -> sarge-0.1.7.post1.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/sarge-0.1.5.post0/LICENSE new/sarge-0.1.7.post1/LICENSE --- old/sarge-0.1.5.post0/LICENSE 2016-09-08 02:48:52.000000000 +0200 +++ new/sarge-0.1.7.post1/LICENSE 2021-12-10 20:59:16.000000000 +0100 @@ -1,4 +1,4 @@ -Copyright (c) 2012-2014 by Vinay Sajip. +Copyright (c) 2012-2021 by Vinay Sajip. All rights reserved. Redistribution and use in source and binary forms, with or without diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/sarge-0.1.5.post0/PKG-INFO new/sarge-0.1.7.post1/PKG-INFO --- old/sarge-0.1.5.post0/PKG-INFO 2018-06-19 13:18:12.000000000 +0200 +++ new/sarge-0.1.7.post1/PKG-INFO 2021-12-11 23:40:32.000000000 +0100 @@ -1,12 +1,12 @@ Metadata-Version: 1.1 Name: sarge -Version: 0.1.5.post0 +Version: 0.1.7.post1 Summary: A wrapper for subprocess which provides command pipeline functionality. Home-page: http://sarge.readthedocs.org/ Author: Vinay Sajip Author-email: [email protected] License: BSD -Download-URL: http://pypi.python.org/packages/source/s/sarge/sarge-0.1.5.post0.tar.gz +Download-URL: https://github.com/vsajip/sarge/releases/download/0.1.7.post1/sarge-0.1.7.post1.tar.gz Description: The sarge package provides a wrapper for subprocess which provides command pipeline functionality. @@ -14,22 +14,18 @@ pipelines with a Posix flavour: you can have chains of commands using ``;``, ``&``, pipes using ``|`` and ``|&``, and redirection. - The latest version of sarge can be found on `BitBucket <https://bitbucket.org/vinay.sajip/sarge/>`_:: + The latest version of sarge can be found on `GitHub <https://github.com/vsajip/sarge/>`_. - https://bitbucket.org/vinay.sajip/sarge/ - - The latest documentation (kept updated between releases) is on `Read The Docs <http://sarge.readthedocs.org/>`_:: - - http://sarge.readthedocs.org/ + The latest documentation (kept updated between releases) is on `Read The Docs <http://sarge.readthedocs.org/>`_: Please report any problems or suggestions for improvement either via the `mailing list <http://groups.google.com/group/python-sarge/>`_ or the `issue - tracker <https://bitbucket.org/vinay.sajip/sarge/issues/new>`_. + tracker <https://github.com/vsajip/sarge/issues/new/choose>`_. Keywords: subprocess,wrapper,external,command Platform: Any -Classifier: Development Status :: 4 - Beta +Classifier: Development Status :: 5 - Production/Stable Classifier: Environment :: Console Classifier: Environment :: MacOS X Classifier: Environment :: Win32 (MS Windows) @@ -44,11 +40,10 @@ Classifier: Programming Language :: Python :: 2 Classifier: Programming Language :: Python :: 2.7 Classifier: Programming Language :: Python :: 3 -Classifier: Programming Language :: Python :: 3.3 -Classifier: Programming Language :: Python :: 3.4 -Classifier: Programming Language :: Python :: 3.5 Classifier: Programming Language :: Python :: 3.6 Classifier: Programming Language :: Python :: 3.7 +Classifier: Programming Language :: Python :: 3.8 +Classifier: Programming Language :: Python :: 3.9 Classifier: Programming Language :: Python :: Implementation Classifier: Topic :: Software Development :: Libraries Classifier: Topic :: Software Development :: Libraries :: Python Modules diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/sarge-0.1.5.post0/README.rst new/sarge-0.1.7.post1/README.rst --- old/sarge-0.1.5.post0/README.rst 2016-09-08 02:48:52.000000000 +0200 +++ new/sarge-0.1.7.post1/README.rst 2021-12-10 19:37:03.000000000 +0100 @@ -28,15 +28,11 @@ Availability & Documentation ---------------------------- -The latest version of sarge can be found on `BitBucket <https://bitbucket.org/vinay.sajip/sarge/>`_:: +The latest version of sarge can be found on `GitHub <https://github.com/vsajip/sarge/>`_. - https://bitbucket.org/vinay.sajip/sarge/ - -The latest documentation (kept updated between releases) is on `Read The Docs <http://sarge.readthedocs.org/>`_:: - - http://sarge.readthedocs.org/ +The latest documentation (kept updated between releases) is on `Read The Docs <http://sarge.readthedocs.org/>`_: Please report any problems or suggestions for improvement either via the `mailing list <http://groups.google.com/group/python-sarge/>`_ or the `issue -tracker <https://bitbucket.org/vinay.sajip/sarge/issues/new>`_. +tracker <https://github.com/vsajip/sarge/issues/new/choose>`_. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/sarge-0.1.5.post0/echoer.py new/sarge-0.1.7.post1/echoer.py --- old/sarge-0.1.5.post0/echoer.py 1970-01-01 01:00:00.000000000 +0100 +++ new/sarge-0.1.7.post1/echoer.py 2019-04-09 09:04:12.000000000 +0200 @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013 Vinay M. Sajip. See LICENSE for licensing information. +# +# Part of the test harness for sarge: Subprocess Allegedly Rewards Good Encapsulation :-) +# +import sys + +def main(args=None): + while True: + data = sys.stdin.readline() + if not data: + break + data = data.strip() + data = '%s %s\n' % (data, data) + sys.stdout.write(data) + sys.stdout.flush() + +if __name__ == '__main__': + try: + rc = main() + except Exception as e: + print(e) + rc = 9 + sys.exit(rc) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/sarge-0.1.5.post0/lister.py new/sarge-0.1.7.post1/lister.py --- old/sarge-0.1.5.post0/lister.py 1970-01-01 01:00:00.000000000 +0100 +++ new/sarge-0.1.7.post1/lister.py 2019-04-09 09:04:12.000000000 +0200 @@ -0,0 +1,83 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013 Vinay M. Sajip. See LICENSE for licensing information. +# +# Part of the test harness for sarge: Subprocess Allegedly Rewards Good Encapsulation :-) +# +import logging +import optparse +import os +import re +import sys +import time + +logger = logging.getLogger(__name__) + +def _file_lines(fn): + with open(fn) as f: + for line in f: + yield line + +def _auto_lines(): + i = 1 + while True: + line = 'line %d\n' % i + i += 1 + yield line + +def main(args=None): + parser = optparse.OptionParser(usage='usage: %prog [options] [filename]', + description='Print lines optionally from ' + 'a file, with a delay between ' + 'lines. If no filename is ' + 'specified, lines of the form ' + '"line N" are generated ' + 'internally and printed.') + parser.add_option('-d', '--delay', default=None, type=float, + help='Delay between lines (seconds)') + parser.add_option('-c', '--count', default=0, type=int, + help='Maximum number of lines to output') + parser.add_option('-i', '--interest', default=None, + help='Indicate patterns of interest for logging') + if args is None: + args = sys.argv[1:] + options, args = parser.parse_args(args) + if not args: + liner = _auto_lines() + else: + fn = args[0] + if not os.path.isfile(fn): + sys.stderr.write('not a file: %r\n' % fn) + return 2 + liner = _file_lines(fn) + bytes_written = 0 + pattern = options.interest + if pattern: + pattern = re.compile(pattern) + nlines = 0 + for line in liner: + sys.stdout.write(line) + sys.stdout.flush() + nlines += 1 + bytes_written += len(line) + if pattern and pattern.search(line): + s = ': %r' % line + level = logging.INFO + else: + s = '' + level = logging.DEBUG + logger.log(level, 'Wrote out %d bytes%s', bytes_written, s) + if options.count and nlines >= options.count: + break + if options.delay: + time.sleep(options.delay) + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO, filename='lister.log', + filemode='w', format='%(asctime)s %(levelname)s %(message)s') + try: + rc = main() + except Exception as e: + print(e) + rc = 9 + sys.exit(rc) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/sarge-0.1.5.post0/sarge/__init__.py new/sarge-0.1.7.post1/sarge/__init__.py --- old/sarge-0.1.5.post0/sarge/__init__.py 2018-06-19 13:17:46.000000000 +0200 +++ new/sarge-0.1.7.post1/sarge/__init__.py 2021-12-11 23:40:20.000000000 +0100 @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright (C) 2012-2014 Vinay M. Sajip. See LICENSE for licensing information. +# Copyright (C) 2012-2021 Vinay M. Sajip. See LICENSE for licensing information. # # sarge: Subprocess Allegedly Rewards Good Encapsulation :-) # @@ -37,7 +37,8 @@ 'shell_format', 'run', 'parse_command_line', 'capture_stdout', 'capture_stderr', 'capture_both') -__version__ = '0.1.5.post0' +__version__ = '0.1.7.post1' +__date__ = '2021-12-11' logger = logging.getLogger(__name__) logger.addHandler(NullHandler()) @@ -54,12 +55,14 @@ text_type = unicode binary_type = str string_types = basestring, + _wait_has_timeout = False else: # pragma: no cover PY3 = True text_type = str binary_type = bytes string_types = str, basestring = str + _wait_has_timeout = sys.version_info[:2] >= (3, 3) # This regex determines which shell input needs quoting # because it may be unsafe @@ -587,11 +590,18 @@ :class:`subprocess.Popen'. :param kwargs: The same as you would pass to :class:`subprocess.Popen'. However, the ``env`` parameter is handled differently: it - is treated as a set of *additional* environment variables to - be added to the values in ``os.environ``. + is treated as a set of *additional* environment variables + to be added to the values in ``os.environ``, unless the + ``replace_env`` keyword argument is present and truthy, in + which case the env value is used *in place of* + ``os.environ``. + + .. versionadded:: 0.1.6 + The ``replace_env`` keyword argument was added. """ def __init__(self, args, **kwargs): + replace_env = kwargs.pop('replace_env', False) shell = kwargs.get('shell') if not shell and isinstance(args, string_types): args = list(shell_shlex(args, control='();>|&')) @@ -609,8 +619,11 @@ # special handling: env is added to os.environ e = kwargs.get('env') if e: - env = dict(os.environ) - env.update(e) + if replace_env: + env = e + else: + env = dict(os.environ) + env.update(e) kwargs['env'] = env self.process_ready = threading.Event() self.process = None @@ -653,10 +666,9 @@ logger.debug('About to call Popen: %s, %s', self.args, self.kwargs) try: self.process = p = Popen(self.args, **self.kwargs) - except OSError as e: - if e.errno == errno.ENOENT: + except (OSError, Exception) as e: #pragma: no cover + if isinstance(e, OSError) and e.errno == errno.ENOENT: raise ValueError('Command not found: %s' % self.args[0]) - except Exception as e: #pragma: no cover logger.exception('Popen call failed: %s: %s', type(e), e) raise self.stdin = p.stdin @@ -686,9 +698,10 @@ logger.debug('returning %s (%s)', self, self.process) return self - def wait(self): + def wait(self, timeout=None): """ - Wait for a command's underlying sub-process to complete. + Wait for a command's underlying sub-process to complete. The timeout + parameter only applies for Python >= 3.3 and has no effect otherwise. """ self.process_ready.wait() p = self.process @@ -696,7 +709,10 @@ logger.warning('No process found for %s', self) result = None else: - result = p.wait() + if _wait_has_timeout: + result = p.wait(timeout) + else: + result = p.wait() return result def terminate(self): @@ -1013,6 +1029,7 @@ self.stdout = kwargs.pop('stdout', None) self.stderr = kwargs.pop('stderr', None) self.lock = threading.RLock() + self.commands = [] def find_last_command(self, node): """ @@ -1113,15 +1130,17 @@ for e in self.events: e.wait() - def wait(self): + def wait(self, timeout=None): """ - Wait for all the commands in the pipeline to complete. + Wait for all the commands in the pipeline to complete. The timeout + parameter only applies for Python >= 3.3 and has no effect otherwise. It will + be applied to each command in turn, so the effect could be cumulative. """ logger.debug('pipeline waiting') self.wait_events() for cmd in self.commands: logger.debug('waiting for command %s', cmd) - cmd.wait() + cmd.wait(timeout) def close(self): """ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/sarge-0.1.5.post0/sarge/shlext.py new/sarge-0.1.7.post1/sarge/shlext.py --- old/sarge-0.1.5.post0/sarge/shlext.py 2016-09-08 02:48:52.000000000 +0200 +++ new/sarge-0.1.7.post1/sarge/shlext.py 2019-10-06 09:18:49.000000000 +0200 @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright (C) 2012-2014 Vinay M. Sajip. See LICENSE for licensing information. +# Copyright (C) 2012-2019 Vinay M. Sajip. See LICENSE for licensing information. # # Enhancements in shlex to tokenize closer to the way real shells do # diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/sarge-0.1.5.post0/sarge/utils.py new/sarge-0.1.7.post1/sarge/utils.py --- old/sarge-0.1.5.post0/sarge/utils.py 2018-06-18 12:17:46.000000000 +0200 +++ new/sarge-0.1.7.post1/sarge/utils.py 2019-10-06 09:18:54.000000000 +0200 @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright (C) 2012-2014 Vinay M. Sajip. See LICENSE for licensing information. +# Copyright (C) 2012-2019 Vinay M. Sajip. See LICENSE for licensing information. # # sarge: Subprocess Allegedly Rewards Good Encapsulation :-) # diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/sarge-0.1.5.post0/setup.py new/sarge-0.1.7.post1/setup.py --- old/sarge-0.1.5.post0/setup.py 2018-06-18 11:18:56.000000000 +0200 +++ new/sarge-0.1.7.post1/setup.py 2021-12-11 23:38:30.000000000 +0100 @@ -1,15 +1,17 @@ # -*- coding: utf-8 -*- # -# Copyright (C) 2012-2014 Vinay M. Sajip. See LICENSE for licensing information. +# Copyright (C) 2012-2021 Vinay M. Sajip. See LICENSE for licensing information. # # sarge: Subprocess Allegedly Rewards Good Encapsulation :-) # from distutils.core import setup, Command +import os from os.path import join, dirname, abspath import re import sarge + class TestCommand(Command): user_options = [] @@ -50,13 +52,13 @@ maintainer='Vinay Sajip', maintainer_email='[email protected]', url='http://sarge.readthedocs.org/', - download_url=('http://pypi.python.org/packages/source/s/sarge/' - 'sarge-%s.tar.gz' % sarge.__version__), + download_url=('https://github.com/vsajip/sarge/releases/download/' + sarge.__version__ + + '/sarge-%s.tar.gz' % sarge.__version__), packages=['sarge'], keywords=['subprocess', 'wrapper', 'external', 'command'], platforms=['Any'], classifiers=[ - 'Development Status :: 4 - Beta', + 'Development Status :: 5 - Production/Stable', 'Environment :: Console', 'Environment :: MacOS X', 'Environment :: Win32 (MS Windows)', @@ -71,11 +73,10 @@ 'Programming Language :: Python :: 2', 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.3', - 'Programming Language :: Python :: 3.4', - 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: Implementation', 'Topic :: Software Development :: Libraries', 'Topic :: Software Development :: Libraries :: Python Modules', diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/sarge-0.1.5.post0/stack_tracer.py new/sarge-0.1.7.post1/stack_tracer.py --- old/sarge-0.1.5.post0/stack_tracer.py 1970-01-01 01:00:00.000000000 +0100 +++ new/sarge-0.1.7.post1/stack_tracer.py 2019-04-09 09:04:12.000000000 +0200 @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- +""" +Adapted from http://code.activestate.com/recipes/577334/ + +by L??szl?? Nagy, released under the MIT license. +""" + +import os +import sys +import threading +import time +import traceback + +def _get_stack_traces(): + code = [] + threads = dict((t.ident, t.name) for t in threading.enumerate()) + for threadId, stack in sys._current_frames().items(): + if threadId == threading.current_thread().ident: + continue + threadName = threads.get(threadId, 'Unknown') + code.append('\n# Thread: %s (%s)' % (threadId, threadName)) + for filename, lineno, name, line in traceback.extract_stack(stack): + code.append('File: %r, line %d, in %s' % (filename, lineno, name)) + if line: + code.append(' %s' % (line.strip())) + + return '\n'.join(code) + + +class TraceDumper(threading.Thread): + """Dump stack traces into a given file periodically.""" + + def __init__(self, path, interval): + """ + @param path: File path to output stack trace info. + @param interval: in seconds - how often to update the trace file. + """ + assert(interval > 0.1) + self.interval = interval + self.path = os.path.abspath(path) + self.stop_requested = threading.Event() + threading.Thread.__init__(self) + + def run(self): + while not self.stop_requested.isSet(): + time.sleep(self.interval) + self.write_stack_traces() + + def stop(self): + self.stop_requested.set() + self.join() + + def write_stack_traces(self): + with open(self.path, 'w') as out: + out.write(_get_stack_traces()) + + +_tracer = None + +def start_trace(path, interval=5): + """Start tracing into the given file.""" + global _tracer + if _tracer is None: + _tracer = TraceDumper(path, interval) + _tracer.daemon = True + _tracer.start() + else: + raise Exception('Already tracing to %s' % _tracer.path) + + +def stop_trace(): + """Stop tracing.""" + global _tracer + if _tracer is not None: + _tracer.stop() + _tracer = None diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/sarge-0.1.5.post0/test_sarge.py new/sarge-0.1.7.post1/test_sarge.py --- old/sarge-0.1.5.post0/test_sarge.py 1970-01-01 01:00:00.000000000 +0100 +++ new/sarge-0.1.7.post1/test_sarge.py 2021-12-10 20:59:34.000000000 +0100 @@ -0,0 +1,788 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2012-2021 Vinay M. Sajip. See LICENSE for licensing information. +# +# Test harness for sarge: Subprocess Allegedly Rewards Good Encapsulation :-) +# + +from __future__ import unicode_literals + +from io import TextIOWrapper +import logging +import os +import re +import shutil +import subprocess +import sys +import tempfile +import time +import unittest + +from sarge import (shell_quote, Capture, Command, CommandLineParser, Pipeline, + shell_format, run, parse_command_line, capture_stdout, + get_stdout, capture_stderr, get_stderr, capture_both, + get_both, Popen, Feeder) +from sarge.shlext import shell_shlex +from stack_tracer import start_trace, stop_trace + +if sys.platform == 'win32': #pragma: no cover + from sarge.utils import find_command + +TRACE_THREADS = sys.platform not in ('cli',) # debugging only + +PY3 = sys.version_info[0] >= 3 + +logger = logging.getLogger(__name__) + +EMITTER = '''#!/usr/bin/env python +import sys + +sys.stdout.write('foo\\n') +sys.stderr.write('bar\\n') +''' + +SEP = '=' * 60 + +def missing_files(): + result = [] # on POSIX, nothing missing + if os.name == 'nt': #pragma: no cover + + def found_file(fn): + if os.path.exists(fn): + return True + for d in os.environ['PATH'].split(os.pathsep): + p = os.path.join(d, fn) + if os.path.exists(p): + return True + return False + + files = ('cat.exe', 'echo.exe', 'tee.exe', 'false.exe', 'true.exe', + 'sleep.exe', 'touch.exe') + + # Looking for the DLLs used by the above - perhaps this check isn't + # needed, as if the .exes were installed properly, we should be OK. The + # DLL check is relevant for GnuWin32 but may not be for MSYS, MSYS2 etc. + if not os.environ.get('USE_MSYS', ''): + files = ('libiconv2.dll', 'libintl3.dll') + files + + path_dirs = os.environ['PATH'].split(os.pathsep) + + for fn in files: + if os.path.exists(fn): + found = True # absolute, or in current directory + else: + found = False + for d in path_dirs: + p = os.path.join(d, fn) + if os.path.exists(p): + found = True + break + if not found: + result.append(fn) + + return result + +ERROR_MESSAGE = ''' +Can't find one or more of the files needed for testing: + +%s + +You may need to install the GnuWin32 coreutils package, MSYS, or an equivalent. +'''.strip() + +missing = missing_files() +if missing: + missing = ', '.join(missing) + print(ERROR_MESSAGE % missing) + sys.exit(1) +del missing + +class SargeTest(unittest.TestCase): + def setUp(self): + logger.debug(SEP) + logger.debug(self.id().rsplit('.', 1)[-1]) + logger.debug(SEP) + + def test_quote(self): + self.assertEqual(shell_quote(''), "''") + self.assertEqual(shell_quote('a'), 'a') + self.assertEqual(shell_quote('*'), "'*'") + self.assertEqual(shell_quote('foo'), 'foo') + self.assertEqual(shell_quote("'*.py'"), "''\\''*.py'\\'''") + self.assertEqual(shell_quote("'a'; rm -f b; true 'c'"), + "''\\''a'\\''; rm -f b; true '\\''c'\\'''") + self.assertEqual(shell_quote("*.py"), "'*.py'") + self.assertEqual(shell_quote("'*.py"), "''\\''*.py'") + + def test_quote_with_shell(self): + from subprocess import PIPE, Popen + + if os.name != 'posix': #pragma: no cover + raise unittest.SkipTest('This test works only on POSIX') + + workdir = tempfile.mkdtemp() + try: + s = "'\\\"; touch %s/foo #'" % workdir + cmd = 'echo %s' % shell_quote(s) + p = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE) + p.communicate() + self.assertEqual(p.returncode, 0) + files = os.listdir(workdir) + self.assertEqual(files, []) + fn = "'ab?'" + cmd = 'touch %s/%s' % (workdir, shell_quote(fn)) + p = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE) + p.communicate() + self.assertEqual(p.returncode, 0) + files = os.listdir(workdir) + self.assertEqual(files, ["'ab?'"]) + finally: + shutil.rmtree(workdir) + + def test_formatter(self): + self.assertEqual(shell_format('ls {0}', '*.py'), "ls '*.py'") + self.assertEqual(shell_format('ls {0!s}', '*.py'), "ls *.py") + + def send_to_capture(self, c, s): + rd, wr = os.pipe() + c.add_stream(os.fdopen(rd, 'rb')) + os.write(wr, s) + os.close(wr) + + def test_capture(self): + logger.debug('test_capture started') + with Capture() as c: + self.send_to_capture(c, b'foofoo') + self.assertEqual(c.read(3), b'foo') + self.assertEqual(c.read(3), b'foo') + self.assertEqual(c.read(), b'') + logger.debug('test_capture finished') + + def test_command_splitting(self): + logger.debug('test_command started') + cmd = 'echo foo' + c = Command(cmd) + self.assertEqual(c.args, cmd.split()) + c = Command(cmd, shell=True) + self.assertEqual(c.args, cmd) + + def test_command_no_stdin(self): + self.assertRaises(ValueError, Command, 'cat', stdin='xyz') + + def test_literal_input(self): + with Capture() as out: + self.assertEqual(run('cat', stdout=out, input='foo').returncode, 0) + self.assertEqual(out.read(), b'foo') + + def test_read_extra(self): + with Capture() as out: + self.assertEqual(run('cat', stdout=out, input='bar').returncode, 0) + self.assertEqual(out.read(5), b'bar') + + def test_shell_redirection(self): + with Capture() as err: + self.assertEqual(run('cat >&2', stderr=err, shell=True, + input='bar').returncode, 0) + self.assertEqual(err.read(), b'bar') + + def test_capture_bytes(self): + with Capture() as err: + self.assertEqual(run('cat >&2', stderr=err, shell=True, + input='bar').returncode, 0) + self.assertEqual(err.bytes, b'bar') + with Capture() as err: + self.assertEqual(run('cat >&2', stderr=err, shell=True, + input='bar').returncode, 0) + self.assertEqual(err.text, 'bar') + + def ensure_testfile(self): + if not os.path.exists('testfile.txt'): #pragma: no cover + with open('testfile.txt', 'w') as f: + for i in range(10000): + f.write('Line %d\n' % (i + 1)) + + def test_run_sync(self): + self.ensure_testfile() + with open('testfile.txt') as f: + content = f.readlines() + with Capture() as out: + self.assertEqual( + run('cat testfile.txt testfile.txt', stdout=out).returncode, 0) + lines = out.readlines() + self.assertEqual(len(lines), len(content) * 2) + # run with a list (see Issue #3) + with Capture() as out: + self.assertEqual( + run(['cat', 'testfile.txt', 'testfile.txt'], + stdout=out).returncode, 0) + lines = out.readlines() + self.assertEqual(len(lines), len(content) * 2) + + def test_run_async(self): + self.ensure_testfile() + with open('testfile.txt', 'rb') as f: + content = f.read().splitlines(True) + with Capture(timeout=1) as out: + p = run('cat testfile.txt testfile.txt', stdout=out, + async_=True) + # Do some other work in parallel, including reading from the + # concurrently running child process + read_count = 0 + if out.readline(): + read_count += 1 + if out.readline(): + read_count += 1 + # kill some time ... + for i in range(10): + with open('testfile.txt') as f: + f.read() + p.wait() + self.assertEqual(p.returncode, 0) + lines = out.readlines() + self.assertEqual(len(lines), len(content) * 2 - read_count) + + def test_env(self): + e = os.environ + if PY3: + env = {'FOO': 'BAR'} + else: + # Python 2.x wants native strings, at least on Windows + # (and literals are Unicode in this module) + env = { b'FOO': b'BAR' } + c = Command('echo foo', env=env) + d = c.kwargs['env'] + ek = set(e) + dk = set(d) + ek.add('FOO') + self.assertEqual(dk, ek) + self.assertEqual(d['FOO'], 'BAR') + c = Command('echo foo', env=env, replace_env=True) + ek = set(env) + dk = set(c.kwargs['env']) + self.assertEqual(dk, ek) + self.assertEqual(dk, {'FOO'}) + + def test_env_usage(self): + if os.name == 'nt': + cmd = 'echo %FOO%' + else: + cmd = 'echo $FOO' + if PY3: + env = {'FOO': 'BAR'} + else: + # Python 2.x wants native strings, at least on Windows + # (and literals are Unicode in this module) + env = { b'FOO': b'BAR' } + c = Command(cmd, env=env, stdout=Capture(), shell=True) + c.run() + self.assertEqual(c.stdout.text.strip(), 'BAR') + + def test_shlex(self): + TESTS = ( + ('', + []), + ('a', + [('a', 'a')]), + ('a && b\n', + [('a', 'a'), ('&&', 'c'), ('b', 'a')]), + ('a | b; c>/fred/jim-sheila.txt|&d;e&', + [('a', 'a'), ('|', 'c'), ('b', 'a'), (';', 'c'), ('c', 'a'), + ('>', 'c'), ('/fred/jim-sheila.txt', 'a'), ('|&', 'c'), + ('d', 'a'), + (';', 'c'), ('e', 'a'), ('&', 'c')]) + ) + for posix in False, True: + for s, expected in TESTS: + s = shell_shlex(s, posix=posix, control=True) + actual = [] + while True: + t, tt = s.get_token(), s.token_type + if not t: + break + actual.append((t, tt)) + self.assertEqual(actual, expected) + + def test_shlex_without_control(self): + TESTS = ( + ('', + []), + ('a', + [('a', 'a')]), + ('a && b\n', + [('a', 'a'), ('&', 'a'), ('&', 'a'), ('b', 'a')]), + ('a | b; c>/fred/jim-sheila.txt|&d;e&', + [('a', 'a'), ('|', 'a'), ('b', 'a'), (';', 'a'), ('c', 'a'), + ('>', 'a'), ('/fred/jim-sheila.txt', 'a'), ('|', 'a'), + ('&', 'a'), + ('d', 'a'), (';', 'a'), ('e', 'a'), ('&', 'a')]) + ) + for posix in False, True: + for s, expected in TESTS: + s = shell_shlex(s, posix=posix) + actual = [] + while True: + t, tt = s.get_token(), s.token_type + if not t: + break + actual.append((t, tt)) + self.assertEqual(actual, expected) + + def test_shlex_with_quoting(self): + TESTS = ( + ('"a b"', False, [('"a b"', '"')]), + ('"a b"', True, [('a b', 'a')]), + ('"a b" c# comment', False, [('"a b"', '"'), ('c', 'a')]), + ('"a b" c# comment', True, [('a b', 'a'), ('c', 'a')]), + ) + for s, posix, expected in TESTS: + s = shell_shlex(s, posix=posix) + actual = [] + while True: + t, tt = s.get_token(), s.token_type + if not t: + break + actual.append((t, tt)) + self.assertEqual(actual, expected) + s = shell_shlex('"abc') + self.assertRaises(ValueError, s.get_token) + + def test_shlex_with_misc_chars(self): + TESTS = ( + ('rsync [email protected]:path dest', + ('rsync', '[email protected]:path', 'dest')), + (r'c:\Python26\Python lister.py -d 0.01', + (r'c:\Python26\Python', 'lister.py', '-d', '0.01')), + ) + for s, t in TESTS: + sh = shell_shlex(s) + self.assertEqual(tuple(sh), t) + + def test_shlex_issue_31(self): + cmd = "python -c 'print('\''ok'\'')'" + list(shell_shlex(cmd, control='();>|&', posix=True)) + shell_format("python -c {0}", "print('ok')") + list(shell_shlex(cmd, control='();>|&', posix=True)) + + def test_shlex_issue_34(self): + cmd = "ls foo,bar" + actual = list(shell_shlex(cmd)) + self.assertEqual(actual, ['ls', 'foo,bar']) + + def test_parsing(self): + parse_command_line('abc') + parse_command_line('abc " " # comment') + parse_command_line('abc \ "def"') + parse_command_line('(abc)') + self.assertRaises(ValueError, parse_command_line, '(abc') + self.assertRaises(ValueError, parse_command_line, '&&') + parse_command_line('(abc>def)') + parse_command_line('(abc 2>&1; def >>&2)') + parse_command_line('(a|b;c d && e || f >ghi jkl 2> mno)') + parse_command_line('(abc; (def)); ghi & ((((jkl & mno)))); pqr') + c = parse_command_line('git rev-list origin/master --since="1 hours ago"', posix=True) + self.assertEqual(c.command, ['git', 'rev-list', 'origin/master', + '--since=1 hours ago']) + + def test_parsing_special(self): + for cmd in ('ls -l --color=auto', 'sleep 0.5', 'ls /tmp/abc.def', + 'ls *.py?', r'c:\Python26\Python lister.py -d 0.01'): + node = parse_command_line(cmd, posix=False) + if sys.platform != 'win32': + self.assertEqual(node.command, cmd.split()) + else: + split = cmd.split()[1:] + self.assertEqual(node.command[1:], split) + + def test_parsing_controls(self): + clp = CommandLineParser() + gvc = clp.get_valid_controls + self.assertEqual(gvc('>>>>'), ['>>', '>>']) + self.assertEqual(gvc('>>'), ['>>']) + self.assertEqual(gvc('>>>'), ['>>', '>']) + self.assertEqual(gvc('>>>>>'), ['>>', '>>', '>']) + self.assertEqual(gvc('))))'), [')', ')', ')', ')']) + self.assertEqual(gvc('>>;>>'), ['>>', ';', '>>']) + self.assertEqual(gvc(';'), [';']) + self.assertEqual(gvc(';;'), [';', ';']) + self.assertEqual(gvc(');'), [')', ';']) + self.assertEqual(gvc('>&'), ['>', '&']) + self.assertEqual(gvc('>>&'), ['>>', '&']) + self.assertEqual(gvc('||&'), ['||', '&']) + self.assertEqual(gvc('|&'), ['|&']) + + #def test_scratch(self): + # import pdb; pdb.set_trace() + # parse_command_line('(a|b;c d && e || f >ghi jkl 2> mno)') + + def test_parsing_errors(self): + self.assertRaises(ValueError, parse_command_line, '(abc') + self.assertRaises(ValueError, parse_command_line, '(abc |&| def') + self.assertRaises(ValueError, parse_command_line, '&&') + self.assertRaises(ValueError, parse_command_line, 'abc>') + self.assertRaises(ValueError, parse_command_line, 'a 3> b') + self.assertRaises(ValueError, parse_command_line, 'abc >&x') + self.assertRaises(ValueError, parse_command_line, 'a > b | c') + self.assertRaises(ValueError, parse_command_line, 'a 2> b |& c') + self.assertRaises(ValueError, parse_command_line, 'a > b > c') + self.assertRaises(ValueError, parse_command_line, 'a > b >> c') + self.assertRaises(ValueError, parse_command_line, 'a 2> b 2> c') + self.assertRaises(ValueError, parse_command_line, 'a 2>> b 2>> c') + self.assertRaises(ValueError, parse_command_line, 'a 3> b') + + def test_pipeline_no_input_stdout(self): + with Capture() as out: + with Pipeline('echo foo 2> %s | cat | cat' % os.devnull, + stdout=out) as pl: + pl.run() + self.assertEqual(out.read().strip(), b'foo') + + def test_pipeline_no_input_stderr(self): + if os.name != 'posix': + raise unittest.SkipTest('This test works only on POSIX') + with Capture() as err: + with Pipeline('echo foo 2> %s | cat | cat >&2' % os.devnull, + stderr=err) as pl: + pl.run() + self.assertEqual(err.read().strip(), b'foo') + + def test_pipeline_no_input_pipe_stderr(self): + if os.name != 'posix': + raise unittest.SkipTest('This test works only on POSIX') + with Capture() as err: + with Pipeline('echo foo 2> %s | cat >&2 |& cat >&2' % + os.devnull, stderr=err) as pl: + pl.run() + self.assertEqual(err.read().strip(), b'foo') + + def test_pipeline_with_input_stdout(self): + logger.debug('starting') + with Capture() as out: + with Pipeline('cat 2>> %s | cat | cat' % os.devnull, + stdout=out) as pl: + pl.run(input='foo' * 1000) + self.assertEqual(out.read().strip(), b'foo' * 1000) + + def test_pipeline_no_input_redirect_stderr(self): + if os.name != 'posix': + raise unittest.SkipTest('This test works only on POSIX') + with Capture() as err: + with Pipeline('echo foo 2> %s | cat 2>&1 | cat >&2' % os.devnull, + stderr=err) as pl: + pl.run() + self.assertEqual(err.read().strip(), b'foo') + + def test_pipeline_swap_outputs(self): + for fn in ('stdout.log', 'stderr.log'): + if os.path.exists(fn): + os.unlink(fn) + with Pipeline('echo foo | tee stdout.log 3>&1 1>&2 2>&3 | ' + 'tee stderr.log > %s' % os.devnull) as pl: + pl.run() + with open('stdout.log') as f: + self.assertEqual(f.read().strip(), 'foo') + with open('stderr.log') as f: + self.assertEqual(f.read().strip(), 'foo') + for fn in ('stdout.log', 'stderr.log'): + os.unlink(fn) + + def test_pipeline_large_file(self): + if os.path.exists('dest.bin'): #pragma: no cover + os.unlink('dest.bin') + if not os.path.exists('random.bin'): #pragma: no cover + with open('random.bin', 'wb') as f: + f.write(os.urandom(20 * 1048576)) + with Pipeline('cat random.bin | cat | cat | cat | cat | ' + 'cat > dest.bin ') as pl: + pl.run() + with open('random.bin', 'rb') as f: + data1 = f.read() + with open('dest.bin', 'rb') as f: + data2 = f.read() + os.unlink('dest.bin') + self.assertEqual(data1, data2) + + def test_logical_and(self): + with Capture() as out: + with Pipeline('false && echo foo', stdout=out) as pl: + pl.run() + self.assertEqual(out.read().strip(), b'') + with Capture() as out: + with Pipeline('true && echo foo', stdout=out) as pl: + pl.run() + self.assertEqual(out.read().strip(), b'foo') + with Capture() as out: + with Pipeline('false | cat && echo foo', stdout=out) as pl: + pl.run() + self.assertEqual(out.read().strip(), b'foo') + + def test_logical_or(self): + with Capture() as out: + with Pipeline('false || echo foo', stdout=out) as pl: + pl.run() + self.assertEqual(out.read().strip(), b'foo') + with Capture() as out: + with Pipeline('true || echo foo', stdout=out) as pl: + pl.run() + self.assertEqual(out.read().strip(), b'') + + def test_list(self): + with Capture() as out: + with Pipeline('echo foo > %s; echo bar' % os.devnull, + stdout=out) as pl: + pl.run() + self.assertEqual(out.read().strip(), b'bar') + + def test_list_merge(self): + with Capture() as out: + with Pipeline('echo foo; echo bar; echo baz', stdout=out) as pl: + pl.run() + self.assertEqual(out.read().split(), [b'foo', b'bar', b'baz']) + + def test_capture_when_other_piped(self): + with Capture() as out: + with Pipeline('echo foo; echo bar |& cat', stdout=out) as pl: + pl.run() + self.assertEqual(out.read().split(), [b'foo', b'bar']) + + def test_pipeline_func(self): + self.assertEqual(run('false').returncode, 1) + with Capture() as out: + self.assertEqual(run('echo foo', stdout=out).returncode, 0) + self.assertEqual(out.bytes.strip(), b'foo') + + def test_double_redirect(self): + with Capture() as out: + self.assertRaises(ValueError, run, 'echo foo > %s' % os.devnull, + stdout=out) + with Capture() as out: + with Capture() as err: + self.assertRaises(ValueError, run, + 'echo foo 2> %s' % os.devnull, stdout=out, + stderr=err) + + def test_pipeline_async(self): + logger.debug('starting') + with Capture() as out: + p = run('echo foo & (sleep 2; echo bar) & (sleep 1; echo baz)', + stdout=out) + self.assertEqual(p.returncode, 0) + items = out.bytes.split() + for item in (b'foo', b'bar', b'baz'): + self.assertTrue(item in items) + self.assertTrue(items.index(b'bar') > items.index(b'baz')) + + def ensure_emitter(self): + if not os.path.exists('emitter.py'): #pragma: no cover + with open('emitter.py', 'w') as f: + f.write(EMITTER) + + def test_capture_stdout(self): + p = capture_stdout('echo foo') + self.assertEqual(p.stdout.text.strip(), 'foo') + + def test_get_stdout(self): + s = get_stdout('echo foo; echo bar') + self.assertEqual(s.split(), ['foo', 'bar']) + + def test_capture_stderr(self): + self.ensure_emitter() + p = capture_stderr('"%s" emitter.py > %s' % (sys.executable, + os.devnull)) + self.assertEqual(p.stderr.text.strip(), 'bar') + + def test_get_stderr(self): + self.ensure_emitter() + s = get_stderr('"%s" emitter.py > %s' % (sys.executable, os.devnull)) + self.assertEqual(s.strip(), 'bar') + + def test_get_both(self): + self.ensure_emitter() + t = get_both('"%s" emitter.py' % sys.executable) + self.assertEqual([s.strip() for s in t], ['foo', 'bar']) + + def test_capture_both(self): + self.ensure_emitter() + p = capture_both('"%s" emitter.py' % sys.executable) + self.assertEqual(p.stdout.text.strip(), 'foo') + self.assertEqual(p.stderr.text.strip(), 'bar') + + def test_byte_iterator(self): + p = capture_stdout('echo foo; echo bar') + lines = [] + for line in p.stdout: + lines.append(line.strip()) + self.assertEqual(lines, [b'foo', b'bar']) + + def test_text_iterator(self): + p = capture_stdout('echo foo; echo bar') + lines = [] + for line in TextIOWrapper(p.stdout): + lines.append(line) + self.assertEqual(lines, ['foo\n', 'bar\n']) + + def test_partial_line(self): + p = capture_stdout('echo foobarbaz') + lines = [p.stdout.readline(6), p.stdout.readline().strip()] + self.assertEqual(lines, [b'foobar', b'baz']) + + def test_returncodes(self): + p = capture_stdout('echo foo; echo bar; echo baz; false') + self.assertEqual(p.returncodes, [0, 0, 0, 1]) + self.assertEqual(p.returncode, 1) + + def test_processes(self): + p = capture_stdout('echo foo; echo bar; echo baz; false') + plist = p.processes + for p in plist: + self.assertTrue(isinstance(p, Popen)) + + def test_command_run(self): + c = Command('echo foo'.split(), stdout=Capture()) + c.run() + self.assertEqual(c.returncode, 0) + + def test_command_nonexistent(self): + c = Command('nonesuch foo'.split(), stdout=Capture()) + if PY3: + ARR = self.assertRaisesRegex + else: + ARR = self.assertRaisesRegexp + ARR(ValueError, 'Command not found: nonesuch', c.run) + + def test_pipeline_nonexistent(self): + p = Pipeline('nonesuch foo'.split(), stdout=Capture()) + self.assertEqual(p.commands, []) + self.assertEqual(p.returncodes, []) + self.assertEqual(p.processes, []) + if PY3: + ARR = self.assertRaisesRegex + else: + ARR = self.assertRaisesRegexp + ARR(ValueError, 'Command not found: nonesuch', p.run) + + def test_working_dir(self): + d = tempfile.mkdtemp() + try: + run('touch newfile.txt', cwd=d) + files = os.listdir(d) + self.assertEqual(files, ['newfile.txt']) + finally: + shutil.rmtree(d) + + def test_expect(self): + cap = Capture(buffer_size=-1) # line buffered + p = run('%s lister.py -d 0.01' % sys.executable, + async_=True, stdout=cap) + timeout = 1.0 + m1 = cap.expect('^line 1\r?$', timeout) + self.assertTrue(m1) + m2 = cap.expect('^line 5\r?$', timeout) + self.assertTrue(m2) + m3 = cap.expect('^line 1.*\r?$', timeout) + self.assertTrue(m3) + cap.close(True) + p.commands[0].kill() + data = cap.bytes + self.assertEqual(data[m1.start():m1.end()].rstrip(), b'line 1') + self.assertEqual(data[m2.start():m2.end()].rstrip(), b'line 5') + self.assertEqual(data[m3.start():m3.end()].rstrip(), b'line 10') + + def test_redirection_with_whitespace(self): + node = parse_command_line('a 2 > b') + self.assertEqual(node.command, ['a', '2']) + self.assertEqual(node.redirects, {1: ('>', 'b')}) + node = parse_command_line('a 2> b') + self.assertEqual(node.command, ['a']) + self.assertEqual(node.redirects, {2: ('>', 'b')}) + node = parse_command_line('a 2 >> b') + self.assertEqual(node.command, ['a', '2']) + self.assertEqual(node.redirects, {1: ('>>', 'b')}) + node = parse_command_line('a 2>> b') + self.assertEqual(node.command, ['a']) + self.assertEqual(node.redirects, {2: ('>>', 'b')}) + + def test_redirection_with_cwd(self): + workdir = tempfile.mkdtemp() + try: + run('echo hello > world', cwd=workdir) + p = os.path.join(workdir, 'world') + self.assertTrue(os.path.exists(p)) + with open(p) as f: + self.assertEqual(f.read().strip(), 'hello') + finally: + shutil.rmtree(workdir) + + if sys.platform == 'win32': #pragma: no cover + pyrunner_re = re.compile(r'.*py.*\.exe', re.I) + pywrunner_re = re.compile(r'.*py.*w\.exe', re.I) + + def test_find_command(self): + cmd = find_command('dummy.py') + self.assertTrue(cmd is None or pyrunner_re.match(cmd)) + cmd = find_command('dummy.pyw') + self.assertTrue(cmd is None or pywrunner_re.match(cmd)) + + def test_run_found_command(self): + with open('hello.py', 'w') as f: + f.write('print("Hello, world!")') + cmd = find_command('hello') + if not cmd: + raise unittest.SkipTest('.py not in PATHEXT or not registered') + p = capture_stdout('hello') + self.assertEqual(p.stdout.text.rstrip(), 'Hello, world!') + + def test_feeder(self): + feeder = Feeder() + p = capture_stdout([sys.executable, 'echoer.py'], input=feeder, + async_=True) + try: + lines = ('hello', 'goodbye') + gen = iter(lines) + # p.commands may not be set yet (separate thread) + while not p.commands or p.commands[0].returncode is None: + logger.debug('commands: %s', p.commands) + try: + data = next(gen) + except StopIteration: + break + feeder.feed(data + '\n') + if p.commands: + p.commands[0].poll() + time.sleep(0.05) # wait for child to return echo + finally: + # p.commands may not be set yet (separate thread) + if p.commands: + p.commands[0].terminate() + feeder.close() + self.assertEqual(p.stdout.text.splitlines(), + ['hello hello', 'goodbye goodbye']) + + def test_timeout(self): + if sys.version_info[:2] < (3, 3): + raise unittest.SkipTest('test is only valid for Python >= 3.3') + cap = Capture(buffer_size=1) + p = run('%s waiter.py 5.0' % sys.executable, async_=True, stdout=cap) + with self.assertRaises(subprocess.TimeoutExpired): + p.wait(2.5) + self.assertEqual(p.returncodes, [None]) + self.assertEqual(cap.read(block=False), b'Waiting ... ') + p.wait(2.6) # ensure the child process finishes + self.assertEqual(p.returncodes, [0]) + expected = b'done.\n' if os.name != 'nt' else b'done.\r\n' + self.assertEqual(cap.read(), expected) + +if __name__ == '__main__': #pragma: no cover + # switch the level to DEBUG for in-depth logging. + fn = 'test_sarge-%d.%d.log' % sys.version_info[:2] + logging.basicConfig(level=logging.DEBUG, filename=fn, filemode='w', + format='%(threadName)s %(funcName)s %(lineno)d ' + '%(message)s') + logging.getLogger('sarge.parse').setLevel(logging.WARNING) + fn = 'threads-%d.%d.log' % sys.version_info[:2] + if TRACE_THREADS: + start_trace(fn) + try: + unittest.main() + finally: + if TRACE_THREADS: + stop_trace() diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/sarge-0.1.5.post0/waiter.py new/sarge-0.1.7.post1/waiter.py --- old/sarge-0.1.5.post0/waiter.py 1970-01-01 01:00:00.000000000 +0100 +++ new/sarge-0.1.7.post1/waiter.py 2020-08-24 11:32:54.000000000 +0200 @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2020 Vinay M. Sajip. See LICENSE for licensing information. +# +# Part of the test harness for sarge: Subprocess Allegedly Rewards Good Encapsulation :-) +# +import sys +import time + +def main(args=None): + sys.stdout.write('Waiting ... ') + sys.stdout.flush() + if len(sys.argv) < 2: + timeout = 5.0 + else: + timeout = float(sys.argv[1]) + time.sleep(timeout) + sys.stdout.write('done.\n') + +if __name__ == '__main__': + try: + rc = main() + except Exception as e: + print(e) + rc = 9 + sys.exit(rc)
