This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch scheduler in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 9054094b1e362a4f159b213bc5d5aab74becec32 Author: Eamon Ford <[email protected]> AuthorDate: Tue Jun 9 17:02:55 2020 -0700 allow multiple collections to use the same directory --- .../services/CollectionProcessor.py | 10 ++-- .../services/CollectionWatcher.py | 59 ++++++++++------------ .../history_manager/FileIngestionHistory.py | 5 +- 3 files changed, 33 insertions(+), 41 deletions(-) diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py index 40da416..75a86e2 100644 --- a/collection_manager/collection_manager/services/CollectionProcessor.py +++ b/collection_manager/collection_manager/services/CollectionProcessor.py @@ -48,14 +48,16 @@ class CollectionProcessor: granule_status = history_manager.get_granule_status(granule, collection.date_from, collection.date_to) if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING: - logger.info(f"New granule '{granule}' detected for forward-processing ingestion.") + logger.info(f"New granule '{granule}' detected for forward-processing ingestion " + f"in collection '{collection.dataset_id}'.") use_priority = collection.forward_processing_priority elif granule_status is GranuleStatus.DESIRED_HISTORICAL: - logger.info(f"New granule '{granule}' detected for historical ingestion.") + logger.info(f"New granule '{granule}' detected for historical ingestion in collection " + f"'{collection.dataset_id}'.") use_priority = collection.historical_priority else: - logger.info(f"Granule '{granule}' detected but has already been ingested or is not in desired time range. " - f"Skipping.") + logger.info(f"Granule '{granule}' detected but has already been ingested or is not in desired " + f"time range for collection '{collection.dataset_id}'. Skipping.") return dataset_config = self._fill_template(collection, config_template=self._config_template) diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py index 912ddad..b1fca64 100644 --- a/collection_manager/collection_manager/services/CollectionWatcher.py +++ b/collection_manager/collection_manager/services/CollectionWatcher.py @@ -1,6 +1,7 @@ import logging import os -from typing import List, Dict, Callable +from collections import defaultdict +from typing import List, Dict, Callable, Set import yaml from watchdog.events import FileSystemEventHandler @@ -14,15 +15,16 @@ logger.setLevel(logging.DEBUG) class CollectionWatcher: - def __init__(self, collections_path: str, + def __init__(self, + collections_path: str, collection_updated_callback: Callable[[Collection], any], granule_updated_callback: Callable[[str, Collection], any]): self._collections_path = collections_path - self._collection_updated = collection_updated_callback - self._granule_updated = granule_updated_callback + self._collection_updated_callback = collection_updated_callback + self._granule_updated_callback = granule_updated_callback + + self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set) self._observer = Observer() - self._watches = {} - self._collections: Dict[str, Collection] = {} def start_watching(self): """ @@ -32,21 +34,21 @@ class CollectionWatcher: """ self._observer.schedule(_CollectionEventHandler(file_path=self._collections_path, callback=self._refresh), os.path.dirname(self._collections_path)) - self._refresh() self._observer.start() + self._refresh() def collections(self) -> List[Collection]: """ Return a list of all Collections being watched. :return: A list of Collections """ - return list(self._collections.values()) + return [collection for collections in self._collections_by_dir.values() for collection in collections] def _load_collections(self): try: with open(self._collections_path, 'r') as f: collections_yaml = yaml.load(f, Loader=yaml.FullLoader) - new_collections = {} + self._collections_by_dir.clear() for _, collection_dict in collections_yaml.items(): collection = Collection.from_dict(collection_dict) directory = collection.directory() @@ -55,25 +57,20 @@ class CollectionWatcher: f"which is the same directory as the collection configuration file, " f"{self._collections_path}. The granules need to be in their own directory. " f"Ignoring collection {collection.dataset_id} for now.") - if directory in new_collections: - logger.error(f"Ingestion order {collection.dataset_id} uses granule directory {directory} " - f"which conflicts with ingestion order {new_collections[directory].dataset_id}." - f" Ignoring {collection.dataset_id}.") else: - new_collections[directory] = collection + self._collections_by_dir[directory].add(collection) - self._collections = new_collections except FileNotFoundError: - logger.error(f"Collection configuration file not found at {self._collections}.") + logger.error(f"Collection configuration file not found at {self._collections_path}.") except yaml.scanner.ScannerError: logger.error(f"Bad YAML syntax in collection configuration file. Will attempt to reload collections " f"after the next configuration change.") def _refresh(self): for collection in self._get_updated_collections(): - self._collection_updated(collection) + self._collection_updated_callback(collection) - self._unschedule_watches() + self._observer.unschedule_all() self._schedule_watches() def _get_updated_collections(self) -> List[Collection]: @@ -81,17 +78,12 @@ class CollectionWatcher: self._load_collections() return list(set(self.collections()) - set(old_collections)) - def _unschedule_watches(self): - for directory, watch in self._watches.items(): - self._observer.unschedule(watch) - self._watches.clear() - def _schedule_watches(self): - for collection in self.collections(): - granule_event_handler = _GranuleEventHandler(self._granule_updated, collection) - directory = collection.directory() - if directory not in self._watches: - self._watches[directory] = self._observer.schedule(granule_event_handler, directory) + for directory, collections in self._collections_by_dir.items(): + granule_event_handler = _GranuleEventHandler(self._granule_updated_callback, collections) + # Note: the Watchdog library does not schedule a new watch + # if one is already scheduled for the same directory + self._observer.schedule(granule_event_handler, directory) class _CollectionEventHandler(FileSystemEventHandler): @@ -114,11 +106,12 @@ class _GranuleEventHandler(FileSystemEventHandler): EventHandler that watches for new or modified granule files. """ - def __init__(self, granule_updated: Callable[[str, Collection], any], collection: Collection): - self._granule_updated = granule_updated - self._collection = collection + def __init__(self, callback: Callable[[str, Collection], any], collections_for_dir: Set[Collection]): + self._callback = callback + self._collections_for_dir = collections_for_dir def on_created(self, event): super().on_created(event) - if self._collection.owns_file(event.src_path): - self._granule_updated(event.src_path, self._collection) + for collection in self._collections_for_dir: + if collection.owns_file(event.src_path): + self._callback(event.src_path, collection) diff --git a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py index 9fab784..0a92317 100644 --- a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py +++ b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py @@ -40,20 +40,17 @@ class FileIngestionHistory(IngestionHistory): self._latest_ingested_file_update_file_path = os.path.join(history_path, f'{dataset_id}.ts') if os.path.exists(self._latest_ingested_file_update_file_path): - logger.info(f"read latest ingested file update date from {self._latest_ingested_file_update_file_path}") with open(self._latest_ingested_file_update_file_path, 'r') as f_ts: self._latest_ingested_file_update = float(f_ts.readline()) def _load_history_dict(self): - logger.info(f"loading history file {self._history_file_path}") try: with open(self._history_file_path, 'r') as f_history: for line in f_history: filename, md5sum = line.strip().split(',') - logger.info(f"add to history file {filename} with md5sum {md5sum}") self._history_dict[filename] = md5sum except FileNotFoundError: - logger.info("no history file created yet") + logger.info("No history file created yet") def __del__(self): self._history_file.close()
