Repository: aurora Updated Branches: refs/heads/master acd55ff52 -> 41c71d9f6
Preserve env variables for tasks in docker. Bugs closed: AURORA-1540 Reviewed at https://reviews.apache.org/r/41201/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/41c71d9f Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/41c71d9f Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/41c71d9f Branch: refs/heads/master Commit: 41c71d9f68d3159f3e2ce847f22a47d6309c67c6 Parents: acd55ff Author: Kasisnu Singh <[email protected]> Authored: Mon Dec 28 16:12:59 2015 -0800 Committer: Bill Farner <[email protected]> Committed: Mon Dec 28 16:12:59 2015 -0800 ---------------------------------------------------------------------- NEWS | 2 ++ .../executor/bin/thermos_executor_main.py | 14 ++++++++-- .../aurora/executor/thermos_task_runner.py | 12 +++++++-- src/main/python/apache/thermos/core/process.py | 13 +++++++-- src/main/python/apache/thermos/core/runner.py | 9 +++++-- .../apache/thermos/runner/thermos_runner.py | 11 +++++++- .../aurora/executor/test_thermos_task_runner.py | 28 ++++++++++++++++++-- .../python/apache/thermos/core/test_process.py | 23 ++++++++++++++++ 8 files changed, 101 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/41c71d9f/NEWS ---------------------------------------------------------------------- diff --git a/NEWS b/NEWS index f56f1e8..394b31c 100644 --- a/NEWS +++ b/NEWS @@ -2,6 +2,8 @@ ------ - Removed the deprecated field 'ConfigGroup.instanceIds' from the API. - Upgraded Mesos to 0.25.0. +- Env variables can be passed through to task processes by passing `--preserve_env` + to thermos. 0.11.0 ------ http://git-wip-us.apache.org/repos/asf/aurora/blob/41c71d9f/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py index 7b7ef4b..4e9b027 100644 --- a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py +++ b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py @@ -120,6 +120,14 @@ app.add_option( help='Maximum number of rotated stdout/stderr logs emitted by the thermos runner.') +app.add_option( + "--preserve_env", + dest="preserve_env", + default=False, + action='store_true', + help="Preserve thermos runners' environment variables for the task being run.") + + # TODO(wickman) Consider just having the OSS version require pip installed # thermos_runner binaries on every machine and instead of embedding the pex # as a resource, shell out to one on the PATH. @@ -168,7 +176,8 @@ def initialize(options): artifact_dir=cwd_path, process_logger_mode=options.runner_logger_mode, rotate_log_size_mb=options.runner_rotate_log_size_mb, - rotate_log_backups=options.runner_rotate_log_backups + rotate_log_backups=options.runner_rotate_log_backups, + preserve_env=options.preserve_env ) thermos_runner_provider.set_role(None) @@ -184,7 +193,8 @@ def initialize(options): artifact_dir=cwd_path, process_logger_mode=options.runner_logger_mode, rotate_log_size_mb=options.runner_rotate_log_size_mb, - rotate_log_backups=options.runner_rotate_log_backups + rotate_log_backups=options.runner_rotate_log_backups, + preserve_env=options.preserve_env ) thermos_executor = AuroraExecutor( http://git-wip-us.apache.org/repos/asf/aurora/blob/41c71d9f/src/main/python/apache/aurora/executor/thermos_task_runner.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/thermos_task_runner.py b/src/main/python/apache/aurora/executor/thermos_task_runner.py index 25fcca2..c019fc9 100644 --- a/src/main/python/apache/aurora/executor/thermos_task_runner.py +++ b/src/main/python/apache/aurora/executor/thermos_task_runner.py @@ -75,7 +75,8 @@ class ThermosTaskRunner(TaskRunner): hostname=None, process_logger_mode=None, rotate_log_size_mb=None, - rotate_log_backups=None): + rotate_log_backups=None, + preserve_env=False): """ runner_pex location of the thermos_runner pex that this task runner should use task_id task_id assigned by scheduler @@ -86,6 +87,7 @@ class ThermosTaskRunner(TaskRunner): checkpoint_root the checkpoint root for the thermos runner artifact_dir scratch space for the thermos runner (basically cwd of thermos.pex) clock clock + preserve_env """ self._runner_pex = runner_pex self._task_id = task_id @@ -97,6 +99,7 @@ class ThermosTaskRunner(TaskRunner): self._root = sandbox.root self._checkpoint_root = checkpoint_root self._enable_chroot = sandbox.chrooted + self._preserve_env = preserve_env self._role = role self._clock = clock self._artifact_dir = artifact_dir or safe_mkdtemp() @@ -252,6 +255,8 @@ class ThermosTaskRunner(TaskRunner): '--%s=%s' % (flag, value) for flag, value in params.items() if value is not None) if self._enable_chroot: cmdline_args.extend(['--enable_chroot']) + if self._preserve_env: + cmdline_args.extend(['--preserve_env']) for name, port in self._ports.items(): cmdline_args.extend(['--port=%s:%s' % (name, port)]) return cmdline_args @@ -348,6 +353,7 @@ class DefaultThermosTaskRunnerProvider(TaskRunnerProvider): pex_location, checkpoint_root, artifact_dir=None, + preserve_env=False, task_runner_class=ThermosTaskRunner, max_wait=Amount(1, Time.MINUTES), preemption_wait=Amount(1, Time.MINUTES), @@ -358,6 +364,7 @@ class DefaultThermosTaskRunnerProvider(TaskRunnerProvider): rotate_log_backups=None): self._artifact_dir = artifact_dir or safe_mkdtemp() self._checkpoint_root = checkpoint_root + self._preserve_env = preserve_env self._clock = clock self._max_wait = max_wait self._pex_location = pex_location @@ -398,7 +405,8 @@ class DefaultThermosTaskRunnerProvider(TaskRunnerProvider): hostname=assigned_task.slaveHost, process_logger_mode=self._process_logger_mode, rotate_log_size_mb=self._rotate_log_size_mb, - rotate_log_backups=self._rotate_log_backups) + rotate_log_backups=self._rotate_log_backups, + preserve_env=self._preserve_env) return HttpLifecycleManager.wrap(runner, mesos_task, mesos_ports) http://git-wip-us.apache.org/repos/asf/aurora/blob/41c71d9f/src/main/python/apache/thermos/core/process.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/core/process.py b/src/main/python/apache/thermos/core/process.py index 8efdfdc..8a181b0 100644 --- a/src/main/python/apache/thermos/core/process.py +++ b/src/main/python/apache/thermos/core/process.py @@ -30,6 +30,7 @@ import subprocess import sys import time from abc import abstractmethod +from copy import deepcopy from twitter.common import log from twitter.common.dirutil import lock_file, safe_delete, safe_mkdir, safe_open @@ -315,10 +316,12 @@ class Process(ProcessBase): Takes additional arguments: fork: the fork function to use [default: os.fork] chroot: whether or not to chroot into the sandbox [default: False] + preserve_env: whether or not to preserve env variables for the task [default: False] """ fork = kw.pop('fork', os.fork) self._use_chroot = bool(kw.pop('chroot', False)) self._rc = None + self._preserve_env = bool(kw.pop('preserve_env', False)) kw['platform'] = RealPlatform(fork=fork) ProcessBase.__init__(self, *args, **kw) if self._use_chroot and self._sandbox is None: @@ -363,12 +366,18 @@ class Process(ProcessBase): sandbox = self._sandbox if not self._use_chroot else '/' thermos_profile = os.path.join(sandbox, self.RCFILE) - env = { + + if self._preserve_env: + env = deepcopy(os.environ) + else: + env = {} + + env.update({ 'HOME': homedir if self._use_chroot else sandbox, 'LOGNAME': username, 'USER': username, 'PATH': os.environ['PATH'] - } + }) if os.path.exists(thermos_profile): env.update(BASH_ENV=thermos_profile) http://git-wip-us.apache.org/repos/asf/aurora/blob/41c71d9f/src/main/python/apache/thermos/core/runner.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/core/runner.py b/src/main/python/apache/thermos/core/runner.py index 11c06a8..5623dce 100644 --- a/src/main/python/apache/thermos/core/runner.py +++ b/src/main/python/apache/thermos/core/runner.py @@ -419,7 +419,8 @@ class TaskRunner(object): def __init__(self, task, checkpoint_root, sandbox, log_dir=None, task_id=None, portmap=None, user=None, chroot=False, clock=time, universal_handler=None, planner_class=TaskPlanner, hostname=None, - process_logger_mode=None, rotate_log_size_mb=None, rotate_log_backups=None): + process_logger_mode=None, rotate_log_size_mb=None, rotate_log_backups=None, + preserve_env=False): """ required: task (config.Task) = the task to run @@ -444,6 +445,8 @@ class TaskRunner(object): process_logger_mode (string) = The type of logger to use for all processes. rotate_log_size_mb (integer) = The maximum size of the rotated stdout/stderr logs in MiB. rotate_log_backups (integer) = The maximum number of rotated stdout/stderr log backups. + preserve_env (boolean) = whether or not env variables for the runner should be in the + env for the task being run """ if not issubclass(planner_class, TaskPlanner): raise TypeError('planner_class must be a TaskPlanner.') @@ -504,6 +507,7 @@ class TaskRunner(object): self._preemption_deadline = None self._watcher = ProcessMuxer(self._pathspec) self._state = RunnerState(processes={}) + self._preserve_env = preserve_env # create runner state universal_handler = universal_handler or TaskRunnerUniversalHandler @@ -708,7 +712,8 @@ class TaskRunner(object): fork=close_ckpt_and_fork, logger_mode=logger_mode, rotate_log_size=rotate_log_size, - rotate_log_backups=rotate_log_backups) + rotate_log_backups=rotate_log_backups, + preserve_env=self._preserve_env) def _build_process_logger_args(self, process): """ http://git-wip-us.apache.org/repos/asf/aurora/blob/41c71d9f/src/main/python/apache/thermos/runner/thermos_runner.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/runner/thermos_runner.py b/src/main/python/apache/thermos/runner/thermos_runner.py index a36bd2a..3dacd45 100644 --- a/src/main/python/apache/thermos/runner/thermos_runner.py +++ b/src/main/python/apache/thermos/runner/thermos_runner.py @@ -83,6 +83,14 @@ app.add_option( app.add_option( + "--preserve_env", + dest="preserve_env", + default=False, + action='store_true', + help="Preserve thermos runners' environment variables for the task being run.") + + +app.add_option( "--port", type='string', nargs=1, @@ -193,7 +201,8 @@ def proxy_main(args, opts): hostname=opts.hostname, process_logger_mode=opts.process_logger_mode, rotate_log_size_mb=opts.rotate_log_size_mb, - rotate_log_backups=opts.rotate_log_backups + rotate_log_backups=opts.rotate_log_backups, + preserve_env=opts.preserve_env ) for sig in (signal.SIGUSR1, signal.SIGUSR2): http://git-wip-us.apache.org/repos/asf/aurora/blob/41c71d9f/src/test/python/apache/aurora/executor/test_thermos_task_runner.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/executor/test_thermos_task_runner.py b/src/test/python/apache/aurora/executor/test_thermos_task_runner.py index 789d2bf..1b92667 100644 --- a/src/test/python/apache/aurora/executor/test_thermos_task_runner.py +++ b/src/test/python/apache/aurora/executor/test_thermos_task_runner.py @@ -85,7 +85,7 @@ class TestThermosTaskRunnerIntegration(object): print('Saved thermos executor at %s' % cls.PEX_PATH) @contextlib.contextmanager - def yield_runner(self, runner_class, portmap=None, clock=time, **bindings): + def yield_runner(self, runner_class, portmap=None, clock=time, preserve_env=False, **bindings): with contextlib.nested(temporary_dir(), temporary_dir()) as (td1, td2): sandbox = DirectorySandbox(td1) checkpoint_root = td2 @@ -101,15 +101,18 @@ class TestThermosTaskRunnerIntegration(object): clock=clock, sandbox=sandbox, checkpoint_root=checkpoint_root, + preserve_env=preserve_env, ) yield task_runner - def yield_sleepy(self, runner_class, sleep, exit_code, portmap={}, clock=time): + def yield_sleepy(self, runner_class, sleep, exit_code, portmap={}, clock=time, + preserve_env=False): return self.yield_runner( runner_class, portmap=portmap, clock=clock, + preserve_env=preserve_env, command='sleep {{__sleep}} && exit {{__exit_code}}', __sleep=sleep, __exit_code=exit_code) @@ -324,3 +327,24 @@ class TestThermosTaskRunnerIntegration(object): status = task_runner.compute_status() assert 'killed by signal 9' in status.reason assert status.status is mesos_pb2.TASK_KILLED + + def test_thermos_preserve_env(self): + with self.yield_sleepy( + ThermosTaskRunner, + preserve_env=True, + sleep=0, + exit_code=0) as task_runner: + + task_runner.start() + task_runner.forked.wait() + + self.run_to_completion(task_runner) + + assert task_runner.status is not None + assert task_runner.status.status == mesos_pb2.TASK_FINISHED + + # no-op + task_runner.stop() + + assert task_runner.status is not None + assert task_runner.status.status == mesos_pb2.TASK_FINISHED http://git-wip-us.apache.org/repos/asf/aurora/blob/41c71d9f/src/test/python/apache/thermos/core/test_process.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/thermos/core/test_process.py b/src/test/python/apache/thermos/core/test_process.py index 261371d..da4c494 100644 --- a/src/test/python/apache/thermos/core/test_process.py +++ b/src/test/python/apache/thermos/core/test_process.py @@ -267,3 +267,26 @@ def assert_log_content(taskpath, log_name, expected_content): def assert_log_dne(taskpath, log_name): log = taskpath.with_filename(log_name).getpath('process_logdir') assert not os.path.exists(log) + + [email protected]('os.environ', values={'PATH': 'SOME_PATH', 'TEST': 'A_TEST_VAR'}, clear=True) +def test_preserve_env(*mocks): + + scenarios = [ + ('PATH', True, 'SOME_PATH'), + ('TEST', True, 'A_TEST_VAR'), + ('PATH', False, 'SOME_PATH'), + ('TEST', False, ''), + ] + + for var, preserve, expectation in scenarios: + with temporary_dir() as td: + taskpath = make_taskpath(td) + sandbox = setup_sandbox(td, taskpath) + + p = TestProcess('process', 'echo $' + var, 0, taskpath, sandbox, preserve_env=preserve) + p.start() + rc = wait_for_rc(taskpath.getpath('process_checkpoint')) + + assert rc == 0 + assert_log_content(taskpath, 'stdout', expectation + '\n')
