Repository: aurora Updated Branches: refs/heads/master ea2c9ad24 -> 8def0a90d
Making observer polling interval configurable. Bugs closed: AURORA-1351 Reviewed at https://reviews.apache.org/r/35527/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/8def0a90 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/8def0a90 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/8def0a90 Branch: refs/heads/master Commit: 8def0a90d9f402eca0a25e51f1bc1022fbce7bfc Parents: ea2c9ad Author: Maxim Khutornenko <[email protected]> Authored: Tue Jun 16 16:09:10 2015 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Tue Jun 16 16:09:10 2015 -0700 ---------------------------------------------------------------------- src/main/python/apache/aurora/tools/__init__.py | 15 +++++++ .../apache/aurora/tools/thermos_observer.py | 18 +++++++- .../apache/thermos/observer/task_observer.py | 9 ++-- src/test/python/apache/aurora/BUILD | 1 + src/test/python/apache/aurora/tools/BUILD | 27 ++++++++++++ .../tools/test_thermos_observer_entry_point.py | 40 +++++++++++++++++ .../thermos/observer/test_task_observer.py | 45 ++++++++++++++++++++ 7 files changed, 150 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/8def0a90/src/main/python/apache/aurora/tools/__init__.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/tools/__init__.py b/src/main/python/apache/aurora/tools/__init__.py new file mode 100644 index 0000000..e2f963e --- /dev/null +++ b/src/main/python/apache/aurora/tools/__init__.py @@ -0,0 +1,15 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +__import__('pkg_resources').declare_namespace(__name__) http://git-wip-us.apache.org/repos/asf/aurora/blob/8def0a90/src/main/python/apache/aurora/tools/thermos_observer.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/tools/thermos_observer.py b/src/main/python/apache/aurora/tools/thermos_observer.py index 4b534d3..d88c03f 100644 --- a/src/main/python/apache/aurora/tools/thermos_observer.py +++ b/src/main/python/apache/aurora/tools/thermos_observer.py @@ -19,6 +19,7 @@ import time from twitter.common import app from twitter.common.exceptions import ExceptionalThread from twitter.common.log.options import LogOptions +from twitter.common.quantity import Amount, Time from apache.aurora.executor.common.path_detector import MesosPathDetector from apache.thermos.common.constants import DEFAULT_CHECKPOINT_ROOT @@ -50,18 +51,31 @@ app.add_option( help='The port on which the observer should listen.') +app.add_option( + '--polling_interval_secs', + dest='polling_interval_secs', + type='int', + default=TaskObserver.POLLING_INTERVAL.as_(Time.SECONDS), + help='The number of seconds between observer refresh attempts.') + + # Allow an interruptible sleep so that ^C works. def sleep_forever(): while True: time.sleep(1) -def main(_, options): +def initialize(options): path_detector = ChainedPathDetector( FixedPathDetector(options.root), MesosPathDetector(options.mesos_root), ) - observer = TaskObserver(path_detector) + polling_interval = Amount(options.polling_interval_secs, Time.SECONDS) + return TaskObserver(path_detector, interval=polling_interval) + + +def main(_, options): + observer = initialize(options) observer.start() root_server = configure_server(observer) http://git-wip-us.apache.org/repos/asf/aurora/blob/8def0a90/src/main/python/apache/thermos/observer/task_observer.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/observer/task_observer.py b/src/main/python/apache/thermos/observer/task_observer.py index b9a37de..1485de8 100644 --- a/src/main/python/apache/thermos/observer/task_observer.py +++ b/src/main/python/apache/thermos/observer/task_observer.py @@ -21,7 +21,6 @@ polls a designated Thermos checkpoint root and collates information about all ta """ import os import threading -import time from operator import attrgetter from twitter.common import log @@ -54,7 +53,10 @@ class TaskObserver(ExceptionalThread, Lockable): POLLING_INTERVAL = Amount(5, Time.SECONDS) - def __init__(self, path_detector, resource_monitor_class=TaskResourceMonitor): + def __init__(self, + path_detector, + resource_monitor_class=TaskResourceMonitor, + interval=POLLING_INTERVAL): self._detector = ObserverTaskDetector( path_detector, self.__on_active, @@ -63,6 +65,7 @@ class TaskObserver(ExceptionalThread, Lockable): if not issubclass(resource_monitor_class, ResourceMonitorBase): raise ValueError("resource monitor class must implement ResourceMonitorBase!") self._resource_monitor_class = resource_monitor_class + self._interval = interval self._active_tasks = {} # task_id => ActiveObservedTask self._finished_tasks = {} # task_id => FinishedObservedTask self._stop_event = threading.Event() @@ -127,7 +130,7 @@ class TaskObserver(ExceptionalThread, Lockable): finished state. """ while not self._stop_event.is_set(): - time.sleep(self.POLLING_INTERVAL.as_(Time.SECONDS)) + self._stop_event.wait(self._interval.as_(Time.SECONDS)) with self.lock: self._detector.refresh() http://git-wip-us.apache.org/repos/asf/aurora/blob/8def0a90/src/test/python/apache/aurora/BUILD ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/BUILD b/src/test/python/apache/aurora/BUILD index 0701440..c2251ce 100644 --- a/src/test/python/apache/aurora/BUILD +++ b/src/test/python/apache/aurora/BUILD @@ -20,6 +20,7 @@ python_test_suite( 'src/test/python/apache/aurora/common:all', 'src/test/python/apache/aurora/config:all', 'src/test/python/apache/aurora/executor:all', + 'src/test/python/apache/aurora/tools:all', ] ) http://git-wip-us.apache.org/repos/asf/aurora/blob/8def0a90/src/test/python/apache/aurora/tools/BUILD ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/tools/BUILD b/src/test/python/apache/aurora/tools/BUILD new file mode 100644 index 0000000..e676aff --- /dev/null +++ b/src/test/python/apache/aurora/tools/BUILD @@ -0,0 +1,27 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +python_test_suite(name = 'all', + dependencies = [ + ':thermos_observer_entry_point', + ] +) + +python_tests(name = 'thermos_observer_entry_point', + sources = ['test_thermos_observer_entry_point.py'], + dependencies = [ + '3rdparty/python:mock', + 'src/main/python/apache/aurora/tools:thermos_observer', + ] +) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/aurora/blob/8def0a90/src/test/python/apache/aurora/tools/test_thermos_observer_entry_point.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/tools/test_thermos_observer_entry_point.py b/src/test/python/apache/aurora/tools/test_thermos_observer_entry_point.py new file mode 100644 index 0000000..e485b81 --- /dev/null +++ b/src/test/python/apache/aurora/tools/test_thermos_observer_entry_point.py @@ -0,0 +1,40 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import os +import unittest + +from mock import create_autospec, Mock, patch +from twitter.common.quantity import Amount, Time + +from apache.aurora.tools.thermos_observer import initialize +from apache.thermos.observer.task_observer import TaskObserver + + +class ThermosObserverMainTest(unittest.TestCase): + def test_initialize(self): + expected_interval = Amount(15, Time.SECONDS) + mock_options = Mock(spec_set=['root', 'mesos_root', 'polling_interval_secs']) + mock_options.root = '' + mock_options.mesos_root = os.path.abspath('.') + mock_options.polling_interval_secs = int(expected_interval.as_(Time.SECONDS)) + mock_task_observer = create_autospec(spec=TaskObserver) + with patch( + 'apache.aurora.tools.thermos_observer.TaskObserver', + return_value=mock_task_observer) as mock_observer: + + initialize(mock_options) + + assert len(mock_observer.mock_calls) == 1 + args = mock_observer.mock_calls[0][2] + assert expected_interval == args['interval'] http://git-wip-us.apache.org/repos/asf/aurora/blob/8def0a90/src/test/python/apache/thermos/observer/test_task_observer.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/thermos/observer/test_task_observer.py b/src/test/python/apache/thermos/observer/test_task_observer.py new file mode 100644 index 0000000..ace15c5 --- /dev/null +++ b/src/test/python/apache/thermos/observer/test_task_observer.py @@ -0,0 +1,45 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +from mock import create_autospec, patch +from twitter.common.quantity import Amount, Time + +from apache.thermos.observer.detector import ObserverTaskDetector +from apache.thermos.observer.task_observer import TaskObserver + + +class TaskObserverTest(unittest.TestCase): + def test_run_loop(self): + """Test observer run loop.""" + mock_task_detector = create_autospec(spec=ObserverTaskDetector) + with patch( + "apache.thermos.observer.task_observer.ObserverTaskDetector", + return_value=mock_task_detector) as mock_detector: + with patch('threading._Event.wait') as mock_wait: + + run_count = 3 + interval = 15 + observer = TaskObserver(mock_detector, interval=Amount(interval, Time.SECONDS)) + observer.start() + while len(mock_wait.mock_calls) < run_count: + pass + + observer.stop() + + assert len(mock_task_detector.mock_calls) >= run_count + assert len(mock_wait.mock_calls) >= run_count + args = mock_wait.mock_calls[1][1] + assert interval == args[0]
