This way one can run a program with BgJob(expect_alive=True) and have
it killed at join_bg_jobs time

Not yet tested outside the ChromeOS autotest world, hence no signoff yet.

-david

---
 client/common_lib/utils.py          |   96 ++++++++++++++++++++++-------------
 client/common_lib/utils_unittest.py |   70 ++++++++++++++++++++++++-
 2 files changed, 127 insertions(+), 39 deletions(-)

diff --git a/client/common_lib/utils.py b/client/common_lib/utils.py
index 26c4d01..df88450 100644
--- a/client/common_lib/utils.py
+++ b/client/common_lib/utils.py
@@ -53,7 +53,20 @@ def get_stream_tee_file(stream, level, prefix=''):

 class BgJob(object):
    def __init__(self, command, stdout_tee=None, stderr_tee=None, verbose=True,
-                 stdin=None, stderr_level=DEFAULT_STDERR_LEVEL):
+                 stdin=None, stderr_level=DEFAULT_STDERR_LEVEL,
+                 expect_alive=False):
+        """
+        Build a job that runs in the background.  Use join_bg_jobs() to
+        clean up BgJobs
+
+       �...@param: command as string (not list), passed to /bin/bash.
+       �...@param: expect_alive:
+          True: throw exception if job has died before join_bg_jobs called
+          False: throw exception if job is still alive after the
+            join_bg_jobs timeout.
+          None:  Don't throw any exceptions over job liveness.
+        """
+        self.expect_alive = expect_alive
        self.command = command
        self.stdout_tee = get_stream_tee_file(stdout_tee, DEFAULT_STDOUT_LEVEL,
                                              prefix=STDOUT_PREFIX)
@@ -481,8 +494,7 @@ def join_bg_jobs(bg_jobs, timeout=None):
    try:
        # We are holding ends to stdin, stdout pipes
        # hence we need to be sure to close those fds no mater what
-        start_time = time.time()
-        timeout_error = _wait_for_commands(bg_jobs, start_time, timeout)
+        _wait_for_commands(bg_jobs, timeout)

        for bg_job in bg_jobs:
            # Process stdout and stderr
@@ -493,24 +505,19 @@ def join_bg_jobs(bg_jobs, timeout=None):
        for bg_job in bg_jobs:
            bg_job.cleanup()

-    if timeout_error:
-        # TODO: This needs to be fixed to better represent what happens when
-        # running in parallel. However this is backwards compatable, so it will
-        # do for the time being.
-        raise error.CmdError(bg_jobs[0].command, bg_jobs[0].result,
-                             "Command(s) did not complete within %d seconds"
-                             % timeout)
-
-
    return bg_jobs


-def _wait_for_commands(bg_jobs, start_time, timeout):
-    # This returns True if it must return due to a timeout, otherwise False.
+def _wait_for_commands(bg_jobs, timeout):
+    """Wait up to timeout for jobs to complete..
+   �...@param bg_jobs: list of BgJob objects
+   �...@param timeout: seconds to wait
+   �...@raise CmdError if a BgJob's expect_alive is violated
+    """

-    # To check for processes which terminate without producing any output
-    # a 1 second timeout is used in select.
-    SELECT_TIMEOUT = 1
+    start_time = time.time()
+    if timeout is not None:
+        stop_time = start_time + timeout

    read_list = []
    write_list = []
@@ -525,18 +532,21 @@ def _wait_for_commands(bg_jobs, start_time, timeout):
            write_list.append(bg_job.sp.stdin)
            reverse_dict[bg_job.sp.stdin] = bg_job

-    if timeout:
-        stop_time = start_time + timeout
-        time_left = stop_time - time.time()
-    else:
-        time_left = None # so that select never times out
+    select_timeout = 1                  # Arbitrary > 0 value
+    while timeout is None or select_timeout > 0:
+        if timeout is not None:
+            select_timeout = max(0, stop_time - time.time())
+        else:
+            # We actually don't want to sleep forever in the case of a
+            # process that has terminated with no output.  Poll at 1Hz
+            select_timeout = 1

-    while not timeout or time_left > 0:
        # select will return when we may write to stdin or when there is
        # stdout/stderr output we can read (including when it is
        # EOF, that is the process has terminated).
+
        read_ready, write_ready, _ = select.select(read_list, write_list, [],
-                                                   SELECT_TIMEOUT)
+                                                   select_timeout)

        # os.read() has to be used instead of
        # subproc.stdout.read() which will otherwise block
@@ -572,22 +582,36 @@ def _wait_for_commands(bg_jobs, start_time, timeout):
                all_jobs_finished = False

        if all_jobs_finished:
-            return False
+            break

        if timeout:
            time_left = stop_time - time.time()

-    # Kill all processes which did not complete prior to timeout
+    # Kill all processes which did not complete prior to timeout.
+    # Error if a job's completion status is unexpected
+
+    to_raise = None
    for bg_job in bg_jobs:
-        if bg_job.result.exit_status is not None:
-            continue
+        if (bg_job.expect_alive and
+            bg_job.result.exit_status is not None):
+            # Don't raise yet; we need to finish this loop and kill
+            # everything first
+            to_raise = error.CmdError(bg_job, bg_job.result,
+                                      "Expected job to be alive")
+            # Print error in case there's more than 1
+            logging.error(to_raise)
+
+        elif (bg_job.expect_alive == False and
+              bg_job.result.exit_status is None):
+            to_raise = error.CmdError(bg_job, bg_job.result,
+                                      "Expected job to complete, "
+                                      "but had to kill it instead")
+            logging.error(to_raise)

-        logging.warn('run process timeout (%s) fired on: %s', timeout,
-                     bg_job.command)
-        nuke_subprocess(bg_job.sp)
-        bg_job.result.exit_status = bg_job.sp.poll()
+        bg_job.result.exit_status = nuke_subprocess(bg_job.sp)

-    return True
+    if to_raise:
+        raise to_raise


 def pid_is_alive(pid):
@@ -620,10 +644,11 @@ def signal_pid(pid, sig):
        # The process may have died before we could kill it.
        pass

-    for i in range(5):
+    # sum([.001 * 4 ** t for t in range(7)]): 5.4 seconds total
+    for i in range(7):
        if not pid_is_alive(pid):
            return True
-        time.sleep(1)
+        time.sleep(0.001 * (4 ** i))

    # The process is still alive
    return False
@@ -1267,4 +1292,3 @@ def configure(extra=None, configure='./configure'):
        args.append(extra)

    system('%s %s' % (configure, ' '.join(args)))
-
diff --git a/client/common_lib/utils_unittest.py
b/client/common_lib/utils_unittest.py
index 913c2ba..5d9f616 100755
--- a/client/common_lib/utils_unittest.py
+++ b/client/common_lib/utils_unittest.py
@@ -1,6 +1,15 @@
 #!/usr/bin/python

-import os, unittest, StringIO, socket, urllib2, shutil, subprocess
+import os
+import shutil
+import signal
+import socket
+import StringIO
+import subprocess
+import sys
+import time
+import unittest
+import urllib2

 import common
 from autotest_lib.client.common_lib import utils, autotemp
@@ -636,6 +645,7 @@ class test_run(unittest.TestCase):
        self.god = mock.mock_god()
        self.god.stub_function(utils.logging, 'warn')
        self.god.stub_function(utils.logging, 'debug')
+        self.god.stub_function(utils.logging, 'error')


    def tearDown(self):
@@ -672,8 +682,8 @@ class test_run(unittest.TestCase):


    def test_timeout(self):
-        # we expect a logging.warn() message, don't care about the contents
-        utils.logging.warn.expect_any_call()
+        # we expect a logging.error() message, don't care about the contents
+        utils.logging.error.expect_any_call()
        try:
            utils.run('echo -n output && sleep 10', timeout=1, verbose=False)
        except utils.error.CmdError, err:
@@ -710,5 +720,59 @@ class test_run(unittest.TestCase):
        self.assertRaises(TypeError, utils.run, 'echo', args='hello')


+
+class test_bgjob(unittest.TestCase):
+    """
+    Test the BgJob class.
+    """
+
+    def setUp(self):
+        self.god = mock.mock_god()
+        self.god.stub_function(utils.logging, 'error')
+
+
+    def test_process_alive_at_shutdown(self):
+        # Must be large enough that creating and killing three jobs
+        # calls never take longer than sleep_time
+        sleep_time = 30
+        command = "/bin/sleep %d" % sleep_time
+
+        start = time.time()
+        sleeper = utils.BgJob(command, expect_alive=True)
+        utils.join_bg_jobs([sleeper], timeout=0)
+        self.assertEqual(-signal.SIGTERM, sleeper.result.exit_status)
+
+
+        utils.logging.error.expect_any_call()
+        sleeper = utils.BgJob(command, expect_alive=False)
+        self.assertRaises(utils.error.CmdError,
+                          utils.join_bg_jobs, [sleeper], timeout=0)
+
+        self.assertEqual(-signal.SIGTERM, sleeper.result.exit_status)
+
+        sleeper = utils.BgJob(command, expect_alive=None)
+        utils.join_bg_jobs([sleeper], timeout=0)
+        self.assertEqual(-signal.SIGTERM, sleeper.result.exit_status)
+
+        delta_t = time.time() - start
+        self.assert_(delta_t < sleep_time)
+
+
+    def test_process_dead_at_shutdown(self):
+        exiter = utils.BgJob("exit 3", expect_alive=True)
+        utils.logging.error.expect_any_call()
+        self.assertRaises(utils.error.CmdError,
+                          utils.join_bg_jobs, [exiter], timeout=1)
+
+        exiter = utils.BgJob("exit 3", expect_alive=False)
+        utils.join_bg_jobs([exiter], timeout=1)
+        self.assertEqual(3, exiter.result.exit_status)
+
+
+        exiter = utils.BgJob("exit 3", expect_alive=None)
+        utils.join_bg_jobs([exiter], timeout=1)
+        self.assertEqual(3, exiter.result.exit_status)
+
+
 if __name__ == "__main__":
    unittest.main()
--
1.7.0.1
_______________________________________________
Autotest mailing list
[email protected]
http://test.kernel.org/cgi-bin/mailman/listinfo/autotest

Reply via email to