Repository: aurora Updated Branches: refs/heads/master 593233857 -> 783baaefb
Ensure shell health checkers running for tasks running under an isolated fileystem are run within that filesystem. Reviewed at https://reviews.apache.org/r/51899/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/783baaef Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/783baaef Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/783baaef Branch: refs/heads/master Commit: 783baaefb9a814ca01fad78181fe3df3de5b34af Parents: 5932338 Author: Joshua Cohen <[email protected]> Authored: Thu Sep 15 13:48:05 2016 -0500 Committer: Joshua Cohen <[email protected]> Committed: Thu Sep 15 13:48:05 2016 -0500 ---------------------------------------------------------------------- .../apache/aurora/common/health_check/shell.py | 29 ++++++++-- .../executor/bin/thermos_executor_main.py | 4 +- .../aurora/executor/common/health_checker.py | 41 +++++++++---- src/main/python/apache/thermos/common/BUILD | 1 + .../apache/thermos/common/process_util.py | 44 ++++++++++++++ src/main/python/apache/thermos/core/process.py | 25 ++------ .../aurora/common/health_check/test_shell.py | 21 ++++++- .../executor/common/test_health_checker.py | 61 +++++++++++++++++++- .../apache/aurora/e2e/http/http_example.aurora | 12 +++- 9 files changed, 194 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/783baaef/src/main/python/apache/aurora/common/health_check/shell.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/common/health_check/shell.py b/src/main/python/apache/aurora/common/health_check/shell.py index 3575082..6ac9021 100644 --- a/src/main/python/apache/aurora/common/health_check/shell.py +++ b/src/main/python/apache/aurora/common/health_check/shell.py @@ -24,19 +24,40 @@ else: import subprocess +class WrappedCalledProcessError(subprocess.CalledProcessError): + """ + Wraps a CalledProcessError but overrides the command so that in the event it was run through an + isolator, the original command is exposed to the user, rather than the isolated value. + """ + + def __init__(self, original_command, error): + self.cmd = original_command + self.returncode = error.returncode + self.output = error.output + + class ShellHealthCheck(object): - def __init__(self, cmd, preexec_fn=None, timeout_secs=None): + def __init__( + self, + cmd, + preexec_fn=None, + timeout_secs=None, + wrapper_fn=None): + """ - Initialize with the commmand we would like to call. + Initialize with the command we would like to call. :param cmd: Command to execute that is expected to have a 0 return code on success. :type cmd: str :param preexec_fn: Callable to invoke just before the child shell process is executed. :type preexec_fn: callable :param timeout_secs: Timeout in seconds. :type timeout_secs: int + :param wrapper_fn: Callable to invoke that wraps the shell command for filesystem isolation. + :type wrapper_fn: callable """ - self._cmd = cmd + self._original_cmd = cmd + self._cmd = cmd if wrapper_fn is None else wrapper_fn(cmd) self._preexec_fn = preexec_fn self._timeout_secs = timeout_secs @@ -56,7 +77,7 @@ class ShellHealthCheck(object): return True, None except subprocess.CalledProcessError as reason: # The command didn't return a 0 so provide reason for failure. - return False, str(reason) + return False, str(WrappedCalledProcessError(self._original_cmd, reason)) except subprocess.TimeoutExpired: return False, 'Health check timed out.' except OSError as e: http://git-wip-us.apache.org/repos/asf/aurora/blob/783baaef/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py index 5211f28..c6c0898 100644 --- a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py +++ b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py @@ -218,7 +218,9 @@ def initialize(options): # status providers: status_providers = [ - HealthCheckerProvider(nosetuid_health_checks=options.nosetuid_health_checks), + HealthCheckerProvider( + nosetuid_health_checks=options.nosetuid_health_checks, + mesos_containerizer_path=options.mesos_containerizer_path), ResourceManagerProvider(checkpoint_root=checkpoint_root) ] http://git-wip-us.apache.org/repos/asf/aurora/blob/783baaef/src/main/python/apache/aurora/executor/common/health_checker.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/common/health_checker.py b/src/main/python/apache/aurora/executor/common/health_checker.py index 5fc845e..3c7c09d 100644 --- a/src/main/python/apache/aurora/executor/common/health_checker.py +++ b/src/main/python/apache/aurora/executor/common/health_checker.py @@ -12,7 +12,7 @@ # limitations under the License. # -import os.path +import os import pwd import threading import time @@ -27,6 +27,7 @@ from twitter.common.metrics import LambdaGauge from apache.aurora.common.health_check.http_signaler import HttpSignaler from apache.aurora.common.health_check.shell import ShellHealthCheck from apache.aurora.config.schema.base import MesosContext +from apache.thermos.common.process_util import wrap_with_mesos_containerizer from apache.thermos.config.schema import ThermosContext from .status_checker import StatusChecker, StatusCheckerProvider, StatusResult @@ -208,8 +209,9 @@ class HealthChecker(StatusChecker): class HealthCheckerProvider(StatusCheckerProvider): - def __init__(self, nosetuid_health_checks=False): - self.nosetuid_health_checks = nosetuid_health_checks + def __init__(self, nosetuid_health_checks=False, mesos_containerizer_path=None): + self._nosetuid_health_checks = nosetuid_health_checks + self._mesos_containerizer_path = mesos_containerizer_path @staticmethod def interpolate_cmd(task, cmd): @@ -241,24 +243,41 @@ class HealthCheckerProvider(StatusCheckerProvider): timeout_secs = health_check_config.get('timeout_secs') if SHELL_HEALTH_CHECK in health_checker: shell_command = health_checker.get(SHELL_HEALTH_CHECK, {}).get('shell_command') - # Filling in variables eg thermos.ports[http] that could have been passed in as part of + + # Filling in variables e.g. thermos.ports[http] that could have been passed in as part of # shell_command. interpolated_command = HealthCheckerProvider.interpolate_cmd( task=assigned_task, - cmd=shell_command - ) - # If we do not want user which is job's role to execute the health shell check - # --nosetuid-health-checks should be passed in as an argument to the executor. + cmd=shell_command) + + # If we do not want the health check to execute as the user from the job's role + # --nosetuid-health-checks should be passed as an argument to the executor. demote_to_job_role_user = None - if not self.nosetuid_health_checks: + if not self._nosetuid_health_checks and not sandbox.is_filesystem_image: pw_entry = pwd.getpwnam(assigned_task.task.job.role) def demote_to_job_role_user(): os.setgid(pw_entry.pw_gid) os.setuid(pw_entry.pw_uid) - shell_signaler = ShellHealthCheck(cmd=interpolated_command, + # If the task is executing in an isolated filesystem we'll want to wrap the health check + # command within a mesos-containerizer invocation so that it's executed within that + # filesystem. + wrapper = None + if sandbox.is_filesystem_image: + health_check_user = (os.getusername() if self._nosetuid_health_checks + else assigned_task.task.job.role) + def wrapper(cmd): + return wrap_with_mesos_containerizer( + cmd, + health_check_user, + sandbox.container_root, + self._mesos_containerizer_path) + + shell_signaler = ShellHealthCheck( + cmd=interpolated_command, preexec_fn=demote_to_job_role_user, - timeout_secs=timeout_secs) + timeout_secs=timeout_secs, + wrapper_fn=wrapper) a_health_checker = lambda: shell_signaler() else: portmap = resolve_ports(mesos_task, assigned_task.assignedPorts) http://git-wip-us.apache.org/repos/asf/aurora/blob/783baaef/src/main/python/apache/thermos/common/BUILD ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/common/BUILD b/src/main/python/apache/thermos/common/BUILD index 879b812..0adabbf 100644 --- a/src/main/python/apache/thermos/common/BUILD +++ b/src/main/python/apache/thermos/common/BUILD @@ -22,6 +22,7 @@ python_library( '3rdparty/python:pystachio', '3rdparty/python:twitter.common.log', '3rdparty/python:twitter.common.recordio', + 'api/src/main/thrift/org/apache/aurora/gen', 'api/src/main/thrift/org/apache/thermos' ], provides = setup_py( http://git-wip-us.apache.org/repos/asf/aurora/blob/783baaef/src/main/python/apache/thermos/common/process_util.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/common/process_util.py b/src/main/python/apache/thermos/common/process_util.py new file mode 100644 index 0000000..abd2c0e --- /dev/null +++ b/src/main/python/apache/thermos/common/process_util.py @@ -0,0 +1,44 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os + +from gen.apache.aurora.api.constants import TASK_FILESYSTEM_MOUNT_POINT + + +def wrap_with_mesos_containerizer(cmdline, user, cwd, mesos_containerizer_path): + # We're going to embed this in JSON, so we must escape quotes and newlines. + cmdline = cmdline.replace('"', '\\"').replace('\n', '\\n') + + # We must wrap the command in single quotes otherwise the shell that executes + # mesos-containerizer will expand any bash variables in the cmdline. Escaping single quotes in + # bash is hard: https://github.com/koalaman/shellcheck/wiki/SC1003. + bash_wrapper = "/bin/bash -c '\\''%s'\\''" + + # The shell: true below shouldn't be necessary. Since we're just invoking bash anyway, using it + # results in a process like: `sh -c /bin/bash -c ...`, however in my testing no combination of + # shell: false and splitting the bash/cmdline args across value/arguments produced an invocation + # that actually worked. That said, it *should* be possbie. + # TODO(jcohen): Investigate setting shell:false further. + return ('%s launch ' + '--unshare_namespace_mnt ' + '--working_directory=%s ' + '--rootfs=%s ' + '--user=%s ' + '--command=\'{"shell":true,"value":"%s"}\'' % ( + mesos_containerizer_path, + cwd, + os.path.join(os.environ['MESOS_DIRECTORY'], TASK_FILESYSTEM_MOUNT_POINT), + user, + bash_wrapper % cmdline)) http://git-wip-us.apache.org/repos/asf/aurora/blob/783baaef/src/main/python/apache/thermos/core/process.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/core/process.py b/src/main/python/apache/thermos/core/process.py index 2134d4f..3ec43e2 100644 --- a/src/main/python/apache/thermos/core/process.py +++ b/src/main/python/apache/thermos/core/process.py @@ -39,6 +39,8 @@ from twitter.common.lang import Interface from twitter.common.quantity import Amount, Data, Time from twitter.common.recordio import ThriftRecordReader, ThriftRecordWriter +from apache.thermos.common.process_util import wrap_with_mesos_containerizer + from gen.apache.aurora.api.constants import TASK_FILESYSTEM_MOUNT_POINT from gen.apache.thermos.ttypes import ProcessState, ProcessStatus, RunnerCkpt @@ -386,27 +388,8 @@ class Process(ProcessBase): if self._mesos_containerizer_path is None: return ['/bin/bash', '-c', cmdline] - # We're going to embed this in JSON, so we must escape quotes and newlines. - cmdline = cmdline.replace('"', '\\"').replace('\n', '\\n') - - # We must wrap the command in single quotes otherwise the shell that executes - # mesos-containerizer will expand any bash variables in the cmdline. Escaping single quotes in - # bash is hard: https://github.com/koalaman/shellcheck/wiki/SC1003. - bash_wrapper = "/bin/bash -c '\\''%s'\\''" - - wrapped = ('%s launch ' - '--unshare_namespace_mnt ' - '--working_directory=%s ' - '--rootfs=%s ' - '--user=%s ' - '--command=\'{"shell":true,"value":"%s"}\'' % ( - self._mesos_containerizer_path, - cwd, - os.path.join(os.environ['MESOS_DIRECTORY'], TASK_FILESYSTEM_MOUNT_POINT), - self._user, - bash_wrapper % cmdline)) - - return shlex.split(wrapped) + return shlex.split( + wrap_with_mesos_containerizer(cmdline, self._user, cwd, self._mesos_containerizer_path)) def execute(self): """Perform final initialization and launch target process commandline in a subprocess.""" http://git-wip-us.apache.org/repos/asf/aurora/blob/783baaef/src/test/python/apache/aurora/common/health_check/test_shell.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/common/health_check/test_shell.py b/src/test/python/apache/aurora/common/health_check/test_shell.py index 011464c..4f02878 100644 --- a/src/test/python/apache/aurora/common/health_check/test_shell.py +++ b/src/test/python/apache/aurora/common/health_check/test_shell.py @@ -41,12 +41,27 @@ class TestHealthChecker(unittest.TestCase): @mock.patch('subprocess32.check_call', autospec=True) def test_health_check_failed(self, mock_popen): + cmd = 'failed' # Fail due to command returning a non-0 exit status. - mock_popen.side_effect = subprocess.CalledProcessError(1, 'failed') + mock_popen.side_effect = subprocess.CalledProcessError(1, cmd) - shell = ShellHealthCheck('cmd', timeout_secs=30) + shell = ShellHealthCheck(cmd, timeout_secs=30) success, msg = shell() - mock_popen.assert_called_once_with('cmd', shell=True, timeout=30, preexec_fn=mock.ANY) + mock_popen.assert_called_once_with(cmd, shell=True, timeout=30, preexec_fn=mock.ANY) + + self.assertFalse(success) + self.assertEqual(msg, "Command 'failed' returned non-zero exit status 1") + + @mock.patch('subprocess32.check_call', autospec=True) + def test_health_check_failed_with_wrapper(self, mock_popen): + cmd = 'failed' + mock_popen.side_effect = subprocess.CalledProcessError(1, cmd) + + shell = ShellHealthCheck(cmd, timeout_secs=30, wrapper_fn=lambda c: 'wrapped: %s' % c) + success, msg = shell() + self.assertEqual( + mock_popen.mock_calls, + [mock.call('wrapped: %s' % cmd, shell=True, timeout=30, preexec_fn=mock.ANY)]) self.assertFalse(success) self.assertEqual(msg, "Command 'failed' returned non-zero exit status 1") http://git-wip-us.apache.org/repos/asf/aurora/blob/783baaef/src/test/python/apache/aurora/executor/common/test_health_checker.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/executor/common/test_health_checker.py b/src/test/python/apache/aurora/executor/common/test_health_checker.py index bb6ea69..da0c56c 100644 --- a/src/test/python/apache/aurora/executor/common/test_health_checker.py +++ b/src/test/python/apache/aurora/executor/common/test_health_checker.py @@ -275,7 +275,12 @@ class TestHealthCheckerProvider(unittest.TestCase): execconfig_data = json.loads(assigned_task.task.executorConfig.data) assert execconfig_data[ 'health_check_config']['health_checker']['shell']['shell_command'] == 'failed command' - health_checker = HealthCheckerProvider().from_assigned_task(assigned_task, None) + + mock_sandbox = mock.Mock(spec_set=SandboxInterface) + type(mock_sandbox).root = mock.PropertyMock(return_value='/some/path') + type(mock_sandbox).is_filesystem_image = mock.PropertyMock(return_value=False) + + health_checker = HealthCheckerProvider().from_assigned_task(assigned_task, mock_sandbox) assert health_checker.threaded_health_checker.interval == interval_secs assert health_checker.threaded_health_checker.initial_interval == initial_interval_secs hct_max_fail = health_checker.threaded_health_checker.max_consecutive_failures @@ -309,8 +314,14 @@ class TestHealthCheckerProvider(unittest.TestCase): execconfig_data = json.loads(assigned_task.task.executorConfig.data) assert execconfig_data[ 'health_check_config']['health_checker']['shell']['shell_command'] == 'failed command' + + mock_sandbox = mock.Mock(spec_set=SandboxInterface) + type(mock_sandbox).root = mock.PropertyMock(return_value='/some/path') + type(mock_sandbox).is_filesystem_image = mock.PropertyMock(return_value=False) + health_checker = HealthCheckerProvider(nosetuid_health_checks=True).from_assigned_task( - assigned_task, None) + assigned_task, + mock_sandbox) assert health_checker.threaded_health_checker.interval == interval_secs assert health_checker.threaded_health_checker.initial_interval == initial_interval_secs hct_max_fail = health_checker.threaded_health_checker.max_consecutive_failures @@ -318,6 +329,52 @@ class TestHealthCheckerProvider(unittest.TestCase): # Should not be trying to access role's user info. assert not mock_getpwnam.called + @mock.patch('pwd.getpwnam') + def test_from_assigned_task_shell_filesystem_image(self, mock_getpwnam): + interval_secs = 17 + initial_interval_secs = 3 + max_consecutive_failures = 2 + timeout_secs = 5 + shell_config = ShellHealthChecker(shell_command='failed command') + task_config = TaskConfig( + job=JobKey(role='role', environment='env', name='name'), + executorConfig=ExecutorConfig( + name='thermos-generic', + data=MESOS_JOB( + task=HELLO_WORLD, + health_check_config=HealthCheckConfig( + health_checker=HealthCheckerConfig(shell=shell_config), + interval_secs=interval_secs, + initial_interval_secs=initial_interval_secs, + max_consecutive_failures=max_consecutive_failures, + timeout_secs=timeout_secs, + ) + ).json_dumps() + ) + ) + assigned_task = AssignedTask(task=task_config, instanceId=1, assignedPorts={'foo': 9001}) + execconfig_data = json.loads(assigned_task.task.executorConfig.data) + assert execconfig_data[ + 'health_check_config']['health_checker']['shell']['shell_command'] == 'failed command' + + mock_sandbox = mock.Mock(spec_set=SandboxInterface) + type(mock_sandbox).root = mock.PropertyMock(return_value='/some/path') + type(mock_sandbox).is_filesystem_image = mock.PropertyMock(return_value=True) + + with mock.patch('apache.aurora.executor.common.health_checker.ShellHealthCheck') as mock_shell: + HealthCheckerProvider( + nosetuid_health_checks=False, + mesos_containerizer_path='/some/path/mesos-containerizer').from_assigned_task( + assigned_task, + mock_sandbox) + + class NotNone(object): + def __eq__(self, other): + return other is not None + + assert mock_shell.mock_calls == [ + mock.call(cmd='failed command', wrapper_fn=NotNone(), preexec_fn=None, timeout_secs=5.0)] + def test_interpolate_cmd(self): """Making sure thermos.ports[foo] gets correctly substituted with assignedPorts info.""" interval_secs = 17 http://git-wip-us.apache.org/repos/asf/aurora/blob/783baaef/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora ---------------------------------------------------------------------- diff --git a/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora b/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora index 290627f..c71fb81 100644 --- a/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora +++ b/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora @@ -73,6 +73,12 @@ no_python_task = SequentialTask( update_config = UpdateConfig(watch_secs=10, batch_size=2) health_check_config = HealthCheckConfig(initial_interval_secs=5, interval_secs=1) +shell_health_check_config = HealthCheckConfig( + health_checker = HealthCheckerConfig( + shell = ShellHealthChecker(shell_command='stat /usr/local/bin/run-server.sh')), + initial_interval_secs=5, + interval_secs=1, +) job = Service( cluster = 'devcluster', @@ -106,11 +112,13 @@ jobs = [ job( name = 'http_example_unified_appc', container = Mesos(image=AppcImage(name='http_example_netcat', image_id='{{appc_image_id}}')), - task = no_python_task + task = no_python_task, + health_check_config=shell_health_check_config ).bind(profile=DefaultProfile()), job( name = 'http_example_unified_docker', container = Mesos(image=DockerImage(name='http_example_netcat', tag='latest')), - task = no_python_task + task = no_python_task, + health_check_config=shell_health_check_config ).bind(profile=DefaultProfile()) ]
