This way one can run a program with BgJob(expect_alive=True) and have
it killed at join_bg_jobs time
Not yet tested outside the ChromeOS autotest world, hence no signoff yet.
-david
---
client/common_lib/utils.py | 96 ++++++++++++++++++++++-------------
client/common_lib/utils_unittest.py | 70 ++++++++++++++++++++++++-
2 files changed, 127 insertions(+), 39 deletions(-)
diff --git a/client/common_lib/utils.py b/client/common_lib/utils.py
index 26c4d01..df88450 100644
--- a/client/common_lib/utils.py
+++ b/client/common_lib/utils.py
@@ -53,7 +53,20 @@ def get_stream_tee_file(stream, level, prefix=''):
class BgJob(object):
def __init__(self, command, stdout_tee=None, stderr_tee=None, verbose=True,
- stdin=None, stderr_level=DEFAULT_STDERR_LEVEL):
+ stdin=None, stderr_level=DEFAULT_STDERR_LEVEL,
+ expect_alive=False):
+ """
+ Build a job that runs in the background. Use join_bg_jobs() to
+ clean up BgJobs
+
+ �...@param: command as string (not list), passed to /bin/bash.
+ �...@param: expect_alive:
+ True: throw exception if job has died before join_bg_jobs called
+ False: throw exception if job is still alive after the
+ join_bg_jobs timeout.
+ None: Don't throw any exceptions over job liveness.
+ """
+ self.expect_alive = expect_alive
self.command = command
self.stdout_tee = get_stream_tee_file(stdout_tee, DEFAULT_STDOUT_LEVEL,
prefix=STDOUT_PREFIX)
@@ -481,8 +494,7 @@ def join_bg_jobs(bg_jobs, timeout=None):
try:
# We are holding ends to stdin, stdout pipes
# hence we need to be sure to close those fds no mater what
- start_time = time.time()
- timeout_error = _wait_for_commands(bg_jobs, start_time, timeout)
+ _wait_for_commands(bg_jobs, timeout)
for bg_job in bg_jobs:
# Process stdout and stderr
@@ -493,24 +505,19 @@ def join_bg_jobs(bg_jobs, timeout=None):
for bg_job in bg_jobs:
bg_job.cleanup()
- if timeout_error:
- # TODO: This needs to be fixed to better represent what happens when
- # running in parallel. However this is backwards compatable, so it will
- # do for the time being.
- raise error.CmdError(bg_jobs[0].command, bg_jobs[0].result,
- "Command(s) did not complete within %d seconds"
- % timeout)
-
-
return bg_jobs
-def _wait_for_commands(bg_jobs, start_time, timeout):
- # This returns True if it must return due to a timeout, otherwise False.
+def _wait_for_commands(bg_jobs, timeout):
+ """Wait up to timeout for jobs to complete..
+ �...@param bg_jobs: list of BgJob objects
+ �...@param timeout: seconds to wait
+ �...@raise CmdError if a BgJob's expect_alive is violated
+ """
- # To check for processes which terminate without producing any output
- # a 1 second timeout is used in select.
- SELECT_TIMEOUT = 1
+ start_time = time.time()
+ if timeout is not None:
+ stop_time = start_time + timeout
read_list = []
write_list = []
@@ -525,18 +532,21 @@ def _wait_for_commands(bg_jobs, start_time, timeout):
write_list.append(bg_job.sp.stdin)
reverse_dict[bg_job.sp.stdin] = bg_job
- if timeout:
- stop_time = start_time + timeout
- time_left = stop_time - time.time()
- else:
- time_left = None # so that select never times out
+ select_timeout = 1 # Arbitrary > 0 value
+ while timeout is None or select_timeout > 0:
+ if timeout is not None:
+ select_timeout = max(0, stop_time - time.time())
+ else:
+ # We actually don't want to sleep forever in the case of a
+ # process that has terminated with no output. Poll at 1Hz
+ select_timeout = 1
- while not timeout or time_left > 0:
# select will return when we may write to stdin or when there is
# stdout/stderr output we can read (including when it is
# EOF, that is the process has terminated).
+
read_ready, write_ready, _ = select.select(read_list, write_list, [],
- SELECT_TIMEOUT)
+ select_timeout)
# os.read() has to be used instead of
# subproc.stdout.read() which will otherwise block
@@ -572,22 +582,36 @@ def _wait_for_commands(bg_jobs, start_time, timeout):
all_jobs_finished = False
if all_jobs_finished:
- return False
+ break
if timeout:
time_left = stop_time - time.time()
- # Kill all processes which did not complete prior to timeout
+ # Kill all processes which did not complete prior to timeout.
+ # Error if a job's completion status is unexpected
+
+ to_raise = None
for bg_job in bg_jobs:
- if bg_job.result.exit_status is not None:
- continue
+ if (bg_job.expect_alive and
+ bg_job.result.exit_status is not None):
+ # Don't raise yet; we need to finish this loop and kill
+ # everything first
+ to_raise = error.CmdError(bg_job, bg_job.result,
+ "Expected job to be alive")
+ # Print error in case there's more than 1
+ logging.error(to_raise)
+
+ elif (bg_job.expect_alive == False and
+ bg_job.result.exit_status is None):
+ to_raise = error.CmdError(bg_job, bg_job.result,
+ "Expected job to complete, "
+ "but had to kill it instead")
+ logging.error(to_raise)
- logging.warn('run process timeout (%s) fired on: %s', timeout,
- bg_job.command)
- nuke_subprocess(bg_job.sp)
- bg_job.result.exit_status = bg_job.sp.poll()
+ bg_job.result.exit_status = nuke_subprocess(bg_job.sp)
- return True
+ if to_raise:
+ raise to_raise
def pid_is_alive(pid):
@@ -620,10 +644,11 @@ def signal_pid(pid, sig):
# The process may have died before we could kill it.
pass
- for i in range(5):
+ # sum([.001 * 4 ** t for t in range(7)]): 5.4 seconds total
+ for i in range(7):
if not pid_is_alive(pid):
return True
- time.sleep(1)
+ time.sleep(0.001 * (4 ** i))
# The process is still alive
return False
@@ -1267,4 +1292,3 @@ def configure(extra=None, configure='./configure'):
args.append(extra)
system('%s %s' % (configure, ' '.join(args)))
-
diff --git a/client/common_lib/utils_unittest.py
b/client/common_lib/utils_unittest.py
index 913c2ba..5d9f616 100755
--- a/client/common_lib/utils_unittest.py
+++ b/client/common_lib/utils_unittest.py
@@ -1,6 +1,15 @@
#!/usr/bin/python
-import os, unittest, StringIO, socket, urllib2, shutil, subprocess
+import os
+import shutil
+import signal
+import socket
+import StringIO
+import subprocess
+import sys
+import time
+import unittest
+import urllib2
import common
from autotest_lib.client.common_lib import utils, autotemp
@@ -636,6 +645,7 @@ class test_run(unittest.TestCase):
self.god = mock.mock_god()
self.god.stub_function(utils.logging, 'warn')
self.god.stub_function(utils.logging, 'debug')
+ self.god.stub_function(utils.logging, 'error')
def tearDown(self):
@@ -672,8 +682,8 @@ class test_run(unittest.TestCase):
def test_timeout(self):
- # we expect a logging.warn() message, don't care about the contents
- utils.logging.warn.expect_any_call()
+ # we expect a logging.error() message, don't care about the contents
+ utils.logging.error.expect_any_call()
try:
utils.run('echo -n output && sleep 10', timeout=1, verbose=False)
except utils.error.CmdError, err:
@@ -710,5 +720,59 @@ class test_run(unittest.TestCase):
self.assertRaises(TypeError, utils.run, 'echo', args='hello')
+
+class test_bgjob(unittest.TestCase):
+ """
+ Test the BgJob class.
+ """
+
+ def setUp(self):
+ self.god = mock.mock_god()
+ self.god.stub_function(utils.logging, 'error')
+
+
+ def test_process_alive_at_shutdown(self):
+ # Must be large enough that creating and killing three jobs
+ # calls never take longer than sleep_time
+ sleep_time = 30
+ command = "/bin/sleep %d" % sleep_time
+
+ start = time.time()
+ sleeper = utils.BgJob(command, expect_alive=True)
+ utils.join_bg_jobs([sleeper], timeout=0)
+ self.assertEqual(-signal.SIGTERM, sleeper.result.exit_status)
+
+
+ utils.logging.error.expect_any_call()
+ sleeper = utils.BgJob(command, expect_alive=False)
+ self.assertRaises(utils.error.CmdError,
+ utils.join_bg_jobs, [sleeper], timeout=0)
+
+ self.assertEqual(-signal.SIGTERM, sleeper.result.exit_status)
+
+ sleeper = utils.BgJob(command, expect_alive=None)
+ utils.join_bg_jobs([sleeper], timeout=0)
+ self.assertEqual(-signal.SIGTERM, sleeper.result.exit_status)
+
+ delta_t = time.time() - start
+ self.assert_(delta_t < sleep_time)
+
+
+ def test_process_dead_at_shutdown(self):
+ exiter = utils.BgJob("exit 3", expect_alive=True)
+ utils.logging.error.expect_any_call()
+ self.assertRaises(utils.error.CmdError,
+ utils.join_bg_jobs, [exiter], timeout=1)
+
+ exiter = utils.BgJob("exit 3", expect_alive=False)
+ utils.join_bg_jobs([exiter], timeout=1)
+ self.assertEqual(3, exiter.result.exit_status)
+
+
+ exiter = utils.BgJob("exit 3", expect_alive=None)
+ utils.join_bg_jobs([exiter], timeout=1)
+ self.assertEqual(3, exiter.result.exit_status)
+
+
if __name__ == "__main__":
unittest.main()
--
1.7.0.1
_______________________________________________
Autotest mailing list
[email protected]
http://test.kernel.org/cgi-bin/mailman/listinfo/autotest