Repository: aurora Updated Branches: refs/heads/master b6d23afdf -> c66a9eeef
Ensure final processes are executed when ephemeral daemon processes exist. Bugs closed: AURORA-1642 Reviewed at https://reviews.apache.org/r/45115/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/c66a9eee Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/c66a9eee Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/c66a9eee Branch: refs/heads/master Commit: c66a9eeef97ebb0be367f76a3992ef9f17ebbde0 Parents: b6d23af Author: Amol Deshmukh <[email protected]> Authored: Tue Mar 22 17:22:40 2016 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Tue Mar 22 17:22:40 2016 -0700 ---------------------------------------------------------------------- src/main/python/apache/thermos/core/process.py | 95 ++++++++++++-------- .../python/apache/thermos/core/test_process.py | 26 +++--- .../e2e/ephemeral_daemon_with_final.aurora | 47 ++++++++++ .../sh/org/apache/aurora/e2e/test_end_to_end.sh | 47 +++++++++- 4 files changed, 165 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/c66a9eee/src/main/python/apache/thermos/core/process.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/core/process.py b/src/main/python/apache/thermos/core/process.py index f147af7..1791b5f 100644 --- a/src/main/python/apache/thermos/core/process.py +++ b/src/main/python/apache/thermos/core/process.py @@ -417,8 +417,13 @@ class Process(ProcessBase): mode=self._logger_mode, rotate_log_size=self._rotate_log_size, rotate_log_backups=self._rotate_log_backups) - stdout, stderr = log_destination_resolver.get_handlers() - executor = PipedSubprocessExecutor(stdout=stdout, + stdout, stderr, handlers_are_files = log_destination_resolver.get_handlers() + if handlers_are_files: + executor = SubprocessExecutor(stdout=stdout, + stderr=stderr, + **subprocess_args) + else: + executor = PipedSubprocessExecutor(stdout=stdout, stderr=stderr, **subprocess_args) @@ -486,9 +491,35 @@ class SubprocessExecutorBase(object): raise NotImplementedError() +class SubprocessExecutor(SubprocessExecutorBase): + """ + Basic implementation of a SubprocessExecutor that writes stderr/stdout to specified output files. + """ + + def __init__(self, args, close_fds, cwd, env, pathspec, stdout=None, stderr=None): + """ + See SubprocessExecutorBase.__init__ + + Takes additional arguments: + stdout = Destination handler for stdout output. Default is /dev/null. + stderr = Destination handler for stderr output. Default is /dev/null. + """ + super(SubprocessExecutor, self).__init__(args, close_fds, cwd, env, pathspec) + self._stderr = stderr + self._stdout = stdout + + def start(self): + self._popen = self._start_subprocess(self._stderr, self._stdout) + return self._popen.pid + + def wait(self): + return self._popen.wait() + + class PipedSubprocessExecutor(SubprocessExecutorBase): """ - Implementation of SubprocessExecutorBase that passes logs to provided destinations + Implementation of SubprocessExecutorBase that uses pipes to poll the pipes to output streams and + copies them to the specified destinations. """ READ_BUFFER_SIZE = 2 ** 16 @@ -537,7 +568,7 @@ class PipedSubprocessExecutor(SubprocessExecutorBase): class LogDestinationResolver(object): """ - Resolves correct stdout/stderr destinations based on process configuration + Resolves correct stdout/stderr destinations based on process configuration. """ STDOUT = 'stdout' @@ -568,11 +599,20 @@ class LogDestinationResolver(object): """ Creates stdout/stderr handler by provided configuration """ - return self._get_handler(self.STDOUT), self._get_handler(self.STDERR) + return (self._get_handler(self.STDOUT), + self._get_handler(self.STDERR), + self._handlers_are_files()) + + def _handlers_are_files(self): + """ + Returns True if both the handlers are standard file objects. + """ + return (self._destination == LoggerDestination.CONSOLE or + (self._destination == LoggerDestination.FILE and self._mode == LoggerMode.STANDARD)) def _get_handler(self, name): """ - Constructs correct handler by provided configuration + Constructs correct handler or file object based on the provided configuration. """ # On no destination write logs to /dev/null @@ -581,7 +621,7 @@ class LogDestinationResolver(object): # Streamed logs to predefined outputs if self._destination == LoggerDestination.CONSOLE: - return self._get_stream(name) + return sys.stdout if name == self.STDOUT else sys.stderr # Streaming AND file logs are required if self._destination == LoggerDestination.BOTH: @@ -592,7 +632,7 @@ class LogDestinationResolver(object): def _get_file(self, name): if self._mode == LoggerMode.STANDARD: - return FileHandler(self._get_log_path(name)) + return safe_open(self._get_log_path(name), mode='a') if self._mode == LoggerMode.ROTATE: log_size = int(self._rotate_log_size.as_(Data.BYTES)) return RotatingFileHandler(self._get_log_path(name), @@ -612,19 +652,25 @@ class LogDestinationResolver(object): return self._pathspec.with_filename(log_name).getpath('process_logdir') -class FileHandler(object): +class RotatingFileHandler(object): """ - Base file handler. + File handler that implements max size/rotation. """ - def __init__(self, filename, mode='w'): + def __init__(self, filename, max_bytes, max_backups, mode='w'): """ required: - filename = The file name. + filename = The file name. + max_bytes = The maximum size of an individual log file. + max_backups = The maximum number of log file backups to create. optional: mode = Mode to open the file in. """ + if max_bytes > 0 and max_backups <= 0: + raise ValueError('A positive value for max_backups must be specified if max_bytes > 0.') + self._max_bytes = max_bytes + self._max_backups = max_backups self.file = safe_open(filename, mode=mode) self.filename = filename self.mode = mode @@ -638,31 +684,6 @@ class FileHandler(object): def write(self, b): self.file.write(b) self.file.flush() - - -class RotatingFileHandler(FileHandler): - """ - File handler that implements max size/rotation. - """ - - def __init__(self, filename, max_bytes, max_backups, mode='w'): - """ - required: - filename = The file name. - max_bytes = The maximum size of an individual log file. - max_backups = The maximum number of log file backups to create. - - optional: - mode = Mode to open the file in. - """ - if max_bytes > 0 and max_backups <= 0: - raise ValueError('A positive value for max_backups must be specified if max_bytes > 0.') - self._max_bytes = max_bytes - self._max_backups = max_backups - super(RotatingFileHandler, self).__init__(filename, mode) - - def write(self, b): - super(RotatingFileHandler, self).write(b) if self.should_rollover(): self.rollover() http://git-wip-us.apache.org/repos/asf/aurora/blob/c66a9eee/src/test/python/apache/thermos/core/test_process.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/thermos/core/test_process.py b/src/test/python/apache/thermos/core/test_process.py index c339c91..77f644c 100644 --- a/src/test/python/apache/thermos/core/test_process.py +++ b/src/test/python/apache/thermos/core/test_process.py @@ -29,7 +29,6 @@ from twitter.common.recordio import ThriftRecordReader from apache.thermos.common.path import TaskPath from apache.thermos.core.process import ( - FileHandler, LogDestinationResolver, LoggerDestination, LoggerMode, @@ -326,29 +325,30 @@ def test_resolver_none_output(): with temporary_dir() as td: taskpath = make_taskpath(td) r = LogDestinationResolver(taskpath, destination=LoggerDestination.NONE) - stdout, stderr = r.get_handlers() + stdout, stderr, handlers_are_files = r.get_handlers() assert type(stdout) == StreamHandler assert type(stderr) == StreamHandler + assert not handlers_are_files def test_resolver_console_output(): with temporary_dir() as td: taskpath = make_taskpath(td) r = LogDestinationResolver(taskpath, destination=LoggerDestination.CONSOLE) - stdout, stderr = r.get_handlers() - assert type(stdout) == StreamHandler - assert type(stderr) == StreamHandler - assert stdout._stream == sys.stdout - assert stderr._stream == sys.stderr + stdout, stderr, handlers_are_files = r.get_handlers() + assert stdout == sys.stdout + assert stderr == sys.stderr + assert handlers_are_files def test_resolver_file_output(): with temporary_dir() as td: taskpath = make_taskpath(td) r = LogDestinationResolver(taskpath, destination=LoggerDestination.FILE) - stdout, stderr = r.get_handlers() - assert type(stdout) == FileHandler - assert type(stderr) == FileHandler + stdout, stderr, handlers_are_files = r.get_handlers() + assert type(stdout) == file + assert type(stderr) == file + assert handlers_are_files assert_log_file_exists(taskpath, 'stdout') assert_log_file_exists(taskpath, 'stderr') @@ -357,9 +357,10 @@ def test_resolver_both_output(): with temporary_dir() as td: taskpath = make_taskpath(td) r = LogDestinationResolver(taskpath, destination=LoggerDestination.BOTH) - stdout, stderr = r.get_handlers() + stdout, stderr, handlers_are_files = r.get_handlers() assert type(stdout) == TeeHandler assert type(stderr) == TeeHandler + assert not handlers_are_files assert_log_file_exists(taskpath, 'stdout') assert_log_file_exists(taskpath, 'stderr') @@ -371,9 +372,10 @@ def test_resolver_both_with_rotation_output(): mode=LoggerMode.ROTATE, rotate_log_size=Amount(70, Data.BYTES), rotate_log_backups=2) - stdout, stderr = r.get_handlers() + stdout, stderr, handlers_are_files = r.get_handlers() assert type(stdout) == TeeHandler assert type(stderr) == TeeHandler + assert not handlers_are_files assert_log_file_exists(taskpath, 'stdout') assert_log_file_exists(taskpath, 'stderr') http://git-wip-us.apache.org/repos/asf/aurora/blob/c66a9eee/src/test/sh/org/apache/aurora/e2e/ephemeral_daemon_with_final.aurora ---------------------------------------------------------------------- diff --git a/src/test/sh/org/apache/aurora/e2e/ephemeral_daemon_with_final.aurora b/src/test/sh/org/apache/aurora/e2e/ephemeral_daemon_with_final.aurora new file mode 100644 index 0000000..6158f28 --- /dev/null +++ b/src/test/sh/org/apache/aurora/e2e/ephemeral_daemon_with_final.aurora @@ -0,0 +1,47 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import getpass + +ephemeral_daemon_process = Process( + name = 'ephemeral_daemon', + daemon = True, + ephemeral = True, + cmdline = 'echo "ephemeral daemon started"; sleep 3600') + +main_process = Process( + name = 'main', + cmdline = 'while [[ ! -e {{stop_file}} ]]; do sleep 1; done; echo "main OK"') + +final_process = Process( + name = 'final', + final = True, + cmdline = 'rm {{stop_file}}; echo "final OK"') + +test_task = Task( + name = 'ephemeral_daemon_with_final', + resources = Resources(cpu=0.4, ram=32*MB, disk=64*MB), + processes = [ephemeral_daemon_process, main_process, final_process]) + +job = Job( + cluster = 'devcluster', + task = test_task, + role = getpass.getuser(), + environment = 'test', + contact = '{{role}}@localhost', +) + +jobs = [ + job(name = 'ephemeral_daemon_with_final') +] http://git-wip-us.apache.org/repos/asf/aurora/blob/c66a9eee/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh ---------------------------------------------------------------------- diff --git a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh index b469f9b..e1c12bb 100755 --- a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh +++ b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh @@ -55,6 +55,23 @@ check_url_live() { [[ $(curl -sL -w '%{http_code}' $1 -o /dev/null) == 200 ]] } +test_file_removed() { + local _file=$1 + local _success=0 + for i in $(seq 1 10); do + if [[ ! -e $_file ]]; then + _success=1 + break + fi + sleep 1 + done + + if [[ "$_success" -ne "1" ]]; then + echo "File was not removed." + exit 1 + fi +} + test_version() { # The version number is written to stderr, making it necessary to redirect the output. [[ $(aurora --version 2>&1) = $(cat /vagrant/.auroraversion) ]] @@ -79,8 +96,10 @@ test_inspect() { test_create() { local _jobkey=$1 _config=$2 + shift; shift + local _extra_args="${@}" - aurora job create $_jobkey $_config + aurora job create $_jobkey $_config $_extra_args } test_job_status() { @@ -288,6 +307,20 @@ test_admin() { aurora_admin get_scheduler $_cluster | grep ":8081" } +test_ephemeral_daemon_with_final() { + local _cluster=$1 _role=$2 _env=$3 _job=$4 _config=$5 + local _jobkey="$_cluster/$_role/$_env/$_job" + local _stop_file=$(mktemp) + local _extra_args="--bind stop_file=$_stop_file" + rm $_stop_file + + test_create $_jobkey $_config $_extra_args + test_observer_ui $_cluster $_role $_job + test_job_status $_cluster $_role $_env $_job + touch $_stop_file # Stops 'main_process'. + test_file_removed $_stop_file # Removed by 'final_process'. +} + restore_netrc() { mv ~/.netrc.bak ~/.netrc >/dev/null 2>&1 || true } @@ -323,6 +356,8 @@ TEST_JOB_DOCKER=http_example_docker TEST_CONFIG_FILE=$EXAMPLE_DIR/http_example.aurora TEST_CONFIG_UPDATED_FILE=$EXAMPLE_DIR/http_example_updated.aurora TEST_BAD_HEALTHCHECK_CONFIG_UPDATED_FILE=$EXAMPLE_DIR/http_example_bad_healthcheck.aurora +TEST_EPHEMERAL_DAEMON_WITH_FINAL_JOB=ephemeral_daemon_with_final +TEST_EPHEMERAL_DAEMON_WITH_FINAL_CONFIG_FILE=$TEST_ROOT/ephemeral_daemon_with_final.aurora BASE_ARGS=( $TEST_CLUSTER @@ -341,6 +376,14 @@ TEST_JOB_DOCKER_ARGS=("${BASE_ARGS[@]}" "$TEST_JOB_DOCKER") TEST_ADMIN_ARGS=($TEST_CLUSTER) +TEST_JOB_EPHEMERAL_DAEMON_WITH_FINAL_ARGS=( + $TEST_CLUSTER + $TEST_ROLE + $TEST_ENV + $TEST_EPHEMERAL_DAEMON_WITH_FINAL_JOB + $TEST_EPHEMERAL_DAEMON_WITH_FINAL_CONFIG_FILE +) + trap collect_result EXIT aurorabuild all @@ -357,6 +400,8 @@ test_http_example "${TEST_JOB_DOCKER_ARGS[@]}" test_admin "${TEST_ADMIN_ARGS[@]}" test_basic_auth_unauthenticated "${TEST_JOB_ARGS[@]}" +test_ephemeral_daemon_with_final "${TEST_JOB_EPHEMERAL_DAEMON_WITH_FINAL_ARGS[@]}" + /vagrant/src/test/sh/org/apache/aurora/e2e/test_kerberos_end_to_end.sh /vagrant/src/test/sh/org/apache/aurora/e2e/test_bypass_leader_redirect_end_to_end.sh RETCODE=0
