AMBARI-18369. Make Execute timeout to be able to kill process trees which doesn't respond to SIGTERM (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/d8f3cf88 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/d8f3cf88 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/d8f3cf88 Branch: refs/heads/branch-dev-patch-upgrade Commit: d8f3cf88f5c03e844c43af85b5eee69a1956eecb Parents: 2dd5ba3 Author: Andrew Onishuk <[email protected]> Authored: Tue Sep 13 10:31:20 2016 +0300 Committer: Andrew Onishuk <[email protected]> Committed: Tue Sep 13 10:31:20 2016 +0300 ---------------------------------------------------------------------- .../python/resource_management/core/__init__.py | 1 + .../resource_management/core/files/killtree.sh | 40 +++++++++ .../core/providers/system.py | 1 + .../core/resources/system.py | 13 ++- .../python/resource_management/core/shell.py | 18 ++-- .../resource_management/core/signal_utils.py | 91 ++++++++++++++++++++ .../python/resource_management/core/utils.py | 28 +----- .../package/alerts/alert_hive_metastore.py | 5 +- 8 files changed, 159 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/d8f3cf88/ambari-common/src/main/python/resource_management/core/__init__.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/core/__init__.py b/ambari-common/src/main/python/resource_management/core/__init__.py index 1af793b..3a63dab 100644 --- a/ambari-common/src/main/python/resource_management/core/__init__.py +++ b/ambari-common/src/main/python/resource_management/core/__init__.py @@ -29,5 +29,6 @@ from resource_management.core.source import * from resource_management.core.system import * from resource_management.core.shell import * from resource_management.core.logger import * +from resource_management.core.signal_utils import * __version__ = "0.4.1" http://git-wip-us.apache.org/repos/asf/ambari/blob/d8f3cf88/ambari-common/src/main/python/resource_management/core/files/killtree.sh ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/core/files/killtree.sh b/ambari-common/src/main/python/resource_management/core/files/killtree.sh new file mode 100644 index 0000000..c19efd9 --- /dev/null +++ b/ambari-common/src/main/python/resource_management/core/files/killtree.sh @@ -0,0 +1,40 @@ +#!/bin/bash +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# + +set -e + +killtree() { + local _pid=$1 + local _sig=${2:--TERM} + ambari-sudo.sh kill -stop ${_pid} # needed to stop quickly forking parent from producing children between child killing and parent killing + for _child in $(ps -o pid --no-headers --ppid ${_pid}); do + killtree ${_child} ${_sig} + done + ambari-sudo.sh kill -${_sig} ${_pid} +} + +if [ $# -eq 0 -o $# -gt 2 ]; then + echo "Usage: $(basename $0) <pid> [signal]" + exit 1 +fi + +killtree $@ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/d8f3cf88/ambari-common/src/main/python/resource_management/core/providers/system.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/core/providers/system.py b/ambari-common/src/main/python/resource_management/core/providers/system.py index fcbab01..2b8d5f7 100644 --- a/ambari-common/src/main/python/resource_management/core/providers/system.py +++ b/ambari-common/src/main/python/resource_management/core/providers/system.py @@ -256,6 +256,7 @@ class ExecuteProvider(Provider): timeout=self.resource.timeout,on_timeout=self.resource.on_timeout, path=self.resource.path, sudo=self.resource.sudo, + timeout_kill_strategy=self.resource.timeout_kill_strategy, on_new_line=self.resource.on_new_line, stdout=self.resource.stdout,stderr=self.resource.stderr, tries=self.resource.tries, try_sleep=self.resource.try_sleep) http://git-wip-us.apache.org/repos/asf/ambari/blob/d8f3cf88/ambari-common/src/main/python/resource_management/core/resources/system.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/core/resources/system.py b/ambari-common/src/main/python/resource_management/core/resources/system.py index 7f164f6..087ceab 100644 --- a/ambari-common/src/main/python/resource_management/core/resources/system.py +++ b/ambari-common/src/main/python/resource_management/core/resources/system.py @@ -23,9 +23,9 @@ Ambari Agent __all__ = ["File", "Directory", "Link", "Execute", "ExecuteScript", "Mount"] import subprocess +from resource_management.core.signal_utils import TerminateStrategy from resource_management.core.base import Resource, ForcedListArgument, ResourceArgument, BooleanArgument - class File(Resource): action = ForcedListArgument(default="create") path = ResourceArgument(default=lambda obj: obj.name) @@ -240,6 +240,17 @@ class Execute(Resource): stdout = ResourceArgument(default=subprocess.PIPE) stderr = ResourceArgument(default=subprocess.STDOUT) + """ + This argument takes TerminateStrategy constants. Import it as shown below: + from resource_management.core.signal_utils import TerminateStrategy + + Possible values are: + TerminateStrategy.TERMINATE_PARENT - kill parent process with SIGTERM (is perfect if all children handle SIGTERM signal) + TerminateStrategy.KILL_PROCESS_GROUP - kill process GROUP with SIGTERM and if not effective with SIGKILL + TerminateStrategy.KILL_PROCESS_TREE - send SIGTERM to every process in the tree + """ + timeout_kill_strategy = ResourceArgument(default=TerminateStrategy.TERMINATE_PARENT) + class ExecuteScript(Resource): action = ForcedListArgument(default="run") code = ResourceArgument(required=True) http://git-wip-us.apache.org/repos/asf/ambari/blob/d8f3cf88/ambari-common/src/main/python/resource_management/core/shell.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/core/shell.py b/ambari-common/src/main/python/resource_management/core/shell.py index 6d9eb18..372755a 100644 --- a/ambari-common/src/main/python/resource_management/core/shell.py +++ b/ambari-common/src/main/python/resource_management/core/shell.py @@ -37,6 +37,7 @@ from exceptions import ExecuteTimeoutException from resource_management.core.logger import Logger from resource_management.core import utils from ambari_commons.constants import AMBARI_SUDO_BINARY +from resource_management.core.signal_utils import TerminateStrategy, terminate_process # use quiet=True calls from this folder (logs get too messy duplicating the resources with its commands) NOT_LOGGED_FOLDER = 'resource_management/core' @@ -90,7 +91,7 @@ def preexec_fn(): @log_function_call def checked_call(command, quiet=False, logoutput=None, stdout=subprocess.PIPE,stderr=subprocess.STDOUT, cwd=None, env=None, preexec_fn=preexec_fn, user=None, wait_for_finish=True, timeout=None, on_timeout=None, - path=None, sudo=False, on_new_line=None, tries=1, try_sleep=0): + path=None, sudo=False, on_new_line=None, tries=1, try_sleep=0, timeout_kill_strategy=TerminateStrategy.TERMINATE_PARENT): """ Execute the shell command and throw an exception on failure. @throws Fail @@ -99,12 +100,12 @@ def checked_call(command, quiet=False, logoutput=None, stdout=subprocess.PIPE,st return _call_wrapper(command, logoutput=logoutput, throw_on_failure=True, stdout=stdout, stderr=stderr, cwd=cwd, env=env, preexec_fn=preexec_fn, user=user, wait_for_finish=wait_for_finish, on_timeout=on_timeout, timeout=timeout, path=path, sudo=sudo, on_new_line=on_new_line, - tries=tries, try_sleep=try_sleep) + tries=tries, try_sleep=try_sleep, timeout_kill_strategy=timeout_kill_strategy) @log_function_call def call(command, quiet=False, logoutput=None, stdout=subprocess.PIPE,stderr=subprocess.STDOUT, cwd=None, env=None, preexec_fn=preexec_fn, user=None, wait_for_finish=True, timeout=None, on_timeout=None, - path=None, sudo=False, on_new_line=None, tries=1, try_sleep=0): + path=None, sudo=False, on_new_line=None, tries=1, try_sleep=0, timeout_kill_strategy=TerminateStrategy.TERMINATE_PARENT): """ Execute the shell command despite failures. @return: return_code, output @@ -112,7 +113,7 @@ def call(command, quiet=False, logoutput=None, stdout=subprocess.PIPE,stderr=sub return _call_wrapper(command, logoutput=logoutput, throw_on_failure=False, stdout=stdout, stderr=stderr, cwd=cwd, env=env, preexec_fn=preexec_fn, user=user, wait_for_finish=wait_for_finish, on_timeout=on_timeout, timeout=timeout, path=path, sudo=sudo, on_new_line=on_new_line, - tries=tries, try_sleep=try_sleep) + tries=tries, try_sleep=try_sleep, timeout_kill_strategy=timeout_kill_strategy) @log_function_call def non_blocking_call(command, quiet=False, stdout=subprocess.PIPE,stderr=subprocess.STDOUT, @@ -166,7 +167,7 @@ def _call_wrapper(command, **kwargs): def _call(command, logoutput=None, throw_on_failure=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT, cwd=None, env=None, preexec_fn=preexec_fn, user=None, wait_for_finish=True, timeout=None, on_timeout=None, - path=None, sudo=False, on_new_line=None, tries=1, try_sleep=0): + path=None, sudo=False, on_new_line=None, tries=1, try_sleep=0, timeout_kill_strategy=TerminateStrategy.TERMINATE_PARENT): """ Execute shell command @@ -224,7 +225,7 @@ def _call(command, logoutput=None, throw_on_failure=True, stdout=subprocess.PIPE if timeout: timeout_event = threading.Event() - t = threading.Timer( timeout, _on_timeout, [proc, timeout_event] ) + t = threading.Timer( timeout, _on_timeout, [proc, timeout_event, timeout_kill_strategy] ) t.start() if not wait_for_finish: @@ -378,7 +379,6 @@ def _print(line): sys.stdout.write(line) sys.stdout.flush() -def _on_timeout(proc, timeout_event): +def _on_timeout(proc, timeout_event, terminate_strategy): timeout_event.set() - utils.killpg_gracefully(proc) - + terminate_process(proc, terminate_strategy) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/d8f3cf88/ambari-common/src/main/python/resource_management/core/signal_utils.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/core/signal_utils.py b/ambari-common/src/main/python/resource_management/core/signal_utils.py new file mode 100644 index 0000000..1f0dfe7 --- /dev/null +++ b/ambari-common/src/main/python/resource_management/core/signal_utils.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Ambari Agent + +""" + +__all__ = ["TerminateStrategy", "terminate_process"] + +import os +import signal +from resource_management.core.base import Fail + +GRACEFUL_PG_KILL_TIMEOUT_SECONDS = 5 + +class TerminateStrategy: + """ + 0 - kill parent process with SIGTERM (is perfect if all children handle SIGTERM signal). Otherwise children will survive. + 1 - kill process GROUP with SIGTERM and if not effective with SIGKILL + 2 - send SIGTERM to every process in the tree + """ + TERMINATE_PARENT = 0 + KILL_PROCESS_GROUP = 1 + KILL_PROCESS_TREE = 2 + +def terminate_process(proc, terminate_strategy): + if terminate_strategy == TerminateStrategy.TERMINATE_PARENT: + terminate_parent_process(proc) + elif terminate_strategy == TerminateStrategy.KILL_PROCESS_GROUP: + killpg_gracefully(proc) + elif terminate_strategy == TerminateStrategy.KILL_PROCESS_TREE: + kill_process_tree(proc) + else: + raise Fail("Invalid timeout_kill_strategy = '{0}'. Use TerminateStrategy class constants as a value.".format(terminate_strategy)) + +def killpg_gracefully(proc, timeout=GRACEFUL_PG_KILL_TIMEOUT_SECONDS): + """ + Tries to kill pgroup (process group) of process with SIGTERM. + If the process is still alive after waiting for timeout, SIGKILL is sent to the pgroup. + """ + from resource_management.core import sudo + from resource_management.core.logger import Logger + + if proc.poll() == None: + try: + pgid = os.getpgid(proc.pid) + sudo.kill(-pgid, signal.SIGTERM) + + for i in xrange(10*timeout): + if proc.poll() is not None: + break + time.sleep(0.1) + else: + Logger.info("Cannot gracefully kill process group {0}. Resorting to SIGKILL.".format(pgid)) + sudo.kill(-pgid, signal.SIGKILL) + proc.wait() + # catch race condition if proc already dead + except OSError: + pass + +def terminate_parent_process(proc): + if proc.poll() == None: + try: + proc.terminate() + proc.wait() + # catch race condition if proc already dead + except OSError: + pass + +def kill_process_tree(proc): + from resource_management.core import shell + current_directory = os.path.dirname(os.path.abspath(__file__)) + kill_tree_script = "{0}/files/killtree.sh".format(current_directory) + if proc.poll() == None: + shell.checked_call(["bash", kill_tree_script, str(proc.pid), str(signal.SIGKILL)]) + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/d8f3cf88/ambari-common/src/main/python/resource_management/core/utils.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/core/utils.py b/ambari-common/src/main/python/resource_management/core/utils.py index dc771d1..265b2f2 100644 --- a/ambari-common/src/main/python/resource_management/core/utils.py +++ b/ambari-common/src/main/python/resource_management/core/utils.py @@ -31,7 +31,6 @@ from resource_management.core.exceptions import Fail from itertools import chain, repeat, islice PASSWORDS_HIDE_STRING = "[PROTECTED]" -GRACEFUL_PG_KILL_TIMEOUT_SECONDS = 5 class AttributeDictionary(object): def __init__(self, *args, **kwargs): @@ -158,29 +157,4 @@ def pad_infinite(iterable, padding=None): return chain(iterable, repeat(padding)) def pad(iterable, size, padding=None): - return islice(pad_infinite(iterable, padding), size) - -def killpg_gracefully(proc, timeout=GRACEFUL_PG_KILL_TIMEOUT_SECONDS): - """ - Tries to kill pgroup (process group) of process with SIGTERM. - If the process is still alive after waiting for timeout, SIGKILL is sent to the pgroup. - """ - from resource_management.core import sudo - from resource_management.core.logger import Logger - - if proc.poll() == None: - try: - pgid = os.getpgid(proc.pid) - sudo.kill(-pgid, signal.SIGTERM) - - for i in xrange(10*timeout): - if proc.poll() is not None: - break - time.sleep(0.1) - else: - Logger.info("Cannot gracefully kill process group {0}. Resorting to SIGKILL.".format(pgid)) - sudo.kill(-pgid, signal.SIGKILL) - proc.wait() - # catch race condition if proc already dead - except OSError: - pass \ No newline at end of file + return islice(pad_infinite(iterable, padding), size) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/d8f3cf88/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_hive_metastore.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_hive_metastore.py b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_hive_metastore.py index e02ed5a..cd1eded 100644 --- a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_hive_metastore.py +++ b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_hive_metastore.py @@ -28,6 +28,7 @@ from resource_management.core import global_lock from resource_management.libraries.functions import format from resource_management.libraries.functions import get_kinit_path from resource_management.core.resources import Execute +from resource_management.core.signal_utils import TerminateStrategy from ambari_commons.os_check import OSConst from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl @@ -195,7 +196,9 @@ def execute(configurations={}, parameters={}, host_name=None): try: Execute(cmd, user=smokeuser, path=["/bin/", "/usr/bin/", "/usr/sbin/", bin_dir], - timeout=int(check_command_timeout) ) + timeout=5, + timeout_kill_strategy=TerminateStrategy.KILL_PROCESS_TREE, + ) total_time = time.time() - start_time
