Repository: aurora Updated Branches: refs/heads/master b3f88bc9c -> 205c308c5
Fix Process log configuration handling. Previously flagged configuration of Process logging mode would blow up and claimed defaulting of the rotation policy did not occur. Bugs closed: AURORA-1724 Reviewed at https://reviews.apache.org/r/49399/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/205c308c Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/205c308c Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/205c308c Branch: refs/heads/master Commit: 205c308c5463944b7c86c82a3e90fa66b7581d6d Parents: b3f88bc Author: John Sirois <[email protected]> Authored: Fri Jul 1 13:35:11 2016 -0600 Committer: John Sirois <[email protected]> Committed: Fri Jul 1 13:35:11 2016 -0600 ---------------------------------------------------------------------- docs/operations/configuration.md | 9 +- docs/reference/configuration.md | 33 ++- .../executor/bin/thermos_executor_main.py | 21 +- src/main/python/apache/thermos/core/runner.py | 40 ++-- .../python/apache/thermos/testing/runner.py | 16 +- src/test/python/apache/thermos/core/BUILD | 2 + .../thermos/core/test_runner_log_config.py | 230 +++++++++++++++++++ 7 files changed, 293 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/205c308c/docs/operations/configuration.md ---------------------------------------------------------------------- diff --git a/docs/operations/configuration.md b/docs/operations/configuration.md index e332f86..0615c54 100644 --- a/docs/operations/configuration.md +++ b/docs/operations/configuration.md @@ -103,10 +103,10 @@ Maximum number of backups to retain before deleting the oldest backup(s). ## Process Logs ### Log destination -By default, Thermos will write process stdout/stderr to log files in the sandbox. Process object configuration -allows specifying alternate log file destinations like streamed stdout/stderr or suppression of all log output. -Default behavior can be configured for the entire cluster with the following flag (through the `-thermos_executor_flags` -argument to the Aurora scheduler): +By default, Thermos will write process stdout/stderr to log files in the sandbox. Process object +configuration allows specifying alternate log file destinations like streamed stdout/stderr or +suppression of all log output. Default behavior can be configured for the entire cluster with the +following flag (through the `-thermos_executor_flags` argument to the Aurora scheduler): --runner-logger-destination=both @@ -130,7 +130,6 @@ reach 100 MiB in size and keep a maximum of 10 backups. If a user has provided a their process, it will override these default settings. - ## Thermos Executor Wrapper If you need to do computation before starting the thermos executor (for example, setting a different http://git-wip-us.apache.org/repos/asf/aurora/blob/205c308c/docs/reference/configuration.md ---------------------------------------------------------------------- diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md index c4b1d38..64c076d 100644 --- a/docs/reference/configuration.md +++ b/docs/reference/configuration.md @@ -121,30 +121,27 @@ schedule. #### logger -The default behavior of Thermos is to store stderr/stdout logs in files which grow unbounded. -In the event that you have large log volume, you may want to configure Thermos to automatically rotate logs -after they grow to a certain size, which can prevent your job from using more than its allocated -disk space. - -A Logger union consists of a destination enum, a mode enum and a rotation policy. -It's to set where the process logs should be sent using `destination`. Default -option is `file`. Its also possible to specify `console` to get logs output -to stdout/stderr, `none` to suppress any logs output or `both` to send logs to files and -console output. In case of using `none` or `console` rotation attributes are ignored. -Rotation policies only apply to loggers whose mode is `rotate`. The acceptable values -for the LoggerMode enum are `standard` and `rotate`. The rotation policy applies to both -stderr and stdout. - -By default, all processes use the `standard` LoggerMode. +The default behavior of Thermos is to store stderr/stdout logs in files which grow unbounded. +In the event that you have large log volume, you may want to configure Thermos to automatically +rotate logs after they grow to a certain size, which can prevent your job from using more than its +allocated disk space. + +Logger objects specify a `destination` for Process logs which is, by default, `file` - a pair of +`stdout` and `stderr` files. Its also possible to specify `console` to get logs output to +the Process stdout and stderr streams, `none` to suppress any logs output or `both` to send logs to +files and console streams. + +The default Logger `mode` is `standard` which lets the stdout and stderr streams grow without bound. **Attribute Name** | **Type** | **Description** ------------------- | :---------------: | --------------------------------- **destination** | LoggerDestination | Destination of logs. (Default: `file`) **mode** | LoggerMode | Mode of the logger. (Default: `standard`) - **rotate** | RotatePolicy | An optional rotation policy. + **rotate** | RotatePolicy | An optional rotation policy. (Default: `Empty`) -A RotatePolicy describes log rotation behavior for when `mode` is set to `rotate`. It is ignored -otherwise. +A RotatePolicy describes log rotation behavior for when `mode` is set to `rotate` and it is ignored +otherwise. If `rotate` is `Empty` or `RotatePolicy()` when the `mode` is set to `rotate` the +defaults below are used. **Attribute Name** | **Type** | **Description** ------------------- | :----------: | --------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/205c308c/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py index 203fc47..0ef3856 100644 --- a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py +++ b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py @@ -55,9 +55,6 @@ LogOptions.set_simple(True) LogOptions.set_disk_log_level('DEBUG') LogOptions.set_log_dir(CWD) -_LOGGER_DESTINATIONS = ', '.join(LoggerDestination.VALUES) -_LOGGER_MODES = ', '.join(LoggerMode.VALUES) - app.add_option( '--announcer-ensemble', @@ -88,8 +85,7 @@ app.add_option( type=str, default=None, help='Set hostname to be announced. By default it is' - 'the --hostname argument passed into the Mesos agent.' -) + 'the --hostname argument passed into the Mesos agent.') app.add_option( '--announcer-zookeeper-auth-config', @@ -118,25 +114,22 @@ app.add_option( dest='nosetuid_health_checks', action="store_true", help='If set, the executor will not run shell health checks as job\'s role\'s user', - default=False -) + default=False) app.add_option( '--runner-logger-destination', dest='runner_logger_destination', - type=str, - default='file', - help='The logger destination [%s] to use for all processes run by thermos.' - % _LOGGER_DESTINATIONS) + choices=LoggerDestination.VALUES, + help='The logger destination %r to use for all processes run by thermos.' + % (LoggerDestination.VALUES,)) app.add_option( '--runner-logger-mode', dest='runner_logger_mode', - type=str, - default=None, - help='The logger mode [%s] to use for all processes run by thermos.' % _LOGGER_MODES) + choices=LoggerMode.VALUES, + help='The logger mode %r to use for all processes run by thermos.' % (LoggerMode.VALUES,)) app.add_option( http://git-wip-us.apache.org/repos/asf/aurora/blob/205c308c/src/main/python/apache/thermos/core/runner.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/core/runner.py b/src/main/python/apache/thermos/core/runner.py index 3ebf86e..fe971ed 100644 --- a/src/main/python/apache/thermos/core/runner.py +++ b/src/main/python/apache/thermos/core/runner.py @@ -66,11 +66,11 @@ from apache.thermos.config.loader import ( ThermosTaskValidator, ThermosTaskWrapper ) -from apache.thermos.config.schema import ThermosContext +from apache.thermos.config.schema import Logger, RotatePolicy, ThermosContext from .helper import TaskRunnerHelper from .muxer import ProcessMuxer -from .process import LoggerDestination, LoggerMode, Process +from .process import LoggerMode, Process from gen.apache.thermos.ttypes import ( ProcessState, @@ -722,6 +722,9 @@ class TaskRunner(object): rotate_log_backups=rotate_log_backups, preserve_env=self._preserve_env) + _DEFAULT_LOGGER = Logger() + _DEFAULT_ROTATION = RotatePolicy() + def _build_process_logger_args(self, process): """ Build the appropriate logging configuration based on flags + process @@ -730,27 +733,36 @@ class TaskRunner(object): If no configuration (neither flags nor process config), default to "standard" mode. """ - destination, mode, size, backups = None, None, None, None + + destination, mode, size, backups = (self._DEFAULT_LOGGER.destination().get(), + self._DEFAULT_LOGGER.mode().get(), + None, + None) + logger = process.logger() if logger is Empty: if self._process_logger_destination: destination = self._process_logger_destination - else: - destination = LoggerDestination.FILE - if self._process_logger_mode: - mode = self._process_logger_mode, - size = Amount(self._rotate_log_size_mb, Data.MB) - backups = self._rotate_log_backups - else: - mode = LoggerMode.STANDARD + mode = self._process_logger_mode else: destination = logger.destination().get() mode = logger.mode().get() - if mode == LoggerMode.ROTATE: + + if mode == LoggerMode.ROTATE: + size = Amount(self._DEFAULT_ROTATION.log_size().get(), Data.BYTES) + backups = self._DEFAULT_ROTATION.backups().get() + if logger is Empty: + if self._rotate_log_size_mb: + size = Amount(self._rotate_log_size_mb, Data.MB) + if self._rotate_log_backups: + backups = self._rotate_log_backups + else: rotate = logger.rotate() - size = Amount(rotate.log_size().get(), Data.BYTES) - backups = rotate.backups().get() + if rotate is not Empty: + size = Amount(rotate.log_size().get(), Data.BYTES) + backups = rotate.backups().get() + return destination, mode, size, backups def deadlocked(self, plan=None): http://git-wip-us.apache.org/repos/asf/aurora/blob/205c308c/src/main/python/apache/thermos/testing/runner.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/testing/runner.py b/src/main/python/apache/thermos/testing/runner.py index 8b6ba73..0560891 100644 --- a/src/main/python/apache/thermos/testing/runner.py +++ b/src/main/python/apache/thermos/testing/runner.py @@ -70,10 +70,8 @@ class AngryHandler(TaskRunnerUniversalHandler): sys.exit(1) sandbox = os.path.join('%(sandbox)s', '%(task_id)s') -args = {} +args = %(extra_task_runner_args)r args['task_id'] = '%(task_id)s' -if %(portmap)s: - args['portmap'] = %(portmap)s args['universal_handler'] = AngryHandler runner = TaskRunner(task, '%(root)s', sandbox, **args) @@ -83,7 +81,7 @@ with open('%(state_filename)s', 'w') as fp: fp.write(thrift_serialize(runner.state)) """ - def __init__(self, task, portmap={}, success_rate=100, random_seed=31337): + def __init__(self, task, success_rate=100, random_seed=31337, **extra_task_runner_args): """ task = Thermos task portmap = port map @@ -99,7 +97,7 @@ with open('%(state_filename)s', 'w') as fp: self.tempdir = tempfile.mkdtemp() self.task_id = '%s-runner-base' % int(time.time() * 1000000) self.sandbox = os.path.join(self.tempdir, 'sandbox') - self.portmap = portmap + self.extra_task_runner_args = extra_task_runner_args self.cleaned = False self.pathspec = TaskPath(root=self.tempdir, task_id=self.task_id) self.script_filename = None @@ -130,9 +128,9 @@ with open('%(state_filename)s', 'w') as fp: 'root': self.tempdir, 'task_id': self.task_id, 'state_filename': self.state_filename, - 'portmap': repr(self.portmap), 'success_rate': self.success_rate, 'random_seed': self.random_seed + self._run_count, + 'extra_task_runner_args': self.extra_task_runner_args, }) with environment_as(PYTHONPATH=os.pathsep.join(sys.path)): @@ -193,12 +191,16 @@ with open('%(state_filename)s', 'w') as fp: class RunnerTestBase(object): @classmethod + def extra_task_runner_args(cls): + return dict(portmap=getattr(cls, 'portmap', {})) + + @classmethod def task(cls): raise NotImplementedError @classmethod def setup_class(cls): - cls.runner = Runner(cls.task(), portmap=getattr(cls, 'portmap', {})) + cls.runner = Runner(cls.task(), **cls.extra_task_runner_args()) cls.runner.run() cls.state = cls.runner.state http://git-wip-us.apache.org/repos/asf/aurora/blob/205c308c/src/test/python/apache/thermos/core/BUILD ---------------------------------------------------------------------- diff --git a/src/test/python/apache/thermos/core/BUILD b/src/test/python/apache/thermos/core/BUILD index acfb79b..957a116 100644 --- a/src/test/python/apache/thermos/core/BUILD +++ b/src/test/python/apache/thermos/core/BUILD @@ -19,7 +19,9 @@ python_tests( '3rdparty/python:mock', '3rdparty/python:psutil', '3rdparty/python:twitter.common.contextutil', + '3rdparty/python:twitter.common.quantity', '3rdparty/python:twitter.common.process', + 'src/main/python/apache/thermos/config', 'src/main/python/apache/thermos/core', 'src/main/python/apache/thermos/monitoring', 'src/main/python/apache/thermos/testing', http://git-wip-us.apache.org/repos/asf/aurora/blob/205c308c/src/test/python/apache/thermos/core/test_runner_log_config.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/thermos/core/test_runner_log_config.py b/src/test/python/apache/thermos/core/test_runner_log_config.py new file mode 100644 index 0000000..4381296 --- /dev/null +++ b/src/test/python/apache/thermos/core/test_runner_log_config.py @@ -0,0 +1,230 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import struct +from collections import namedtuple + +from twitter.common.quantity import Amount, Data + +from apache.thermos.config.schema_base import Logger, Process, RotatePolicy, Task +from apache.thermos.testing.runner import RunnerTestBase + + +class LogConfig(namedtuple('LogConfig', ['destination', 'mode', 'size_mb', 'backups'])): + @classmethod + def create(cls, destination=None, mode=None, size_mb=None, backups=None): + return cls(destination=destination, mode=mode, size_mb=size_mb, backups=backups) + + +class RunnerLogConfigTestBase(RunnerTestBase): + _OPT_CONFIG_PROCESS_NAME = 'opt-log-config-process' + _CUSTOM_CONFIG_PROCESS_NAME = 'custom-log-config-process' + + @classmethod + def opt_config(cls): + return LogConfig.create() + + @classmethod + def custom_config(cls): + return None + + @classmethod + def log_size(cls): + return Amount(1, Data.BYTES) + + STDOUT = 1 + STDERR = 2 + + @classmethod + def log_fd(cls): + return cls.STDOUT + + @classmethod + def extra_task_runner_args(cls): + opt_config = cls.opt_config() + return dict(process_logger_destination=opt_config.destination, + process_logger_mode=opt_config.mode, + rotate_log_size_mb=opt_config.size_mb, + rotate_log_backups=opt_config.backups) + + @classmethod + def task(cls): + cmdline = 'head -c %d /dev/zero >&%d' % (cls.log_size().as_(Data.BYTES), cls.log_fd()) + + opt_config_process = Process(name=cls._OPT_CONFIG_PROCESS_NAME, cmdline=cmdline) + + custom_config_process = Process(name=cls._CUSTOM_CONFIG_PROCESS_NAME, cmdline=cmdline) + custom_config = cls.custom_config() + if custom_config: + logger = Logger() + if custom_config.destination: + logger = logger(destination=custom_config.destination) + if custom_config.mode: + logger = logger(mode=custom_config.mode) + if custom_config.size_mb or custom_config.backups: + rotate = RotatePolicy() + if custom_config.size_mb: + rotate = rotate(log_size=custom_config.size_mb) + if custom_config.backups: + rotate = rotate(backups=custom_config.backups) + logger = logger(rotate=rotate) + custom_config_process = custom_config_process(logger=logger) + + return Task(name='log-config-task', processes=[opt_config_process, custom_config_process]) + + class Assert(object): + def __init__(self, log_file_dir): + self._log_file_dir = log_file_dir + + def log_file_names(self, *names): + actual_names = os.listdir(self._log_file_dir) + assert set(actual_names) == set(names) + assert all(os.path.isfile(os.path.join(self._log_file_dir, name)) for name in names) + + def log_file(self, name): + expected_output_file = os.path.join(self._log_file_dir, name) + assert os.path.exists(expected_output_file) + with open(expected_output_file, 'rb') as fp: + actual = fp.read() + size = len(actual) + assert actual == struct.pack('B', 0) * size + return size + + def empty_log_file(self, name): + size = self.log_file(name=name) + assert 0 == size + + def _log_dir_name(self, process_name): + return os.path.join(self.state.header.log_dir, process_name, '0') + + @property + def opt_assert(self): + return self.Assert(self._log_dir_name(self._OPT_CONFIG_PROCESS_NAME)) + + @property + def custom_assert(self): + return self.Assert(self._log_dir_name(self._CUSTOM_CONFIG_PROCESS_NAME)) + + +class StandardTestBase(RunnerLogConfigTestBase): + @classmethod + def log_size(cls): + return Amount(200, Data.MB) + + def test_log_config(self): + log, empty = ('stdout', 'stderr') if self.log_fd() == self.STDOUT else ('stderr', 'stdout') + for assertions in self.opt_assert, self.custom_assert: + assertions.log_file_names(log, empty) + assertions.empty_log_file(name=empty) + + # No rotation should occur in standard mode. + size = assertions.log_file(name=log) + assert size == self.log_size().as_(Data.BYTES) + + +class TestStandardStdout(StandardTestBase): + @classmethod + def log_fd(cls): + return cls.STDOUT + + +class TestStandardStderr(StandardTestBase): + @classmethod + def log_fd(cls): + return cls.STDERR + + +class RotateTestBase(RunnerLogConfigTestBase): + @classmethod + def opt_config(cls): + return LogConfig.create(mode='rotate', size_mb=1, backups=1) + + +class TestRotateUnderStdout(RotateTestBase): + def test_log_config(self): + self.opt_assert.log_file_names('stdout', 'stderr') + self.opt_assert.empty_log_file(name='stderr') + self.opt_assert.log_file(name='stdout') + + +class TestRotateUnderStderr(RotateTestBase): + @classmethod + def log_fd(cls): + return cls.STDERR + + def test_log_config(self): + self.opt_assert.log_file_names('stdout', 'stderr') + self.opt_assert.empty_log_file(name='stdout') + self.opt_assert.log_file(name='stderr') + + +class TestRotateOverStdout(RotateTestBase): + @classmethod + def log_size(cls): + return Amount(2, Data.MB) + + def test_log_config(self): + self.opt_assert.log_file_names('stdout', 'stdout.1', 'stderr') + self.opt_assert.empty_log_file(name='stderr') + self.opt_assert.log_file(name='stdout') + self.opt_assert.log_file(name='stdout.1') + + +class TestRotateOverStderr(RotateTestBase): + @classmethod + def log_size(cls): + return Amount(3, Data.MB) + + @classmethod + def log_fd(cls): + return cls.STDERR + + def test_log_config(self): + self.opt_assert.log_file_names('stdout', 'stderr', 'stderr.1') + self.opt_assert.empty_log_file(name='stdout') + self.opt_assert.log_file(name='stderr') + self.opt_assert.log_file(name='stderr.1') + + +class TestRotateDefaulted(RunnerLogConfigTestBase): + @classmethod + def opt_config(cls): + return LogConfig.create(mode='rotate') + + @classmethod + def custom_config(cls): + return LogConfig.create(mode='rotate') + + @classmethod + def log_size(cls): + # Default rotation policy is 100MiB with 5 backups so this guarantees a full rotation. + return Amount(700, Data.MB) + + def test_log_config(self): + for assertions in self.opt_assert, self.custom_assert: + assertions.log_file_names('stderr', + 'stdout', + 'stdout.1', + 'stdout.2', + 'stdout.3', + 'stdout.4', + 'stdout.5') + assertions.empty_log_file(name='stderr') + assertions.log_file(name='stdout') + assertions.log_file(name='stdout.1') + assertions.log_file(name='stdout.2') + assertions.log_file(name='stdout.3') + assertions.log_file(name='stdout.4') + assertions.log_file(name='stdout.5')
