This is an automated email from the ASF dual-hosted git repository. tloubrieu pushed a commit to branch ascending_latitudes in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 36b87d0be6d6184fac63ab3556445ebe3e6295c2 Author: Eamon Ford <[email protected]> AuthorDate: Thu Jun 18 15:53:34 2020 -0700 SDAP-234: Add more test coverage. (#1) Co-authored-by: Eamon Ford <[email protected]> --- .../collection_manager/entities/Collection.py | 40 ++-- .../collection_manager/entities/__init__.py | 3 +- .../entities/exceptions/Exceptions.py | 30 +++ .../entities/exceptions/__init__.py | 6 + collection_manager/collection_manager/main.py | 58 +++--- .../services/CollectionProcessor.py | 7 +- .../services/CollectionWatcher.py | 103 +++++++---- .../{history_manager => entities}/__init__.py | 0 .../tests/entities/test_Collection.py | 139 ++++++++++++++ .../test_datasetingestionhistoryfile.py | 64 ------- collection_manager/tests/resources/collections.yml | 17 ++ .../tests/resources/collections_alternate.yml | 17 ++ .../tests/resources/collections_bad.yml | 17 ++ .../tests/resources/data/collections.yml | 11 -- .../resources/data/dataset_config_file_ok.yml | 44 ----- .../{ => services}/history_manager/__init__.py | 0 .../history_manager/test_FileIngestionHistory.py | 56 ++++++ .../history_manager/test_SolrIngestionHistory.py} | 3 +- .../tests/services/test_CollectionProcessor.py | 122 ++++++++++++- .../tests/services/test_CollectionWatcher.py | 203 +++++++++++++++++++++ 20 files changed, 734 insertions(+), 206 deletions(-) diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py index d033c69..3976b6d 100644 --- a/collection_manager/collection_manager/entities/Collection.py +++ b/collection_manager/collection_manager/entities/Collection.py @@ -1,33 +1,39 @@ import os +from dataclasses import dataclass from datetime import datetime from fnmatch import fnmatch from glob import glob -from typing import NamedTuple, List +from typing import List, Optional +from collection_manager.entities.exceptions import MissingValueCollectionError -class Collection(NamedTuple): + +@dataclass(frozen=True) +class Collection: dataset_id: str variable: str path: str historical_priority: int - forward_processing_priority: int - date_from: datetime - date_to: datetime + forward_processing_priority: Optional[int] = None + date_from: Optional[datetime] = None + date_to: Optional[datetime] = None @staticmethod def from_dict(properties: dict): - date_to = datetime.fromisoformat(properties['to']) if 'to' in properties else None - date_from = datetime.fromisoformat(properties['from']) if 'from' in properties else None - - collection = Collection(dataset_id=properties['id'], - variable=properties['variable'], - path=properties['path'], - historical_priority=properties['priority'], - forward_processing_priority=properties.get('forward_processing_priority', - properties['priority']), - date_to=date_to, - date_from=date_from) - return collection + try: + date_to = datetime.fromisoformat(properties['to']) if 'to' in properties else None + date_from = datetime.fromisoformat(properties['from']) if 'from' in properties else None + + collection = Collection(dataset_id=properties['id'], + variable=properties['variable'], + path=properties['path'], + historical_priority=properties['priority'], + forward_processing_priority=properties.get('forward-processing-priority', None), + date_to=date_to, + date_from=date_from) + return collection + except KeyError as e: + raise MissingValueCollectionError(missing_value=e.args[0]) def directory(self): if os.path.isdir(self.path): diff --git a/collection_manager/collection_manager/entities/__init__.py b/collection_manager/collection_manager/entities/__init__.py index 9f30603..165341b 100644 --- a/collection_manager/collection_manager/entities/__init__.py +++ b/collection_manager/collection_manager/entities/__init__.py @@ -1,2 +1 @@ - -from .Collection import Collection \ No newline at end of file +from .Collection import Collection diff --git a/collection_manager/collection_manager/entities/exceptions/Exceptions.py b/collection_manager/collection_manager/entities/exceptions/Exceptions.py new file mode 100644 index 0000000..8e63d24 --- /dev/null +++ b/collection_manager/collection_manager/entities/exceptions/Exceptions.py @@ -0,0 +1,30 @@ +class RelativePathError(Exception): + pass + + +class YamlParsingError(Exception): + pass + + +class CollectionConfigFileNotFoundError(Exception): + pass + + +class CollectionError(Exception): + def __init__(self, collection=None, message=None): + super().__init__(message) + self.collection = collection + + +class MissingValueCollectionError(CollectionError): + def __init__(self, missing_value, collection=None, message=None): + super().__init__(collection, message) + self.missing_value = missing_value + + +class ConflictingPathCollectionError(CollectionError): + pass + + +class RelativePathCollectionError(CollectionError): + pass diff --git a/collection_manager/collection_manager/entities/exceptions/__init__.py b/collection_manager/collection_manager/entities/exceptions/__init__.py new file mode 100644 index 0000000..9a22c16 --- /dev/null +++ b/collection_manager/collection_manager/entities/exceptions/__init__.py @@ -0,0 +1,6 @@ +from .Exceptions import CollectionConfigFileNotFoundError +from .Exceptions import ConflictingPathCollectionError +from .Exceptions import MissingValueCollectionError +from .Exceptions import RelativePathCollectionError +from .Exceptions import RelativePathError +from .Exceptions import YamlParsingError diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py index fa4d7ce..a10446f 100644 --- a/collection_manager/collection_manager/main.py +++ b/collection_manager/collection_manager/main.py @@ -1,5 +1,6 @@ import argparse import logging +import os import time from collection_manager.services import CollectionProcessor, CollectionWatcher, MessagePublisher @@ -7,7 +8,13 @@ from collection_manager.services.history_manager import SolrIngestionHistoryBuil logging.basicConfig(level=logging.INFO) logging.getLogger("pika").setLevel(logging.WARNING) -logger = logging.getLogger(__name__) +logger = logging.getLogger("collection_manager") + + +def check_path(path) -> str: + if not os.path.isabs(path): + raise argparse.ArgumentError("Paths must be absolute.") + return path def get_args() -> argparse.Namespace: @@ -16,7 +23,7 @@ def get_args() -> argparse.Namespace: help="refresh interval in seconds to check for new or updated granules", default=300) parser.add_argument("--collections", - help="path to collections configuration file", + help="Absolute path to collections configuration file", required=True) parser.add_argument('--rabbitmq_host', default='localhost', @@ -36,32 +43,39 @@ def get_args() -> argparse.Namespace: help='Name of the RabbitMQ queue to consume from. (Default: "nexus")') history_group = parser.add_mutually_exclusive_group(required=True) history_group.add_argument("--history-path", - help="path to ingestion history local directory") + help="Absolute path to ingestion history local directory") history_group.add_argument("--history-url", - help="url to ingestion history solr database") + help="URL to ingestion history solr database") + return parser.parse_args() def main(): - options = get_args() - if options.history_path: - history_manager_builder = FileIngestionHistoryBuilder(history_path=options.history_path) - else: - history_manager_builder = SolrIngestionHistoryBuilder(solr_url=options.history_url) - publisher = MessagePublisher(host=options.rabbitmq_host, - username=options.rabbitmq_username, - password=options.rabbitmq_password, - queue=options.rabbitmq_queue) - publisher.connect() - collection_processor = CollectionProcessor(message_publisher=publisher, - history_manager_builder=history_manager_builder) - collection_watcher = CollectionWatcher(collections_path=options.collections, - collection_updated_callback=collection_processor.process_collection, - granule_updated_callback=collection_processor.process_granule) + try: + options = get_args() + + if options.history_path: + history_manager_builder = FileIngestionHistoryBuilder(history_path=options.history_path) + else: + history_manager_builder = SolrIngestionHistoryBuilder(solr_url=options.history_url) + publisher = MessagePublisher(host=options.rabbitmq_host, + username=options.rabbitmq_username, + password=options.rabbitmq_password, + queue=options.rabbitmq_queue) + publisher.connect() + collection_processor = CollectionProcessor(message_publisher=publisher, + history_manager_builder=history_manager_builder) + collection_watcher = CollectionWatcher(collections_path=options.collections, + collection_updated_callback=collection_processor.process_collection, + granule_updated_callback=collection_processor.process_granule) + + collection_watcher.start_watching() + while True: + time.sleep(1) - collection_watcher.start_watching() - while True: - time.sleep(1) + except Exception as e: + logger.error(e) + return if __name__ == "__main__": diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py index 75a86e2..a81390b 100644 --- a/collection_manager/collection_manager/services/CollectionProcessor.py +++ b/collection_manager/collection_manager/services/CollectionProcessor.py @@ -50,7 +50,10 @@ class CollectionProcessor: if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING: logger.info(f"New granule '{granule}' detected for forward-processing ingestion " f"in collection '{collection.dataset_id}'.") - use_priority = collection.forward_processing_priority + if collection.forward_processing_priority is not None: + use_priority = collection.forward_processing_priority + else: + use_priority = collection.historical_priority elif granule_status is GranuleStatus.DESIRED_HISTORICAL: logger.info(f"New granule '{granule}' detected for historical ingestion in collection " f"'{collection.dataset_id}'.") @@ -61,7 +64,7 @@ class CollectionProcessor: return dataset_config = self._fill_template(collection, config_template=self._config_template) - self._publisher.publish_message(dataset_config, use_priority) + self._publisher.publish_message(body=dataset_config, priority=use_priority) history_manager.push(granule) @staticmethod diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py index b1fca64..6bbe7d9 100644 --- a/collection_manager/collection_manager/services/CollectionWatcher.py +++ b/collection_manager/collection_manager/services/CollectionWatcher.py @@ -1,7 +1,7 @@ import logging import os from collections import defaultdict -from typing import List, Dict, Callable, Set +from typing import Dict, Callable, Set import yaml from watchdog.events import FileSystemEventHandler @@ -9,6 +9,9 @@ from watchdog.observers import Observer from yaml.scanner import ScannerError from collection_manager.entities import Collection +from collection_manager.entities.exceptions import RelativePathError, YamlParsingError, \ + CollectionConfigFileNotFoundError, MissingValueCollectionError, ConflictingPathCollectionError, \ + RelativePathCollectionError logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -19,71 +22,96 @@ class CollectionWatcher: collections_path: str, collection_updated_callback: Callable[[Collection], any], granule_updated_callback: Callable[[str, Collection], any]): + if not os.path.isabs(collections_path): + raise RelativePathError("Collections config path must be an absolute path.") + self._collections_path = collections_path self._collection_updated_callback = collection_updated_callback self._granule_updated_callback = granule_updated_callback - + self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set) self._observer = Observer() + self._granule_watches = set() + def start_watching(self): """ Start observing filesystem events for added/modified granules or changes to the Collections configuration file. When an event occurs, call the appropriate callback that was passed in during instantiation. :return: None """ - self._observer.schedule(_CollectionEventHandler(file_path=self._collections_path, callback=self._refresh), - os.path.dirname(self._collections_path)) + self._observer.schedule( + _CollectionEventHandler(file_path=self._collections_path, callback=self._reload_and_reschedule), + os.path.dirname(self._collections_path)) self._observer.start() - self._refresh() + self._reload_and_reschedule() - def collections(self) -> List[Collection]: + def collections(self) -> Set[Collection]: """ - Return a list of all Collections being watched. - :return: A list of Collections + Return a set of all Collections being watched. + :return: A set of Collections """ - return [collection for collections in self._collections_by_dir.values() for collection in collections] + return {collection for collections in self._collections_by_dir.values() for collection in collections} + + def _validate_collection(self, collection: Collection): + directory = collection.directory() + if not os.path.isabs(directory): + raise RelativePathCollectionError(collection=collection) + if directory == os.path.dirname(self._collections_path): + raise ConflictingPathCollectionError(collection=collection) def _load_collections(self): try: with open(self._collections_path, 'r') as f: collections_yaml = yaml.load(f, Loader=yaml.FullLoader) self._collections_by_dir.clear() - for _, collection_dict in collections_yaml.items(): - collection = Collection.from_dict(collection_dict) - directory = collection.directory() - if directory == os.path.dirname(self._collections_path): - logger.error(f"Collection {collection.dataset_id} uses granule directory {collection.path} " - f"which is the same directory as the collection configuration file, " - f"{self._collections_path}. The granules need to be in their own directory. " - f"Ignoring collection {collection.dataset_id} for now.") - else: - self._collections_by_dir[directory].add(collection) - + for collection_dict in collections_yaml['collections']: + try: + collection = Collection.from_dict(collection_dict) + self._validate_collection(collection) + self._collections_by_dir[collection.directory()].add(collection) + except MissingValueCollectionError as e: + logger.error(f"A collection is missing '{e.missing_value}'. Ignoring this collection for now.") + except RelativePathCollectionError as e: + logger.error(f"Relative paths are not allowed for the 'path' property of a collection. " + f"Ignoring collection '{e.collection.dataset_id}' until its path is fixed.") + except ConflictingPathCollectionError as e: + logger.error(f"Collection '{e.collection.dataset_id}' has granule path '{e.collection.path}' " + f"which uses same directory as the collection configuration file, " + f"'{self._collections_path}'. The granules need to be in their own directory. " + f"Ignoring collection '{e.collection.dataset_id}' for now.") except FileNotFoundError: - logger.error(f"Collection configuration file not found at {self._collections_path}.") + raise CollectionConfigFileNotFoundError("The collection config file could not be found at " + f"{self._collections_path}") except yaml.scanner.ScannerError: - logger.error(f"Bad YAML syntax in collection configuration file. Will attempt to reload collections " - f"after the next configuration change.") - - def _refresh(self): - for collection in self._get_updated_collections(): - self._collection_updated_callback(collection) + raise YamlParsingError("Bad YAML syntax in collection configuration file. Will attempt to reload " + "collections after the next configuration change.") - self._observer.unschedule_all() - self._schedule_watches() - - def _get_updated_collections(self) -> List[Collection]: + def _get_updated_collections(self) -> Set[Collection]: old_collections = self.collections() self._load_collections() - return list(set(self.collections()) - set(old_collections)) + return self.collections() - old_collections + + def _reload_and_reschedule(self): + try: + for collection in self._get_updated_collections(): + self._collection_updated_callback(collection) + self._unschedule_watches() + self._schedule_watches() + except YamlParsingError as e: + logger.error(e) + + def _unschedule_watches(self): + for watch in self._granule_watches: + self._observer.unschedule(watch) + self._granule_watches.clear() def _schedule_watches(self): for directory, collections in self._collections_by_dir.items(): granule_event_handler = _GranuleEventHandler(self._granule_updated_callback, collections) # Note: the Watchdog library does not schedule a new watch # if one is already scheduled for the same directory - self._observer.schedule(granule_event_handler, directory) + self._granule_watches.add(self._observer.schedule(granule_event_handler, directory)) class _CollectionEventHandler(FileSystemEventHandler): @@ -115,3 +143,12 @@ class _GranuleEventHandler(FileSystemEventHandler): for collection in self._collections_for_dir: if collection.owns_file(event.src_path): self._callback(event.src_path, collection) + + def on_modified(self, event): + super().on_modified(event) + if os.path.isdir(event.src_path): + return + + for collection in self._collections_for_dir: + if collection.owns_file(event.src_path): + self._callback(event.src_path, collection) diff --git a/collection_manager/tests/history_manager/__init__.py b/collection_manager/tests/entities/__init__.py similarity index 100% copy from collection_manager/tests/history_manager/__init__.py copy to collection_manager/tests/entities/__init__.py diff --git a/collection_manager/tests/entities/test_Collection.py b/collection_manager/tests/entities/test_Collection.py new file mode 100644 index 0000000..46506d4 --- /dev/null +++ b/collection_manager/tests/entities/test_Collection.py @@ -0,0 +1,139 @@ +import os +import unittest +from datetime import datetime, timezone + +from collection_manager.entities import Collection +from collection_manager.entities.exceptions import MissingValueCollectionError + + +class TestCollection(unittest.TestCase): + + def test_directory_with_directory(self): + directory = os.path.join(os.path.dirname(__file__), "../resources/data") + collection = Collection(dataset_id="test_dataset", + path=directory, + variable="test_variable", + historical_priority=1, + forward_processing_priority=2, + date_from=None, + date_to=None) + self.assertEqual(directory, collection.directory()) + + def test_directory_with_pattern(self): + pattern = os.path.join(os.path.dirname(__file__), "../resources/data/*.nc") + collection = Collection(dataset_id="test_dataset", + path=pattern, + variable="test_variable", + historical_priority=1, + forward_processing_priority=2, + date_from=None, + date_to=None) + self.assertEqual(os.path.dirname(pattern), collection.directory()) + + def test_owns_file_raises_exception_with_directory(self): + directory = os.path.join(os.path.dirname(__file__), "../resources/data") + collection = Collection(dataset_id="test_dataset", + path=directory, + variable="test_variable", + historical_priority=1, + forward_processing_priority=2, + date_from=None, + date_to=None) + self.assertRaises(IsADirectoryError, collection.owns_file, directory) + + def test_owns_file_matches(self): + directory = os.path.join(os.path.dirname(__file__), "../resources/data") + collection = Collection(dataset_id="test_dataset", + path=directory, + variable="test_variable", + historical_priority=1, + forward_processing_priority=2, + date_from=None, + date_to=None) + file_path = os.path.join(directory, "test_file.nc") + self.assertTrue(collection.owns_file(file_path)) + + def test_owns_file_does_not_match(self): + directory = os.path.join(os.path.dirname(__file__), "../resources/data") + collection = Collection(dataset_id="test_dataset", + path=directory, + variable="test_variable", + historical_priority=1, + forward_processing_priority=2, + date_from=None, + date_to=None) + self.assertFalse(collection.owns_file("test_file.nc")) + + def test_owns_file_matches_pattern(self): + directory = os.path.join(os.path.dirname(__file__), "../resources/data") + pattern = os.path.join(directory, "test_*.nc") + collection = Collection(dataset_id="test_dataset", + path=pattern, + variable="test_variable", + historical_priority=1, + forward_processing_priority=2, + date_from=None, + date_to=None) + file_path = os.path.join(directory, "test_file.nc") + self.assertTrue(collection.owns_file(file_path)) + + def test_owns_file_does_not_match_pattern(self): + directory = os.path.join(os.path.dirname(__file__), "../resources/data") + pattern = os.path.join(directory, "test_*.nc") + collection = Collection(dataset_id="test_dataset", + path=pattern, + variable="test_variable", + historical_priority=1, + forward_processing_priority=2, + date_from=None, + date_to=None) + file_path = os.path.join(directory, "nonmatch.nc") + self.assertFalse(collection.owns_file(file_path)) + + def test_from_dict(self): + collection_dict = { + 'id': 'test_id', + 'variable': 'test_var', + 'path': '/some/path', + 'priority': 1, + 'forward-processing-priority': 2, + 'from': '2020-01-01T00:00:00+00:00', + 'to': '2020-02-01T00:00:00+00:00' + } + + expected_collection = Collection(dataset_id='test_id', + variable='test_var', + path='/some/path', + historical_priority=1, + forward_processing_priority=2, + date_from=datetime(2020, 1, 1, 0, 0, 0, tzinfo=timezone.utc), + date_to=datetime(2020, 2, 1, 0, 0, 0, tzinfo=timezone.utc)) + + self.assertEqual(expected_collection, Collection.from_dict(collection_dict)) + + def test_from_dict_missing_optional_values(self): + collection_dict = { + 'id': 'test_id', + 'variable': 'test_var', + 'path': '/some/path', + 'priority': 3 + } + + expected_collection = Collection(dataset_id='test_id', + variable='test_var', + path='/some/path', + historical_priority=3, + forward_processing_priority=None, + date_from=None, + date_to=None) + + self.assertEqual(expected_collection, Collection.from_dict(collection_dict)) + + def test_from_dict_missing_required_values(self): + collection_dict = { + 'id': 'test_id', + 'variable': 'test_var', + 'path': '/some/path', + } + + self.assertRaises(MissingValueCollectionError, Collection.from_dict, collection_dict) diff --git a/collection_manager/tests/history_manager/test_datasetingestionhistoryfile.py b/collection_manager/tests/history_manager/test_datasetingestionhistoryfile.py deleted file mode 100644 index 0edeafb..0000000 --- a/collection_manager/tests/history_manager/test_datasetingestionhistoryfile.py +++ /dev/null @@ -1,64 +0,0 @@ -import unittest -import os -import sys -import pathlib -from collection_manager.services.history_manager import FileIngestionHistory -from collection_manager.services.history_manager import md5sum_from_filepath - -HISTORY_ROOT_PATH = os.path.join(sys.prefix, - ".collection_manager", - "tmp/history") -DATASET_ID = "zobi_la_mouche" - - -class DatasetIngestionHistoryFileTestCase(unittest.TestCase): - ingestion_history = None - - # @unittest.skip("does not work without a solr server for history_manager") - def test_get_md5sum(self): - self.ingestion_history = FileIngestionHistory(HISTORY_ROOT_PATH, DATASET_ID, md5sum_from_filepath) - self.ingestion_history._push_record("blue", "12weeukrhbwerqu7wier") - result = self.ingestion_history._get_signature("blue") - self.assertEqual(result, "12weeukrhbwerqu7wier") - - # @unittest.skip("does not work without a solr server for history_manager") - def test_get_missing_md5sum(self): - self.ingestion_history = FileIngestionHistory(HISTORY_ROOT_PATH, DATASET_ID, md5sum_from_filepath) - self.ingestion_history._push_record("blue", "12weeukrhbwerqu7wier") - result = self.ingestion_history._get_signature("green") - self.assertEqual(result, None) - - def test_has_valid_cache(self): - self.ingestion_history = FileIngestionHistory(HISTORY_ROOT_PATH, DATASET_ID, md5sum_from_filepath) - # history_manager with this file - current_file_path = pathlib.Path(__file__) - self.ingestion_history.push(str(current_file_path)) - self.assertEqual(self.ingestion_history.already_ingested(str(current_file_path)), True) - - def test_has_valid_cache_with_latest_modifcation_signature(self): - self.ingestion_history = FileIngestionHistory(HISTORY_ROOT_PATH, DATASET_ID, os.path.getmtime) - # history_manager with this file - current_file_path = pathlib.Path(__file__) - self.ingestion_history.push(str(current_file_path)) - self.assertEqual(self.ingestion_history.already_ingested(str(current_file_path)), True) - - def test_has_not_valid_cache(self): - self.ingestion_history = FileIngestionHistory(HISTORY_ROOT_PATH, DATASET_ID, md5sum_from_filepath) - # history_manager with this file - current_file_path = pathlib.Path(__file__) - self.assertEqual(self.ingestion_history.already_ingested(str(current_file_path)), False) - - @unittest.skip("skip before history_manager dataset is not available") - def test_purge(self): - self.ingestion_history = FileIngestionHistory("/Users/loubrieu/PycharmProjects/collection_manager/venv/.collection_manager/tmp/history/", - "avhrr-oi-analysed-sst-toto", - lambda x : str(os.path.getmtime(x))) - self.ingestion_history.purge() - - def tearDown(self): - self.ingestion_history.reset_cache() - del self.ingestion_history - - -if __name__ == '__main__': - unittest.main() diff --git a/collection_manager/tests/resources/collections.yml b/collection_manager/tests/resources/collections.yml new file mode 100644 index 0000000..89524ec --- /dev/null +++ b/collection_manager/tests/resources/collections.yml @@ -0,0 +1,17 @@ +collections: + - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND + path: /opt/data/grace/*land*.nc + variable: lwe_thickness + priority: 1 + forward-processing-priority: 5 + + - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_OCEAN + path: /opt/data/grace/*ocean*.nc + variable: lwe_thickness + priority: 2 + forward-processing-priority: 6 + + - id: AVHRR_OI-NCEI-L4-GLOB-v2.0 + path: /opt/data/avhrr/*.nc + variable: analysed_sst + priority: 1 diff --git a/collection_manager/tests/resources/collections_alternate.yml b/collection_manager/tests/resources/collections_alternate.yml new file mode 100644 index 0000000..3d7da95 --- /dev/null +++ b/collection_manager/tests/resources/collections_alternate.yml @@ -0,0 +1,17 @@ +collections: + - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND + path: /opt/data/grace/*land*.nc + variable: lwe_thickness + priority: 1 + forward-processing-priority: 5 + + - id: ID_CHANGED + path: /opt/data/grace/*ocean*.nc + variable: lwe_thickness + priority: 2 + forward-processing-priority: 6 + + - id: AVHRR_OI-NCEI-L4-GLOB-v2.0 + path: /opt/data/avhrr/*.nc + variable: analysed_sst + priority: 1 diff --git a/collection_manager/tests/resources/collections_bad.yml b/collection_manager/tests/resources/collections_bad.yml new file mode 100644 index 0000000..cac6a32 --- /dev/null +++ b/collection_manager/tests/resources/collections_bad.yml @@ -0,0 +1,17 @@ +collections: + - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND + path: /opt/data/grace/*land*.nc + variable: lwe_thickness + priority: 1 + forward-processing-priority: 5 +BAD SYNTAX! + - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_OCEAN + path: /opt/data/grace/*ocean*.nc + variable: lwe_thickness + priority: 2 + forward-processing-priority: 6 + + - id: AVHRR_OI-NCEI-L4-GLOB-v2.0 + path: /opt/data/avhrr/*.nc + variable: analysed_sst + priority: 1 diff --git a/collection_manager/tests/resources/data/collections.yml b/collection_manager/tests/resources/data/collections.yml deleted file mode 100644 index 8c30a37..0000000 --- a/collection_manager/tests/resources/data/collections.yml +++ /dev/null @@ -1,11 +0,0 @@ -avhrr-oi-analysed-sst: - id: avhrr-oi-analysed-sst - path: ../tests/resources/data/avhrr_oi/*.nc - variable: analysed_sst - priority: 9 - -avhrr-oi-analysed-sst2: - id: avhrr-oi-analysed-sst2 - path: ../tests/resources/data/avhrr_oi/*.nc - variable: analysed_sst - priority: 1 diff --git a/collection_manager/tests/resources/data/dataset_config_file_ok.yml b/collection_manager/tests/resources/data/dataset_config_file_ok.yml deleted file mode 100644 index 66bb883..0000000 --- a/collection_manager/tests/resources/data/dataset_config_file_ok.yml +++ /dev/null @@ -1,44 +0,0 @@ -# Tile Slicer Config -ningester: - tile_slicer: sliceFileByTilesDesired - sliceFileByTilesDesired: - tilesDesired: 1296 - timeDimension: time - dimensions: - - lat - - lon ---- -# Tile processors configuration -ningester: - tile_processors: - - pythonChainProcessor - - generateTileId - - addDatasetName - pythonChainProcessor: - enabled: - processor_list: - - - name: GridReadingProcessor - config: - latitude: lat - longitude: lon - time: time - variable_to_read: analysed_sst - - - name: EmptyTileFilter - - - name: TileSummarizingProcessor - config: - stored_var_name: analysed_sst - generateTileId: - enabled: - salt: analysed_sst - addDatasetName: - enabled: - datasetName: avhrr-oi-analysed-sst ---- -# Tile writer configuration -ningester: - tile_writer: - data_store: cassandraStore - metadata_store: solrStore diff --git a/collection_manager/tests/history_manager/__init__.py b/collection_manager/tests/services/history_manager/__init__.py similarity index 100% rename from collection_manager/tests/history_manager/__init__.py rename to collection_manager/tests/services/history_manager/__init__.py diff --git a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py new file mode 100644 index 0000000..d2ad45c --- /dev/null +++ b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py @@ -0,0 +1,56 @@ +import os +import pathlib +import tempfile +import unittest + +from collection_manager.services.history_manager import FileIngestionHistory, md5sum_from_filepath + +DATASET_ID = "zobi_la_mouche" + + +class TestFileIngestionHistory(unittest.TestCase): + + def test_get_md5sum(self): + with tempfile.TemporaryDirectory() as history_dir: + ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath) + ingestion_history._push_record("blue", "12weeukrhbwerqu7wier") + result = ingestion_history._get_signature("blue") + self.assertEqual(result, "12weeukrhbwerqu7wier") + + def test_get_missing_md5sum(self): + with tempfile.TemporaryDirectory() as history_dir: + ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath) + ingestion_history._push_record("blue", "12weeukrhbwerqu7wier") + result = ingestion_history._get_signature("green") + self.assertIsNone(result) + + def test_already_ingested(self): + with tempfile.TemporaryDirectory() as history_dir: + ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath) + # history_manager with this file + current_file_path = pathlib.Path(__file__) + ingestion_history.push(str(current_file_path)) + self.assertTrue(ingestion_history.already_ingested(str(current_file_path))) + + del ingestion_history + + def test_already_ingested_with_latest_modifcation_signature(self): + with tempfile.TemporaryDirectory() as history_dir: + ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, os.path.getmtime) + # history_manager with this file + current_file_path = pathlib.Path(__file__) + ingestion_history.push(str(current_file_path)) + self.assertTrue(ingestion_history.already_ingested(str(current_file_path))) + + del ingestion_history + + def test_already_ingested_is_false(self): + with tempfile.TemporaryDirectory() as history_dir: + ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, md5sum_from_filepath) + # history_manager with this file + current_file_path = pathlib.Path(__file__) + self.assertFalse(ingestion_history.already_ingested(str(current_file_path))) + + +if __name__ == '__main__': + unittest.main() diff --git a/collection_manager/tests/history_manager/test_datasetingestionhistorysolr.py b/collection_manager/tests/services/history_manager/test_SolrIngestionHistory.py similarity index 94% rename from collection_manager/tests/history_manager/test_datasetingestionhistorysolr.py rename to collection_manager/tests/services/history_manager/test_SolrIngestionHistory.py index 57b2bd1..deab42d 100644 --- a/collection_manager/tests/history_manager/test_datasetingestionhistorysolr.py +++ b/collection_manager/tests/services/history_manager/test_SolrIngestionHistory.py @@ -5,7 +5,8 @@ SOLR_URL = "http://localhost:8984/solr" DATASET_ID = "zobi_la_mouche" -class DatasetIngestionHistorySolrTestCase(unittest.TestCase): +# TODO: mock solr and fix these tests +class TestSolrIngestionHistory(unittest.TestCase): @unittest.skip("does not work without a solr server for history_manager") def test_get(self): ingestion_history = SolrIngestionHistory(SOLR_URL, DATASET_ID) diff --git a/collection_manager/tests/services/test_CollectionProcessor.py b/collection_manager/tests/services/test_CollectionProcessor.py index 73b67ba..7899e22 100644 --- a/collection_manager/tests/services/test_CollectionProcessor.py +++ b/collection_manager/tests/services/test_CollectionProcessor.py @@ -1,10 +1,11 @@ import tempfile -from unittest import mock import unittest +from unittest import mock from collection_manager.entities import Collection from collection_manager.services import CollectionProcessor -from collection_manager.services.history_manager import FileIngestionHistoryBuilder, FileIngestionHistory +from collection_manager.services.history_manager import FileIngestionHistoryBuilder +from collection_manager.services.history_manager import GranuleStatus class TestCollectionProcessor(unittest.TestCase): @@ -18,15 +19,17 @@ class TestCollectionProcessor(unittest.TestCase): def test_file_supported_with_foo(self): self.assertFalse(CollectionProcessor._file_supported("test_dir/test_granule.foo")) - def test_get_history_manager_returns_same_object(self): + @mock.patch('collection_manager.services.MessagePublisher', autospec=True) + def test_get_history_manager_returns_same_object(self, mock_publisher): with tempfile.TemporaryDirectory() as history_dir: - collection_processor = CollectionProcessor(None, FileIngestionHistoryBuilder(history_dir)) + collection_processor = CollectionProcessor(mock_publisher, FileIngestionHistoryBuilder(history_dir)) history_manager = collection_processor._get_history_manager('dataset_id') self.assertIs(collection_processor._get_history_manager('dataset_id'), history_manager) - def test_get_history_manager_returns_different_object(self): + @mock.patch('collection_manager.services.MessagePublisher', autospec=True) + def test_get_history_manager_returns_different_object(self, mock_publisher): with tempfile.TemporaryDirectory() as history_dir: - collection_processor = CollectionProcessor(None, FileIngestionHistoryBuilder(history_dir)) + collection_processor = CollectionProcessor(mock_publisher, FileIngestionHistoryBuilder(history_dir)) history_manager = collection_processor._get_history_manager('foo') self.assertIsNot(collection_processor._get_history_manager('bar'), history_manager) @@ -60,7 +63,106 @@ class TestCollectionProcessor(unittest.TestCase): filled = CollectionProcessor._fill_template(collection, template) self.assertEqual(filled, expected) - @mock.patch.object(FileIngestionHistory, 'push') - @mock.patch.object(FileIngestionHistory, 'get_granule_status') - def test_process_granule(self): - history_manager_builder = FileIngestionHistoryBuilder('/foo') + @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True) + @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True) + @mock.patch('collection_manager.services.MessagePublisher', autospec=True) + def test_process_granule_with_historical_granule(self, mock_publisher, mock_history_builder, mock_history): + mock_history.get_granule_status.return_value = GranuleStatus.DESIRED_HISTORICAL + mock_history_builder.build.return_value = mock_history + + collection_processor = CollectionProcessor(mock_publisher, mock_history_builder) + collection = Collection(dataset_id="test_dataset", + path="test_path", + variable="test_variable", + historical_priority=1, + forward_processing_priority=2, + date_from=None, + date_to=None) + + collection_processor.process_granule("test.nc", collection) + + mock_publisher.publish_message.assert_called_with(body=mock.ANY, priority=1) + mock_history.push.assert_called() + + @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True) + @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True) + @mock.patch('collection_manager.services.MessagePublisher', autospec=True) + def test_process_granule_with_forward_processing_granule(self, mock_publisher, mock_history_builder, mock_history): + mock_history.get_granule_status.return_value = GranuleStatus.DESIRED_FORWARD_PROCESSING + mock_history_builder.build.return_value = mock_history + + collection_processor = CollectionProcessor(mock_publisher, mock_history_builder) + collection = Collection(dataset_id="test_dataset", + path="test_path", + variable="test_variable", + historical_priority=1, + forward_processing_priority=2, + date_from=None, + date_to=None) + + collection_processor.process_granule("test.h5", collection) + + mock_publisher.publish_message.assert_called_with(body=mock.ANY, priority=2) + mock_history.push.assert_called() + + @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True) + @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True) + @mock.patch('collection_manager.services.MessagePublisher', autospec=True) + def test_process_granule_with_forward_processing_granule_and_no_priority(self, mock_publisher, + mock_history_builder, mock_history): + mock_history.get_granule_status.return_value = GranuleStatus.DESIRED_FORWARD_PROCESSING + mock_history_builder.build.return_value = mock_history + + collection_processor = CollectionProcessor(mock_publisher, mock_history_builder) + collection = Collection(dataset_id="test_dataset", + path="test_path", + variable="test_variable", + historical_priority=1, + date_from=None, + date_to=None) + + collection_processor.process_granule("test.h5", collection) + + mock_publisher.publish_message.assert_called_with(body=mock.ANY, priority=1) + mock_history.push.assert_called() + + @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True) + @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True) + @mock.patch('collection_manager.services.MessagePublisher', autospec=True) + def test_process_granule_with_undesired_granule(self, mock_publisher, mock_history_builder, mock_history): + mock_history.get_granule_status.return_value = GranuleStatus.UNDESIRED + mock_history_builder.build.return_value = mock_history + + collection_processor = CollectionProcessor(mock_publisher, mock_history_builder) + collection = Collection(dataset_id="test_dataset", + path="test_path", + variable="test_variable", + historical_priority=1, + forward_processing_priority=2, + date_from=None, + date_to=None) + + collection_processor.process_granule("test.nc", collection) + + mock_publisher.publish_message.assert_not_called() + mock_history.push.assert_not_called() + + @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True) + @mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder', autospec=True) + @mock.patch('collection_manager.services.MessagePublisher', autospec=True) + def test_process_granule_with_unsupported_file_type(self, mock_publisher, mock_history_builder, mock_history): + mock_history_builder.build.return_value = mock_history + + collection_processor = CollectionProcessor(mock_publisher, mock_history_builder) + collection = Collection(dataset_id="test_dataset", + path="test_path", + variable="test_variable", + historical_priority=1, + forward_processing_priority=2, + date_from=None, + date_to=None) + + collection_processor.process_granule("test.foo", collection) + + mock_publisher.publish_message.assert_not_called() + mock_history.push.assert_not_called() diff --git a/collection_manager/tests/services/test_CollectionWatcher.py b/collection_manager/tests/services/test_CollectionWatcher.py new file mode 100644 index 0000000..7ae25a1 --- /dev/null +++ b/collection_manager/tests/services/test_CollectionWatcher.py @@ -0,0 +1,203 @@ +import os +import tempfile +import time +import unittest +from datetime import datetime +from unittest.mock import Mock + +from collection_manager.entities import Collection +from collection_manager.entities.exceptions import YamlParsingError, CollectionConfigFileNotFoundError, \ + RelativePathCollectionError, ConflictingPathCollectionError +from collection_manager.services import CollectionWatcher + + +class TestCollectionWatcher(unittest.TestCase): + + def test_collections_returns_all_collections(self): + collection_watcher = CollectionWatcher('/foo', Mock(), Mock()) + collection_watcher._collections_by_dir = { + "/foo": { + Collection("id1", "var1", "path1", 1, 2, datetime.now(), datetime.now()), + Collection("id2", "var2", "path2", 3, 4, datetime.now(), datetime.now()), + }, + "/bar": { + Collection("id3", "var3", "path3", 5, 6, datetime.now(), datetime.now()), + Collection("id4", "var4", "path4", 7, 8, datetime.now(), datetime.now()), + } + } + flattened_collections = collection_watcher.collections() + self.assertEqual(len(flattened_collections), 4) + + def test_load_collections_loads_all_collections(self): + collections_path = os.path.join(os.path.dirname(__file__), '../resources/collections.yml') + collection_watcher = CollectionWatcher(collections_path, Mock(), Mock()) + collection_watcher._load_collections() + + self.assertEqual(len(collection_watcher._collections_by_dir), 2) + self.assertEqual(len(collection_watcher._collections_by_dir['/opt/data/grace']), 2) + self.assertEqual(len(collection_watcher._collections_by_dir['/opt/data/avhrr']), 1) + + def test_load_collections_with_bad_yaml_syntax(self): + collections_path = os.path.join(os.path.dirname(__file__), '../resources/collections_bad.yml') + collection_watcher = CollectionWatcher(collections_path, Mock(), Mock()) + + self.assertRaises(YamlParsingError, collection_watcher._load_collections) + + def test_load_collections_with_file_not_found(self): + collections_path = os.path.join(os.path.dirname(__file__), '../resources/does_not_exist.yml') + collection_watcher = CollectionWatcher(collections_path, Mock(), Mock()) + + self.assertRaises(CollectionConfigFileNotFoundError, collection_watcher._load_collections) + + def test_get_updated_collections_returns_all_collections(self): + collections_path = os.path.join(os.path.dirname(__file__), '../resources/collections.yml') + collection_watcher = CollectionWatcher(collections_path, Mock(), Mock()) + + updated_collections = collection_watcher._get_updated_collections() + self.assertSetEqual(updated_collections, collection_watcher.collections()) + + def test_get_updated_collections_returns_no_collections(self): + collections_path = os.path.join(os.path.dirname(__file__), '../resources/collections.yml') + collection_watcher = CollectionWatcher(collections_path, Mock(), Mock()) + collection_watcher._load_collections() + updated_collections = collection_watcher._get_updated_collections() + + self.assertEqual(len(updated_collections), 0) + + def test_get_updated_collections_returns_some_collections(self): + collections_path = os.path.join(os.path.dirname(__file__), '../resources/collections.yml') + collection_watcher = CollectionWatcher(collections_path, Mock(), Mock()) + collection_watcher._load_collections() + + new_collections_path = os.path.join(os.path.dirname(__file__), '../resources/collections_alternate.yml') + collection_watcher._collections_path = new_collections_path + updated_collections = collection_watcher._get_updated_collections() + + self.assertEqual(len(updated_collections), 1) + + def test_validate_collection(self): + collections_path = os.path.join(os.path.dirname(__file__), '../resources/collections.yml') + collection_watcher = CollectionWatcher(collections_path, Mock(), Mock()) + + collection = Collection(dataset_id="test_dataset", + path="/absolute/path", + variable="test_variable", + historical_priority=1, + forward_processing_priority=2, + date_from=None, + date_to=None) + collection_watcher._validate_collection(collection) + + def test_validate_collection_with_relative_path(self): + collections_path = os.path.join(os.path.dirname(__file__), '../resources/collections.yml') + collection_watcher = CollectionWatcher(collections_path, Mock(), Mock()) + + collection = Collection(dataset_id="test_dataset", + path="relative/path", + variable="test_variable", + historical_priority=1, + forward_processing_priority=2, + date_from=None, + date_to=None) + self.assertRaises(RelativePathCollectionError, collection_watcher._validate_collection, collection) + + def test_validate_collection_with_conflicting_path(self): + collections_path = os.path.join(os.path.dirname(__file__), '/resources/collections.yml') + collection_watcher = CollectionWatcher(collections_path, Mock(), Mock()) + + collection = Collection(dataset_id="test_dataset", + path="/resources/*.nc", + variable="test_variable", + historical_priority=1, + forward_processing_priority=2, + date_from=None, + date_to=None) + self.assertRaises(ConflictingPathCollectionError, collection_watcher._validate_collection, collection) + + def test_collection_callback_is_called(self): + collections_config = tempfile.NamedTemporaryFile("w+b", buffering=0, delete=False) + granule_dir = tempfile.TemporaryDirectory() + collections_str = f"""collections: +- id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND + path: {granule_dir.name} + variable: lwe_thickness + priority: 1 + forward-processing-priority: 5""" + collections_config.write(collections_str.encode("utf-8")) + + collection_callback = Mock() + collection_watcher = CollectionWatcher(collections_config.name, collection_callback, Mock()) + collection_watcher.start_watching() + + collections_str = f""" +- id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND + path: {granule_dir.name} + variable: lwe_thickness + priority: 10 + forward-processing-priority: 5 + """ + collections_config.write(collections_str.encode("utf-8")) + collections_config.close() + + self.assert_called_within_timeout(collection_callback, timeout_sec=1, call_count=2) + granule_dir.cleanup() + os.remove(collections_config.name) + + def test_granule_callback_is_called_on_new_file(self): + with tempfile.NamedTemporaryFile("w+b", buffering=0) as collections_config: + granule_dir = tempfile.TemporaryDirectory() + collections_str = f""" +collections: +- id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND + path: {granule_dir.name} + variable: lwe_thickness + priority: 1 + forward-processing-priority: 5 + """ + collections_config.write(collections_str.encode("utf-8")) + + granule_callback = Mock() + collection_watcher = CollectionWatcher(collections_config.name, Mock(), granule_callback) + collection_watcher.start_watching() + + new_granule = open(os.path.join(granule_dir.name, 'test.nc'), "w+") + + self.assert_called_within_timeout(granule_callback) + + new_granule.close() + granule_dir.cleanup() + + def test_granule_callback_is_called_on_modified_file(self): + with tempfile.NamedTemporaryFile("w+b", buffering=0) as collections_config: + granule_dir = tempfile.TemporaryDirectory() + collections_str = f""" +collections: +- id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND + path: {granule_dir.name} + variable: lwe_thickness + priority: 1 + forward-processing-priority: 5 + """ + collections_config.write(collections_str.encode("utf-8")) + new_granule = open(os.path.join(granule_dir.name, 'test.nc'), "w+") + + granule_callback = Mock() + collection_watcher = CollectionWatcher(collections_config.name, Mock(), granule_callback) + collection_watcher.start_watching() + + new_granule.write("hello world") + new_granule.close() + + self.assert_called_within_timeout(granule_callback) + + granule_dir.cleanup() + + @staticmethod + def assert_called_within_timeout(mock_func, timeout_sec=1.0, call_count=1): + start = datetime.now() + + while (datetime.now() - start).total_seconds() < timeout_sec: + time.sleep(0.01) + if mock_func.call_count >= call_count: + return + raise AssertionError(f"{mock_func} did not reach {call_count} calls called within {timeout_sec} sec")
