Repository: incubator-slider Updated Branches: refs/heads/develop 8234cbe4e -> 4032999f3
SLIDER-1264 killing another processes by slider-agent, when commandScript timed out Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/4032999f Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/4032999f Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/4032999f Branch: refs/heads/develop Commit: 4032999f35db4877b6b8ffc5e97a59837e22365b Parents: 8234cbe Author: kwnam <kw...@apache.org> Authored: Sat Mar 24 12:43:32 2018 +0900 Committer: kwnam <kw...@apache.org> Committed: Sat Mar 24 12:43:32 2018 +0900 ---------------------------------------------------------------------- .../src/main/python/agent/process_utils.py | 100 +++++++++ slider-agent/src/main/python/agent/shell.py | 66 +++--- .../src/test/python/agent/TestProcessUtils.py | 221 +++++++++++++++++++ slider-agent/src/test/python/agent/TestShell.py | 3 - 4 files changed, 350 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4032999f/slider-agent/src/main/python/agent/process_utils.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/process_utils.py b/slider-agent/src/main/python/agent/process_utils.py new file mode 100644 index 0000000..50ffd02 --- /dev/null +++ b/slider-agent/src/main/python/agent/process_utils.py @@ -0,0 +1,100 @@ +# !/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import subprocess +import time + +check_time_delay = 0.1 # seconds between checks of process killed + + +def get_children(pid): + PSCMD = ["ps", "-o", "pid", "--no-headers", "--ppid", str(pid)] + ps_process = subprocess.Popen(PSCMD, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = ps_process.communicate() + if ps_process.returncode != 0: + return [] + return stdout.split() + + +def get_flat_process_tree(pid): + """ + :param pid: process id of parent process + :return: list of child process pids. Resulting list also includes parent pid + """ + res = [str(pid)] + children = get_children(pid) + for child in children: + res += get_flat_process_tree(child) + return res + + +def kill_pids(pids, signal): + from resource_management.core.exceptions import Fail + CMD = ["kill", "-" + str(signal)] + CMD.extend(pids) + process = subprocess.Popen(CMD, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = process.communicate() + if process.returncode != 0: + raise Fail("Unable to kill PIDs {0} : {1}".format(str(pids),stderr)) + + +def get_command_by_pid(pid): + CMD = ["ps", "-p", str(pid), "-o", "command", "--no-headers"] + process = subprocess.Popen(CMD, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = process.communicate() + if process.returncode != 0: + return "NOT_FOUND[%s]" % pid + return stdout + + +def wait_for_entire_process_tree_death(pids): + for child in pids: + wait_for_process_death(child) + + +def wait_for_process_death(pid, timeout=5): + start = time.time() + current_time = start + while is_process_running(pid) and current_time < start + timeout: + time.sleep(check_time_delay) + current_time = time.time() + + +def is_process_running(pid): + CMD = ["ps", "-p", str(pid), "-o", "pid", "--no-headers"] + process = subprocess.Popen(CMD, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = process.communicate() + if process.returncode != 0: + return False + return pid in stdout + + + +def get_processes_running(process_pids): + """ + Checks what processes are still running + :param process_pids: list of process pids + :return: list of pids for processes that are still running + """ + result = [] + for pid in process_pids: + if is_process_running(pid): + result.append(pid) + return result http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4032999f/slider-agent/src/main/python/agent/shell.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/shell.py b/slider-agent/src/main/python/agent/shell.py index 446dde9..f22d535 100644 --- a/slider-agent/src/main/python/agent/shell.py +++ b/slider-agent/src/main/python/agent/shell.py @@ -19,16 +19,13 @@ limitations under the License. ''' import logging -import subprocess import os -import tempfile import signal -import sys +import subprocess import threading -import time -import traceback -import pprint import platform +from process_utils import get_flat_process_tree, kill_pids, wait_for_entire_process_tree_death, \ + get_processes_running, get_command_by_pid if platform.system() != "Windows": try: @@ -42,7 +39,6 @@ logger = logging.getLogger() shellRunner = None threadLocal = threading.local() -gracefull_kill_delay = 5 # seconds between SIGTERM and SIGKILL tempFiles = [] def noteTempFile(filename): tempFiles.append(filename) @@ -92,40 +88,36 @@ class shellRunnerWindows: logger.debug("Exitcode for %s is %d" % (cmd, code)) return _dict_to_object({'exitCode': code, 'output': out, 'error': err}) - #linux specific code def _kill_process_with_children_linux(parent_pid): - def kill_tree_function(pid, signal): - ''' + """ Kills process tree starting from a given pid. - ''' - # The command below starts 'ps' linux utility and then parses it's - # output using 'awk'. AWK recursively extracts PIDs of all children of - # a given PID and then passes list of "kill -<SIGNAL> PID" commands to 'sh' - # shell. - CMD = """ps xf | awk -v PID=""" + str(pid) + \ - """ ' $1 == PID { P = $1; next } P && /_/ { P = P " " $1;""" + \ - """K=P } P && !/_/ { P="" } END { print "kill -""" \ - + str(signal) + """ "K }' | sh """ - process = subprocess.Popen(CMD, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, shell=True) - process.communicate() - _run_kill_function(kill_tree_function, parent_pid) - -def _run_kill_function(kill_function, pid): - try: - kill_function(pid, signal.SIGTERM) - except Exception, e: - logger.warn("Failed to kill PID %d" % (pid)) - logger.warn("Reported error: " + repr(e)) - - time.sleep(gracefull_kill_delay) + :param parent_pid: head of tree + :param graceful_kill_delays: map <command name, custom delay between SIGTERM and SIGKILL> + :return: + """ + + pids = get_flat_process_tree(parent_pid) + logger.info("Process tree: %s" % ','.join(pids)) + try: + kill_pids(pids, signal.SIGTERM) + except Exception, e: + logger.warn("Failed to kill PID %d" % parent_pid) + logger.warn("Reported error: " + repr(e)) + + wait_for_entire_process_tree_death(pids) + + try: + running_processes = get_processes_running(pids) + if running_processes: + process_names = map(lambda x: get_command_by_pid(x), running_processes) + logger.warn("These PIDs %s did not die after SIGTERM, sending SIGKILL. Exact commands to be killed:\n %s" % + (", ".join(running_processes), "\n".join(process_names))) + kill_pids(running_processes, signal.SIGKILL) + except Exception, e: + logger.error("Failed to send SIGKILL to PID %d. Process exited?" % parent_pid) + logger.error("Reported error: " + repr(e)) - try: - kill_function(pid, signal.SIGKILL) - except Exception, e: - logger.error("Failed to send SIGKILL to PID %d. Process exited?" % (pid)) - logger.error("Reported error: " + repr(e)) def _changeUid(): try: http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4032999f/slider-agent/src/test/python/agent/TestProcessUtils.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestProcessUtils.py b/slider-agent/src/test/python/agent/TestProcessUtils.py new file mode 100644 index 0000000..d3c708e --- /dev/null +++ b/slider-agent/src/test/python/agent/TestProcessUtils.py @@ -0,0 +1,221 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import unittest +import signal +import subprocess, time +from mock.mock import patch, MagicMock, PropertyMock, call +from agent import process_utils + +process_tree = {"111": "222\n 22", + "222": "333\n 33", + "22": "44\n 444",} + + +class TestProcessUtils(unittest.TestCase): + @patch("subprocess.Popen") + def test_kill(self, popen_mock): + process_mock = MagicMock() + process_mock.communicate.return_value = (None, None) + returncode_mock = PropertyMock() + returncode_mock.return_value = 0 + type(process_mock).returncode = returncode_mock + popen_mock.return_value = process_mock + process_utils.kill_pids(["12321113230", "2312415453"], signal.SIGTERM) + expected = [call(['kill', '-15', '12321113230', '2312415453'], stderr=-1, stdout=-1)] + self.assertEquals(popen_mock.call_args_list, expected) + + @patch("subprocess.Popen") + def test_get_children(self, popen_mock): + + process_mock = MagicMock() + process_mock.communicate.return_value = ("123 \n \n 321\n", None) + popen_mock.return_value = process_mock + returncode_mock = PropertyMock() + returncode_mock.return_value = 0 + type(process_mock).returncode = returncode_mock + result = process_utils.get_children("2312415453") + + self.assertEquals(result, ["123", "321"]) + + expected = [ + call(['ps', '-o', 'pid', '--no-headers', '--ppid', '2312415453'], stderr=subprocess.PIPE, stdout=subprocess.PIPE)] + self.assertEquals(popen_mock.call_args_list, expected) + + @patch("subprocess.Popen") + def test_get_flat_process_tree(self, popen_mock): + def side_effect(*args, **kwargs): + process_mock = MagicMock() + returncode_mock = PropertyMock() + returncode_mock.return_value = 0 + type(process_mock).returncode = returncode_mock + if args[0][5] in process_tree.keys(): + process_mock.communicate.return_value = (process_tree[args[0][5]], None) + else: + process_mock.communicate.return_value = ("", None) + return process_mock + + popen_mock.side_effect = side_effect + result = process_utils.get_flat_process_tree("111") + self.assertEquals(result, ['111', '222', '333', '33', '22', '44', '444']) + + expected = [call(['ps', '-o', 'pid', '--no-headers', '--ppid', '111'], stderr=-1, stdout=-1), + call(['ps', '-o', 'pid', '--no-headers', '--ppid', '222'], stderr=-1, stdout=-1), + call(['ps', '-o', 'pid', '--no-headers', '--ppid', '333'], stderr=-1, stdout=-1), + call(['ps', '-o', 'pid', '--no-headers', '--ppid', '33'], stderr=-1, stdout=-1), + call(['ps', '-o', 'pid', '--no-headers', '--ppid', '22'], stderr=-1, stdout=-1), + call(['ps', '-o', 'pid', '--no-headers', '--ppid', '44'], stderr=-1, stdout=-1), + call(['ps', '-o', 'pid', '--no-headers', '--ppid', '444'], stderr=-1, stdout=-1)] + self.assertEquals(popen_mock.call_args_list, expected) + + @patch("subprocess.Popen") + def test_get_command_by_pid(self, popen_mock): + + process_mock = MagicMock() + process_mock.communicate.return_value = ("yum something", None) + returncode_mock = PropertyMock() + returncode_mock.return_value = 0 + type(process_mock).returncode = returncode_mock + popen_mock.return_value = process_mock + + result = process_utils.get_command_by_pid("2312415453") + + self.assertEquals(result, "yum something") + + expected = [call(['ps', '-p', '2312415453', '-o', 'command', '--no-headers'], stderr=-1, stdout=-1)] + self.assertEquals(popen_mock.call_args_list, expected) + + @patch("subprocess.Popen") + def test_get_command_by_pid_not_exist(self, popen_mock): + + process_mock = MagicMock() + process_mock.communicate.return_value = ("", None) + returncode_mock = PropertyMock() + returncode_mock.return_value = 1 + type(process_mock).returncode = returncode_mock + popen_mock.return_value = process_mock + + result = process_utils.get_command_by_pid("2312415453") + + self.assertEquals(result, "NOT_FOUND[2312415453]") + + expected = [call(['ps', '-p', '2312415453', '-o', 'command', '--no-headers'], stderr=-1, stdout=-1)] + self.assertEquals(popen_mock.call_args_list, expected) + + @patch("subprocess.Popen") + def test_is_process_running(self, popen_mock): + + process_mock = MagicMock() + process_mock.communicate.return_value = ("2312415453", None) + returncode_mock = PropertyMock() + returncode_mock.return_value = 0 + type(process_mock).returncode = returncode_mock + popen_mock.return_value = process_mock + + result = process_utils.is_process_running("2312415453") + + self.assertEquals(result, True) + + expected = [call(['ps', '-p', '2312415453', '-o', 'pid', '--no-headers'], stderr=-1, stdout=-1)] + self.assertEquals(popen_mock.call_args_list, expected) + + @patch("subprocess.Popen") + def test_is_process_not_running(self, popen_mock): + + process_mock = MagicMock() + process_mock.communicate.return_value = ("", None) + returncode_mock = PropertyMock() + returncode_mock.return_value = 1 + type(process_mock).returncode = returncode_mock + popen_mock.return_value = process_mock + + result = process_utils.is_process_running("2312415453") + + self.assertEquals(result, False) + + expected = [call(['ps', '-p', '2312415453', '-o', 'pid', '--no-headers'], stderr=-1, stdout=-1)] + self.assertEquals(popen_mock.call_args_list, expected) + + @patch("subprocess.Popen") + def test_get_processes_running(self, popen_mock): + def side_effect(*args, **kwargs): + process_mock = MagicMock() + returncode_mock = PropertyMock() + if args[0][2] == "4321": + returncode_mock.return_value = 0 + process_mock.communicate.return_value = ("4321", None) + else: + returncode_mock.return_value = 1 + process_mock.communicate.return_value = (None, None) + type(process_mock).returncode = returncode_mock + return process_mock + + popen_mock.side_effect = side_effect + + result = process_utils.get_processes_running(["1234", "4321"]) + + self.assertEquals(result, ["4321"]) + + expected = [call(['ps', '-p', '1234', '-o', 'pid', '--no-headers'], stderr=-1, stdout=-1), + call(['ps', '-p', '4321', '-o', 'pid', '--no-headers'], stderr=-1, stdout=-1)] + self.assertEquals(popen_mock.call_args_list, expected) + + @patch("time.sleep") + @patch("subprocess.Popen") + def test_wait_for_process_death(self, popen_mock, sleep_mock): + + process_mock = MagicMock() + process_mock.communicate.side_effect = [("4321", None),("4321", None),(None, None)] + returncode_mock = PropertyMock() + returncode_mock.side_effect = [0, 0, 1] + type(process_mock).returncode = returncode_mock + popen_mock.return_value = process_mock + + process_utils.wait_for_process_death("4321") + + expected = [call(['ps', '-p', '4321', '-o', 'pid', '--no-headers'], stderr=-1, stdout=-1), + call(['ps', '-p', '4321', '-o', 'pid', '--no-headers'], stderr=-1, stdout=-1), + call(['ps', '-p', '4321', '-o', 'pid', '--no-headers'], stderr=-1, stdout=-1)] + self.assertEquals(popen_mock.call_args_list, expected) + expected = [call(0.1), call(0.1)] + self.assertEquals(sleep_mock.call_args_list, expected) + + @patch("time.sleep") + @patch("subprocess.Popen") + def test_wait_for_entire_process_tree_death(self, popen_mock, sleep_mock): + + process_mock = MagicMock() + process_mock.communicate.side_effect = [("1234", None), (None, None), ("4321", None), ("4321", None), (None, None)] + returncode_mock = PropertyMock() + returncode_mock.side_effect = [0, 1, 0, 0, 1] + type(process_mock).returncode = returncode_mock + popen_mock.return_value = process_mock + + process_utils.wait_for_entire_process_tree_death(["1234", "4321"]) + + expected = [call(['ps', '-p', '1234', '-o', 'pid', '--no-headers'], stderr=-1, stdout=-1), + call(['ps', '-p', '1234', '-o', 'pid', '--no-headers'], stderr=-1, stdout=-1), + call(['ps', '-p', '4321', '-o', 'pid', '--no-headers'], stderr=-1, stdout=-1), + call(['ps', '-p', '4321', '-o', 'pid', '--no-headers'], stderr=-1, stdout=-1), + call(['ps', '-p', '4321', '-o', 'pid', '--no-headers'], stderr=-1, stdout=-1)] + self.assertEquals(popen_mock.call_args_list, expected) + expected = [call(0.1), call(0.1), call(0.1)] + self.assertEquals(sleep_mock.call_args_list, expected) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4032999f/slider-agent/src/test/python/agent/TestShell.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestShell.py b/slider-agent/src/test/python/agent/TestShell.py index 8caed7b..a707e09 100644 --- a/slider-agent/src/test/python/agent/TestShell.py +++ b/slider-agent/src/test/python/agent/TestShell.py @@ -50,8 +50,6 @@ class TestShell(unittest.TestCase): return if _platform == "linux" or _platform == "linux2": # Test is Linux-specific - gracefull_kill_delay_old = shell.gracefull_kill_delay - shell.gracefull_kill_delay = 0.1 sleep_cmd = "sleep 10" test_cmd = """ (({0}) & ({0} & {0})) """.format(sleep_cmd) # Starting process tree (multiple process groups) @@ -69,7 +67,6 @@ class TestShell(unittest.TestCase): ps_process = subprocess.Popen(ps_cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True) (out, err) = ps_process.communicate() self.assertFalse(sleep_cmd in out) - shell.gracefull_kill_delay = gracefull_kill_delay_old else: # Do not run under other systems pass