Repository: aurora Updated Branches: refs/heads/master c0fdcdcc6 -> 3c33f663f
Implements log rotation in the Thermos runner. Bugs closed: AURORA-95 Reviewed at https://reviews.apache.org/r/30695/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/3c33f663 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/3c33f663 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/3c33f663 Branch: refs/heads/master Commit: 3c33f663f70c9ef9026a35e3ca2e2b9360719b06 Parents: c0fdcdc Author: George Sirois <[email protected]> Authored: Thu Dec 17 14:02:59 2015 -0800 Committer: Bill Farner <[email protected]> Committed: Thu Dec 17 14:02:59 2015 -0800 ---------------------------------------------------------------------- NEWS | 3 + docs/configuration-reference.md | 38 +++ docs/deploying-aurora-scheduler.md | 18 +- .../executor/bin/thermos_executor_main.py | 34 ++- .../aurora/executor/thermos_task_runner.py | 29 +- .../python/apache/thermos/config/schema_base.py | 28 +- src/main/python/apache/thermos/core/process.py | 300 +++++++++++++++++-- src/main/python/apache/thermos/core/runner.py | 49 ++- .../apache/thermos/runner/thermos_runner.py | 27 ++ .../python/apache/thermos/core/test_process.py | 92 +++++- 10 files changed, 574 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/3c33f663/NEWS ---------------------------------------------------------------------- diff --git a/NEWS b/NEWS index 8b5a1a1..066925e 100644 --- a/NEWS +++ b/NEWS @@ -16,6 +16,9 @@ - Added support for taking in an executor configuration in JSON via a command line argument `--custom_executor_config` which will override all other the command line arguments and default values pertaining to the executor. +- Log rotation has been added to the thermos runner. See the configuration reference for details + on how configure rotation per-process. Command line options may also be passed through the + scheduler in order to configure the global default behavior. 0.10.0 ------ http://git-wip-us.apache.org/repos/asf/aurora/blob/3c33f663/docs/configuration-reference.md ---------------------------------------------------------------------- diff --git a/docs/configuration-reference.md b/docs/configuration-reference.md index 12c8021..cf63cfa 100644 --- a/docs/configuration-reference.md +++ b/docs/configuration-reference.md @@ -12,6 +12,7 @@ Aurora + Thermos Configuration Reference - [ephemeral](#ephemeral) - [min_duration](#min_duration) - [final](#final) + - [logger](#logger) - [Task Schema](#task-schema) - [Task Object](#task-object) - [name](#name-1) @@ -72,6 +73,7 @@ behavior with its optional attributes. Remember, Processes are handled by Thermo **ephemeral** | Boolean | When True, this is an ephemeral process. (Default: False) **min_duration** | Integer | Minimum duration between process restarts in seconds. (Default: 15) **final** | Boolean | When True, this process is a finalizing one that should run last. (Default: False) + **logger** | Logger | Struct defining the log behavior for the process. (Default: Empty) #### name @@ -144,6 +146,42 @@ vice-versa, however finalizing processes may depend upon other finalizing processes and otherwise run as a typical process schedule. +#### logger + +The default behavior of Thermos is to allow stderr/stdout logs to grow unbounded. In the event +that you have large log volume, you may want to configure Thermos to automatically rotate logs +after they grow to a certain size, which can prevent your job from using more than its allocated +disk space. + +A Logger union consists of a mode enum and a rotation policy. Rotation policies only apply to +loggers whose mode is `rotate`. The acceptable values for the LoggerMode enum are `standard` +and `rotate`. The rotation policy applies to both stderr and stdout. + +By default, all processes use the `standard` LoggerMode. + + **Attribute Name** | **Type** | **Description** + ------------------- | :----------: | --------------------------------- + **mode** | LoggerMode | Mode of the logger. (Required) + **rotate** | RotatePolicy | An optional rotation policy. + +A RotatePolicy describes log rotation behavior for when `mode` is set to `rotate`. It is ignored +otherwise. + + **Attribute Name** | **Type** | **Description** + ------------------- | :----------: | --------------------------------- + **log_size** | Integer | Maximum size (in bytes) of an individual log file. (Default: 100 MiB) + **backups** | Integer | The maximum number of backups to retain. (Default: 5) + +An example process configuration is as follows: + + process = Process( + name='process', + logger=Logger( + mode=LoggerMode('rotate'), + rotate=RotatePolicy(log_size=5*MB, backups=5) + ) + ) + Task Schema =========== http://git-wip-us.apache.org/repos/asf/aurora/blob/3c33f663/docs/deploying-aurora-scheduler.md ---------------------------------------------------------------------- diff --git a/docs/deploying-aurora-scheduler.md b/docs/deploying-aurora-scheduler.md index 29fafca..caaf42b 100644 --- a/docs/deploying-aurora-scheduler.md +++ b/docs/deploying-aurora-scheduler.md @@ -15,6 +15,7 @@ machines. This guide helps you get the scheduler set up and troubleshoot some c - [Considerations for running jobs in docker](#considerations-for-running-jobs-in-docker) - [Security Considerations](#security-considerations) - [Configuring Resource Oversubscription](#configuring-resource-oversubscription) + - [Process Log Rotation](#process-log-rotation) - [Running Aurora](#running-aurora) - [Maintaining an Aurora Installation](#maintaining-an-aurora-installation) - [Monitoring](#monitoring) @@ -166,13 +167,28 @@ script does not access resources outside of the sandbox, as when the script is r docker container those resources will not exist. A scheduler flag, `-global_container_mounts` allows mounting paths from the host (i.e., the slave) -into all containers on that host. The format is a comma seperated list of host_path:container_path[:mode] +into all containers on that host. The format is a comma separated list of host_path:container_path[:mode] tuples. For example `-global_container_mounts=/opt/secret_keys_dir:/mnt/secret_keys_dir:ro` mounts `/opt/secret_keys_dir` from the slaves into all launched containers. Valid modes are `ro` and `rw`. In order to correctly execute processes inside a job, the docker container must have python 2.7 installed. +### Process Log Rotation +By default, Thermos will not rotate the stdout/stderr logs from child processes and they will grow +without bound. An individual user may change this behavior via configuration on the Process object, +but it may also be desirable to change the default configuration for the entire cluster. +In order to enable rotation by default, the following flags can be applied to Thermos (through the +-thermos_executor_flags argument to the Aurora scheduler): + + --runner-logger-mode=rotate + --runner-rotate-log-size-mb=100 + --runner-rotate-log-backups=10 + +In the above example, each instance of the Thermos runner will rotate stderr/stdout logs once they +reach 100 MiB in size and keep a maximum of 10 backups. If a user has provided a custom setting for +their process, it will override these default settings. + ## Running Aurora Configure a supervisor like [Monit](http://mmonit.com/monit/) or [supervisord](http://supervisord.org/) to run the created `scheduler.sh` file and restart it http://git-wip-us.apache.org/repos/asf/aurora/blob/3c33f663/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py index 0d02dc1..7b7ef4b 100644 --- a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py +++ b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py @@ -28,6 +28,7 @@ import traceback from twitter.common import app, log from twitter.common.log.options import LogOptions +from apache.aurora.config.schema.base import LoggerMode from apache.aurora.executor.aurora_executor import AuroraExecutor from apache.aurora.executor.common.announcer import DefaultAnnouncerCheckerProvider from apache.aurora.executor.common.executor_timeout import ExecutorTimeout @@ -54,6 +55,8 @@ LogOptions.set_simple(True) LogOptions.set_disk_log_level('DEBUG') LogOptions.set_log_dir(CWD) +_LOGGER_TYPES = ', '.join(LoggerMode.VALUES) + app.add_option( '--announcer-enable', @@ -96,6 +99,27 @@ app.add_option( default=False) +app.add_option( + '--runner-logger-mode', + dest='runner_logger_mode', + type=str, + default=None, + help='The type of logger [%s] to use for all processes run by thermos.' % _LOGGER_TYPES) + +app.add_option( + '--runner-rotate-log-size-mb', + dest='runner_rotate_log_size_mb', + type=int, + help='Maximum size of the rotated stdout/stderr logs emitted by the thermos runner in MiB.') + + +app.add_option( + '--runner-rotate-log-backups', + dest='runner_rotate_log_backups', + type=int, + help='Maximum number of rotated stdout/stderr logs emitted by the thermos runner.') + + # TODO(wickman) Consider just having the OSS version require pip installed # thermos_runner binaries on every machine and instead of embedding the pex # as a resource, shell out to one on the PATH. @@ -141,7 +165,10 @@ def initialize(options): thermos_runner_provider = UserOverrideThermosTaskRunnerProvider( dump_runner_pex(), checkpoint_root, - artifact_dir=cwd_path + artifact_dir=cwd_path, + process_logger_mode=options.runner_logger_mode, + rotate_log_size_mb=options.runner_rotate_log_size_mb, + rotate_log_backups=options.runner_rotate_log_backups ) thermos_runner_provider.set_role(None) @@ -154,7 +181,10 @@ def initialize(options): thermos_runner_provider = DefaultThermosTaskRunnerProvider( dump_runner_pex(), checkpoint_root, - artifact_dir=cwd_path + artifact_dir=cwd_path, + process_logger_mode=options.runner_logger_mode, + rotate_log_size_mb=options.runner_rotate_log_size_mb, + rotate_log_backups=options.runner_rotate_log_backups ) thermos_executor = AuroraExecutor( http://git-wip-us.apache.org/repos/asf/aurora/blob/3c33f663/src/main/python/apache/aurora/executor/thermos_task_runner.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/thermos_task_runner.py b/src/main/python/apache/aurora/executor/thermos_task_runner.py index 14e8b4b..25fcca2 100644 --- a/src/main/python/apache/aurora/executor/thermos_task_runner.py +++ b/src/main/python/apache/aurora/executor/thermos_task_runner.py @@ -72,7 +72,10 @@ class ThermosTaskRunner(TaskRunner): checkpoint_root, artifact_dir=None, clock=time, - hostname=None): + hostname=None, + process_logger_mode=None, + rotate_log_size_mb=None, + rotate_log_backups=None): """ runner_pex location of the thermos_runner pex that this task runner should use task_id task_id assigned by scheduler @@ -98,6 +101,9 @@ class ThermosTaskRunner(TaskRunner): self._clock = clock self._artifact_dir = artifact_dir or safe_mkdtemp() self._hostname = hostname or socket.gethostname() + self._process_logger_mode = process_logger_mode + self._rotate_log_size_mb = rotate_log_size_mb + self._rotate_log_backups = rotate_log_backups # wait events self._dead = threading.Event() @@ -233,13 +239,17 @@ class ThermosTaskRunner(TaskRunner): sandbox=host_sandbox or self._root, task_id=self._task_id, thermos_json=self._task_filename, - hostname=self._hostname) + hostname=self._hostname, + process_logger_mode=self._process_logger_mode, + rotate_log_size_mb=self._rotate_log_size_mb, + rotate_log_backups=self._rotate_log_backups) if getpass.getuser() == 'root' and self._role: params.update(setuid=self._role) cmdline_args = [sys.executable, self._runner_pex] - cmdline_args.extend('--%s=%s' % (flag, value) for flag, value in params.items()) + cmdline_args.extend( + '--%s=%s' % (flag, value) for flag, value in params.items() if value is not None) if self._enable_chroot: cmdline_args.extend(['--enable_chroot']) for name, port in self._ports.items(): @@ -342,7 +352,10 @@ class DefaultThermosTaskRunnerProvider(TaskRunnerProvider): max_wait=Amount(1, Time.MINUTES), preemption_wait=Amount(1, Time.MINUTES), poll_interval=Amount(500, Time.MILLISECONDS), - clock=time): + clock=time, + process_logger_mode=None, + rotate_log_size_mb=None, + rotate_log_backups=None): self._artifact_dir = artifact_dir or safe_mkdtemp() self._checkpoint_root = checkpoint_root self._clock = clock @@ -351,6 +364,9 @@ class DefaultThermosTaskRunnerProvider(TaskRunnerProvider): self._poll_interval = poll_interval self._preemption_wait = preemption_wait self._task_runner_class = task_runner_class + self._process_logger_mode = process_logger_mode + self._rotate_log_size_mb = rotate_log_size_mb + self._rotate_log_backups = rotate_log_backups def _get_role(self, assigned_task): return None if assigned_task.task.container.docker else assigned_task.task.job.role @@ -379,7 +395,10 @@ class DefaultThermosTaskRunnerProvider(TaskRunnerProvider): self._checkpoint_root, artifact_dir=self._artifact_dir, clock=self._clock, - hostname=assigned_task.slaveHost) + hostname=assigned_task.slaveHost, + process_logger_mode=self._process_logger_mode, + rotate_log_size_mb=self._rotate_log_size_mb, + rotate_log_backups=self._rotate_log_backups) return HttpLifecycleManager.wrap(runner, mesos_task, mesos_ports) http://git-wip-us.apache.org/repos/asf/aurora/blob/3c33f663/src/main/python/apache/thermos/config/schema_base.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/config/schema_base.py b/src/main/python/apache/thermos/config/schema_base.py index f9143cc..5552108 100644 --- a/src/main/python/apache/thermos/config/schema_base.py +++ b/src/main/python/apache/thermos/config/schema_base.py @@ -14,7 +14,19 @@ # checkstyle: noqa -from pystachio import Boolean, Default, Empty, Float, Integer, List, Map, Required, String, Struct +from pystachio import ( + Boolean, + Default, + Empty, + Enum, + Float, + Integer, + List, + Map, + Required, + String, + Struct +) # Define constants for resources BYTES = 1 @@ -45,6 +57,19 @@ class Constraint(Struct): order = List(String) +class RotatePolicy(Struct): + log_size = Default(Integer, 100*MB) + backups = Default(Integer, 5) + + +LoggerMode = Enum('standard', 'rotate') + + +class Logger(Struct): + mode = Required(LoggerMode) + rotate = RotatePolicy + + class Process(Struct): cmdline = Required(String) name = Required(String) @@ -60,6 +85,7 @@ class Process(Struct): min_duration = Default(Integer, 5) # integer seconds final = Default(Boolean, False) # if this process should be a finalizing process # that should always be run after regular processes + logger = Default(Logger, Empty) class Task(Struct): http://git-wip-us.apache.org/repos/asf/aurora/blob/3c33f663/src/main/python/apache/thermos/core/process.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/core/process.py b/src/main/python/apache/thermos/core/process.py index f214bcc..8efdfdc 100644 --- a/src/main/python/apache/thermos/core/process.py +++ b/src/main/python/apache/thermos/core/process.py @@ -20,9 +20,11 @@ commandline in a subprocess of its own. """ +import errno import grp import os import pwd +import select import signal import subprocess import sys @@ -30,9 +32,9 @@ import time from abc import abstractmethod from twitter.common import log -from twitter.common.dirutil import lock_file, safe_mkdir, safe_open +from twitter.common.dirutil import lock_file, safe_delete, safe_mkdir, safe_open from twitter.common.lang import Interface -from twitter.common.quantity import Amount, Time +from twitter.common.quantity import Amount, Data, Time from twitter.common.recordio import ThriftRecordReader, ThriftRecordWriter from gen.apache.thermos.ttypes import ProcessState, ProcessStatus, RunnerCkpt @@ -54,6 +56,17 @@ class Platform(Interface): pass +class LoggerMode(object): + STANDARD = 'standard' + ROTATE = 'rotate' + + _ALL_MODES = [STANDARD, ROTATE] + + @staticmethod + def is_valid(mode): + return mode in LoggerMode._ALL_MODES + + class ProcessBase(object): """ Encapsulate a running process for a task. @@ -67,7 +80,8 @@ class ProcessBase(object): CONTROL_WAIT_CHECK_INTERVAL = Amount(100, Time.MILLISECONDS) MAXIMUM_CONTROL_WAIT = Amount(1, Time.MINUTES) - def __init__(self, name, cmdline, sequence, pathspec, sandbox_dir, user=None, platform=None): + def __init__(self, name, cmdline, sequence, pathspec, sandbox_dir, user=None, platform=None, + logger_mode=LoggerMode.STANDARD, rotate_log_size=None, rotate_log_backups=None): """ required: name = name of the process @@ -78,8 +92,12 @@ class ProcessBase(object): platform = Platform providing fork, clock, getpid optional: - user = the user to run as (if unspecified, will default to current user.) - if specified to a user that is not the current user, you must have root access + user = the user to run as (if unspecified, will default to current user.) + if specified to a user that is not the current user, you must have root + access + logger_mode = The type of logger to use for the process. + rotate_log_size = The maximum size of the rotated stdout/stderr logs. + rotate_log_backups = The maximum number of rotated stdout/stderr log backups. """ self._name = name self._cmdline = cmdline @@ -90,14 +108,24 @@ class ProcessBase(object): safe_mkdir(self._sandbox) self._pid = None self._fork_time = None - self._stdout = None - self._stderr = None self._user = user self._ckpt = None self._ckpt_head = -1 if platform is None: raise ValueError("Platform must be specified") self._platform = platform + self._logger_mode = logger_mode + self._rotate_log_size = rotate_log_size + self._rotate_log_backups = rotate_log_backups + + if not LoggerMode.is_valid(self._logger_mode): + raise ValueError("Logger mode %s is invalid." % self._logger_mode) + + if self._logger_mode == LoggerMode.ROTATE: + if self._rotate_log_size.as_(Data.BYTES) <= 0: + raise ValueError('Log size cannot be less than one byte.') + if self._rotate_log_backups <= 0: + raise ValueError('Log backups cannot be less than one.') def _log(self, msg): log.debug('[process:%5s=%s]: %s' % (self._pid, self.name(), msg)) @@ -151,6 +179,9 @@ class ProcessBase(object): def ckpt_file(self): return self._pathspec.getpath('process_checkpoint') + def process_logdir(self): + return self._pathspec.getpath('process_logdir') + def _setup_ckpt(self): """Set up the checkpoint: must be run on the parent.""" self._log('initializing checkpoint file: %s' % self.ckpt_file()) @@ -207,11 +238,9 @@ class ProcessBase(object): raise self.PermissionError('Must be root to run processes as other users!') self._fork_time = self._platform.clock().time() self._setup_ckpt() - self._stdout = safe_open(self._pathspec.with_filename('stdout').getpath('process_logdir'), "a") - self._stderr = safe_open(self._pathspec.with_filename('stderr').getpath('process_logdir'), "a") - uid, gid = user.pw_uid, user.pw_gid - os.chown(self._stdout.name, uid, gid) - os.chown(self._stderr.name, uid, gid) + # Since the forked process is responsible for creating log files, it needs to own the log dir. + safe_mkdir(self.process_logdir()) + os.chown(self.process_logdir(), user.pw_uid, user.pw_gid) def _finalize_fork(self): self._write_initial_update() @@ -315,10 +344,6 @@ class Process(ProcessBase): def execute(self): """Perform final initialization and launch target process commandline in a subprocess.""" - if not self._stderr: - raise RuntimeError('self._stderr not set up!') - if not self._stdout: - raise RuntimeError('self._stdout not set up!') user, _ = self._getpwuid() username, homedir = user.pw_name, user.pw_dir @@ -348,19 +373,32 @@ class Process(ProcessBase): if os.path.exists(thermos_profile): env.update(BASH_ENV=thermos_profile) - self._popen = subprocess.Popen(["/bin/bash", "-c", self.cmdline()], - stderr=self._stderr, - stdout=self._stdout, - close_fds=self.FD_CLOEXEC, - cwd=sandbox, - env=env) + subprocess_args = { + 'args': ["/bin/bash", "-c", self.cmdline()], + 'close_fds': self.FD_CLOEXEC, + 'cwd': sandbox, + 'env': env, + 'pathspec': self._pathspec + } + + if self._logger_mode == LoggerMode.ROTATE: + log_size = int(self._rotate_log_size.as_(Data.BYTES)) + self._log('Starting subprocess with log rotation. Size: %s, Backups: %s' % ( + log_size, self._rotate_log_backups)) + executor = LogRotatingSubprocessExecutor(max_bytes=log_size, + max_backups=self._rotate_log_backups, + **subprocess_args) + else: + self._log('Starting subprocess with no log rotation.') + executor = SubprocessExecutor(**subprocess_args) + + pid = executor.start() self._write_process_update(state=ProcessState.RUNNING, - pid=self._popen.pid, + pid=pid, start_time=start_time) - # wait for job to finish - rc = self._popen.wait() + rc = executor.wait() # indicate that we have finished/failed if rc < 0: @@ -378,3 +416,215 @@ class Process(ProcessBase): def finish(self): self._log('Coordinator exiting.') sys.exit(0) + + +class SubprocessExecutorBase(object): + """ + Encapsulate execution of a subprocess. + """ + + def __init__(self, args, close_fds, cwd, env, pathspec): + """ + required: + args = The arguments to pass to the subprocess. + close_fds = Close file descriptors argument to Popen. + cwd = The current working directory. + env = Environment variables to be passed to the subprocess. + pathspec = TaskPath object for synthesizing path names. + """ + self._args = args + self._close_fds = close_fds + self._cwd = cwd + self._env = env + self._pathspec = pathspec + self._popen = None + + def _get_log_path(self, log_name): + return self._pathspec.with_filename(log_name).getpath('process_logdir') + + def _start_subprocess(self, stderr, stdout): + return subprocess.Popen(self._args, + stderr=stderr, + stdout=stdout, + close_fds=self._close_fds, + cwd=self._cwd, + env=self._env) + + def start(self): + """Start the subprocess and immediately return the resulting pid.""" + raise NotImplementedError() + + def wait(self): + """Wait for the subprocess to finish executing and return the return code.""" + raise NotImplementedError() + + +class SubprocessExecutor(SubprocessExecutorBase): + """ + Basic implementation of a SubprocessExecutor that writes stderr/stdout unconstrained log files. + """ + + def __init__(self, args, close_fds, cwd, env, pathspec): + """See SubprocessExecutorBase.__init__""" + self._stderr = None + self._stdout = None + super(SubprocessExecutor, self).__init__(args, close_fds, cwd, env, pathspec) + + def start(self): + self._stderr = safe_open(self._get_log_path('stderr'), 'a') + self._stdout = safe_open(self._get_log_path('stdout'), 'a') + + self._popen = self._start_subprocess(self._stderr, self._stdout) + return self._popen.pid + + def wait(self): + return self._popen.wait() + + +class LogRotatingSubprocessExecutor(SubprocessExecutorBase): + """ + Implementation of a SubprocessExecutor that implements log rotation for stderr/stdout. + """ + + READ_BUFFER_SIZE = 2 ** 16 + + def __init__(self, args, close_fds, cwd, env, pathspec, max_bytes, max_backups): + """ + See SubprocessExecutorBase.__init__ + + Takes additional arguments: + max_bytes = The maximum size of an individual log file. + max_backups = The maximum number of log file backups to create. + """ + self._max_bytes = max_bytes + self._max_backups = max_backups + self._stderr = None + self._stdout = None + super(LogRotatingSubprocessExecutor, self).__init__(args, close_fds, cwd, env, pathspec) + + def start(self): + self._stderr = RotatingFileHandler(self._get_log_path('stderr'), + self._max_bytes, + self._max_backups) + self._stdout = RotatingFileHandler(self._get_log_path('stdout'), + self._max_bytes, + self._max_backups) + + self._popen = self._start_subprocess(subprocess.PIPE, subprocess.PIPE) + return self._popen.pid + + def wait(self): + stdout = self._popen.stdout.fileno() + stderr = self._popen.stderr.fileno() + pipes = { + stderr: self._stderr, + stdout: self._stdout + } + + rc = None + # Read until there is a return code AND both of the pipes have reached EOF. + while rc is None or pipes: + rc = self._popen.poll() + + read_results, _, _ = select.select(pipes.keys(), [], [], 1) + for fd in read_results: + handler = pipes[fd] + buf = os.read(fd, self.READ_BUFFER_SIZE) + + if len(buf) == 0: + del pipes[fd] + else: + handler.write(buf) + + return rc + + +class FileHandler(object): + """ + Base file handler. + """ + + def __init__(self, filename, mode='w'): + """ + required: + filename = The file name. + + optional: + mode = Mode to open the file in. + """ + self.file = safe_open(filename, mode=mode) + self.filename = filename + self.mode = mode + self.closed = False + + def close(self): + if not self.closed: + self.file.close() + self.closed = True + + def write(self, b): + self.file.write(b) + self.file.flush() + + +class RotatingFileHandler(FileHandler): + """ + File handler that implements max size/rotation. + """ + + def __init__(self, filename, max_bytes, max_backups, mode='w'): + """ + required: + filename = The file name. + max_bytes = The maximum size of an individual log file. + max_backups = The maximum number of log file backups to create. + + optional: + mode = Mode to open the file in. + """ + if max_bytes > 0 and max_backups <= 0: + raise ValueError('A positive value for max_backups must be specified if max_bytes > 0.') + self._max_bytes = max_bytes + self._max_backups = max_backups + super(RotatingFileHandler, self).__init__(filename, mode) + + def write(self, b): + super(RotatingFileHandler, self).write(b) + if self.should_rollover(): + self.rollover() + + def swap_files(self, src, tgt): + if os.path.exists(tgt): + safe_delete(tgt) + + try: + os.rename(src, tgt) + except OSError as e: + if e.errno != errno.ENOENT: + raise + + def make_indexed_filename(self, index): + return '%s.%d' % (self.filename, index) + + def should_rollover(self): + if self._max_bytes <= 0 or self._max_backups <= 0: + return False + + if self.file.tell() >= self._max_bytes: + return True + + return False + + def rollover(self): + """ + Perform the rollover of the log. + """ + self.file.close() + for i in range(self._max_backups - 1, 0, -1): + src = self.make_indexed_filename(i) + tgt = self.make_indexed_filename(i + 1) + if os.path.exists(src): + self.swap_files(src, tgt) + + self.swap_files(self.filename, self.make_indexed_filename(1)) + self.file = safe_open(self.filename, mode='w') http://git-wip-us.apache.org/repos/asf/aurora/blob/3c33f663/src/main/python/apache/thermos/core/runner.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/core/runner.py b/src/main/python/apache/thermos/core/runner.py index f949f27..11c06a8 100644 --- a/src/main/python/apache/thermos/core/runner.py +++ b/src/main/python/apache/thermos/core/runner.py @@ -47,10 +47,10 @@ import time import traceback from contextlib import contextmanager -from pystachio import Environment +from pystachio import Empty, Environment from twitter.common import log from twitter.common.dirutil import safe_mkdir -from twitter.common.quantity import Amount, Time +from twitter.common.quantity import Amount, Data, Time from twitter.common.recordio import ThriftRecordReader from apache.thermos.common.ckpt import ( @@ -70,7 +70,7 @@ from apache.thermos.config.schema import ThermosContext from .helper import TaskRunnerHelper from .muxer import ProcessMuxer -from .process import Process +from .process import LoggerMode, Process from gen.apache.thermos.ttypes import ( ProcessState, @@ -418,7 +418,8 @@ class TaskRunner(object): def __init__(self, task, checkpoint_root, sandbox, log_dir=None, task_id=None, portmap=None, user=None, chroot=False, clock=time, - universal_handler=None, planner_class=TaskPlanner, hostname=None): + universal_handler=None, planner_class=TaskPlanner, hostname=None, + process_logger_mode=None, rotate_log_size_mb=None, rotate_log_backups=None): """ required: task (config.Task) = the task to run @@ -440,6 +441,9 @@ class TaskRunner(object): universal_handler = checkpoint record handler (only used for testing) planner_class (TaskPlanner class) = TaskPlanner class to use for constructing the task planning policy. + process_logger_mode (string) = The type of logger to use for all processes. + rotate_log_size_mb (integer) = The maximum size of the rotated stdout/stderr logs in MiB. + rotate_log_backups (integer) = The maximum number of rotated stdout/stderr log backups. """ if not issubclass(planner_class, TaskPlanner): raise TypeError('planner_class must be a TaskPlanner.') @@ -462,6 +466,9 @@ class TaskRunner(object): self._portmap = portmap or {} self._launch_time = launch_time self._log_dir = log_dir or os.path.join(sandbox, '.logs') + self._process_logger_mode = process_logger_mode + self._rotate_log_size_mb = rotate_log_size_mb + self._rotate_log_backups = rotate_log_backups self._pathspec = TaskPath(root=checkpoint_root, task_id=self._task_id, log_dir=self._log_dir) self._hostname = hostname or socket.gethostname() try: @@ -687,6 +694,9 @@ class TaskRunner(object): if pid == 0 and self._ckpt is not None: self._ckpt.close() return pid + + logger_mode, rotate_log_size, rotate_log_backups = self._build_process_logger_args(process) + return Process( process.name().get(), process.cmdline().get(), @@ -695,7 +705,36 @@ class TaskRunner(object): self._sandbox, self._user, chroot=self._chroot, - fork=close_ckpt_and_fork) + fork=close_ckpt_and_fork, + logger_mode=logger_mode, + rotate_log_size=rotate_log_size, + rotate_log_backups=rotate_log_backups) + + def _build_process_logger_args(self, process): + """ + Build the appropriate logging configuration based on flags + process + configuration settings. + + If no configuration (neither flags nor process config), default to + "standard" mode. + """ + logger = process.logger() + if logger is Empty: + if self._process_logger_mode: + return ( + self._process_logger_mode, + Amount(self._rotate_log_size_mb, Data.MB), + self._rotate_log_backups + ) + else: + return LoggerMode.STANDARD, None, None + else: + mode = logger.mode().get() + if mode == LoggerMode.ROTATE: + rotate = logger.rotate() + return mode, Amount(rotate.log_size().get(), Data.BYTES), rotate.backups().get() + else: + return mode, None, None def deadlocked(self, plan=None): """Check whether a plan is deadlocked, i.e. there are no running/runnable processes, and the http://git-wip-us.apache.org/repos/asf/aurora/blob/3c33f663/src/main/python/apache/thermos/runner/thermos_runner.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/runner/thermos_runner.py b/src/main/python/apache/thermos/runner/thermos_runner.py index bd8cf7f..a36bd2a 100644 --- a/src/main/python/apache/thermos/runner/thermos_runner.py +++ b/src/main/python/apache/thermos/runner/thermos_runner.py @@ -102,6 +102,30 @@ app.add_option( "the locally-resolved hostname.") +app.add_option( + '--process_logger_mode', + dest='process_logger_mode', + type=str, + default=None, + help='The type of logger to use for all processes run by thermos.') + + +app.add_option( + '--rotate_log_size_mb', + dest='rotate_log_size_mb', + type=int, + default=None, + help='Maximum size of the rotated stdout/stderr logs emitted by the thermos runner in MiB.') + + +app.add_option( + '--rotate_log_backups', + dest='rotate_log_backups', + type=int, + default=None, + help='Maximum number of rotated stdout/stderr logs emitted by the thermos runner.') + + def get_task_from_options(opts): tasks = ThermosConfigLoader.load_json(opts.thermos_json) if len(tasks.tasks()) == 0: @@ -167,6 +191,9 @@ def proxy_main(args, opts): chroot=opts.chroot, planner_class=CappedTaskPlanner, hostname=opts.hostname, + process_logger_mode=opts.process_logger_mode, + rotate_log_size_mb=opts.rotate_log_size_mb, + rotate_log_backups=opts.rotate_log_backups ) for sig in (signal.SIGUSR1, signal.SIGUSR2): http://git-wip-us.apache.org/repos/asf/aurora/blob/3c33f663/src/test/python/apache/thermos/core/test_process.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/thermos/core/test_process.py b/src/test/python/apache/thermos/core/test_process.py index 5e6ad2f..261371d 100644 --- a/src/test/python/apache/thermos/core/test_process.py +++ b/src/test/python/apache/thermos/core/test_process.py @@ -22,10 +22,11 @@ import mock import pytest from twitter.common.contextutil import temporary_dir from twitter.common.dirutil import safe_mkdir +from twitter.common.quantity import Amount, Data from twitter.common.recordio import ThriftRecordReader from apache.thermos.common.path import TaskPath -from apache.thermos.core.process import Process +from apache.thermos.core.process import LoggerMode, LogRotatingSubprocessExecutor, Process from gen.apache.thermos.ttypes import RunnerCkpt @@ -86,10 +87,7 @@ def test_simple_process(): rc = wait_for_rc(taskpath.getpath('process_checkpoint')) assert rc == 0 - stdout = taskpath.with_filename('stdout').getpath('process_logdir') - assert os.path.exists(stdout) - with open(stdout, 'r') as fp: - assert fp.read() == 'hello world\n' + assert_log_content(taskpath, 'stdout', 'hello world\n') @mock.patch('os.chown') @@ -185,3 +183,87 @@ def test_cloexec(): assert run_with_class(TestWithoutCloexec) == 0 assert run_with_class(TestProcess) != 0 + + +STDERR = 'for i in {1..31};do echo "stderr" 1>&2; done;' +STDOUT = 'for i in {1..31};do echo "stdout";done;' + + +def test_log_standard(): + with temporary_dir() as td: + taskpath = make_taskpath(td) + sandbox = setup_sandbox(td, taskpath) + + script = STDERR + STDOUT + p = TestProcess('process', script, 0, taskpath, sandbox) + p.start() + + rc = wait_for_rc(taskpath.getpath('process_checkpoint')) + assert rc == 0 + assert_log_content(taskpath, 'stdout', 'stdout\n' * 31) + assert_log_content(taskpath, 'stderr', 'stderr\n' * 31) + + +def test_log_rotation(): + # During testing, read one byte at a time to make the file sizes deterministic. + LogRotatingSubprocessExecutor.READ_BUFFER_SIZE = 1 + + def assert_stderr(taskpath, solo=True): + if solo: + assert_log_content(taskpath, 'stdout', '') + + assert_log_content(taskpath, 'stderr', 'stderr\n') + assert_log_content(taskpath, 'stderr.1', 'stderr\n' * 10) + assert_log_content(taskpath, 'stderr.2', 'stderr\n' * 10) + assert_log_dne(taskpath, 'stderr.3') + + def assert_stdout(taskpath, solo=True): + if solo: + assert_log_content(taskpath, 'stderr', '') + + assert_log_content(taskpath, 'stdout', 'stdout\n') + assert_log_content(taskpath, 'stdout.1', 'stdout\n' * 10) + assert_log_content(taskpath, 'stdout.2', 'stdout\n' * 10) + assert_log_dne(taskpath, 'stdout.3') + + def assert_both(taskpath): + assert_stderr(taskpath, solo=False) + assert_stdout(taskpath, solo=False) + + scenarios = [ + (STDERR + STDOUT, assert_both), + (STDERR, assert_stderr), + (STDOUT, assert_stdout) + ] + + for script, assertion in scenarios: + with temporary_dir() as td: + taskpath = make_taskpath(td) + sandbox = setup_sandbox(td, taskpath) + + p = TestProcess( + 'process', + script, + 0, + taskpath, + sandbox, + logger_mode=LoggerMode.ROTATE, + rotate_log_size=Amount(70, Data.BYTES), + rotate_log_backups=2) + p.start() + + rc = wait_for_rc(taskpath.getpath('process_checkpoint')) + assert rc == 0 + assertion(taskpath) + + +def assert_log_content(taskpath, log_name, expected_content): + log = taskpath.with_filename(log_name).getpath('process_logdir') + assert os.path.exists(log) + with open(log, 'r') as fp: + assert fp.read() == expected_content + + +def assert_log_dne(taskpath, log_name): + log = taskpath.with_filename(log_name).getpath('process_logdir') + assert not os.path.exists(log)
