This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch collection-config-refresh in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 0c85575a24ec98b27a86fe42e5e3185202252f98 Author: Eamon Ford <[email protected]> AuthorDate: Mon Jul 6 12:20:43 2020 -0500 reload collections config on an interval --- collection_manager/collection_manager/main.py | 17 +++++--- .../services/CollectionWatcher.py | 50 +++++++++++----------- collection_manager/docker/entrypoint.sh | 1 + collection_manager/requirements.txt | 1 + 4 files changed, 38 insertions(+), 31 deletions(-) diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py index d8d2a5a..45d88fc 100644 --- a/collection_manager/collection_manager/main.py +++ b/collection_manager/collection_manager/main.py @@ -1,7 +1,7 @@ import argparse +import asyncio import logging import os -import time from collection_manager.services import CollectionProcessor, CollectionWatcher, MessagePublisher from collection_manager.services.history_manager import SolrIngestionHistoryBuilder, FileIngestionHistoryBuilder @@ -47,11 +47,15 @@ def get_args() -> argparse.Namespace: default="nexus", metavar="QUEUE", help='Name of the RabbitMQ queue to consume from. (Default: "nexus")') + parser.add_argument('--refresh', + default='30', + metavar="INTERVAL", + help='Number of seconds after which to reload the collections config file. (Default: 30)') return parser.parse_args() -def main(): +async def main(): try: options = get_args() @@ -68,13 +72,14 @@ def main(): history_manager_builder=history_manager_builder) collection_watcher = CollectionWatcher(collections_path=options.collections_path, collection_updated_callback=collection_processor.process_collection, - granule_updated_callback=collection_processor.process_granule) + granule_updated_callback=collection_processor.process_granule, + collections_refresh_interval=int(options.refresh)) - collection_watcher.start_watching() + await collection_watcher.start_watching() while True: try: - time.sleep(1) + await asyncio.sleep(1) except KeyboardInterrupt: return @@ -84,4 +89,4 @@ def main(): if __name__ == "__main__": - main() + asyncio.run(main()) diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py index a3c3bf7..7a73d9b 100644 --- a/collection_manager/collection_manager/services/CollectionWatcher.py +++ b/collection_manager/collection_manager/services/CollectionWatcher.py @@ -2,7 +2,8 @@ import logging import os from collections import defaultdict from typing import Dict, Callable, Set - +import schedule +import asyncio import yaml from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer @@ -21,30 +22,30 @@ class CollectionWatcher: def __init__(self, collections_path: str, collection_updated_callback: Callable[[Collection], any], - granule_updated_callback: Callable[[str, Collection], any]): + granule_updated_callback: Callable[[str, Collection], any], + collections_refresh_interval: int = 30): if not os.path.isabs(collections_path): raise RelativePathError("Collections config path must be an absolute path.") self._collections_path = collections_path self._collection_updated_callback = collection_updated_callback self._granule_updated_callback = granule_updated_callback + self._collections_refresh_interval = collections_refresh_interval self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set) self._observer = Observer() self._granule_watches = set() - def start_watching(self): + async def start_watching(self): """ Start observing filesystem events for added/modified granules or changes to the Collections configuration file. When an event occurs, call the appropriate callback that was passed in during instantiation. :return: None """ - self._observer.schedule( - _CollectionEventHandler(file_path=self._collections_path, callback=self._reload_and_reschedule), - os.path.dirname(self._collections_path)) + + asyncio.create_task(run_periodically(self._collections_refresh_interval, self._reload_and_reschedule)) self._observer.start() - self._reload_and_reschedule() def collections(self) -> Set[Collection]: """ @@ -98,10 +99,12 @@ class CollectionWatcher: def _reload_and_reschedule(self): try: - for collection in self._get_updated_collections(): + updated_collections = self._get_updated_collections() + for collection in updated_collections: self._collection_updated_callback(collection) - self._unschedule_watches() - self._schedule_watches() + if len(updated_collections) > 0: + self._unschedule_watches() + self._schedule_watches() except CollectionConfigParsingError as e: logger.error(e) @@ -122,21 +125,6 @@ class CollectionWatcher: logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.") -class _CollectionEventHandler(FileSystemEventHandler): - """ - EventHandler that watches for changes to the Collections config file. - """ - - def __init__(self, file_path: str, callback: Callable[[], any]): - self._callback = callback - self._file_path = file_path - - def on_modified(self, event): - super().on_modified(event) - if event.src_path == self._file_path: - self._callback() - - class _GranuleEventHandler(FileSystemEventHandler): """ EventHandler that watches for new or modified granule files. @@ -160,3 +148,15 @@ class _GranuleEventHandler(FileSystemEventHandler): for collection in self._collections_for_dir: if collection.owns_file(event.src_path): self._callback(event.src_path, collection) + + +async def run_periodically(wait_time: int, func: Callable, *args): + """ + Wraps a function in a coroutine that will run the given function indefinitely + :param wait_time: seconds to wait between iterations of func + :param func: the function that will be run + :param args: any args that need to be provided to func + """ + while True: + func(*args) + await asyncio.sleep(wait_time) diff --git a/collection_manager/docker/entrypoint.sh b/collection_manager/docker/entrypoint.sh index eb88f75..988dd2c 100644 --- a/collection_manager/docker/entrypoint.sh +++ b/collection_manager/docker/entrypoint.sh @@ -8,3 +8,4 @@ python /collection_manager/collection_manager/main.py \ $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq-queue=$RABBITMQ_QUEUE) \ $([[ ! -z "$HISTORY_URL" ]] && echo --history-url=$HISTORY_URL) \ $([[ ! -z "$HISTORY_PATH" ]] && echo --history-path=$HISTORY_PATH) + $([[ ! -z "$REFRESH" ]] && echo --refresh=$REFRESH) diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt index f16bde3..27501c9 100644 --- a/collection_manager/requirements.txt +++ b/collection_manager/requirements.txt @@ -4,3 +4,4 @@ pysolr==3.8.1 pika==1.1.0 watchdog==0.10.2 requests==2.23.0 +schedule==0.6.0 \ No newline at end of file
