Removing GC executor code. Bugs closed: AURORA-1333
Reviewed at https://reviews.apache.org/r/35813/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/56bb1e69 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/56bb1e69 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/56bb1e69 Branch: refs/heads/master Commit: 56bb1e693db4312892f425ab56108ac3e6e00086 Parents: d977aa4 Author: Maxim Khutornenko <[email protected]> Authored: Thu Jun 25 13:49:15 2015 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Thu Jun 25 13:49:15 2015 -0700 ---------------------------------------------------------------------- NEWS | 1 + api/src/main/thrift/org/apache/aurora/gen/BUILD | 1 - .../org/apache/aurora/gen/internal_rpc.thrift | 28 - debian/aurora-executor.install | 1 - debian/rules | 1 - docs/test-resource-generation.md | 5 +- examples/vagrant/aurorabuild.sh | 1 - src/main/python/apache/aurora/executor/BUILD | 31 - .../python/apache/aurora/executor/bin/BUILD | 29 - .../aurora/executor/bin/gc_executor_main.py | 77 --- .../apache/aurora/executor/gc_executor.py | 574 ---------------- .../python/apache/thermos/cli/commands/BUILD | 12 - .../python/apache/thermos/cli/commands/gc.py | 105 --- src/main/python/apache/thermos/cli/main.py | 2 - src/main/python/apache/thermos/core/BUILD | 13 - src/main/python/apache/thermos/core/helper.py | 18 - .../python/apache/thermos/core/inspector.py | 115 ---- src/main/python/apache/thermos/monitoring/BUILD | 14 - .../python/apache/thermos/monitoring/garbage.py | 198 ------ src/test/python/apache/aurora/executor/BUILD | 19 - .../python/apache/aurora/executor/bin/BUILD | 10 - .../bin/test_gc_executor_entry_point.py | 40 -- .../apache/aurora/executor/test_gc_executor.py | 656 ------------------- .../apache/thermos/cli/commands/test_import.py | 2 - src/test/python/apache/thermos/monitoring/BUILD | 14 - .../apache/thermos/monitoring/test_garbage.py | 90 --- 26 files changed, 3 insertions(+), 2054 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/NEWS ---------------------------------------------------------------------- diff --git a/NEWS b/NEWS index a17f0e7..1a0fb48 100644 --- a/NEWS +++ b/NEWS @@ -2,3 +2,4 @@ ----- - Now requires JRE 8 or greater. +- GC executor is fully replaced by the task state reconciliation (AURORA-1047). http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/api/src/main/thrift/org/apache/aurora/gen/BUILD ---------------------------------------------------------------------- diff --git a/api/src/main/thrift/org/apache/aurora/gen/BUILD b/api/src/main/thrift/org/apache/aurora/gen/BUILD index fe3f83b..d196fef 100644 --- a/api/src/main/thrift/org/apache/aurora/gen/BUILD +++ b/api/src/main/thrift/org/apache/aurora/gen/BUILD @@ -20,7 +20,6 @@ python_thrift_library( name = 'py-thrift', sources = [ 'api.thrift', - 'internal_rpc.thrift', ], ) http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/api/src/main/thrift/org/apache/aurora/gen/internal_rpc.thrift ---------------------------------------------------------------------- diff --git a/api/src/main/thrift/org/apache/aurora/gen/internal_rpc.thrift b/api/src/main/thrift/org/apache/aurora/gen/internal_rpc.thrift deleted file mode 100644 index a2c230f..0000000 --- a/api/src/main/thrift/org/apache/aurora/gen/internal_rpc.thrift +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace java org.apache.aurora.gen.comm -namespace py gen.apache.aurora.comm - -include "api.thrift" - -// Thrift interface to define the communication between the scheduler and executor. - -// Message sent from the scheduler to the executor, indicating that some -// task history associated with the host may have been purged, and the -// executor should only retain tasks associated with the provided tasks IDs. -struct AdjustRetainedTasks { - 2: map<string, api.ScheduleStatus> retainedTasks // All tasks that the executor should - // retain, and their statuses. -} http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/debian/aurora-executor.install ---------------------------------------------------------------------- diff --git a/debian/aurora-executor.install b/debian/aurora-executor.install index 8efb130..5d0d1f7 100644 --- a/debian/aurora-executor.install +++ b/debian/aurora-executor.install @@ -1,4 +1,3 @@ -dist/gc_executor.pex /usr/share/aurora/bin dist/thermos_executor.pex /usr/share/aurora/bin dist/thermos_observer.pex /usr/share/aurora/bin dist/thermos.pex /usr/share/aurora/bin http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/debian/rules ---------------------------------------------------------------------- diff --git a/debian/rules b/debian/rules index 6ba18ce..db0f14e 100755 --- a/debian/rules +++ b/debian/rules @@ -33,7 +33,6 @@ __pants_build: mkdir -p third_party $(pants) binary src/main/python/apache/aurora/admin:kaurora_admin $(pants) binary src/main/python/apache/aurora/client/cli:kaurora - $(pants) binary src/main/python/apache/aurora/executor/bin:gc_executor $(pants) binary src/main/python/apache/aurora/executor/bin:thermos_executor $(pants) binary src/main/python/apache/thermos/cli/bin:thermos $(pants) binary src/main/python/apache/thermos/bin:thermos_runner http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/docs/test-resource-generation.md ---------------------------------------------------------------------- diff --git a/docs/test-resource-generation.md b/docs/test-resource-generation.md index 335586d..e78e742 100644 --- a/docs/test-resource-generation.md +++ b/docs/test-resource-generation.md @@ -4,9 +4,8 @@ The Aurora source repository and distributions contain several [binary files](../src/test/resources/org/apache/thermos/root/checkpoints) to qualify the backwards-compatibility of thermos with checkpoint data. Since -thermos persists state to disk, to be read by other components (the GC executor -and the thermos observer), it is important that we have tests that prevent -regressions affecting the ability to parse previously-written data. +thermos persists state to disk, to be read by the thermos observer), it is important that we have +tests that prevent regressions affecting the ability to parse previously-written data. ## Generating test files The files included represent persisted checkpoints that exercise different http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/examples/vagrant/aurorabuild.sh ---------------------------------------------------------------------- diff --git a/examples/vagrant/aurorabuild.sh b/examples/vagrant/aurorabuild.sh index 5eb171c..fbaa6ae 100755 --- a/examples/vagrant/aurorabuild.sh +++ b/examples/vagrant/aurorabuild.sh @@ -58,7 +58,6 @@ function build_scheduler { } function build_executor { - ./pants binary src/main/python/apache/aurora/executor/bin:gc_executor ./pants binary src/main/python/apache/aurora/executor/bin:thermos_executor ./pants binary src/main/python/apache/thermos/bin:thermos_runner http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/main/python/apache/aurora/executor/BUILD ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/BUILD b/src/main/python/apache/aurora/executor/BUILD index cbb2f5f..52be02b 100644 --- a/src/main/python/apache/aurora/executor/BUILD +++ b/src/main/python/apache/aurora/executor/BUILD @@ -95,36 +95,6 @@ python_library( ] ) - -python_library( - name = 'gc_executor', - sources = ['gc_executor.py'], - dependencies = [ - ':executor_base', - '3rdparty/python:mesos.interface', - # To prevent an alpha version of protobuf from being pulled down by pants we - # specify protobuf here. See AURORA-1128 for more details. - '3rdparty/python:protobuf', - '3rdparty/python:psutil', - '3rdparty/python:twitter.common.collections', - '3rdparty/python:twitter.common.exceptions', - '3rdparty/python:twitter.common.log', - '3rdparty/python:twitter.common.metrics', - '3rdparty/python:twitter.common.quantity', - 'src/main/python/apache/thermos/common:ckpt', - 'src/main/python/apache/thermos/common:path', - 'src/main/python/apache/thermos/core:helper', - 'src/main/python/apache/thermos/core:inspector', - 'src/main/python/apache/thermos/monitoring:detector', - 'src/main/python/apache/thermos/monitoring:garbage', - 'src/main/python/apache/aurora/config:schema', - 'src/main/python/apache/aurora/executor/common:executor_detector', - 'src/main/python/apache/aurora/executor/common:sandbox', - 'api/src/main/thrift/org/apache/aurora/gen:py-thrift', - 'api/src/main/thrift/org/apache/thermos:py-thrift', - ] -) - python_library( name = 'executor-packaged', dependencies = [ @@ -140,7 +110,6 @@ python_library( name = 'apache.aurora.executor', version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().upper(), ).with_binaries( - gc_executor = 'src/main/python/apache/aurora/executor/bin:gc_executor', thermos_executor = 'src/main/python/apache/aurora/executor/bin:thermos_executor', ) ) http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/main/python/apache/aurora/executor/bin/BUILD ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/bin/BUILD b/src/main/python/apache/aurora/executor/bin/BUILD index 0fbb0f8..b23429a 100644 --- a/src/main/python/apache/aurora/executor/bin/BUILD +++ b/src/main/python/apache/aurora/executor/bin/BUILD @@ -45,32 +45,3 @@ python_binary( ':thermos_executor_source', ] ) - -python_library( - name = 'gc_executor_source', - sources = ['gc_executor_main.py'], - dependencies = [ - # To prevent an alpha version of protobuf from being pulled down by pants we - # specify protobuf here. See AURORA-1128 for more details. - '3rdparty/python:protobuf', - '3rdparty/python:twitter.common.app', - '3rdparty/python:twitter.common.log', - '3rdparty/python:twitter.common.metrics', - 'src/main/python/apache/thermos/common:constants', - 'src/main/python/apache/thermos/monitoring:detector', - 'src/main/python/apache/aurora/executor/common:executor_detector', - 'src/main/python/apache/aurora/executor/common:path_detector', - 'src/main/python/apache/aurora/executor:executor_vars', - 'src/main/python/apache/aurora/executor:gc_executor', - ] -) - -python_binary( - name = 'gc_executor', - entry_point = 'apache.aurora.executor.bin.gc_executor_main:proxy_main', - always_write_cache = True, - dependencies = [ - '3rdparty/python:mesos.native', - ':gc_executor_source', - ] -) http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/main/python/apache/aurora/executor/bin/gc_executor_main.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/bin/gc_executor_main.py b/src/main/python/apache/aurora/executor/bin/gc_executor_main.py deleted file mode 100644 index 8093717..0000000 --- a/src/main/python/apache/aurora/executor/bin/gc_executor_main.py +++ /dev/null @@ -1,77 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Command-line entry point to the Thermos GC executor - -This module wraps the Thermos GC executor into an executable suitable for launching by a Mesos -slave. - -""" - -try: - from mesos.native import MesosExecutorDriver -except ImportError: - MesosExecutorDriver = None - -from twitter.common import app, log -from twitter.common.log.options import LogOptions -from twitter.common.metrics.sampler import DiskMetricWriter - -from apache.aurora.executor.common.executor_detector import ExecutorDetector -from apache.aurora.executor.common.path_detector import MesosPathDetector -from apache.aurora.executor.gc_executor import ThermosGCExecutor -from apache.thermos.common.constants import DEFAULT_CHECKPOINT_ROOT -from apache.thermos.monitoring.detector import ChainedPathDetector, FixedPathDetector - -app.configure(debug=True) - - -# locate logs locally in executor sandbox -LogOptions.set_simple(True) -LogOptions.set_disk_log_level('DEBUG') -LogOptions.set_log_dir(ExecutorDetector.LOG_PATH) - - -def initialize(): - path_detector = ChainedPathDetector( - FixedPathDetector(DEFAULT_CHECKPOINT_ROOT), - MesosPathDetector(), - ) - - # Create executor stub - thermos_gc_executor = ThermosGCExecutor(path_detector) - - # Create metrics collector - metric_writer = DiskMetricWriter(thermos_gc_executor.metrics, ExecutorDetector.VARS_PATH) - - # Create driver stub - driver = MesosExecutorDriver(thermos_gc_executor) - - return thermos_gc_executor, metric_writer, driver - - -def proxy_main(): - def main(): - if MesosExecutorDriver is None: - app.error('Could not load MesosExecutorDriver!') - - thermos_gc_executor, metric_writer, driver = initialize() - - thermos_gc_executor.start() - metric_writer.start() - driver.run() - - log.info('MesosExecutorDriver.run() has finished.') - - app.main() http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/main/python/apache/aurora/executor/gc_executor.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/executor/gc_executor.py b/src/main/python/apache/aurora/executor/gc_executor.py deleted file mode 100644 index d4392fa..0000000 --- a/src/main/python/apache/aurora/executor/gc_executor.py +++ /dev/null @@ -1,574 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Thermos garbage-collection (GC) executor - -This module containts the Thermos GC executor, responsible for garbage collecting old tasks and -reconciling task states with the Mesos scheduler. It is intended to be run periodically on Mesos -slaves utilising the Thermos executor. - -""" - -import os -import threading -import time -from collections import namedtuple - -import psutil -from mesos.interface import mesos_pb2 -from thrift.TSerialization import deserialize as thrift_deserialize -from twitter.common.collections import OrderedDict -from twitter.common.exceptions import ExceptionalThread -from twitter.common.metrics import Observable -from twitter.common.metrics.gauge import AtomicGauge -from twitter.common.quantity import Amount, Time - -from apache.thermos.common.ckpt import CheckpointDispatcher -from apache.thermos.common.path import TaskPath -from apache.thermos.core.helper import TaskKiller -from apache.thermos.core.inspector import CheckpointInspector -from apache.thermos.monitoring.detector import PathDetector, TaskDetector -from apache.thermos.monitoring.garbage import TaskGarbageCollector - -from .common.executor_detector import ExecutorDetector -from .common.sandbox import DirectorySandbox -from .executor_base import ExecutorBase - -from gen.apache.aurora.api.constants import TERMINAL_STATES -from gen.apache.aurora.api.ttypes import ScheduleStatus -from gen.apache.aurora.comm.ttypes import AdjustRetainedTasks -from gen.apache.thermos.ttypes import TaskState - -THERMOS_TO_TWITTER_STATES = { - TaskState.ACTIVE: ScheduleStatus.RUNNING, - TaskState.CLEANING: ScheduleStatus.RUNNING, - TaskState.FINALIZING: ScheduleStatus.RUNNING, - TaskState.SUCCESS: ScheduleStatus.FINISHED, - TaskState.FAILED: ScheduleStatus.FAILED, - TaskState.KILLED: ScheduleStatus.KILLED, - TaskState.LOST: ScheduleStatus.LOST, -} - - -THERMOS_TO_MESOS_STATES = { - TaskState.ACTIVE: mesos_pb2.TASK_RUNNING, - TaskState.SUCCESS: mesos_pb2.TASK_FINISHED, - TaskState.FAILED: mesos_pb2.TASK_FAILED, - TaskState.KILLED: mesos_pb2.TASK_KILLED, - TaskState.LOST: mesos_pb2.TASK_LOST, -} - - -# RootedTask is a (checkpoint_root, task_id) tuple. Before, checkpoint_root was assumed to be a -# globally defined location e.g. '/var/run/thermos'. We are trying to move checkpoints into -# sandboxes, which mean that each task will have its own checkpoint_root, so we can no longer rely -# upon a single invariant checkpoint root passed into the GC executor constructor. -RootedTask = namedtuple('RootedTask', 'checkpoint_root task_id') - - -class ThermosGCExecutor(ExecutorBase, ExceptionalThread, Observable): - """ - Thermos GC Executor, responsible for: - - garbage collecting old tasks to make sure they don't clutter up the system - - state reconciliation with the scheduler (in case it thinks we're running - something we're not or vice versa.) - """ - MAX_PID_TIME_DRIFT = Amount(10, Time.SECONDS) - MAX_CHECKPOINT_TIME_DRIFT = Amount(1, Time.HOURS) # maximum runner disconnection time - - # how old a task must be before we're willing to kill it, assuming that there could be - # slight races in the following scenario: - # launch gc with retained_tasks={t1, t2, t3} - # launch task t4 - MINIMUM_KILL_AGE = Amount(10, Time.MINUTES) - - # wait time between checking for new GC events from the slave and/or cleaning orphaned tasks - POLL_WAIT = Amount(5, Time.MINUTES) - - # maximum lifetime of this executor. this is to prevent older GC executor binaries from - # running forever - MAXIMUM_EXECUTOR_LIFETIME = Amount(1, Time.DAYS) - - PERSISTENCE_WAIT = Amount(5, Time.SECONDS) - - def __init__(self, - path_detector, - verbose=True, - task_killer=TaskKiller, - executor_detector=ExecutorDetector, - task_garbage_collector=TaskGarbageCollector, - clock=time): - ExecutorBase.__init__(self) - ExceptionalThread.__init__(self) - self.daemon = True - self._stop_event = threading.Event() - # mapping of task_id => (TaskInfo, AdjustRetainedTasks), in the order in - # which they were received via a launchTask. - self._gc_task_queue = OrderedDict() - # cache the ExecutorDriver provided by the slave, so we can use it out - # of band from slave-initiated callbacks. This should be supplied by - # ExecutorBase.registered() when the executor first registers with the - # slave. - self._driver = None - self._slave_id = None # cache the slave ID provided by the slave - self._task_id = None # the task_id currently being executed by the ThermosGCExecutor, if any - self._start_time = None # the start time of a task currently being executed, if any - self._executor_detector = executor_detector() - self._collector_class = task_garbage_collector - self._clock = clock - self._task_killer = task_killer - if not isinstance(path_detector, PathDetector): - raise TypeError('ThermosGCExecutor expects a path_detector of type PathDetector, got %s' % - type(path_detector)) - self._path_detector = path_detector - self._dropped_tasks = AtomicGauge('dropped_tasks') - self.metrics.register(self._dropped_tasks) - - def _runner_ckpt(self, task): - """Return the runner checkpoint file for a given task. - - :param task: An instance of a task to retrieve checkpoint path - :type task: :class:`RootedTask` instance - """ - return TaskPath(root=task.checkpoint_root, task_id=task.task_id).getpath('runner_checkpoint') - - def _terminate_task(self, task, kill=True): - """Terminate a task using the associated task killer. Returns a boolean indicating success.""" - killer = self._task_killer(task.task_id, task.checkpoint_root) - self.log('Terminating %s...' % task.task_id) - runner_terminate = killer.kill if kill else killer.lose - try: - runner_terminate(force=True) - return True - except Exception as e: - self.log('Could not terminate: %s' % e) - return False - - def partition_tasks(self): - """Return active/finished tasks as discovered from the checkpoint roots.""" - active_tasks, finished_tasks = set(), set() - - for checkpoint_root in self._path_detector.get_paths(): - detector = TaskDetector(root=checkpoint_root) - - active_tasks.update(RootedTask(checkpoint_root, task_id) - for _, task_id in detector.get_task_ids(state='active')) - finished_tasks.update(RootedTask(checkpoint_root, task_id) - for _, task_id in detector.get_task_ids(state='finished')) - - return active_tasks, finished_tasks - - def get_states(self, task): - """Returns the (timestamp, status) tuples of the task or [] if could not replay.""" - statuses = CheckpointDispatcher.iter_statuses(self._runner_ckpt(task)) - try: - return [(state.timestamp_ms / 1000.0, state.state) for state in statuses] - except CheckpointDispatcher.ErrorRecoveringState: - return [] - - def get_sandbox(self, task): - """Returns the sandbox of the task, or None if it has not yet been initialized.""" - try: - for update in CheckpointDispatcher.iter_updates(self._runner_ckpt(task)): - if update.runner_header and update.runner_header.sandbox: - return update.runner_header.sandbox - except CheckpointDispatcher.ErrorRecoveringState: - return None - - def maybe_terminate_unknown_task(self, task): - """Terminate a task if we believe the scheduler doesn't know about it. - - It's possible for the scheduler to queue a GC and launch a task afterwards, in which - case we may see actively running tasks that the scheduler did not report in the - AdjustRetainedTasks message. - - Returns: - boolean indicating whether the task was terminated - """ - states = self.get_states(task) - if states: - task_start_time, _ = states[0] - if self._start_time - task_start_time > self.MINIMUM_KILL_AGE.as_(Time.SECONDS): - return self._terminate_task(task) - return False - - def should_gc_task(self, task): - """Check if a possibly-corrupt task should be locally GCed - - A task should be GCed if its checkpoint stream appears to be corrupted and the kill age - threshold is exceeded. - - Returns: - set, containing the task if it should be marked for local GC, or empty otherwise - """ - runner_ckpt = self._runner_ckpt(task) - if not os.path.exists(runner_ckpt): - return set() - latest_update = os.path.getmtime(runner_ckpt) - if self._start_time - latest_update > self.MINIMUM_KILL_AGE.as_(Time.SECONDS): - self.log('Got corrupt checkpoint file for %s - marking for local GC' % task.task_id) - return set([task]) - else: - self.log('Checkpoint file unreadable, but not yet beyond MINIMUM_KILL_AGE threshold') - return set() - - def reconcile_states(self, driver, retained_tasks): - """Reconcile states that the scheduler thinks tasks are in vs what they really are in. - - Local vs Scheduler => Action - =================================== - ACTIVE ACTIVE => no-op - ACTIVE ASSIGNED => no-op - ACTIVE TERMINAL => maybe kill task* - ACTIVE !EXISTS => maybe kill task* - TERMINAL ACTIVE => send actual status** - TERMINAL ASSIGNED => send actual status** - TERMINAL TERMINAL => no-op - TERMINAL !EXISTS => gc locally - !EXISTS ACTIVE => send LOST** - !EXISTS ASSIGNED => no-op - !EXISTS TERMINAL => gc remotely - - * - Only kill if this does not appear to be a race condition. - ** - These appear to have no effect - - Side effecting operations: - ACTIVE | (TERMINAL / !EXISTS) => maybe kill - TERMINAL | !EXISTS => delete - !EXISTS | TERMINAL => delete - - Returns tuple of (local_gc, remote_gc, updates), where: - local_gc - set of RootedTasks to be GCed locally - remote_gc - set of task_ids to be deleted on the scheduler - updates - dictionary of updates sent to the scheduler (task_id: ScheduleStatus) - """ - def partition(rt): - active, assigned, finished = set(), set(), set() - for task, schedule_status in rt.items(): - if schedule_status in TERMINAL_STATES: - finished.add(task) - elif schedule_status == ScheduleStatus.ASSIGNED: - assigned.add(task) - else: - active.add(task) - return active, assigned, finished - - local_active, local_finished = self.partition_tasks() - sched_active, sched_assigned, sched_finished = partition(retained_tasks) - local_tasks = local_active | local_finished - sched_tasks = sched_active | sched_assigned | sched_finished - - self.log('Told to retain the following task ids:') - for task_id, schedule_status in retained_tasks.items(): - self.log(' => %s as %s' % - (task_id, ScheduleStatus._VALUES_TO_NAMES.get(schedule_status, 'UNKNOWN'))) - - self.log('Local active tasks:') - for task in local_active: - self.log(' => %s' % task.task_id) - - self.log('Local finished tasks:') - for task in local_finished: - self.log(' => %s' % task.task_id) - - local_gc, remote_gc_ids = set(), set() - updates = {} - - for task in local_active: - if task.task_id not in (sched_active | sched_assigned): - self.log('Inspecting task %s for termination.' % task.task_id) - if not self.maybe_terminate_unknown_task(task): - local_gc.update(self.should_gc_task(task)) - - for task in local_finished: - if task.task_id not in sched_tasks: - self.log('Queueing task %s for local deletion.' % task.task_id) - local_gc.add(task) - if task.task_id in (sched_active | sched_assigned): - self.log('Task %s finished but scheduler thinks active/assigned.' % task.task_id) - states = self.get_states(task) - if states: - _, last_state = states[-1] - updates[task.task_id] = THERMOS_TO_TWITTER_STATES.get(last_state, ScheduleStatus.LOST) - self.send_update( - driver, - task.task_id, - THERMOS_TO_MESOS_STATES.get(last_state, mesos_pb2.TASK_LOST), - 'Task finish detected by GC executor.') - else: - local_gc.update(self.should_gc_task(task)) - - local_task_ids = set(task.task_id for task in local_tasks) - - for task_id in sched_finished: - if task_id not in local_task_ids: - self.log('Queueing task %s for remote deletion.' % task_id) - remote_gc_ids.add(task_id) - - for task_id in sched_active: - if task_id not in local_task_ids: - self.log('Know nothing about task %s, telling scheduler of LOSS.' % task_id) - updates[task_id] = ScheduleStatus.LOST - self.send_update( - driver, task_id, mesos_pb2.TASK_LOST, 'GC executor found no trace of task.') - - for task_id in sched_assigned: - if task_id not in local_task_ids: - self.log('Know nothing about task %s, but scheduler says ASSIGNED - passing' % task_id) - - return local_gc, remote_gc_ids, updates - - def clean_orphans(self, driver): - """Inspect checkpoints for trees that have been kill -9'ed but not properly cleaned up.""" - self.log('Checking for orphaned tasks') - active_tasks, _ = self.partition_tasks() - updates = {} - - def is_our_process(process, uid, timestamp): - if process.uids().real != uid: - return False - estimated_start_time = self._clock.time() - process.create_time() - return abs(timestamp - estimated_start_time) < self.MAX_PID_TIME_DRIFT.as_(Time.SECONDS) - - for task in active_tasks: - inspector = CheckpointInspector(task.checkpoint_root) - - self.log('Inspecting running task: %s' % task.task_id) - inspection = inspector.inspect(task.task_id) - if not inspection: - self.log(' - Error inspecting task runner') - continue - latest_runner = inspection.runner_processes[-1] - # Assume that it has not yet started? - if not latest_runner: - self.log(' - Task has no registered runners.') - continue - runner_pid, runner_uid, timestamp_ms = latest_runner - try: - runner_process = psutil.Process(runner_pid) - if is_our_process(runner_process, runner_uid, timestamp_ms / 1000.0): - self.log(' - Runner appears healthy.') - continue - except psutil.NoSuchProcess: - # Runner is dead - pass - except psutil.Error as err: - self.log(' - Error sampling runner process [pid=%s]: %s' % (runner_pid, err)) - continue - try: - latest_update = os.path.getmtime(self._runner_ckpt(task)) - except (IOError, OSError) as err: - self.log(' - Error accessing runner ckpt: %s' % err) - continue - if self._clock.time() - latest_update < self.MAX_CHECKPOINT_TIME_DRIFT.as_(Time.SECONDS): - self.log(' - Runner is dead but under LOST threshold.') - continue - self.log(' - Runner is dead but beyond LOST threshold: %.1fs' % ( - self._clock.time() - latest_update)) - if self._terminate_task(task, kill=False): - updates[task.task_id] = ScheduleStatus.LOST - self.send_update( - driver, task.task_id, mesos_pb2.TASK_LOST, 'GC executor detected failed task runner.') - - return updates - - def _erase_sandbox(self, task): - header_sandbox = self.get_sandbox(task) - directory_sandbox = DirectorySandbox(header_sandbox) if header_sandbox else None - if directory_sandbox and directory_sandbox.exists(): - self.log('Destroying DirectorySandbox for %s' % task.task_id) - try: - directory_sandbox.destroy() - except DirectorySandbox.Error as e: - self.log('Failed to destroy DirectorySandbox: %s' % e) - else: - self.log('Found no sandboxes for %s' % task.task_id) - - def _gc(self, task): - """Erase the sandbox, logs and metadata of the given task.""" - - self.log('Erasing sandbox for %s' % task.task_id) - self._erase_sandbox(task) - - collector = self._collector_class(task.checkpoint_root, task.task_id) - - self.log('Erasing logs for %s' % task.task_id) - collector.erase_logs() - - self.log('Erasing metadata for %s' % task.task_id) - collector.erase_metadata() - - def garbage_collect(self, force_delete=frozenset()): - """Garbage collect tasks on the system no longer active or in the supplied force_delete. - - Return a set of task_ids representing the tasks that were garbage collected. - """ - active_tasks, finished_tasks = self.partition_tasks() - retained_executors = set(iter(self.linked_executors)) - self.log('Executor sandboxes retained by Mesos:') - if retained_executors: - for r_e in sorted(retained_executors): - self.log(' %s' % r_e) - else: - self.log(' None') - for task in active_tasks: - if task.task_id not in retained_executors: - self.log('ERROR: Active task %s had its executor sandbox pulled.' % task.task_id) - gc_tasks = set() - for task in finished_tasks: - if task.task_id not in retained_executors: - gc_tasks.add(task) - gc_tasks.update(force_delete) - for gc_task in gc_tasks: - self._gc(gc_task) - return set(task.task_id for task in gc_tasks) - - @property - def linked_executors(self): - """Generator yielding the executor sandboxes detected on the system.""" - thermos_executor_prefix = 'thermos-' - for executor in self._executor_detector: - # It's possible for just the 'latest' symlink to be present but no run directories. - # This indicates that the task has been fully garbage collected. - if executor.executor_id.startswith(thermos_executor_prefix) and executor.run != 'latest': - yield executor.executor_id[len(thermos_executor_prefix):] - - def _run_gc(self, task, retain_tasks, retain_start): - """ - Reconcile the set of tasks to retain (provided by the scheduler) with the current state of - executors on this system. Garbage collect tasks/executors as appropriate. - - Not re-entrant! Previous executions must complete (and clear self._task_id) before this can be - invoked. - - Potentially blocking (e.g. on I/O) in self.garbage_collect() - - Args: - task: TaskInfo provided by the slave - retain_tasks: mapping of task_id => ScheduleStatus, describing what the scheduler thinks is - running on this system - retain_start: the time at which the retain_tasks message is effective -- this means that - tasks started after the retain_tasks message is effective are skipped - until future GC runs. - """ - task_id = task.task_id.value - if self._task_id is not None: - raise RuntimeError('_run_gc() called [task_id=%s], but already running [task_id=%s]' - % (task_id, self._task_id)) - self._task_id = task_id - self.log('Launching garbage collection [task_id=%s]' % task_id) - self._start_time = retain_start - local_gc, _, _ = self.reconcile_states(self._driver, retain_tasks) - self.garbage_collect(local_gc) - self.send_update( - self._driver, task.task_id.value, mesos_pb2.TASK_FINISHED, 'Garbage collection finished.') - self.log('Garbage collection complete [task_id=%s]' % task_id) - self._task_id = self._start_time = None - - def run(self): - """Main GC executor event loop. - - Periodically perform state reconciliation with the set of tasks provided - by the slave, and garbage collect orphaned tasks on the system. - """ - run_start = self._clock.time() - - def should_terminate(): - now = self._clock.time() - if now > run_start + self.MAXIMUM_EXECUTOR_LIFETIME.as_(Time.SECONDS): - return True - return self._stop_event.is_set() - - while not should_terminate(): - try: - _, (task, retain_tasks, retain_start) = self._gc_task_queue.popitem(0) - self._run_gc(task, retain_tasks, retain_start) - except KeyError: # no enqueued GC tasks - pass - if self._driver is not None: - self.clean_orphans(self._driver) - # TODO(wickman) This should be polling with self._clock - self._stop_event.wait(self.POLL_WAIT.as_(Time.SECONDS)) - - # shutdown - if self._driver is not None: - try: - prev_task_id, _ = self._gc_task_queue.popitem(0) - except KeyError: # no enqueued GC tasks - pass - else: - self.send_update(self._driver, prev_task_id, mesos_pb2.TASK_FINISHED, - 'Garbage collection skipped - GC executor shutting down') - # TODO(jon) Remove this once external MESOS-243 is resolved. - self.log('Sleeping briefly to mitigate https://issues.apache.org/jira/browse/MESOS-243') - self.log('Clock is %r, time is %s' % (self._clock, self._clock.time())) - self._clock.sleep(self.PERSISTENCE_WAIT.as_(Time.SECONDS)) - self.log('Finished sleeping.') - - self._driver.stop() - - """ Mesos Executor API methods follow """ - - def launchTask(self, driver, task): - """Queue a new garbage collection run, and drop any currently-enqueued runs.""" - if self._slave_id is None: - self._slave_id = task.slave_id.value - task_id = task.task_id.value - self.log('launchTask() got task_id: %s' % task_id) - if self._stop_event.is_set(): - self.log('=> Executor is shutting down - ignoring task %s' % task_id) - self.send_update( - self._driver, task_id, mesos_pb2.TASK_FAILED, 'GC Executor is shutting down.') - return - elif task_id == self._task_id: - self.log('=> GC with task_id %s currently running - ignoring' % task_id) - return - elif task_id in self._gc_task_queue: - self.log('=> Already have task_id %s queued - ignoring' % task_id) - return - try: - art = thrift_deserialize(AdjustRetainedTasks(), task.data) - except Exception as err: - self.log('Error deserializing task: %s' % err) - self.send_update( - self._driver, task_id, mesos_pb2.TASK_FAILED, 'Deserialization of GC task failed') - return - try: - prev_task_id, _ = self._gc_task_queue.popitem(0) - except KeyError: # no enqueued GC tasks - reset counter - self._dropped_tasks.write(0) - else: - self.log('=> Dropping previously queued GC with task_id %s' % prev_task_id) - self._dropped_tasks.increment() - self.log('=> Updating scheduler') - self.send_update(self._driver, prev_task_id, mesos_pb2.TASK_FINISHED, - 'Garbage collection skipped - GC executor received another task') - self.log('=> Adding %s to GC queue' % task_id) - self._gc_task_queue[task_id] = (task, art.retainedTasks, self._clock.time()) - - def killTask(self, driver, task_id): - """Remove the specified task from the queue, if it's not yet run. Otherwise, no-op.""" - self.log('killTask() got task_id: %s' % task_id) - task = self._gc_task_queue.pop(task_id, None) - if task is not None: - self.log('=> Removed %s from queued GC tasks' % task_id) - elif task_id == self._task_id: - self.log('=> GC with task_id %s currently running - ignoring' % task_id) - else: - self.log('=> Unknown task_id %s - ignoring' % task_id) - - def shutdown(self, driver): - """Trigger the Executor to shut down as soon as the current GC run is finished.""" - self.log('shutdown() called - setting stop event') - self._stop_event.set() http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/main/python/apache/thermos/cli/commands/BUILD ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/cli/commands/BUILD b/src/main/python/apache/thermos/cli/commands/BUILD index 1dae8c9..552eeb4 100644 --- a/src/main/python/apache/thermos/cli/commands/BUILD +++ b/src/main/python/apache/thermos/cli/commands/BUILD @@ -15,7 +15,6 @@ python_library( name = 'commands', dependencies = [ - ':gc', ':help', ':inspect', ':kill', @@ -28,17 +27,6 @@ python_library( ) python_library( - name = 'gc', - sources = ['gc.py'], - dependencies = [ - '3rdparty/python:twitter.common.app', - '3rdparty/python:twitter.common.quantity', - 'src/main/python/apache/thermos/cli:common', - 'src/main/python/apache/thermos/monitoring:garbage', - ] -) - -python_library( name = 'help', sources = ['help.py'], dependencies = [ http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/main/python/apache/thermos/cli/commands/gc.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/cli/commands/gc.py b/src/main/python/apache/thermos/cli/commands/gc.py deleted file mode 100644 index 23d9ff4..0000000 --- a/src/main/python/apache/thermos/cli/commands/gc.py +++ /dev/null @@ -1,105 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from __future__ import print_function - -from twitter.common import app -from twitter.common.quantity.parse_simple import parse_data, parse_time - -from apache.thermos.cli.common import get_path_detector, tasks_from_re -from apache.thermos.monitoring.garbage import GarbageCollectionPolicy, TaskGarbageCollector - - -def set_keep(option, opt_str, value, parser): - setattr(parser.values, option.dest, opt_str.startswith('--keep')) - - [email protected] [email protected]_option("--max_age", metavar="AGE", default=None, dest='max_age', - help="Max age in human readable form, e.g. 2d5h or 7200s") [email protected]_option("--max_tasks", metavar="NUM", default=None, dest='max_tasks', - help="Max number of tasks to keep.") [email protected]_option("--max_space", metavar="SPACE", default=None, dest='max_space', - help="Max space to allow for tasks, e.g. 20G.") [email protected]_option("--keep-logs", "--delete-logs", - metavar="PATH", default=True, - action='callback', callback=set_keep, dest='keep_logs', - help="Keep logs.") [email protected]_option("--keep-metadata", "--delete-metadata", - metavar="PATH", default=True, - action='callback', callback=set_keep, dest='keep_metadata', - help="Keep metadata.") [email protected]_option("--force", default=False, action='store_true', dest='force', - help="Perform garbage collection without confirmation") [email protected]_option("--dryrun", default=False, action='store_true', dest='dryrun', - help="Don't actually run garbage collection.") -def gc(args, options): - """Garbage collect task(s) and task metadata. - - Usage: thermos gc [options] [task_id1 task_id2 ...] - - If tasks specified, restrict garbage collection to only those tasks, - otherwise all tasks are considered. The optional constraints are still - honored. - """ - print('Analyzing root at %s' % options.root) - gc_options = {} - if options.max_age is not None: - gc_options['max_age'] = parse_time(options.max_age) - if options.max_space is not None: - gc_options['max_space'] = parse_data(options.max_space) - if options.max_tasks is not None: - gc_options['max_tasks'] = int(options.max_tasks) - gc_options.update(include_metadata=not options.keep_metadata, - include_logs=not options.keep_logs, - verbose=True, - logger=print) - if args: - gc_tasks = list(tasks_from_re(args, state='finished')) - else: - print('No task ids specified, using default collector.') - gc_tasks = [(task.checkpoint_root, task.task_id) - for task in GarbageCollectionPolicy(get_path_detector(), **gc_options).run()] - - if not gc_tasks: - print('No tasks to garbage collect. Exiting') - return - - def maybe(function, *args): - if options.dryrun: - print(' would run %s%r' % (function.__name__, args)) - else: - function(*args) - - value = 'y' - if not options.force: - value = raw_input("Continue [y/N]? ") or 'N' - if value.lower() == 'y': - print('Running gc...') - - for checkpoint_root, task_id in gc_tasks: - tgc = TaskGarbageCollector(checkpoint_root, task_id) - print(' Task %s ' % task_id, end='') - print('data (%s) ' % ('keeping' if options.keep_data else 'deleting'), end='') - print('logs (%s) ' % ('keeping' if options.keep_logs else 'deleting'), end='') - print('metadata (%s) ' % ('keeping' if options.keep_metadata else 'deleting')) - if not options.keep_data: - maybe(tgc.erase_data) - if not options.keep_logs: - maybe(tgc.erase_logs) - if not options.keep_metadata: - maybe(tgc.erase_metadata) - print('done.') - else: - print('Cancelling gc.') http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/main/python/apache/thermos/cli/main.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/cli/main.py b/src/main/python/apache/thermos/cli/main.py index f20f612..bb51353 100644 --- a/src/main/python/apache/thermos/cli/main.py +++ b/src/main/python/apache/thermos/cli/main.py @@ -20,7 +20,6 @@ from .common import clear_path_detectors, register_path_detector def register_commands(app): from apache.thermos.cli.common import generate_usage from apache.thermos.cli.commands import ( - gc as gc_command, help as help_command, inspect as inspect_command, kill as kill_command, @@ -32,7 +31,6 @@ def register_commands(app): ) app.register_commands_from( - gc_command, help_command, inspect_command, kill_command, http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/main/python/apache/thermos/core/BUILD ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/core/BUILD b/src/main/python/apache/thermos/core/BUILD index efb68e8..d47b7a2 100644 --- a/src/main/python/apache/thermos/core/BUILD +++ b/src/main/python/apache/thermos/core/BUILD @@ -30,18 +30,6 @@ python_library( ) python_library( - name = 'inspector', - sources = ['inspector.py'], - dependencies = [ - ':muxer', - '3rdparty/python:twitter.common.recordio', - 'src/main/python/apache/thermos/common:ckpt', - 'src/main/python/apache/thermos/common:path', - 'api/src/main/thrift/org/apache/thermos:py-thrift', - ] -) - -python_library( name = 'muxer', sources = ['muxer.py'], dependencies = [ @@ -87,7 +75,6 @@ python_library( python_library( name = 'core', dependencies = [ - ':inspector', ':runner', # covering libs http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/main/python/apache/thermos/core/helper.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/core/helper.py b/src/main/python/apache/thermos/core/helper.py index 8cd3294..dda40ed 100644 --- a/src/main/python/apache/thermos/core/helper.py +++ b/src/main/python/apache/thermos/core/helper.py @@ -30,24 +30,6 @@ from apache.thermos.common.path import TaskPath from gen.apache.thermos.ttypes import ProcessState, ProcessStatus, RunnerCkpt, TaskState, TaskStatus -class TaskKiller(object): - """ - Task killing interface. - """ - - def __init__(self, task_id, checkpoint_root): - self._task_id = task_id - self._checkpoint_root = checkpoint_root - - def kill(self, force=True): - TaskRunnerHelper.kill(self._task_id, self._checkpoint_root, force=force, - terminal_status=TaskState.KILLED) - - def lose(self, force=True): - TaskRunnerHelper.kill(self._task_id, self._checkpoint_root, force=force, - terminal_status=TaskState.LOST) - - class TaskRunnerHelper(object): """ TaskRunner helper methods that can be operated directly upon checkpoint http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/main/python/apache/thermos/core/inspector.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/core/inspector.py b/src/main/python/apache/thermos/core/inspector.py deleted file mode 100644 index 4fe8aa3..0000000 --- a/src/main/python/apache/thermos/core/inspector.py +++ /dev/null @@ -1,115 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import pwd -from collections import namedtuple -from contextlib import closing - -from twitter.common import log -from twitter.common.recordio import RecordIO, ThriftRecordReader - -from apache.thermos.common.ckpt import CheckpointDispatcher -from apache.thermos.common.path import TaskPath - -from .muxer import ProcessMuxer - -from gen.apache.thermos.ttypes import ProcessState, RunnerCkpt, RunnerState - -CheckpointInspection = namedtuple('CheckpointInspection', - ['runner_latest_update', - 'process_latest_update', - 'runner_processes', - 'coordinator_processes', - 'processes']) - - -class CheckpointInspector(object): - def __init__(self, checkpoint_root): - self._path = TaskPath(root=checkpoint_root) - - @staticmethod - def get_timestamp(process_record): - if process_record: - for timestamp in ('fork_time', 'start_time', 'stop_time'): - stamp = getattr(process_record, timestamp, None) - if stamp: - return stamp - return 0 - - def inspect(self, task_id): - """ - Reconstructs the checkpoint stream and returns a CheckpointInspection. - """ - dispatcher = CheckpointDispatcher() - state = RunnerState(processes={}) - muxer = ProcessMuxer(self._path.given(task_id=task_id)) - - runner_processes = [] - coordinator_processes = set() - processes = set() - - def consume_process_record(record): - if not record.process_status: - return - try: - user_uid = pwd.getpwnam(state.header.user).pw_uid - except KeyError: - log.error('Could not find user: %s' % state.header.user) - return - if record.process_status.state == ProcessState.FORKED: - coordinator_processes.add((record.process_status.coordinator_pid, user_uid, - record.process_status.fork_time)) - elif record.process_status.state == ProcessState.RUNNING: - processes.add((record.process_status.pid, user_uid, - record.process_status.start_time)) - - # replay runner checkpoint - runner_pid = None - runner_latest_update = 0 - try: - with open(self._path.given(task_id=task_id).getpath('runner_checkpoint')) as fp: - with closing(ThriftRecordReader(fp, RunnerCkpt)) as ckpt: - for record in ckpt: - dispatcher.dispatch(state, record) - runner_latest_update = max(runner_latest_update, - self.get_timestamp(record.process_status)) - # collect all bound runners - if record.task_status: - if record.task_status.runner_pid != runner_pid: - runner_processes.append((record.task_status.runner_pid, - record.task_status.runner_uid or 0, - record.task_status.timestamp_ms)) - runner_pid = record.task_status.runner_pid - elif record.process_status: - consume_process_record(record) - except (IOError, OSError, RecordIO.Error) as err: - log.debug('Error inspecting task runner checkpoint: %s' % err) - return - - # register existing processes in muxer - for process_name in state.processes: - muxer.register(process_name) - - # read process checkpoints - process_latest_update = runner_latest_update - for record in muxer.select(): - process_latest_update = max(process_latest_update, self.get_timestamp(record.process_status)) - consume_process_record(record) - - return CheckpointInspection( - runner_latest_update=runner_latest_update, - process_latest_update=process_latest_update, - runner_processes=runner_processes, - coordinator_processes=coordinator_processes, - processes=processes) http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/main/python/apache/thermos/monitoring/BUILD ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/monitoring/BUILD b/src/main/python/apache/thermos/monitoring/BUILD index 633dd95..e885102 100644 --- a/src/main/python/apache/thermos/monitoring/BUILD +++ b/src/main/python/apache/thermos/monitoring/BUILD @@ -25,19 +25,6 @@ python_library( ) python_library( - name = 'garbage', - sources = ['garbage.py'], - dependencies = [ - ':detector', - '3rdparty/python:twitter.common.dirutil', - '3rdparty/python:twitter.common.lang', - '3rdparty/python:twitter.common.quantity', - 'src/main/python/apache/thermos/common:ckpt', - 'src/main/python/apache/thermos/common:path', - ] -) - -python_library( name = 'monitor', sources = ['monitor.py'], dependencies = [ @@ -95,7 +82,6 @@ python_library( dependencies = [ ':detector', ':disk', - ':garbage', ':monitor', ':process', ':resource', http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/main/python/apache/thermos/monitoring/garbage.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/monitoring/garbage.py b/src/main/python/apache/thermos/monitoring/garbage.py deleted file mode 100644 index aa5a272..0000000 --- a/src/main/python/apache/thermos/monitoring/garbage.py +++ /dev/null @@ -1,198 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import os -import sys -import time -from collections import namedtuple - -from twitter.common.dirutil import safe_bsize, safe_delete, safe_mtime, safe_rmtree -from twitter.common.quantity import Amount, Data, Time - -from apache.thermos.common.ckpt import CheckpointDispatcher -from apache.thermos.common.path import TaskPath - -from .detector import TaskDetector - - -class TaskGarbageCollector(object): - """A task wrapper to manage its sandbox and log state.""" - - def __init__(self, checkpoint_root, task_id): - """ - :param checkpoint_root: The checkpoint root to find the given task. - :param task_id: The task_id of the task whose state we wish to manage. - """ - - self._detector = TaskDetector(checkpoint_root) - self._task_id = task_id - self._pathspec = TaskPath(root=checkpoint_root, task_id=task_id) - self._state = CheckpointDispatcher.from_file(self._detector.get_checkpoint(task_id)) - - def get_age(self): - return safe_mtime(self._detector.get_checkpoint(self._task_id)) - - def get_metadata(self, with_size=True): - runner_ckpt = self._detector.get_checkpoint(self._task_id) - process_ckpts = [ckpt for ckpt in self._detector.get_process_checkpoints(self._task_id)] - # assumes task is in finished state. - json_spec = self._pathspec.given(state='finished').getpath('task_path') - for path in [json_spec, runner_ckpt] + process_ckpts: - if with_size: - yield path, safe_bsize(path) - else: - yield path - - def get_logs(self, with_size=True): - if self._state and self._state.header and self._state.header.log_dir: - for path in self._detector.get_process_logs(self._task_id, self._state.header.log_dir): - if with_size: - yield path, safe_bsize(path) - else: - yield path - - def get_data(self, with_size=True): - if self._state and self._state.header and self._state.header.sandbox: - for root, dirs, files in os.walk(self._state.header.sandbox): - for file in files: - filename = os.path.join(root, file) - if with_size: - yield filename, safe_bsize(filename) - else: - yield filename - - def erase_task(self): - self.erase_data() - self.erase_logs() - self.erase_metadata() - - def erase_metadata(self): - for fn in self.get_metadata(with_size=False): - safe_delete(fn) - safe_rmtree(self._pathspec.getpath('checkpoint_path')) - - def erase_logs(self): - for fn in self.get_logs(with_size=False): - safe_delete(fn) - if self._state and self._state.header: - path = self._pathspec.given(log_dir=self._state.header.log_dir).getpath('process_logbase') - safe_rmtree(path) - - def erase_data(self): - for fn in self.get_data(with_size=False): - safe_delete(fn) - if self._state and self._state.header and self._state.header.sandbox: - safe_rmtree(self._state.header.sandbox) - - -class GarbageCollectionPolicy(object): - def __init__(self, - path_detector, - max_age=Amount(10 ** 10, Time.DAYS), - max_space=Amount(10 ** 10, Data.TB), - max_tasks=10 ** 10, - include_metadata=True, - include_logs=True, - verbose=False, - logger=sys.stdout.write): - """ - Default garbage collection policy. - - Arguments that may be specified: - max_age: Amount(Time) (max age of a retained task) [default: infinity] - max_space: Amount(Data) (max space to keep) [default: infinity] - max_tasks: int (max number of tasks to keep) [default: infinity] - include_metadata: boolean (Whether or not to include metadata in the - space calculations.) [default: True] - include_logs: boolean (Whether or not to include logs in the - space calculations.) [default: True] - verbose: boolean (whether or not to log) [default: False] - logger: callable (function to call with log messages) [default: sys.stdout.write] - """ - self._path_detector = path_detector - self._max_age = max_age - self._max_space = max_space - self._max_tasks = max_tasks - self._include_metadata = include_metadata - self._include_logs = include_logs - self._verbose = verbose - self._logger = logger - - def log(self, msg): - if self._verbose: - self._logger(msg) - - def get_finished_tasks(self): - """Yields (checkpoint_root, task_id) for finished tasks.""" - - for checkpoint_root in self._path_detector.get_paths(): - for task_id in TaskDetector(checkpoint_root).get_task_ids(state='finished'): - yield (checkpoint_root, task_id) - - def run(self): - tasks = [] - now = time.time() - - # age: The time (in seconds) since the last task transition to/from ACTIVE/FINISHED - # metadata_size: The size of the thermos checkpoint records for this task - # log_size: The size of the stdout/stderr logs for this task's processes - # data_size: The size of the sandbox of this task. - TaskTuple = namedtuple('TaskTuple', - 'checkpoint_root task_id age metadata_size log_size data_size') - - for checkpoint_root, task_id in self.get_finished_tasks(): - collector = TaskGarbageCollector(checkpoint_root, task_id) - - age = Amount(int(now - collector.get_age()), Time.SECONDS) - self.log('Analyzing task %s (age: %s)... ' % (task_id, age)) - metadata_size = Amount(sum(sz for _, sz in collector.get_metadata()), Data.BYTES) - self.log(' metadata %.1fKB ' % metadata_size.as_(Data.KB)) - log_size = Amount(sum(sz for _, sz in collector.get_logs()), Data.BYTES) - self.log(' logs %.1fKB ' % log_size.as_(Data.KB)) - data_size = Amount(sum(sz for _, sz in collector.get_data()), Data.BYTES) - self.log(' data %.1fMB ' % data_size.as_(Data.MB)) - tasks.append(TaskTuple(checkpoint_root, task_id, age, metadata_size, log_size, data_size)) - - gc_tasks = set() - gc_tasks.update(task for task in tasks if task.age > self._max_age) - - self.log('After age filter: %s tasks' % len(gc_tasks)) - - def total_gc_size(task): - return sum([task.data_size, - task.metadata_size if self._include_metadata else Amount(0, Data.BYTES), - task.log_size if self._include_logs else Amount(0, Data.BYTES)], - Amount(0, Data.BYTES)) - - total_used = Amount(0, Data.BYTES) - for task in sorted(tasks, key=lambda tsk: tsk.age, reverse=True): - if task not in gc_tasks: - total_used += total_gc_size(task) - if total_used > self._max_space: - gc_tasks.add(task) - self.log('After size filter: %s tasks' % len(gc_tasks)) - - for task in sorted(tasks, key=lambda tsk: tsk.age, reverse=True): - if task not in gc_tasks and len(tasks) - len(gc_tasks) > self._max_tasks: - gc_tasks.add(task) - self.log('After total task filter: %s tasks' % len(gc_tasks)) - - self.log('Deciding to garbage collect the following tasks:') - if gc_tasks: - for task in gc_tasks: - self.log(' %s' % repr(task)) - else: - self.log(' None.') - - return gc_tasks http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/test/python/apache/aurora/executor/BUILD ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/executor/BUILD b/src/test/python/apache/aurora/executor/BUILD index f415ecc..e1c635c 100644 --- a/src/test/python/apache/aurora/executor/BUILD +++ b/src/test/python/apache/aurora/executor/BUILD @@ -35,29 +35,10 @@ python_test_suite( python_test_suite( name = 'executor-large', dependencies = [ - ':gc_executor', ':thermos_executor', ] ) - -python_tests(name = 'gc_executor', - sources = ['test_gc_executor.py'], - dependencies = [ - '3rdparty/python:mock', - '3rdparty/python:twitter.common.app', - '3rdparty/python:twitter.common.concurrent', - '3rdparty/python:twitter.common.quantity', - '3rdparty/python:twitter.common.testing', - 'src/main/python/apache/thermos/common:path', - 'src/main/python/apache/thermos/config', - 'src/main/python/apache/thermos/core:runner', - 'api/src/main/thrift/org/apache/thermos:py-thrift', - 'src/main/python/apache/aurora/executor:gc_executor', - 'api/src/main/thrift/org/apache/aurora/gen:py-thrift', - ], -) - python_tests(name = 'thermos_executor', sources = ['test_thermos_executor.py'], # timeout = Amount(5, Time.MINUTES), http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/test/python/apache/aurora/executor/bin/BUILD ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/executor/bin/BUILD b/src/test/python/apache/aurora/executor/bin/BUILD index 2caab2a..713ce95 100644 --- a/src/test/python/apache/aurora/executor/bin/BUILD +++ b/src/test/python/apache/aurora/executor/bin/BUILD @@ -15,21 +15,11 @@ python_test_suite( name = 'all', dependencies = [ - ':gc_executor_entry_point', ':thermos_executor_entry_point', ] ) python_tests( - name = 'gc_executor_entry_point', - sources = ['test_gc_executor_entry_point.py'], - dependencies = [ - '3rdparty/python:mock', - 'src/main/python/apache/aurora/executor/bin:gc_executor_source', - ], -) - -python_tests( name = 'thermos_executor_entry_point', sources = ['test_thermos_executor_entry_point.py'], dependencies = [ http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/test/python/apache/aurora/executor/bin/test_gc_executor_entry_point.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/executor/bin/test_gc_executor_entry_point.py b/src/test/python/apache/aurora/executor/bin/test_gc_executor_entry_point.py deleted file mode 100644 index d4c1d57..0000000 --- a/src/test/python/apache/aurora/executor/bin/test_gc_executor_entry_point.py +++ /dev/null @@ -1,40 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import unittest - -from mock import create_autospec, Mock, patch - -from apache.aurora.executor.bin.gc_executor_main import initialize, proxy_main -from apache.aurora.executor.gc_executor import ThermosGCExecutor -from apache.thermos.monitoring.detector import ChainedPathDetector - - -def test_gc_executor_valid_import_dependencies(): - assert proxy_main is not None - - -class GcExecutorMainTest(unittest.TestCase): - def test_chained_path_detector_initialized(self): - mock_gc_executor = create_autospec(spec=ThermosGCExecutor) - with patch('apache.aurora.executor.bin.gc_executor_main.ThermosGCExecutor', - return_value=mock_gc_executor) as mock: - with patch('apache.aurora.executor.bin.gc_executor_main.MesosExecutorDriver', - return_value=Mock()): - - initialize() - assert len(mock.mock_calls) == 1 - call = mock.mock_calls[0] - _, args, _ = call - assert len(args) == 1 and isinstance(args[0], ChainedPathDetector)
