This is an automated email from the ASF dual-hosted git repository. tloubrieu pushed a commit to branch ascending_latitudes in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 242de669a25e90fe47e80e9e8152b8f81ecdd62b Author: Eamon Ford <[email protected]> AuthorDate: Thu Jul 9 10:26:22 2020 -0500 SDAP-259: The Collection Manager now reloads the Collections Config file on an interval instead of watching for filesystem events (#5) Co-authored-by: Eamon Ford <[email protected]> --- collection_manager/collection_manager/main.py | 15 ++++-- .../services/CollectionWatcher.py | 56 ++++++++++++---------- collection_manager/docker/entrypoint.sh | 1 + .../tests/services/test_CollectionWatcher.py | 44 ++++++++++++----- 4 files changed, 73 insertions(+), 43 deletions(-) diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py index d8d2a5a..7e72de5 100644 --- a/collection_manager/collection_manager/main.py +++ b/collection_manager/collection_manager/main.py @@ -1,7 +1,7 @@ import argparse +import asyncio import logging import os -import time from collection_manager.services import CollectionProcessor, CollectionWatcher, MessagePublisher from collection_manager.services.history_manager import SolrIngestionHistoryBuilder, FileIngestionHistoryBuilder @@ -47,11 +47,15 @@ def get_args() -> argparse.Namespace: default="nexus", metavar="QUEUE", help='Name of the RabbitMQ queue to consume from. (Default: "nexus")') + parser.add_argument('--refresh', + default='30', + metavar="INTERVAL", + help='Number of seconds after which to reload the collections config file. (Default: 30)') return parser.parse_args() -def main(): +async def main(): try: options = get_args() @@ -68,13 +72,14 @@ def main(): history_manager_builder=history_manager_builder) collection_watcher = CollectionWatcher(collections_path=options.collections_path, collection_updated_callback=collection_processor.process_collection, - granule_updated_callback=collection_processor.process_granule) + granule_updated_callback=collection_processor.process_granule, + collections_refresh_interval=int(options.refresh)) collection_watcher.start_watching() while True: try: - time.sleep(1) + await asyncio.sleep(1) except KeyboardInterrupt: return @@ -84,4 +89,4 @@ def main(): if __name__ == "__main__": - main() + asyncio.run(main()) diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py index a3c3bf7..2387016 100644 --- a/collection_manager/collection_manager/services/CollectionWatcher.py +++ b/collection_manager/collection_manager/services/CollectionWatcher.py @@ -1,7 +1,9 @@ +import asyncio import logging import os from collections import defaultdict -from typing import Dict, Callable, Set +from functools import partial +from typing import Dict, Callable, Set, Optional import yaml from watchdog.events import FileSystemEventHandler @@ -21,30 +23,31 @@ class CollectionWatcher: def __init__(self, collections_path: str, collection_updated_callback: Callable[[Collection], any], - granule_updated_callback: Callable[[str, Collection], any]): + granule_updated_callback: Callable[[str, Collection], any], + collections_refresh_interval: float = 30): if not os.path.isabs(collections_path): raise RelativePathError("Collections config path must be an absolute path.") self._collections_path = collections_path self._collection_updated_callback = collection_updated_callback self._granule_updated_callback = granule_updated_callback + self._collections_refresh_interval = collections_refresh_interval self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set) self._observer = Observer() self._granule_watches = set() - def start_watching(self): + def start_watching(self, loop: Optional[asyncio.AbstractEventLoop] = None): """ - Start observing filesystem events for added/modified granules or changes to the Collections configuration file. - When an event occurs, call the appropriate callback that was passed in during instantiation. + Periodically load the Collections Configuration file to check for changes, + and observe filesystem events for added/modified granules. When an event occurs, + call the appropriate callback that was passed in during instantiation. :return: None """ - self._observer.schedule( - _CollectionEventHandler(file_path=self._collections_path, callback=self._reload_and_reschedule), - os.path.dirname(self._collections_path)) + + self._run_periodically(loop, self._collections_refresh_interval, self._reload_and_reschedule) self._observer.start() - self._reload_and_reschedule() def collections(self) -> Set[Collection]: """ @@ -98,10 +101,12 @@ class CollectionWatcher: def _reload_and_reschedule(self): try: - for collection in self._get_updated_collections(): + updated_collections = self._get_updated_collections() + for collection in updated_collections: self._collection_updated_callback(collection) - self._unschedule_watches() - self._schedule_watches() + if len(updated_collections) > 0: + self._unschedule_watches() + self._schedule_watches() except CollectionConfigParsingError as e: logger.error(e) @@ -121,20 +126,19 @@ class CollectionWatcher: bad_collection_names = ' and '.join([col.dataset_id for col in collections]) logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.") - -class _CollectionEventHandler(FileSystemEventHandler): - """ - EventHandler that watches for changes to the Collections config file. - """ - - def __init__(self, file_path: str, callback: Callable[[], any]): - self._callback = callback - self._file_path = file_path - - def on_modified(self, event): - super().on_modified(event) - if event.src_path == self._file_path: - self._callback() + @classmethod + def _run_periodically(cls, loop: Optional[asyncio.AbstractEventLoop], wait_time: float, func: Callable, *args): + """ + Call a function periodically. This uses asyncio, and is non-blocking. + :param loop: An optional event loop to use. If None, the current running event loop will be used. + :param wait_time: seconds to wait between iterations of func + :param func: the function that will be run + :param args: any args that need to be provided to func + """ + if loop is None: + loop = asyncio.get_running_loop() + func(*args) + loop.call_later(wait_time, partial(cls._run_periodically, loop, wait_time, func)) class _GranuleEventHandler(FileSystemEventHandler): diff --git a/collection_manager/docker/entrypoint.sh b/collection_manager/docker/entrypoint.sh index eb88f75..988dd2c 100644 --- a/collection_manager/docker/entrypoint.sh +++ b/collection_manager/docker/entrypoint.sh @@ -8,3 +8,4 @@ python /collection_manager/collection_manager/main.py \ $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq-queue=$RABBITMQ_QUEUE) \ $([[ ! -z "$HISTORY_URL" ]] && echo --history-url=$HISTORY_URL) \ $([[ ! -z "$HISTORY_PATH" ]] && echo --history-path=$HISTORY_PATH) + $([[ ! -z "$REFRESH" ]] && echo --refresh=$REFRESH) diff --git a/collection_manager/tests/services/test_CollectionWatcher.py b/collection_manager/tests/services/test_CollectionWatcher.py index 8c6ab5f..b954812 100644 --- a/collection_manager/tests/services/test_CollectionWatcher.py +++ b/collection_manager/tests/services/test_CollectionWatcher.py @@ -1,6 +1,6 @@ +import asyncio import os import tempfile -import time import unittest from datetime import datetime from unittest.mock import Mock @@ -132,8 +132,13 @@ class TestCollectionWatcher(unittest.TestCase): collections_config.write(collections_str.encode("utf-8")) collection_callback = Mock() - collection_watcher = CollectionWatcher(collections_config.name, collection_callback, Mock()) - collection_watcher.start_watching() + collection_watcher = CollectionWatcher(collections_path=collections_config.name, + collection_updated_callback=collection_callback, + granule_updated_callback=Mock(), + collections_refresh_interval=0.1) + + loop = asyncio.new_event_loop() + collection_watcher.start_watching(loop) collections_str = f""" - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND @@ -143,9 +148,11 @@ class TestCollectionWatcher(unittest.TestCase): forward-processing-priority: 5 """ collections_config.write(collections_str.encode("utf-8")) - collections_config.close() - self.assert_called_within_timeout(collection_callback, timeout_sec=1, call_count=2) + loop.run_until_complete(self.assert_called_within_timeout(collection_callback, call_count=2)) + + loop.close() + collections_config.close() granule_dir.cleanup() os.remove(collections_config.name) @@ -164,12 +171,15 @@ collections: granule_callback = Mock() collection_watcher = CollectionWatcher(collections_config.name, Mock(), granule_callback) - collection_watcher.start_watching() + + loop = asyncio.new_event_loop() + collection_watcher.start_watching(loop) new_granule = open(os.path.join(granule_dir.name, 'test.nc'), "w+") - self.assert_called_within_timeout(granule_callback) + loop.run_until_complete(self.assert_called_within_timeout(granule_callback)) + loop.close() new_granule.close() granule_dir.cleanup() @@ -189,21 +199,31 @@ collections: granule_callback = Mock() collection_watcher = CollectionWatcher(collections_config.name, Mock(), granule_callback) - collection_watcher.start_watching() + + loop = asyncio.new_event_loop() + collection_watcher.start_watching(loop) new_granule.write("hello world") new_granule.close() - self.assert_called_within_timeout(granule_callback) - + loop.run_until_complete(self.assert_called_within_timeout(granule_callback)) + loop.close() granule_dir.cleanup() + def test_run_periodically(self): + callback = Mock() + loop = asyncio.new_event_loop() + CollectionWatcher._run_periodically(loop, 0.1, callback) + loop.run_until_complete(self.assert_called_within_timeout(callback, timeout_sec=0.3, call_count=2)) + loop.close() + @staticmethod - def assert_called_within_timeout(mock_func, timeout_sec=1.0, call_count=1): + async def assert_called_within_timeout(mock_func, timeout_sec=1.0, call_count=1): start = datetime.now() while (datetime.now() - start).total_seconds() < timeout_sec: - time.sleep(0.01) + await asyncio.sleep(0.01) if mock_func.call_count >= call_count: return raise AssertionError(f"{mock_func} did not reach {call_count} calls called within {timeout_sec} sec") +
