Repository: incubator-airflow
Updated Branches:
  refs/heads/master e58d0c9e2 -> 39b7d7d87


[AIRFLOW-1623] Trigger on_kill method in operators

on_kill methods were not triggered, due to
processes
not being properly terminated. This was due to the
fact
the runners use a shell which is then replaced by
the
child pid, which is unknown to Airflow.

Closes #3204 from bolkedebruin/AIRFLOW-1623


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/39b7d7d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/39b7d7d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/39b7d7d8

Branch: refs/heads/master
Commit: 39b7d7d87cabae9de02ba5d64b998317b494bdd9
Parents: e58d0c9
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Wed Apr 11 08:05:42 2018 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Apr 11 08:05:42 2018 +0200

----------------------------------------------------------------------
 .../contrib/task_runner/cgroup_task_runner.py   |   4 +-
 airflow/jobs.py                                 |   2 +-
 airflow/models.py                               |   3 +-
 airflow/task/task_runner/base_task_runner.py    |   3 +-
 airflow/task/task_runner/bash_task_runner.py    |   4 +-
 airflow/utils/helpers.py                        | 113 +++++-----------
 tests/__init__.py                               |   3 +
 tests/dags/test_on_kill.py                      |  40 ++++++
 tests/task/__init__.py                          |  18 +++
 tests/task/task_runner/__init__.py              |  13 ++
 tests/task/task_runner/test_bash_task_runner.py | 131 +++++++++++++++++++
 tests/utils/test_helpers.py                     |  75 +++++------
 12 files changed, 284 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/39b7d7d8/airflow/contrib/task_runner/cgroup_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/task_runner/cgroup_task_runner.py 
b/airflow/contrib/task_runner/cgroup_task_runner.py
index 87767a6..04ae81c 100644
--- a/airflow/contrib/task_runner/cgroup_task_runner.py
+++ b/airflow/contrib/task_runner/cgroup_task_runner.py
@@ -21,7 +21,7 @@ from cgroupspy import trees
 import psutil
 
 from airflow.task_runner.base_task_runner import BaseTaskRunner
-from airflow.utils.helpers import kill_process_tree
+from airflow.utils.helpers import reap_process_group
 
 
 class CgroupTaskRunner(BaseTaskRunner):
@@ -176,7 +176,7 @@ class CgroupTaskRunner(BaseTaskRunner):
 
     def terminate(self):
         if self.process and psutil.pid_exists(self.process.pid):
-            kill_process_tree(self.log, self.process.pid)
+            reap_process_group(self.process.pid, self.log)
 
     def on_finish(self):
         # Let the OOM watcher thread know we're done to avoid false OOM alarms

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/39b7d7d8/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index bcff868..1911896 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -2514,7 +2514,7 @@ class LocalTaskJob(BaseJob):
 
         def signal_handler(signum, frame):
             """Setting kill signal handler"""
-            self.log.error("Killing subprocess")
+            self.log.error("Received SIGTERM. Terminating subprocesses")
             self.on_kill()
             raise AirflowException("LocalTaskJob received SIGTERM signal")
         signal.signal(signal.SIGTERM, signal_handler)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/39b7d7d8/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index e89e776..afcacd1 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1539,8 +1539,7 @@ class TaskInstance(Base, LoggingMixin):
                 self.task = task_copy
 
                 def signal_handler(signum, frame):
-                    """Setting kill signal handler"""
-                    self.log.error("Killing subprocess")
+                    self.log.error("Received SIGTERM. Terminating 
subprocesses.")
                     task_copy.on_kill()
                     raise AirflowException("Task received SIGTERM signal")
                 signal.signal(signal.SIGTERM, signal_handler)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/39b7d7d8/airflow/task/task_runner/base_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/task/task_runner/base_task_runner.py 
b/airflow/task/task_runner/base_task_runner.py
index 794b450..49bc8bc 100644
--- a/airflow/task/task_runner/base_task_runner.py
+++ b/airflow/task/task_runner/base_task_runner.py
@@ -122,7 +122,8 @@ class BaseTaskRunner(LoggingMixin):
             stderr=subprocess.STDOUT,
             universal_newlines=True,
             close_fds=True,
-            env=os.environ.copy()
+            env=os.environ.copy(),
+            preexec_fn=os.setsid
         )
 
         # Start daemon thread to read subprocess logging output

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/39b7d7d8/airflow/task/task_runner/bash_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/task/task_runner/bash_task_runner.py 
b/airflow/task/task_runner/bash_task_runner.py
index ba0d57b..6843c2a 100644
--- a/airflow/task/task_runner/bash_task_runner.py
+++ b/airflow/task/task_runner/bash_task_runner.py
@@ -15,7 +15,7 @@
 import psutil
 
 from airflow.task.task_runner.base_task_runner import BaseTaskRunner
-from airflow.utils.helpers import kill_process_tree
+from airflow.utils.helpers import reap_process_group
 
 
 class BashTaskRunner(BaseTaskRunner):
@@ -33,7 +33,7 @@ class BashTaskRunner(BaseTaskRunner):
 
     def terminate(self):
         if self.process and psutil.pid_exists(self.process.pid):
-            kill_process_tree(self.log, self.process.pid)
+            reap_process_group(self.process.pid, self.log)
 
     def on_finish(self):
         super(BashTaskRunner, self).on_finish()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/39b7d7d8/airflow/utils/helpers.py
----------------------------------------------------------------------
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index 6a70725..9e6a439 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -183,87 +183,44 @@ def pprinttable(rows):
     return s
 
 
-def kill_using_shell(logger, pid, signal=signal.SIGTERM):
-    try:
-        process = psutil.Process(pid)
-        # Use sudo only when necessary - consider SubDagOperator and 
SequentialExecutor case.
-        if process.username() != getpass.getuser():
-            args = ["sudo", "kill", "-{}".format(int(signal)), str(pid)]
-        else:
-            args = ["kill", "-{}".format(int(signal)), str(pid)]
-        # PID may not exist and return a non-zero error code
-        logger.error(subprocess.check_output(args, close_fds=True))
-        logger.info("Killed process {} with signal {}".format(pid, signal))
-        return True
-    except psutil.NoSuchProcess as e:
-        logger.warning("Process {} no longer exists".format(pid))
-        return False
-    except subprocess.CalledProcessError as e:
-        logger.warning("Failed to kill process {} with signal {}. Output: {}"
-                       .format(pid, signal, e.output))
-        return False
-
-
-def kill_process_tree(logger, pid, timeout=DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM):
+def reap_process_group(pid, log, sig=signal.SIGTERM,
+                       timeout=DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM):
     """
-    TODO(saguziel): also kill the root process after killing descendants
-  
-    Kills the process's descendants. Kills using the `kill`
-    shell command so that it can change users. Note: killing via PIDs
-    has the potential to the wrong process if the process dies and the
-    PID gets recycled in a narrow time window.
-
-    :param logger: logger
-    :type logger: logging.Logger
+    Tries really hard to terminate all children (including grandchildren). 
Will send
+    sig (SIGTERM) to the process group of pid. If any process is alive after 
timeout
+    a SIGKILL will be send.
+
+    :param log: log handler
+    :param pid: pid to kill
+    :param sig: signal type
+    :param timeout: how much time a process has to terminate
     """
-    try:
-        root_process = psutil.Process(pid)
-    except psutil.NoSuchProcess:
-        logger.warning("PID: {} does not exist".format(pid))
-        return
+    def on_terminate(p):
+        log.info("Process %s (%s) terminated with exit code %s", p, p.pid, 
p.returncode)
 
-    # Check child processes to reduce cases where a child process died but
-    # the PID got reused.
-    descendant_processes = [x for x in root_process.children(recursive=True)
-                            if x.is_running()]
-
-    if len(descendant_processes) != 0:
-        logger.info("Terminating descendant processes of {} PID: {}"
-                    .format(root_process.cmdline(),
-                            root_process.pid))
-        temp_processes = descendant_processes[:]
-        for descendant in temp_processes:
-            logger.info("Terminating descendant process {} PID: {}"
-                        .format(descendant.cmdline(), descendant.pid))
-            if not kill_using_shell(logger, descendant.pid, signal.SIGTERM):
-                descendant_processes.remove(descendant)
-
-        logger.info("Waiting up to {}s for processes to exit..."
-                    .format(timeout))
-        try:
-            psutil.wait_procs(descendant_processes, timeout)
-            logger.info("Done waiting")
-        except psutil.TimeoutExpired:
-            logger.warning("Ran out of time while waiting for "
-                           "processes to exit")
-        # Then SIGKILL
-        descendant_processes = [x for x in 
root_process.children(recursive=True)
-                                if x.is_running()]
-
-        if len(descendant_processes) > 0:
-            temp_processes = descendant_processes[:]
-            for descendant in temp_processes:
-                logger.info("Killing descendant process {} PID: {}"
-                            .format(descendant.cmdline(), descendant.pid))
-                if not kill_using_shell(logger, descendant.pid, 
signal.SIGKILL):
-                    descendant_processes.remove(descendant)
-                else:
-                    descendant.wait()
-            logger.info("Killed all descendant processes of {} PID: {}"
-                        .format(root_process.cmdline(),
-                                root_process.pid))
-    else:
-        logger.debug("There are no descendant processes to kill")
+    if pid == os.getpid():
+        raise RuntimeError("I refuse to kill myself")
+
+    parent = psutil.Process(pid)
+
+    children = parent.children(recursive=True)
+    children.append(parent)
+
+    log.info("Sending %s to GPID %s", sig, os.getpgid(pid))
+    os.killpg(os.getpgid(pid), sig)
+
+    gone, alive = psutil.wait_procs(children, timeout=timeout, 
callback=on_terminate)
+
+    if alive:
+        for p in alive:
+            log.warn("process %s (%s) did not respond to SIGTERM. Trying 
SIGKILL", p, pid)
+
+        os.killpg(os.getpgid(pid), signal.SIGKILL)
+
+        gone, alive = psutil.wait_procs(alive, timeout=timeout, 
callback=on_terminate)
+        if alive:
+            for p in alive:
+                log.error("Process %s (%s) could not be killed. Giving up.", 
p, p.pid)
 
 
 class AirflowImporter(object):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/39b7d7d8/tests/__init__.py
----------------------------------------------------------------------
diff --git a/tests/__init__.py b/tests/__init__.py
index 20f8c48..14f38b7 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -12,6 +12,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+# flake8: noqa
+
 from __future__ import absolute_import
 
 from .api import *
@@ -24,4 +26,5 @@ from .impersonation import *
 from .models import *
 from .operators import *
 from .security import *
+from .task import *
 from .utils import *

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/39b7d7d8/tests/dags/test_on_kill.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_on_kill.py b/tests/dags/test_on_kill.py
new file mode 100644
index 0000000..8c57096
--- /dev/null
+++ b/tests/dags/test_on_kill.py
@@ -0,0 +1,40 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import time
+
+from airflow.models import DAG
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.timezone import datetime
+
+
+class DummyWithOnKill(DummyOperator):
+    def execute(self, context):
+        time.sleep(10)
+
+    def on_kill(self):
+        self.log.info("Executing on_kill")
+        f = open("/tmp/airflow_on_kill", "w")
+        f.write("ON_KILL_TEST")
+        f.close()
+
+
+# DAG tests backfill with pooled tasks
+# Previously backfill would queue the task but never run it
+dag1 = DAG(
+    dag_id='test_on_kill',
+    start_date=datetime(2015, 1, 1))
+dag1_task1 = DummyWithOnKill(
+    task_id='task1',
+    dag=dag1,
+    owner='airflow')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/39b7d7d8/tests/task/__init__.py
----------------------------------------------------------------------
diff --git a/tests/task/__init__.py b/tests/task/__init__.py
new file mode 100644
index 0000000..bc88849
--- /dev/null
+++ b/tests/task/__init__.py
@@ -0,0 +1,18 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# flake8: noqa
+
+from __future__ import absolute_import
+from .task_runner import *

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/39b7d7d8/tests/task/task_runner/__init__.py
----------------------------------------------------------------------
diff --git a/tests/task/task_runner/__init__.py 
b/tests/task/task_runner/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/tests/task/task_runner/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/39b7d7d8/tests/task/task_runner/test_bash_task_runner.py
----------------------------------------------------------------------
diff --git a/tests/task/task_runner/test_bash_task_runner.py 
b/tests/task/task_runner/test_bash_task_runner.py
new file mode 100644
index 0000000..7e50478
--- /dev/null
+++ b/tests/task/task_runner/test_bash_task_runner.py
@@ -0,0 +1,131 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import mock
+import os
+import psutil
+import time
+import unittest
+
+from airflow import models, settings
+from airflow.jobs import LocalTaskJob
+from airflow.models import TaskInstance as TI
+from airflow.task.task_runner import BashTaskRunner
+from airflow.utils import timezone
+from airflow.utils.state import State
+
+from logging.config import dictConfig
+
+from tests.core import TEST_DAG_FOLDER
+
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+
+LOGGING_CONFIG = {
+    'version': 1,
+    'disable_existing_loggers': False,
+    'formatters': {
+        'airflow.task': {
+            'format': '[%%(asctime)s] {{%%(filename)s:%%(lineno)d}} 
%%(levelname)s - '
+                      '%%(message)s'
+        },
+    },
+    'handlers': {
+        'console': {
+            'class': 'logging.StreamHandler',
+            'formatter': 'airflow.task',
+            'stream': 'ext://sys.stdout'
+        }
+    },
+    'loggers': {
+        'airflow': {
+            'handlers': ['console'],
+            'level': 'INFO',
+            'propagate': False
+        }
+    }
+}
+
+
+class TestBashTaskRunner(unittest.TestCase):
+    def setUp(self):
+        dictConfig(LOGGING_CONFIG)
+
+    def test_start_and_terminate(self):
+        local_task_job = mock.Mock()
+        local_task_job.task_instance = mock.MagicMock()
+        local_task_job.task_instance.run_as_user = None
+        local_task_job.task_instance.command_as_list.return_value = ['sleep', 
'1000']
+
+        runner = BashTaskRunner(local_task_job)
+        runner.start()
+
+        pgid = os.getpgid(runner.process.pid)
+        self.assertTrue(pgid)
+
+        procs = []
+        for p in psutil.process_iter():
+            try:
+                if os.getpgid(p.pid) == pgid:
+                    procs.append(p)
+            except OSError:
+                pass
+
+        runner.terminate()
+
+        for p in procs:
+            self.assertFalse(psutil.pid_exists(p.pid))
+
+    def test_on_kill(self):
+        """
+        Test that ensures that clearing in the UI SIGTERMS
+        the task
+        """
+        path = "/tmp/airflow_on_kill"
+        try:
+            os.unlink(path)
+        except OSError:
+            pass
+
+        dagbag = models.DagBag(
+            dag_folder=TEST_DAG_FOLDER,
+            include_examples=False,
+        )
+        dag = dagbag.dags.get('test_on_kill')
+        task = dag.get_task('task1')
+
+        session = settings.Session()
+
+        dag.clear()
+        dag.create_dagrun(run_id="test",
+                          state=State.RUNNING,
+                          execution_date=DEFAULT_DATE,
+                          start_date=DEFAULT_DATE,
+                          session=session)
+        ti = TI(task=task, execution_date=DEFAULT_DATE)
+        job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True)
+
+        runner = BashTaskRunner(job1)
+        runner.start()
+
+        # give the task some time to startup
+        time.sleep(3)
+
+        runner.terminate()
+
+        f = open(path, "r")
+        self.assertEqual("ON_KILL_TEST", f.readline())
+        f.close()
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/39b7d7d8/tests/utils/test_helpers.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py
index 83c519d..fd4964c 100644
--- a/tests/utils/test_helpers.py
+++ b/tests/utils/test_helpers.py
@@ -26,64 +26,61 @@ from airflow.utils import helpers
 class TestHelpers(unittest.TestCase):
 
     @staticmethod
-    def _ignores_sigterm(child_pid, setup_done):
+    def _ignores_sigterm(child_pid, child_setup_done):
         def signal_handler(signum, frame):
             pass
 
         signal.signal(signal.SIGTERM, signal_handler)
         child_pid.value = os.getpid()
-        setup_done.release()
+        child_setup_done.release()
         while True:
             time.sleep(1)
 
     @staticmethod
-    def _parent_of_ignores_sigterm(child_process_killed, child_pid,
-                                   process_done, setup_done):
+    def _parent_of_ignores_sigterm(parent_pid, child_pid, setup_done):
+        def signal_handler(signum, frame):
+            pass
+        os.setsid()
+        signal.signal(signal.SIGTERM, signal_handler)
+        child_setup_done = multiprocessing.Semaphore(0)
         child = multiprocessing.Process(target=TestHelpers._ignores_sigterm,
-                                        args=[child_pid, setup_done])
+                                        args=[child_pid, child_setup_done])
         child.start()
-        if setup_done.acquire(timeout=1.0):
-            helpers.kill_process_tree(logging.getLogger(), os.getpid(), 
timeout=1.0)
-            # Process.is_alive doesnt work with SIGKILL
-            if not psutil.pid_exists(child_pid.value):
-                child_process_killed.value = 1
-
-        process_done.release()
+        child_setup_done.acquire(timeout=5.0)
+        parent_pid.value = os.getpid()
+        setup_done.release()
+        while True:
+            time.sleep(1)
 
-    def test_kill_process_tree(self):
-        """Spin up a process that can't be killed by SIGTERM and make sure it 
gets killed anyway."""
-        child_process_killed = multiprocessing.Value('i', 0)
-        process_done = multiprocessing.Semaphore(0)
+    def test_reap_process_group(self):
+        """
+        Spin up a process that can't be killed by SIGTERM and make sure
+        it gets killed anyway.
+        """
+        parent_setup_done = multiprocessing.Semaphore(0)
+        parent_pid = multiprocessing.Value('i', 0)
         child_pid = multiprocessing.Value('i', 0)
-        setup_done = multiprocessing.Semaphore(0)
-        args = [child_process_killed, child_pid, process_done, setup_done]
-        child = 
multiprocessing.Process(target=TestHelpers._parent_of_ignores_sigterm, 
args=args)
+        args = [parent_pid, child_pid, parent_setup_done]
+        parent = 
multiprocessing.Process(target=TestHelpers._parent_of_ignores_sigterm,
+                                         args=args)
         try:
-            child.start()
-            self.assertTrue(process_done.acquire(timeout=5.0))
-            self.assertEqual(1, child_process_killed.value)
+            parent.start()
+            self.assertTrue(parent_setup_done.acquire(timeout=5.0))
+            self.assertTrue(psutil.pid_exists(parent_pid.value))
+            self.assertTrue(psutil.pid_exists(child_pid.value))
+
+            helpers.reap_process_group(parent_pid.value, logging.getLogger(),
+                                       timeout=1)
+
+            self.assertFalse(psutil.pid_exists(parent_pid.value))
+            self.assertFalse(psutil.pid_exists(child_pid.value))
         finally:
             try:
-                os.kill(child_pid.value, signal.SIGKILL) # terminate doesnt 
work here
+                os.kill(parent_pid.value, signal.SIGKILL)  # terminate doesnt 
work here
+                os.kill(child_pid.value, signal.SIGKILL)  # terminate doesnt 
work here
             except OSError:
                 pass
 
-    def test_kill_using_shell(self):
-        """Test when no process exists."""
-        child_pid = multiprocessing.Value('i', 0)
-        setup_done = multiprocessing.Semaphore(0)
-        args = [child_pid, setup_done]
-        child = multiprocessing.Process(target=TestHelpers._ignores_sigterm, 
args=args)
-        child.start()
-
-        self.assertTrue(setup_done.acquire(timeout=1.0))
-        pid_to_kill = child_pid.value
-        self.assertTrue(helpers.kill_using_shell(logging.getLogger(), 
pid_to_kill,
-                                                 signal=signal.SIGKILL))
-        child.join() # remove orphan process
-        self.assertFalse(helpers.kill_using_shell(logging.getLogger(), 
pid_to_kill,
-                                                  signal=signal.SIGKILL))
-
 
 if __name__ == '__main__':
     unittest.main()

Reply via email to