Repository: aurora Updated Branches: refs/heads/master 2988a7e01 -> 5cad046fc
A few executor fixes for filesystem isolation: - Add an option to skip the groupadd/useradd calls into the task's filesystem. - Mount any configured volumes into the task's filesystem. - Clean up http server script used by appc e2e tests. - Properly support CWD and .thermos_profile. Reviewed at https://reviews.apache.org/r/51298/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/5cad046f Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/5cad046f Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/5cad046f Branch: refs/heads/master Commit: 5cad046fc0f0c4bb79a4563cfcff0442b7bf8383 Parents: 2988a7e Author: Joshua Cohen <[email protected]> Authored: Fri Aug 26 12:49:25 2016 -0500 Committer: Joshua Cohen <[email protected]> Committed: Fri Aug 26 12:49:25 2016 -0500 ---------------------------------------------------------------------- .../apache/aurora/executor/aurora_executor.py | 38 +++++--- .../executor/bin/thermos_executor_main.py | 37 +++++++- .../apache/aurora/executor/common/sandbox.py | 97 +++++++++++++------- .../aurora/executor/thermos_task_runner.py | 11 +-- src/main/python/apache/thermos/core/process.py | 84 ++++++++++++----- src/main/python/apache/thermos/core/runner.py | 8 +- .../apache/thermos/runner/thermos_runner.py | 14 ++- .../aurora/executor/common/test_sandbox.py | 84 ++++++++++++++++- .../aurora/executor/test_thermos_executor.py | 14 +-- .../python/apache/thermos/core/test_process.py | 8 +- .../apache/aurora/e2e/http/http_example.aurora | 26 ++++-- src/test/sh/org/apache/aurora/e2e/run-server.sh | 4 +- .../sh/org/apache/aurora/e2e/test_end_to_end.sh | 7 ++ 13 files changed, 327 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/5cad046f/src/main/python/apache/aurora/executor/aurora_executor.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/aurora_executor.py b/src/main/python/apache/aurora/executor/aurora_executor.py index dde19a6..ce5ef68 100644 --- a/src/main/python/apache/aurora/executor/aurora_executor.py +++ b/src/main/python/apache/aurora/executor/aurora_executor.py @@ -42,12 +42,15 @@ class AuroraExecutor(ExecutorBase, Observable): STOP_TIMEOUT = Amount(2, Time.MINUTES) STOP_WAIT = Amount(5, Time.SECONDS) - def __init__(self, - runner_provider, - status_manager_class=StatusManager, - sandbox_provider=DefaultSandboxProvider(), - status_providers=(), - clock=time): + def __init__( + self, + runner_provider, + status_manager_class=StatusManager, + sandbox_provider=DefaultSandboxProvider(), + status_providers=(), + clock=time, + no_sandbox_create_user=False, + sandbox_mount_point=None): ExecutorBase.__init__(self) if not isinstance(runner_provider, TaskRunnerProvider): @@ -62,6 +65,8 @@ class AuroraExecutor(ExecutorBase, Observable): self._status_manager_class = status_manager_class self._sandbox = None self._sandbox_provider = sandbox_provider + self._no_sandbox_create_user = no_sandbox_create_user + self._sandbox_mount_point = sandbox_mount_point self._kill_manager = KillManager() # Events that are exposed for interested entities self.runner_aborted = threading.Event() @@ -81,7 +86,7 @@ class AuroraExecutor(ExecutorBase, Observable): self.send_update(driver, self._task_id, status, msg) defer(driver.stop, delay=self.STOP_WAIT) - def _run(self, driver, assigned_task): + def _run(self, driver, assigned_task, mounted_volume_paths): """ Commence running a Task. - Initialize the sandbox @@ -91,7 +96,7 @@ class AuroraExecutor(ExecutorBase, Observable): """ self.send_update(driver, self._task_id, mesos_pb2.TASK_STARTING, 'Initializing sandbox.') - if not self._initialize_sandbox(driver, assigned_task): + if not self._initialize_sandbox(driver, assigned_task, mounted_volume_paths): return # start the process on a separate thread and give the message processing thread back @@ -118,8 +123,12 @@ class AuroraExecutor(ExecutorBase, Observable): log.error(traceback.format_exc()) self._die(driver, mesos_pb2.TASK_FAILED, "Internal error") - def _initialize_sandbox(self, driver, assigned_task): - self._sandbox = self._sandbox_provider.from_assigned_task(assigned_task) + def _initialize_sandbox(self, driver, assigned_task, mounted_volume_paths): + self._sandbox = self._sandbox_provider.from_assigned_task( + assigned_task, + no_create_user=self._no_sandbox_create_user, + mounted_volume_paths=mounted_volume_paths, + sandbox_mount_point=self._sandbox_mount_point) self.sandbox_initialized.set() try: propagate_deadline(self._sandbox.create, timeout=self.SANDBOX_INITIALIZATION_TIMEOUT) @@ -230,6 +239,13 @@ class AuroraExecutor(ExecutorBase, Observable): log.fatal(traceback.format_exc()) return None + @classmethod + def extract_mount_paths_from_task(cls, task): + if task.executor and task.executor.container: + return [v.container_path for v in task.executor.container.volumes] + + return None + """ Mesos Executor API methods follow """ def launchTask(self, driver, task): @@ -261,7 +277,7 @@ class AuroraExecutor(ExecutorBase, Observable): defer(driver.stop, delay=self.STOP_WAIT) return - defer(lambda: self._run(driver, assigned_task)) + defer(lambda: self._run(driver, assigned_task, self.extract_mount_paths_from_task(task))) def killTask(self, driver, task_id): """ http://git-wip-us.apache.org/repos/asf/aurora/blob/5cad046f/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py index 65a495d..5211f28 100644 --- a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py +++ b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py @@ -100,7 +100,34 @@ app.add_option( type=str, help='The path to the mesos-containerizer executable that will be used to isolate the task''s ' 'filesystem when using a filesystem image. Note: this path should match the value of the ' - 'Mesos Agent''s -launcher_dir flag.') + 'Mesos Agent''s -launcher_dir flag.', + default='/usr/libexec/mesos/mesos-containerizer') + + +app.add_option( + '--no-create-user', + dest='no_create_user', + action='store_true', + help='If set, the executor will not attempt to create the task''s user/group under the ' + 'filesystem image (only applicable when launching a task with a filesystem image).', + default=False) + + +# Ideally we'd just be able to use the value of the MESOS_SANDBOX environment variable to get this +# directly from Mesos. Unfortunately, our method of isolating the task's filesystem does not involve +# setting a ContainerInfo on the task, but instead mounts the task's filesystem as a Volume with an +# Image set. In practice this means the value of MESOS_SANDBOX matches the value of the +# MESOS_DIRECTORY environment variable. +app.add_option( + '--sandbox-mount-point', + dest='sandbox_mount_point', + type=str, + help='The path under the task''s filesystem where the sandbox directory should be mounted ' + '(only applicable when launching a task with a filesystem image). Note: for ' + 'consistency, this path should match the value of the Mesos Agent''s ' + '-sandbox_directory flag.', + default='/mnt/mesos/sandbox') + app.add_option( '--execute-as-user', @@ -223,7 +250,9 @@ def initialize(options): thermos_executor = AuroraExecutor( runner_provider=thermos_runner_provider, status_providers=status_providers, - sandbox_provider=UserOverrideDirectorySandboxProvider(options.execute_as_user) + sandbox_provider=UserOverrideDirectorySandboxProvider(options.execute_as_user), + no_sandbox_create_user=options.no_create_user, + sandbox_mount_point=options.sandbox_mount_point ) else: thermos_runner_provider = DefaultThermosTaskRunnerProvider( @@ -240,7 +269,9 @@ def initialize(options): thermos_executor = AuroraExecutor( runner_provider=thermos_runner_provider, - status_providers=status_providers + status_providers=status_providers, + no_sandbox_create_user=options.no_create_user, + sandbox_mount_point=options.sandbox_mount_point ) return thermos_executor http://git-wip-us.apache.org/repos/asf/aurora/blob/5cad046f/src/main/python/apache/aurora/executor/common/sandbox.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/common/sandbox.py b/src/main/python/apache/aurora/executor/common/sandbox.py index 5f091af..cedcab6 100644 --- a/src/main/python/apache/aurora/executor/common/sandbox.py +++ b/src/main/python/apache/aurora/executor/common/sandbox.py @@ -33,7 +33,11 @@ class SandboxInterface(Interface): @abstractproperty def root(self): - """Return the root path of the sandbox.""" + """Return the root path of the sandbox within the host filesystem.""" + + @abstractproperty + def container_root(self): + """Return the root path of the sandbox as it's visible to the running task.""" @abstractproperty def chrooted(self): @@ -67,26 +71,27 @@ class SandboxProvider(Interface): class DefaultSandboxProvider(SandboxProvider): SANDBOX_NAME = 'sandbox' + MESOS_DIRECTORY_ENV_VARIABLE = 'MESOS_DIRECTORY' + + def from_assigned_task(self, assigned_task, **kwargs): + sandbox_root = os.path.join(os.environ[self.MESOS_DIRECTORY_ENV_VARIABLE], self.SANDBOX_NAME) - def from_assigned_task(self, assigned_task): container = assigned_task.task.container if container.docker: - return DockerDirectorySandbox(self.SANDBOX_NAME) + return DockerDirectorySandbox(sandbox_root, **kwargs) elif container.mesos and container.mesos.image: - return FileSystemImageSandbox(self.SANDBOX_NAME, self._get_sandbox_user(assigned_task)) + return FileSystemImageSandbox( + sandbox_root, + user=self._get_sandbox_user(assigned_task), + **kwargs) else: - return DirectorySandbox( - os.path.abspath(self.SANDBOX_NAME), - self._get_sandbox_user(assigned_task)) + return DirectorySandbox(sandbox_root, user=self._get_sandbox_user(assigned_task), **kwargs) class DirectorySandbox(SandboxInterface): """ Basic sandbox implementation using a directory on the filesystem """ - MESOS_DIRECTORY_ENV_VARIABLE = 'MESOS_DIRECTORY' - MESOS_SANDBOX_ENV_VARIABLE = 'MESOS_SANDBOX' - - def __init__(self, root, user=getpass.getuser()): + def __init__(self, root, user=getpass.getuser(), **kwargs): self._root = root self._user = user @@ -95,6 +100,10 @@ class DirectorySandbox(SandboxInterface): return self._root @property + def container_root(self): + return self.root + + @property def chrooted(self): return False @@ -144,10 +153,13 @@ class DirectorySandbox(SandboxInterface): class DockerDirectorySandbox(DirectorySandbox): """ A sandbox implementation that configures the sandbox correctly for docker containers. """ - def __init__(self, sandbox_name): - self._mesos_host_sandbox = os.environ[self.MESOS_DIRECTORY_ENV_VARIABLE] - self._root = os.path.join(self._mesos_host_sandbox, sandbox_name) - super(DockerDirectorySandbox, self).__init__(self._root, user=None) + def __init__(self, root, **kwargs): + self._mesos_host_sandbox = os.environ[DefaultSandboxProvider.MESOS_DIRECTORY_ENV_VARIABLE] + + # remove the user value from kwargs if it was set. + kwargs.pop('user', None) + + super(DockerDirectorySandbox, self).__init__(root, user=None, **kwargs) def _create_symlinks(self): # This sets up the container to have a similar directory structure to the host. @@ -160,7 +172,7 @@ class DockerDirectorySandbox(DirectorySandbox): mesos_host_sandbox_root = os.path.dirname(self._mesos_host_sandbox) try: safe_mkdir(mesos_host_sandbox_root) - os.symlink(os.environ[self.MESOS_SANDBOX_ENV_VARIABLE], self._mesos_host_sandbox) + os.symlink(os.environ['MESOS_SANDBOX'], self._mesos_host_sandbox) except (IOError, OSError) as e: raise self.CreationError('Failed to create the sandbox root: %s' % e) @@ -178,11 +190,20 @@ class FileSystemImageSandbox(DirectorySandbox): # returncode from a `useradd` or `groupadd` call indicating that the uid/gid already exists. _USER_OR_GROUP_ID_EXISTS = 4 - def __init__(self, root, user=None): + def __init__(self, root, **kwargs): self._task_fs_root = os.path.join( - os.environ[self.MESOS_DIRECTORY_ENV_VARIABLE], + os.environ[DefaultSandboxProvider.MESOS_DIRECTORY_ENV_VARIABLE], TASK_FILESYSTEM_MOUNT_POINT) - super(FileSystemImageSandbox, self).__init__(root, user=user) + + self._no_create_user = kwargs.pop('no_create_user', False) + self._mounted_volume_paths = kwargs.pop('mounted_volume_paths', None) + self._sandbox_mount_point = kwargs.pop('sandbox_mount_point', None) + + if self._sandbox_mount_point is None: + raise self.Error( + 'Failed to initialize FileSystemImageSandbox: no value specified for sandbox_mount_point') + + super(FileSystemImageSandbox, self).__init__(root, **kwargs) def _create_user_and_group_in_taskfs(self): if self._user: @@ -218,26 +239,36 @@ class FileSystemImageSandbox(DirectorySandbox): else: raise self.CreationError('Failed to create user in sandbox for task image: %s' % e) - def _mount_mesos_directory_into_taskfs(self): - mesos_directory = os.environ[self.MESOS_DIRECTORY_ENV_VARIABLE] - mount_path = os.path.join(self._task_fs_root, mesos_directory[1:]) + def _mount_paths(self): + def do_mount(source, destination): + safe_mkdir(destination) + log.info('Mounting %s into task filesystem at %s.' % (source, destination)) + subprocess.check_call([ + 'mount', + '--bind', + source, + destination]) - log.debug('Mounting mesos directory (%s) into task filesystem at %s' % ( - mesos_directory, - mount_path)) + if self._mounted_volume_paths is not None: + for container_path in self._mounted_volume_paths: + if container_path != TASK_FILESYSTEM_MOUNT_POINT: + target = container_path.lstrip('/') + do_mount(container_path, os.path.join(self._task_fs_root, target)) - safe_mkdir(mount_path) - subprocess.check_call([ - 'mount', - '--bind', - mesos_directory, - mount_path]) + do_mount(self.root, os.path.join(self._task_fs_root, self._sandbox_mount_point.lstrip('/'))) + + @property + def container_root(self): + return self._sandbox_mount_point @property def is_filesystem_image(self): return True def create(self): - self._create_user_and_group_in_taskfs() - self._mount_mesos_directory_into_taskfs() + if not self._no_create_user: + self._create_user_and_group_in_taskfs() + super(FileSystemImageSandbox, self).create() + + self._mount_paths() http://git-wip-us.apache.org/repos/asf/aurora/blob/5cad046f/src/main/python/apache/aurora/executor/thermos_task_runner.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/thermos_task_runner.py b/src/main/python/apache/aurora/executor/thermos_task_runner.py index 1d713ca..efa51e1 100644 --- a/src/main/python/apache/aurora/executor/thermos_task_runner.py +++ b/src/main/python/apache/aurora/executor/thermos_task_runner.py @@ -100,7 +100,6 @@ class ThermosTaskRunner(TaskRunner): self._monitor = None self._status = None self._ports = portmap - self._root = sandbox.root self._sandbox = sandbox self._checkpoint_root = checkpoint_root self._enable_chroot = sandbox.chrooted @@ -239,14 +238,11 @@ class ThermosTaskRunner(TaskRunner): log.error('Could not quitquitquit runner: %s' % e) def _cmdline(self): - host_sandbox = None - if os.environ.get('MESOS_DIRECTORY'): - host_sandbox = os.path.join(os.environ.get('MESOS_DIRECTORY'), 'sandbox') - params = dict(log_dir=LogOptions.log_dir(), log_to_disk='DEBUG', checkpoint_root=self._checkpoint_root, - sandbox=host_sandbox or self._root, + sandbox=self._sandbox.root, + container_sandbox=self._sandbox.container_root, task_id=self._task_id, thermos_json=self._task_filename, hostname=self._hostname, @@ -266,8 +262,7 @@ class ThermosTaskRunner(TaskRunner): if self._preserve_env: cmdline_args.extend(['--preserve_env']) if self._sandbox.is_filesystem_image: - cmdline_args.extend( - ['--mesos_containerizer_path=%s' % self._mesos_containerizer_path]) + cmdline_args.extend(['--mesos_containerizer_path=%s' % self._mesos_containerizer_path]) for name, port in self._ports.items(): cmdline_args.extend(['--port=%s:%s' % (name, port)]) return cmdline_args http://git-wip-us.apache.org/repos/asf/aurora/blob/5cad046f/src/main/python/apache/thermos/core/process.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/core/process.py b/src/main/python/apache/thermos/core/process.py index a296fa7..78e7d78 100644 --- a/src/main/python/apache/thermos/core/process.py +++ b/src/main/python/apache/thermos/core/process.py @@ -25,6 +25,7 @@ import grp import os import pwd import select +import shlex import signal import subprocess import sys @@ -97,7 +98,7 @@ class ProcessBase(object): def __init__(self, name, cmdline, sequence, pathspec, sandbox_dir, user=None, platform=None, logger_destination=LoggerDestination.FILE, logger_mode=LoggerMode.STANDARD, - rotate_log_size=None, rotate_log_backups=None, mesos_containerizer_path=None): + rotate_log_size=None, rotate_log_backups=None): """ required: name = name of the process @@ -115,8 +116,6 @@ class ProcessBase(object): logger_mode = The type of logger to use for the process. rotate_log_size = The maximum size of the rotated stdout/stderr logs. rotate_log_backups = The maximum number of rotated stdout/stderr log backups. - mesos_containerizer_path = The path to the mesos-containerizer binary to be used for task - filesystem isolation. """ self._name = name self._cmdline = cmdline @@ -137,7 +136,6 @@ class ProcessBase(object): self._logger_mode = logger_mode self._rotate_log_size = rotate_log_size self._rotate_log_backups = rotate_log_backups - self._mesos_containerizer_path = mesos_containerizer_path if not LoggerDestination.is_valid(self._logger_destination): raise ValueError("Logger destination %s is invalid." % self._logger_destination) @@ -186,16 +184,6 @@ class ProcessBase(object): coordinator_pid=self._pid) def cmdline(self): - if self._mesos_containerizer_path is not None: - return ('%s launch ' - '--unshare_namespace_mnt ' - '--rootfs=%s ' - '--user=%s ' - '--command=\'{"shell":true,"value":"%s"}\'' % ( - self._mesos_containerizer_path, - os.path.join(os.environ['MESOS_DIRECTORY'], TASK_FILESYSTEM_MOUNT_POINT), - self._user, - self._cmdline.replace('"', '\\"'))) return self._cmdline def name(self): @@ -353,11 +341,20 @@ class Process(ProcessBase): fork: the fork function to use [default: os.fork] chroot: whether or not to chroot into the sandbox [default: False] preserve_env: whether or not to preserve env variables for the task [default: False] + mesos_containerizer_path: The path to the mesos-containerizer binary to be used for task + filesystem isolation. + container_sandbox: If running in an isolated filesystem, the path within that filesystem + where the sandbox is mounted. """ fork = kw.pop('fork', os.fork) self._use_chroot = bool(kw.pop('chroot', False)) self._rc = None self._preserve_env = bool(kw.pop('preserve_env', False)) + self._mesos_containerizer_path = kw.pop('mesos_containerizer_path', None) + self._container_sandbox = kw.pop('container_sandbox', None) + + if self._mesos_containerizer_path is not None and self._container_sandbox is None: + raise self.UnspecifiedSandbox('If using mesos-containerizer, container_sandbox must be set.') kw['platform'] = RealPlatform(fork=fork) ProcessBase.__init__(self, *args, **kw) @@ -382,6 +379,35 @@ class Process(ProcessBase): os.setgid(gid) os.setuid(uid) + def wrapped_cmdline(self, cwd): + cmdline = self.cmdline() + + # If mesos-containerizer is not set, we only need to wrap the cmdline in a bash invocation. + if self._mesos_containerizer_path is None: + return ['/bin/bash', '-c', cmdline] + + # We're going to embed this in JSON, so we must escape quotes and newlines. + cmdline = cmdline.replace('"', '\\"').replace('\n', '\\n') + + # We must wrap the command in single quotes otherwise the shell that executes + # mesos-containerizer will expand any bash variables in the cmdline. Escaping single quotes in + # bash is hard: https://github.com/koalaman/shellcheck/wiki/SC1003. + bash_wrapper = "/bin/bash -c '\\''%s'\\''" + + wrapped = ('%s launch ' + '--unshare_namespace_mnt ' + '--working_directory=%s ' + '--rootfs=%s ' + '--user=%s ' + '--command=\'{"shell":true,"value":"%s"}\'' % ( + self._mesos_containerizer_path, + cwd, + os.path.join(os.environ['MESOS_DIRECTORY'], TASK_FILESYSTEM_MOUNT_POINT), + self._user, + bash_wrapper % cmdline)) + + return shlex.split(wrapped) + def execute(self): """Perform final initialization and launch target process commandline in a subprocess.""" @@ -406,14 +432,15 @@ class Process(ProcessBase): start_time = self._platform.clock().time() if not self._sandbox: - cwd = sandbox = os.getcwd() + cwd = subprocess_cwd = sandbox = os.getcwd() else: - if self._use_chroot or taskfs_isolated: - sandbox = '/' - cwd = self._sandbox if taskfs_isolated else sandbox + if self._use_chroot: + cwd = subprocess_cwd = sandbox = '/' + elif taskfs_isolated: + cwd = homedir = sandbox = self._container_sandbox + subprocess_cwd = self._sandbox else: - cwd = sandbox = self._sandbox - homedir = sandbox + cwd = subprocess_cwd = homedir = sandbox = self._sandbox thermos_profile = os.path.join(sandbox, self.RCFILE) @@ -429,12 +456,23 @@ class Process(ProcessBase): 'PATH': os.environ['PATH'] }) - if os.path.exists(thermos_profile): + wrapped_cmdline = self.wrapped_cmdline(cwd) + log.debug('Wrapped cmdline: %s' % wrapped_cmdline) + + + real_thermos_profile_path = os.path.join( + os.environ['MESOS_DIRECTORY'], + TASK_FILESYSTEM_MOUNT_POINT, + thermos_profile.lstrip('/')) if taskfs_isolated else thermos_profile + + if os.path.exists(real_thermos_profile_path): env.update(BASH_ENV=thermos_profile) + + log.debug('ENV is: %s' % env) subprocess_args = { - 'args': ["/bin/bash", "-c", self.cmdline()], + 'args': wrapped_cmdline, 'close_fds': self.FD_CLOEXEC, - 'cwd': cwd, + 'cwd': subprocess_cwd, 'env': env, 'pathspec': self._pathspec } http://git-wip-us.apache.org/repos/asf/aurora/blob/5cad046f/src/main/python/apache/thermos/core/runner.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/core/runner.py b/src/main/python/apache/thermos/core/runner.py index dcfc190..7b9013d 100644 --- a/src/main/python/apache/thermos/core/runner.py +++ b/src/main/python/apache/thermos/core/runner.py @@ -421,7 +421,7 @@ class TaskRunner(object): universal_handler=None, planner_class=TaskPlanner, hostname=None, process_logger_destination=None, process_logger_mode=None, rotate_log_size_mb=None, rotate_log_backups=None, - preserve_env=False, mesos_containerizer_path=None): + preserve_env=False, mesos_containerizer_path=None, container_sandbox=None): """ required: task (config.Task) = the task to run @@ -451,6 +451,8 @@ class TaskRunner(object): env for the task being run mesos_containerizer_path = the path to the mesos-containerizer executable that will be used to isolate the task's filesystem (if using a filesystem image). + container_sandbox = the path within the isolated filesystem where the task's sandbox is + mounted. """ if not issubclass(planner_class, TaskPlanner): raise TypeError('planner_class must be a TaskPlanner.') @@ -503,6 +505,7 @@ class TaskRunner(object): process_filter=lambda proc: proc.final().get() is True) self._chroot = chroot self._sandbox = sandbox + self._container_sandbox = container_sandbox self._terminal_state = None self._ckpt = None self._process_map = dict((p.name().get(), p) for p in self._task.processes()) @@ -724,7 +727,8 @@ class TaskRunner(object): rotate_log_size=rotate_log_size, rotate_log_backups=rotate_log_backups, preserve_env=self._preserve_env, - mesos_containerizer_path=self._mesos_containerizer_path) + mesos_containerizer_path=self._mesos_containerizer_path, + container_sandbox=self._container_sandbox) _DEFAULT_LOGGER = Logger() _DEFAULT_ROTATION = RotatePolicy() http://git-wip-us.apache.org/repos/asf/aurora/blob/5cad046f/src/main/python/apache/thermos/runner/thermos_runner.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/runner/thermos_runner.py b/src/main/python/apache/thermos/runner/thermos_runner.py index 441bacd..847f51e 100644 --- a/src/main/python/apache/thermos/runner/thermos_runner.py +++ b/src/main/python/apache/thermos/runner/thermos_runner.py @@ -47,7 +47,16 @@ app.add_option( dest="sandbox", metavar="PATH", default=None, - help="the sandbox in which this task should run") + help="The path on the host filesystem to the sandbox in which this task should run.") + + +app.add_option( + '--container_sandbox', + dest='container_sandbox', + type=str, + default=None, + help='If running in an isolated filesystem, the path within that filesystem where the sandbox ' + 'is mounted.') app.add_option( @@ -221,7 +230,8 @@ def proxy_main(args, opts): rotate_log_size_mb=opts.rotate_log_size_mb, rotate_log_backups=opts.rotate_log_backups, preserve_env=opts.preserve_env, - mesos_containerizer_path=opts.mesos_containerizer_path) + mesos_containerizer_path=opts.mesos_containerizer_path, + container_sandbox=opts.container_sandbox) for sig in (signal.SIGUSR1, signal.SIGUSR2): signal.signal(sig, functools.partial(runner_teardown, task_runner)) http://git-wip-us.apache.org/repos/asf/aurora/blob/5cad046f/src/test/python/apache/aurora/executor/common/test_sandbox.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/executor/common/test_sandbox.py b/src/test/python/apache/aurora/executor/common/test_sandbox.py index ce989b1..b30a5bc 100644 --- a/src/test/python/apache/aurora/executor/common/test_sandbox.py +++ b/src/test/python/apache/aurora/executor/common/test_sandbox.py @@ -119,8 +119,8 @@ def test_docker_directory_sandbox_create_ioerror(makedirs): makedirs.side_effect = IOError('Disk is borked') with mock.patch.dict('os.environ', { - DockerDirectorySandbox.MESOS_DIRECTORY_ENV_VARIABLE: 'some-directory', - DockerDirectorySandbox.MESOS_SANDBOX_ENV_VARIABLE: 'some-sandbox' + 'MESOS_DIRECTORY': 'some-directory', + 'MESOS_SANDBOX': 'some-sandbox' }): with temporary_dir() as d: real_path = os.path.join(d, 'sandbox') @@ -173,18 +173,92 @@ def assert_create_user_and_group(mock_check_call, gid_exists, uid_exists): 'get_user_and_group', return_value=(mock_pwent, mock_grent)): - sandbox = FileSystemImageSandbox(os.path.join(d, 'sandbox'), user='someuser') + sandbox = FileSystemImageSandbox( + os.path.join(d, 'sandbox'), + user='someuser', + sandbox_mount_point='/some/path') sandbox._create_user_and_group_in_taskfs() assert len(mock_check_call.mock_calls) == 2 + +MOCK_MESOS_DIRECTORY = '/some/path' + + @mock.patch('subprocess.check_call') [email protected](os.environ, {'MESOS_DIRECTORY': '/some/path'}) [email protected](os.environ, {'MESOS_DIRECTORY': MOCK_MESOS_DIRECTORY}) def test_uid_exists(mock_check_call): assert_create_user_and_group(mock_check_call, False, True) @mock.patch('subprocess.check_call') [email protected](os.environ, {'MESOS_DIRECTORY': '/some/path'}) [email protected](os.environ, {'MESOS_DIRECTORY': MOCK_MESOS_DIRECTORY}) def test_gid_exists(mock_check_call): assert_create_user_and_group(mock_check_call, True, False) + + [email protected]('subprocess.check_call') [email protected]('apache.aurora.executor.common.sandbox.safe_mkdir') [email protected](os.environ, {'MESOS_DIRECTORY': MOCK_MESOS_DIRECTORY}) +def test_filesystem_sandbox_mounts_paths(mock_safe_mkdir, mock_check_call): + sandbox_mount_point = '/some/mount/point' + sandbox_directory = os.path.join(MOCK_MESOS_DIRECTORY, 'sandbox') + + sandbox = FileSystemImageSandbox( + sandbox_directory, + user='someuser', + no_create_user=True, + mounted_volume_paths=['/some/container/path', '/some/other/container/path'], + sandbox_mount_point=sandbox_mount_point) + + sandbox._mount_paths() + + task_fs_path = os.path.join(MOCK_MESOS_DIRECTORY, 'taskfs') + # we should have mounted both of the paths we passed in as well as the sandbox directory itself. + assert mock_check_call.mock_calls == [ + mock.call([ + 'mount', + '--bind', + '/some/container/path', + os.path.join(task_fs_path, 'some/container/path') + ]), + mock.call([ + 'mount', + '--bind', + '/some/other/container/path', + os.path.join(task_fs_path, 'some/other/container/path') + ]), + mock.call([ + 'mount', + '--bind', + sandbox_directory, + os.path.join(task_fs_path, sandbox_mount_point[1:]) + ]) + ] + [email protected]('subprocess.check_call') [email protected]('apache.aurora.executor.common.sandbox.safe_mkdir') [email protected](os.environ, {'MESOS_DIRECTORY': MOCK_MESOS_DIRECTORY}) +def test_filesystem_sandbox_no_volumes(mock_safe_mkdir, mock_check_call): + sandbox_mount_point = '/some/mount/point' + sandbox_directory = os.path.join(MOCK_MESOS_DIRECTORY, 'sandbox'), + + sandbox = FileSystemImageSandbox( + sandbox_directory, + user='someuser', + no_create_user=True, + mounted_volume_paths=None, + sandbox_mount_point=sandbox_mount_point) + + sandbox._mount_paths() + + task_fs_path = os.path.join(MOCK_MESOS_DIRECTORY, 'taskfs') + + assert mock_check_call.mock_calls == [ + mock.call([ + 'mount', + '--bind', + sandbox_directory, + os.path.join(task_fs_path, sandbox_mount_point[1:]) + ]) + ] http://git-wip-us.apache.org/repos/asf/aurora/blob/5cad046f/src/test/python/apache/aurora/executor/test_thermos_executor.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/executor/test_thermos_executor.py b/src/test/python/apache/aurora/executor/test_thermos_executor.py index 06601df..0bfe9e9 100644 --- a/src/test/python/apache/aurora/executor/test_thermos_executor.py +++ b/src/test/python/apache/aurora/executor/test_thermos_executor.py @@ -74,8 +74,8 @@ class FastStatusManager(StatusManager): class DefaultTestSandboxProvider(SandboxProvider): - def from_assigned_task(self, assigned_task): - return DirectorySandbox(safe_mkdtemp()) + def from_assigned_task(self, assigned_task, **kwargs): + return DirectorySandbox(safe_mkdtemp(), **kwargs) class FailingStartingTaskRunner(ThermosTaskRunner): @@ -84,7 +84,7 @@ class FailingStartingTaskRunner(ThermosTaskRunner): class FailingSandbox(DirectorySandbox): - def __init__(self, root, exception_type): + def __init__(self, root, exception_type, **kwargs): self._exception_type = exception_type super(FailingSandbox, self).__init__(root) @@ -96,8 +96,8 @@ class FailingSandboxProvider(SandboxProvider): def __init__(self, exception_type=DirectorySandbox.CreationError): self._exception_type = exception_type - def from_assigned_task(self, assigned_task): - return FailingSandbox(safe_mkdtemp(), exception_type=self._exception_type) + def from_assigned_task(self, assigned_task, **kwargs): + return FailingSandbox(safe_mkdtemp(), exception_type=self._exception_type, **kwargs) class SlowSandbox(DirectorySandbox): @@ -115,8 +115,8 @@ class SlowSandbox(DirectorySandbox): class SlowSandboxProvider(SandboxProvider): - def from_assigned_task(self, assigned_task): - return SlowSandbox(safe_mkdtemp()) + def from_assigned_task(self, assigned_task, **kwargs): + return SlowSandbox(safe_mkdtemp(), **kwargs) class ProxyDriver(object): http://git-wip-us.apache.org/repos/asf/aurora/blob/5cad046f/src/test/python/apache/thermos/core/test_process.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/thermos/core/test_process.py b/src/test/python/apache/thermos/core/test_process.py index 759f783..49f52d9 100644 --- a/src/test/python/apache/thermos/core/test_process.py +++ b/src/test/python/apache/thermos/core/test_process.py @@ -125,7 +125,8 @@ def test_simple_process_filesystem_isolator(): 0, taskpath, sandbox, - mesos_containerizer_path=test_isolator_path) + mesos_containerizer_path=test_isolator_path, + container_sandbox=sandbox) p.start() rc = wait_for_rc(taskpath.getpath('process_checkpoint')) @@ -133,8 +134,9 @@ def test_simple_process_filesystem_isolator(): assert_log_content( taskpath, 'stdout', - 'launch --unshare_namespace_mnt --rootfs=/some/path/taskfs --user=None ' - '--command={"shell":true,"value":"echo hello world"}\n') + 'launch --unshare_namespace_mnt --working_directory=%s --rootfs=/some/path/taskfs --user=None ' + '--command={"shell":true,"value":"/bin/bash -c \'echo hello world\'"}\n' % ( + sandbox)) @mock.patch('os.chown') http://git-wip-us.apache.org/repos/asf/aurora/blob/5cad046f/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora ---------------------------------------------------------------------- diff --git a/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora b/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora index b69ddf1..042424d 100644 --- a/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora +++ b/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora @@ -42,20 +42,34 @@ stage_server = Process( cmdline = '{{profile.cmd}}' ) -test_task = Task( +setup_env = Process( + name = 'setup_env', + cmdline='''cat <<EOF > .thermos_profile +export IT_WORKED=hello +EOF''' +) + +read_env = Process( + name = 'read_env', + cmdline = 'echo "$IT_WORKED"' +) + +test_task = SequentialTask( name = 'http_example', resources = Resources(cpu=0.5, ram=32*MB, disk=64*MB, gpu='{{profile.gpu}}'), - processes = [echo_ports, stage_server, run_server], - constraints = order(echo_ports, stage_server, run_server)) + processes = [setup_env, read_env, echo_ports, stage_server, run_server] +) -no_python_task = Task( +no_python_task = SequentialTask( name = 'http_example_no_python', resources = Resources(cpu=0.4, ram=32*MB, disk=64*MB), processes = [ + setup_env, + read_env, echo_ports, Process(name='run_server', cmdline='run-server.sh {{thermos.ports[http]}}'), - ], - constraints = order(echo_ports, run_server)) + ] +) update_config = UpdateConfig(watch_secs=10, batch_size=2) health_check_config = HealthCheckConfig(initial_interval_secs=5, interval_secs=1) http://git-wip-us.apache.org/repos/asf/aurora/blob/5cad046f/src/test/sh/org/apache/aurora/e2e/run-server.sh ---------------------------------------------------------------------- diff --git a/src/test/sh/org/apache/aurora/e2e/run-server.sh b/src/test/sh/org/apache/aurora/e2e/run-server.sh index 7693988..1fe0909 100755 --- a/src/test/sh/org/apache/aurora/e2e/run-server.sh +++ b/src/test/sh/org/apache/aurora/e2e/run-server.sh @@ -1,7 +1,7 @@ -#!/bin/sh +#!/bin/bash echo "Starting up server..." while true do - echo -e "HTTP/1.1 200 OK\n\n Hello from a filesystem image" | nc -l "$1" + echo -e "HTTP/1.1 200 OK\r\n\r\nHello from a filesystem image." | nc -l "$1" done http://git-wip-us.apache.org/repos/asf/aurora/blob/5cad046f/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh ---------------------------------------------------------------------- diff --git a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh index 0404d0e..1a668dd 100755 --- a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh +++ b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh @@ -317,6 +317,12 @@ test_discovery_info() { fi } +test_thermos_profile() { + read_env_output=$(aurora task ssh $_jobkey/0 --command='tail -1 .logs/read_env/0/stdout' |tr -d '\r\n' 2>/dev/null) + echo "$read_env_output" + [[ "$read_env_output" = "hello" ]] +} + test_http_example() { local _cluster=$1 _role=$2 _env=$3 local _base_config=$4 _updated_config=$5 @@ -335,6 +341,7 @@ test_http_example() { test_scheduler_ui $_role $_env $_job test_observer_ui $_cluster $_role $_job test_discovery_info $_task_id_prefix $_discovery_name + test_thermos_profile $_jobkey test_restart $_jobkey test_update $_jobkey $_updated_config $_cluster $_bind_parameters test_update_fail $_jobkey $_base_config $_cluster $_bad_healthcheck_config $_bind_parameters
