Repository: aurora Updated Branches: refs/heads/master d977aa475 -> 56bb1e693
http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/test/python/apache/aurora/executor/test_gc_executor.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/executor/test_gc_executor.py b/src/test/python/apache/aurora/executor/test_gc_executor.py deleted file mode 100644 index 17d3590..0000000 --- a/src/test/python/apache/aurora/executor/test_gc_executor.py +++ /dev/null @@ -1,656 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import contextlib -import functools -import os -import shutil -import threading -import time -import unittest -from collections import namedtuple -from itertools import product - -import mock -import pytest -from mesos.interface import mesos_pb2 -from thrift.TSerialization import serialize as thrift_serialize -from twitter.common import log -from twitter.common.concurrent import deadline, Timeout -from twitter.common.contextutil import temporary_dir -from twitter.common.dirutil import safe_rmtree -from twitter.common.quantity import Amount, Time -from twitter.common.testing.clock import ThreadedClock - -from apache.aurora.executor.gc_executor import RootedTask, ThermosGCExecutor -from apache.thermos.common.path import TaskPath -from apache.thermos.config.schema import SimpleTask -from apache.thermos.core.runner import TaskRunner -from apache.thermos.monitoring.detector import FixedPathDetector - -from gen.apache.aurora.api.constants import LIVE_STATES, TERMINAL_STATES -from gen.apache.aurora.api.ttypes import ScheduleStatus -from gen.apache.aurora.comm.ttypes import AdjustRetainedTasks -from gen.apache.thermos.ttypes import ProcessState, TaskState - -FAKE_ROOT = 'fake_root' - - -def make_task(task_id): - return RootedTask(FAKE_ROOT, task_id) - - -ACTIVE_TASKS = (make_task('sleep60-lost'),) - - -FINISHED_TASKS = { - make_task('failure'): ProcessState.SUCCESS, - make_task('failure_limit'): ProcessState.FAILED, - make_task('hello_world'): ProcessState.SUCCESS, - make_task('ordering'): ProcessState.SUCCESS, - make_task('ports'): ProcessState.SUCCESS, - make_task('sleep60'): ProcessState.KILLED -} - -# TODO(wickman) These should be constant sets in the Thermos thrift -THERMOS_LIVES = (TaskState.ACTIVE, TaskState.CLEANING, TaskState.FINALIZING) -THERMOS_TERMINALS = (TaskState.SUCCESS, TaskState.FAILED, TaskState.KILLED, TaskState.LOST) -ASSIGNED_STATES = (ScheduleStatus.ASSIGNED, ) -TASK_ID = 'gc_executor_task_id' -EVENT_WAIT_TIMEOUT_SECS = 10 - - -if 'THERMOS_DEBUG' in os.environ: - from twitter.common.log.options import LogOptions - LogOptions.set_disk_log_level('NONE') - LogOptions.set_stderr_log_level('DEBUG') - log.init('test_gc_executor') - - -def thread_yield(): - time.sleep(0.1) - - -def setup_tree(td, lose=False): - safe_rmtree(td) - - # TODO(wickman) These should be referred as resources= in the python_target instead. - shutil.copytree('src/test/resources/org/apache/thermos/root', td) - - if lose: - lost_age = time.time() - ( - 2 * ThinTestThermosGCExecutor.MAX_CHECKPOINT_TIME_DRIFT.as_(Time.SECONDS)) - utime = (lost_age, lost_age) - else: - utime = None - - # touch everything - for root, dirs, files in os.walk(td): - for fn in files: - os.utime(os.path.join(root, fn), utime) - - -StatusUpdate = namedtuple('StatusUpdate', 'state task_id') - - -class ProxyDriver(object): - def __init__(self): - self.stopped = threading.Event() - self.updates = [] - - def stop(self): - log.debug('ProxyDriver.stop') - self.stopped.set() - - def sendStatusUpdate(self, update): # noqa - log.debug('ProxyDriver.sendStatusUpdate %r' % update) - self.updates.append(StatusUpdate(update.state, update.task_id.value)) - - -def serialize_art(art, task_id=TASK_ID): - td = mesos_pb2.TaskInfo() - td.slave_id.value = 'ignore_me' - td.task_id.value = task_id - td.data = thrift_serialize(art) - return td - - -class FakeExecutorDetector(object): - class ExecutorScanf(object): - def __init__(self, task_id): - self.executor_id = 'thermos-' + task_id - self.run = 'some_run_number' - - def __init__(self, *task_ids): - self.__scanfs = [self.ExecutorScanf(task_id) for task_id in task_ids] - - def __iter__(self): - return iter(self.__scanfs) - - -class ThinTestThermosGCExecutor(ThermosGCExecutor): - POLL_WAIT = Amount(5, Time.MILLISECONDS) - MINIMUM_KILL_AGE = Amount(5, Time.SECONDS) - - def __init__(self, checkpoint_root, active_executors=[]): - self._active_executors = active_executors - self._kills = set() - self._losses = set() - self._gcs = set() - ThermosGCExecutor.__init__( - self, - FixedPathDetector(checkpoint_root), - clock=ThreadedClock(time.time()), - executor_detector=lambda: list) - - @property - def gcs(self): - return self._gcs - - def _gc(self, task): - self._gcs.add(task) - - def _terminate_task(self, task, kill=True): - if kill: - self._kills.add(task) - else: - self._losses.add(task) - return True - - @property - def linked_executors(self): - return self._active_executors - - -class ThickTestThermosGCExecutor(ThinTestThermosGCExecutor): - def __init__(self, active_tasks, finished_tasks, active_executors=[], corrupt_tasks=[]): - self._active_tasks = active_tasks - self._finished_tasks = finished_tasks - self._corrupt_tasks = corrupt_tasks - self._maybe_terminate = set() - ThinTestThermosGCExecutor.__init__(self, FAKE_ROOT, active_executors) - - @property - def results(self): - return self._kills, self._losses, self._gcs, self._maybe_terminate - - @property - def len_results(self): - return len(self._kills), len(self._losses), len(self._gcs), len(self._maybe_terminate) - - def partition_tasks(self): - return set(self._active_tasks.keys()), set(self._finished_tasks.keys()) - - def maybe_terminate_unknown_task(self, task): - self._maybe_terminate.add(task) - - def get_states(self, task): - if task not in self._corrupt_tasks: - if task in self._active_tasks: - return [(self._clock.time(), self._active_tasks[task])] - elif task in self._finished_tasks: - return [(self._clock.time(), self._finished_tasks[task])] - return [] - - def should_gc_task(self, task): - if task in self._corrupt_tasks: - return set([task]) - return set() - - -def make_pair(*args, **kw): - return ThickTestThermosGCExecutor(*args, **kw), ProxyDriver() - - -def llen(*iterables): - return tuple(len(iterable) for iterable in iterables) - - -def test_state_reconciliation_no_ops(): - # active vs. active - for st0, st1 in product(THERMOS_LIVES, LIVE_STATES): - tgc, driver = make_pair({make_task('foo'): st0}, {}) - lgc, rgc, updates = tgc.reconcile_states(driver, {'foo': st1}) - assert tgc.len_results == (0, 0, 0, 0) - assert llen(lgc, rgc, updates) == (0, 0, 0) - - # terminal vs. terminal - for st0, st1 in product(THERMOS_TERMINALS, TERMINAL_STATES): - tgc, driver = make_pair({}, {make_task('foo'): st0}) - lgc, rgc, updates = tgc.reconcile_states(driver, {'foo': st1}) - assert tgc.len_results == (0, 0, 0, 0) - assert llen(lgc, rgc, updates) == (0, 0, 0) - - # active vs. assigned - for st0, st1 in product(THERMOS_LIVES, ASSIGNED_STATES): - tgc, driver = make_pair({make_task('foo'): st0}, {}) - lgc, rgc, updates = tgc.reconcile_states(driver, {'foo': st1}) - assert tgc.len_results == (0, 0, 0, 0) - assert llen(lgc, rgc, updates) == (0, 0, 0) - - # nexist vs. assigned - for st1 in ASSIGNED_STATES: - tgc, driver = make_pair({}, {}) - lgc, rgc, updates = tgc.reconcile_states(driver, {'foo': st1}) - assert tgc.len_results == (0, 0, 0, 0) - assert llen(lgc, rgc, updates) == (0, 0, 0) - - -def test_state_reconciliation_active_terminal(): - for st0, st1 in product(THERMOS_LIVES, TERMINAL_STATES): - tgc, driver = make_pair({make_task('foo'): st0}, {}) - lgc, rgc, updates = tgc.reconcile_states(driver, {'foo': st1}) - assert tgc.len_results == (0, 0, 0, 1) - assert llen(lgc, rgc, updates) == (0, 0, 0) - - -def test_state_reconciliation_active_nexist(): - for st0 in THERMOS_LIVES: - tgc, driver = make_pair({make_task('foo'): st0}, {}) - lgc, rgc, updates = tgc.reconcile_states(driver, {}) - assert tgc.len_results == (0, 0, 0, 1) - assert llen(lgc, rgc, updates) == (0, 0, 0) - - -def test_state_reconciliation_terminal_active(): - for st0, st1 in product(THERMOS_TERMINALS, LIVE_STATES): - tgc, driver = make_pair({}, {make_task('foo'): st0}) - lgc, rgc, updates = tgc.reconcile_states(driver, {'foo': st1}) - assert tgc.len_results == (0, 0, 0, 0) - assert llen(lgc, rgc, updates) == (0, 0, 1) - - -def test_state_reconciliation_corrupt_tasks(): - for st0, st1 in product(THERMOS_TERMINALS, LIVE_STATES): - tgc, driver = make_pair({}, {make_task('foo'): st0}, corrupt_tasks=[make_task('foo')]) - lgc, rgc, updates = tgc.reconcile_states(driver, {'foo': st1}) - assert tgc.len_results == (0, 0, 0, 0) - assert llen(lgc, rgc, updates) == (1, 0, 0) - - -def test_state_reconciliation_terminal_nexist(): - for st0, st1 in product(THERMOS_TERMINALS, LIVE_STATES): - tgc, driver = make_pair({}, {make_task('foo'): st0}) - lgc, rgc, updates = tgc.reconcile_states(driver, {}) - assert tgc.len_results == (0, 0, 0, 0) - assert llen(lgc, rgc, updates) == (1, 0, 0) - assert lgc == set([make_task('foo')]) - - -def test_state_reconciliation_nexist_active(): - for st1 in LIVE_STATES: - tgc, driver = make_pair({}, {}) - lgc, rgc, updates = tgc.reconcile_states(driver, {'foo': st1}) - assert tgc.len_results == (0, 0, 0, 0) - assert llen(lgc, rgc, updates) == (0, 0, 1) - - -def test_state_reconciliation_nexist_terminal(): - for st1 in TERMINAL_STATES: - tgc, driver = make_pair({}, {}) - lgc, rgc, updates = tgc.reconcile_states(driver, {'foo': st1}) - assert tgc.len_results == (0, 0, 0, 0) - assert llen(lgc, rgc, updates) == (0, 1, 0) - assert rgc == set(['foo']) - - -def test_real_get_states(): - with temporary_dir() as td: - setup_tree(td) - executor = ThinTestThermosGCExecutor(td) - for task in FINISHED_TASKS: - real_task = RootedTask(td, task.task_id) - states = executor.get_states(real_task) - assert isinstance(states, list) and len(states) > 0 - assert executor.get_sandbox(real_task) is not None - - -def wait_until_not(thing, timeout=EVENT_WAIT_TIMEOUT_SECS): - """wait until something is booleany False""" - def wait(): - while thing(): - time.sleep(0.1) - try: - deadline(wait, timeout=timeout, daemon=True) - return True - except Timeout: - return False - - -def run_gc_with(active_executors, retained_tasks, lose=False): - proxy_driver = ProxyDriver() - with temporary_dir() as td: - setup_tree(td, lose=lose) - executor = ThinTestThermosGCExecutor(td, active_executors=active_executors) - try: - executor.registered(proxy_driver, None, None, None) - executor.start() - art = AdjustRetainedTasks(retainedTasks=retained_tasks) - executor.launchTask(proxy_driver, serialize_art(art, TASK_ID)) - assert wait_until_not(lambda: executor._gc_task_queue) - assert wait_until_not(lambda: executor._task_id) - assert len(executor._gc_task_queue) == 0 - assert not executor._task_id - finally: - executor.shutdown(proxy_driver) - assert len(proxy_driver.updates) >= 1 - if not lose: # if the task is lost it will be cleaned out of band (by clean_orphans), - # so we don't care when the GC task actually finishes - assert proxy_driver.updates[-1][0] == mesos_pb2.TASK_FINISHED - assert proxy_driver.updates[-1][1] == TASK_ID - return executor, proxy_driver - - -def test_gc_with_loss(): - executor, proxy_driver = run_gc_with( - active_executors=set(task.task_id for task in ACTIVE_TASKS), - retained_tasks={}, - lose=True) - assert len(executor._kills) == len(ACTIVE_TASKS) - assert len(executor.gcs) == len(FINISHED_TASKS) - assert len(proxy_driver.updates) >= 1 - assert StatusUpdate(mesos_pb2.TASK_LOST, ACTIVE_TASKS[0].task_id) in proxy_driver.updates - - -def test_gc_with_starting_task(): - executor, proxy_driver = run_gc_with( - active_executors=set(task.task_id for task in ACTIVE_TASKS), - retained_tasks={ACTIVE_TASKS[0].task_id: ScheduleStatus.STARTING}) - assert len(executor._kills) == 0 - assert len(executor.gcs) == len(FINISHED_TASKS) - - -def test_gc_without_task_missing(): - executor, proxy_driver = run_gc_with( - active_executors=set(task.task_id for task in ACTIVE_TASKS), - retained_tasks={}, - lose=False) - assert len(executor._kills) == len(ACTIVE_TASKS) - assert len(executor.gcs) == len(FINISHED_TASKS) - - -def test_gc_without_loss(): - executor, proxy_driver = run_gc_with( - active_executors=set(task.task_id for task in ACTIVE_TASKS), - retained_tasks={ACTIVE_TASKS[0].task_id: ScheduleStatus.RUNNING}) - assert len(executor._kills) == 0 - assert len(executor.gcs) == len(FINISHED_TASKS) - - -def test_gc_withheld(): - executor, proxy_driver = run_gc_with( - active_executors=set([ACTIVE_TASKS[0].task_id, 'failure']), - retained_tasks={ACTIVE_TASKS[0].task_id: ScheduleStatus.RUNNING, - 'failure': ScheduleStatus.FAILED}) - assert len(executor._kills) == 0 - assert len(executor.gcs) == len(FINISHED_TASKS) - 1 - - -def test_gc_withheld_and_executor_missing(): - executor, proxy_driver = run_gc_with( - active_executors=set(task.task_id for task in ACTIVE_TASKS), - retained_tasks={ACTIVE_TASKS[0].task_id: ScheduleStatus.RUNNING, - 'failure': ScheduleStatus.FAILED}) - assert len(executor._kills) == 0 - assert len(executor.gcs) == len(FINISHED_TASKS) - - -def build_blocking_gc_executor(td, proxy_driver): - class LongGCThinTestThermosGCExecutor(ThinTestThermosGCExecutor): - def _run_gc(self, task, retain_tasks, retain_start): - # just block until we shutdown - log.debug('%r : _run_gc called (%r, %r, %r)' % ( - self, task, retain_tasks, retain_start)) - self._start_time = retain_start - self._task_id = task.task_id.value - self._stop_event.wait() - self._start_time = None - self._task_id = None - executor = LongGCThinTestThermosGCExecutor(td) - executor.registered(proxy_driver, None, None, None) - executor.start() - return executor - - -def test_gc_killtask_noop(): - proxy_driver = ProxyDriver() - with temporary_dir() as td: - executor = ThinTestThermosGCExecutor(td) - try: - executor.registered(proxy_driver, None, None, None) - executor.start() - executor.killTask(proxy_driver, TASK_ID) - assert not proxy_driver.stopped.is_set() - assert len(proxy_driver.updates) == 0 - finally: - executor.shutdown(proxy_driver) - - -def test_gc_killtask_current(): - proxy_driver = ProxyDriver() - with temporary_dir() as td: - executor = build_blocking_gc_executor(td, proxy_driver) - - try: - executor.launchTask(proxy_driver, serialize_art(AdjustRetainedTasks())) - assert wait_until_not(lambda: len(executor._gc_task_queue)) - assert len(executor._gc_task_queue) == 0 - assert executor._task_id == TASK_ID - executor.killTask(proxy_driver, TASK_ID) - assert executor._task_id == TASK_ID - assert len(executor._gc_task_queue) == 0 - assert not proxy_driver.stopped.is_set() - assert len(proxy_driver.updates) == 0 - finally: - executor.shutdown(proxy_driver) - - -def test_gc_killtask_queued(): - TASK2_ID = "task2" - proxy_driver = ProxyDriver() - with temporary_dir() as td: - executor = build_blocking_gc_executor(td, proxy_driver) - try: - executor.launchTask(proxy_driver, serialize_art(AdjustRetainedTasks())) - thread_yield() - executor.launchTask(proxy_driver, serialize_art(AdjustRetainedTasks(), task_id=TASK2_ID)) - thread_yield() - assert len(executor._gc_task_queue) == 1 - executor.killTask(proxy_driver, TASK2_ID) - thread_yield() - assert len(executor._gc_task_queue) == 0 - assert not proxy_driver.stopped.is_set() - assert len(proxy_driver.updates) == 0 - finally: - executor.shutdown(proxy_driver) - - -def test_gc_multiple_launchtasks(): - TASK2, TASK3 = "task2", "task3" - proxy_driver = ProxyDriver() - with temporary_dir() as td: - executor = build_blocking_gc_executor(td, proxy_driver) - - try: - executor.launchTask(proxy_driver, serialize_art(AdjustRetainedTasks())) - thread_yield() - executor.launchTask(proxy_driver, serialize_art(AdjustRetainedTasks(), task_id=TASK2)) - thread_yield() - assert len(executor._gc_task_queue) == 1 - executor.launchTask(proxy_driver, serialize_art(AdjustRetainedTasks(), task_id=TASK3)) - thread_yield() - assert len(executor._gc_task_queue) == 1 - assert not proxy_driver.stopped.is_set() - assert len(proxy_driver.updates) >= 1 - assert StatusUpdate(mesos_pb2.TASK_FINISHED, TASK2) in proxy_driver.updates - finally: - executor.shutdown(proxy_driver) - - -def test_gc_shutdown(): - proxy_driver = ProxyDriver() - with temporary_dir() as td: - executor = ThinTestThermosGCExecutor(td) - executor.registered(proxy_driver, None, None, None) - executor.start() - executor.shutdown(proxy_driver) - executor._stop_event.wait(timeout=EVENT_WAIT_TIMEOUT_SECS) - assert executor._stop_event.is_set() - proxy_driver.stopped.wait(timeout=EVENT_WAIT_TIMEOUT_SECS) - assert proxy_driver.stopped.is_set() - assert len(proxy_driver.updates) == 0 - - -def test_ignores_launch_task_when_shutting_down(): - """Newly launched tasks should be rejected if shutdown was already called.""" - TASK_ID = "task" - proxy_driver = ProxyDriver() - with temporary_dir() as td: - executor = build_blocking_gc_executor(td, proxy_driver) - executor.shutdown(proxy_driver) - executor.launchTask(proxy_driver, serialize_art(AdjustRetainedTasks(), task_id=TASK_ID)) - assert (mesos_pb2.TASK_FAILED, TASK_ID) == proxy_driver.updates[-1] - - -def make_gc_executor_with_timeouts(maximum_executor_lifetime=Amount(1, Time.DAYS)): - class TimeoutGCExecutor(ThinTestThermosGCExecutor): - MAXIMUM_EXECUTOR_LIFETIME = maximum_executor_lifetime - return TimeoutGCExecutor - - [email protected] -def run_gc_with_timeout(**kw): - proxy_driver = ProxyDriver() - with temporary_dir() as td: - executor_class = make_gc_executor_with_timeouts(**kw) - executor = executor_class(td) - executor.registered(proxy_driver, None, None, None) - executor.start() - yield (proxy_driver, executor) - executor.shutdown(proxy_driver) - - [email protected]('True', reason='Flaky test (AURORA-1162)') -def test_gc_lifetime(): - with run_gc_with_timeout(maximum_executor_lifetime=Amount(500, Time.MILLISECONDS)) as ( - proxy_driver, executor): - executor._clock.tick(1) - proxy_driver.stopped.wait(timeout=EVENT_WAIT_TIMEOUT_SECS) - assert proxy_driver.stopped.is_set() - assert not executor._stop_event.is_set() - - -DIRECTORY_SANDBOX = 'apache.aurora.executor.gc_executor.DirectorySandbox' - - -class TestRealGC(unittest.TestCase): - """ - Test functions against the actual garbage_collect() functionality of the GC executor - """ - - def setUp(self): - self.HELLO_WORLD = SimpleTask(name="foo", command="echo hello world") - - def setup_task(self, task, root, finished=False, corrupt=False): - """Set up the checkpoint stream for the given task in the given checkpoint root, optionally - finished and/or with a corrupt stream""" - class FastTaskRunner(TaskRunner): - COORDINATOR_INTERVAL_SLEEP = Amount(1, Time.MILLISECONDS) - tr = FastTaskRunner( - task=task, - checkpoint_root=root, - sandbox=os.path.join(root, 'sandbox', task.name().get()), - clock=ThreadedClock(time.time())) - with tr.control(): - # initialize checkpoint stream - pass - if finished: - tr.kill() - if corrupt: - ckpt_file = TaskPath(root=root, tr=tr.task_id).getpath('runner_checkpoint') - with open(ckpt_file, 'w') as f: - f.write("definitely not a valid checkpoint stream") - return tr.task_id - - def run_gc(self, root, task_id, retain=False): - """Run the garbage collection process against the given task_id in the given checkpoint root""" - class FakeTaskKiller(object): - def __init__(self, task_id, checkpoint_root): - pass - - def kill(self): - pass - - def lose(self): - pass - - class FakeTaskGarbageCollector(object): - def __init__(self, root, task_id): - pass - - def erase_logs(self): - pass - - def erase_metadata(self): - pass - - class FastThermosGCExecutor(ThermosGCExecutor): - POLL_WAIT = Amount(1, Time.MILLISECONDS) - - detector = functools.partial(FakeExecutorDetector, task_id) if retain else FakeExecutorDetector - executor = FastThermosGCExecutor( - path_detector=FixedPathDetector(root), - task_killer=FakeTaskKiller, - executor_detector=detector, - task_garbage_collector=FakeTaskGarbageCollector, - clock=ThreadedClock(time.time())) - return executor.garbage_collect() - - def test_active_task_no_runners(self): - # TODO(jon): implement - pass - - def test_active_task_running(self): - # TODO(jon): implement - pass - - def test_finished_task_corrupt(self): - # TODO(jon): implement - pass - - def test_gc_task_no_sandbox(self): - with mock.patch(DIRECTORY_SANDBOX) as directory_mock: - directory_sandbox = directory_mock.return_value - with temporary_dir() as root: - task_id = self.setup_task(self.HELLO_WORLD, root, finished=True) - gcs = self.run_gc(root, task_id) - directory_sandbox.exists.assert_called_with() - assert len(gcs) == 1 - - def test_gc_task_directory_sandbox(self): - with mock.patch(DIRECTORY_SANDBOX) as directory_mock: - directory_sandbox = directory_mock.return_value - directory_sandbox.exists.return_value = True - with temporary_dir() as root: - task_id = self.setup_task(self.HELLO_WORLD, root, finished=True) - gcs = self.run_gc(root, task_id) - directory_sandbox.exists.assert_called_with() - directory_sandbox.destroy.assert_called_with() - assert len(gcs) == 1 - - def test_gc_ignore_retained_task(self): - with temporary_dir() as root: - task_id = self.setup_task(self.HELLO_WORLD, root, finished=True) - gcs = self.run_gc(root, task_id, retain=True) - assert len(gcs) == 0 http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/test/python/apache/thermos/cli/commands/test_import.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/thermos/cli/commands/test_import.py b/src/test/python/apache/thermos/cli/commands/test_import.py index 74d9a32..6f99f47 100644 --- a/src/test/python/apache/thermos/cli/commands/test_import.py +++ b/src/test/python/apache/thermos/cli/commands/test_import.py @@ -14,7 +14,6 @@ def test_imports(): - from apache.thermos.cli.commands import gc as gc_command from apache.thermos.cli.commands import help as help_command from apache.thermos.cli.commands import inspect as inspect_command from apache.thermos.cli.commands import kill as kill_command @@ -25,7 +24,6 @@ def test_imports(): from apache.thermos.cli.commands import tail as tail_command for command in ( - gc_command, help_command, inspect_command, kill_command, http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/test/python/apache/thermos/monitoring/BUILD ---------------------------------------------------------------------- diff --git a/src/test/python/apache/thermos/monitoring/BUILD b/src/test/python/apache/thermos/monitoring/BUILD index 89030d0..f4ad7fc 100644 --- a/src/test/python/apache/thermos/monitoring/BUILD +++ b/src/test/python/apache/thermos/monitoring/BUILD @@ -16,7 +16,6 @@ python_test_suite(name = 'all', dependencies = [ ':test_disk', ':test_detector', - ':test_garbage', ':test_resource', ] ) @@ -39,19 +38,6 @@ python_tests(name = 'test_detector', ] ) -python_tests(name = 'test_garbage', - sources = ['test_garbage.py'], - dependencies = [ - '3rdparty/python:mock', - '3rdparty/python:twitter.common.contextutil', - '3rdparty/python:twitter.common.dirutil', - 'api/src/main/thrift/org/apache/thermos:py-thrift', - 'src/main/python/apache/thermos/common:path', - 'src/main/python/apache/thermos/core:helper', - 'src/main/python/apache/thermos/monitoring:garbage', - ] -) - python_tests(name = 'test_resource', sources = ['test_resource.py'], dependencies = [ http://git-wip-us.apache.org/repos/asf/aurora/blob/56bb1e69/src/test/python/apache/thermos/monitoring/test_garbage.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/thermos/monitoring/test_garbage.py b/src/test/python/apache/thermos/monitoring/test_garbage.py deleted file mode 100644 index 4309c46..0000000 --- a/src/test/python/apache/thermos/monitoring/test_garbage.py +++ /dev/null @@ -1,90 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import os - -from mock import call, patch -from twitter.common.contextutil import temporary_dir -from twitter.common.dirutil import safe_mkdir, touch - -from apache.thermos.common.path import TaskPath -from apache.thermos.core.helper import TaskRunnerHelper -from apache.thermos.monitoring.garbage import TaskGarbageCollector - -from gen.apache.thermos.ttypes import RunnerCkpt, RunnerHeader - - -def test_empty_garbage_collector(): - with temporary_dir() as checkpoint_root: - path = TaskPath(root=checkpoint_root, task_id='does_not_exist') - gc = TaskGarbageCollector(checkpoint_root, 'does_not_exist') - - assert gc.get_age() == 0 - - # assume runner, finished task - assert set(gc.get_metadata(with_size=False)) == set([ - path.getpath('runner_checkpoint'), - path.given(state='finished').getpath('task_path'), - ]) - - assert list(gc.get_logs()) == [] - assert list(gc.get_data()) == [] - - -@patch('apache.thermos.monitoring.garbage.safe_delete') -@patch('apache.thermos.monitoring.garbage.safe_rmtree') -def test_garbage_collector(safe_rmtree, safe_delete): - with temporary_dir() as sandbox, temporary_dir() as checkpoint_root, temporary_dir() as log_dir: - - path = TaskPath(root=checkpoint_root, task_id='test', log_dir=log_dir) - - touch(os.path.join(sandbox, 'test_file1')) - touch(os.path.join(sandbox, 'test_file2')) - safe_mkdir(os.path.dirname(path.given(state='finished').getpath('task_path'))) - safe_mkdir(os.path.dirname(path.getpath('runner_checkpoint'))) - touch(path.given(state='finished').getpath('task_path')) - - header = RunnerHeader(task_id='test', sandbox=sandbox, log_dir=log_dir) - ckpt = TaskRunnerHelper.open_checkpoint(path.getpath('runner_checkpoint')) - ckpt.write(RunnerCkpt(runner_header=header)) - ckpt.close() - - gc = TaskGarbageCollector(checkpoint_root, task_id='test') - assert gc._state.header.log_dir == log_dir - assert gc._state.header.sandbox == sandbox - - # erase metadata - gc.erase_metadata() - safe_delete.assert_has_calls([ - call(path.given(state='finished').getpath('task_path')), - call(path.getpath('runner_checkpoint'))], any_order=True) - safe_rmtree.assert_has_calls([call(path.getpath('checkpoint_path'))]) - - safe_delete.reset_mock() - safe_rmtree.reset_mock() - - # erase logs - gc.erase_logs() - safe_rmtree.assert_has_calls([call(log_dir)]) - - safe_delete.reset_mock() - safe_rmtree.reset_mock() - - # erase sandbox - gc.erase_data() - - safe_delete.assert_has_calls([ - call(os.path.join(sandbox, 'test_file1')), - call(os.path.join(sandbox, 'test_file2'))], any_order=True) - safe_rmtree.assert_has_calls([call(sandbox)])
