This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
The following commit(s) were added to refs/heads/dev by this push:
new 8b6f5e5 SDAP-234: Add more test coverage. (#1)
8b6f5e5 is described below
commit 8b6f5e5b7f96403aa895e3e59a15fe4dd687e770
Author: Eamon Ford <[email protected]>
AuthorDate: Thu Jun 18 15:53:34 2020 -0700
SDAP-234: Add more test coverage. (#1)
Co-authored-by: Eamon Ford <[email protected]>
---
.../collection_manager/entities/Collection.py | 40 ++--
.../collection_manager/entities/__init__.py | 3 +-
.../entities/exceptions/Exceptions.py | 30 +++
.../entities/exceptions/__init__.py | 6 +
collection_manager/collection_manager/main.py | 58 +++---
.../services/CollectionProcessor.py | 7 +-
.../services/CollectionWatcher.py | 103 +++++++----
.../{history_manager => entities}/__init__.py | 0
.../tests/entities/test_Collection.py | 139 ++++++++++++++
.../test_datasetingestionhistoryfile.py | 64 -------
collection_manager/tests/resources/collections.yml | 17 ++
.../tests/resources/collections_alternate.yml | 17 ++
.../tests/resources/collections_bad.yml | 17 ++
.../tests/resources/data/collections.yml | 11 --
.../resources/data/dataset_config_file_ok.yml | 44 -----
.../{ => services}/history_manager/__init__.py | 0
.../history_manager/test_FileIngestionHistory.py | 56 ++++++
.../history_manager/test_SolrIngestionHistory.py} | 3 +-
.../tests/services/test_CollectionProcessor.py | 122 ++++++++++++-
.../tests/services/test_CollectionWatcher.py | 203 +++++++++++++++++++++
20 files changed, 734 insertions(+), 206 deletions(-)
diff --git a/collection_manager/collection_manager/entities/Collection.py
b/collection_manager/collection_manager/entities/Collection.py
index d033c69..3976b6d 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -1,33 +1,39 @@
import os
+from dataclasses import dataclass
from datetime import datetime
from fnmatch import fnmatch
from glob import glob
-from typing import NamedTuple, List
+from typing import List, Optional
+from collection_manager.entities.exceptions import MissingValueCollectionError
-class Collection(NamedTuple):
+
+@dataclass(frozen=True)
+class Collection:
dataset_id: str
variable: str
path: str
historical_priority: int
- forward_processing_priority: int
- date_from: datetime
- date_to: datetime
+ forward_processing_priority: Optional[int] = None
+ date_from: Optional[datetime] = None
+ date_to: Optional[datetime] = None
@staticmethod
def from_dict(properties: dict):
- date_to = datetime.fromisoformat(properties['to']) if 'to' in
properties else None
- date_from = datetime.fromisoformat(properties['from']) if 'from' in
properties else None
-
- collection = Collection(dataset_id=properties['id'],
- variable=properties['variable'],
- path=properties['path'],
- historical_priority=properties['priority'],
-
forward_processing_priority=properties.get('forward_processing_priority',
-
properties['priority']),
- date_to=date_to,
- date_from=date_from)
- return collection
+ try:
+ date_to = datetime.fromisoformat(properties['to']) if 'to' in
properties else None
+ date_from = datetime.fromisoformat(properties['from']) if 'from'
in properties else None
+
+ collection = Collection(dataset_id=properties['id'],
+ variable=properties['variable'],
+ path=properties['path'],
+ historical_priority=properties['priority'],
+
forward_processing_priority=properties.get('forward-processing-priority', None),
+ date_to=date_to,
+ date_from=date_from)
+ return collection
+ except KeyError as e:
+ raise MissingValueCollectionError(missing_value=e.args[0])
def directory(self):
if os.path.isdir(self.path):
diff --git a/collection_manager/collection_manager/entities/__init__.py
b/collection_manager/collection_manager/entities/__init__.py
index 9f30603..165341b 100644
--- a/collection_manager/collection_manager/entities/__init__.py
+++ b/collection_manager/collection_manager/entities/__init__.py
@@ -1,2 +1 @@
-
-from .Collection import Collection
\ No newline at end of file
+from .Collection import Collection
diff --git
a/collection_manager/collection_manager/entities/exceptions/Exceptions.py
b/collection_manager/collection_manager/entities/exceptions/Exceptions.py
new file mode 100644
index 0000000..8e63d24
--- /dev/null
+++ b/collection_manager/collection_manager/entities/exceptions/Exceptions.py
@@ -0,0 +1,30 @@
+class RelativePathError(Exception):
+ pass
+
+
+class YamlParsingError(Exception):
+ pass
+
+
+class CollectionConfigFileNotFoundError(Exception):
+ pass
+
+
+class CollectionError(Exception):
+ def __init__(self, collection=None, message=None):
+ super().__init__(message)
+ self.collection = collection
+
+
+class MissingValueCollectionError(CollectionError):
+ def __init__(self, missing_value, collection=None, message=None):
+ super().__init__(collection, message)
+ self.missing_value = missing_value
+
+
+class ConflictingPathCollectionError(CollectionError):
+ pass
+
+
+class RelativePathCollectionError(CollectionError):
+ pass
diff --git
a/collection_manager/collection_manager/entities/exceptions/__init__.py
b/collection_manager/collection_manager/entities/exceptions/__init__.py
new file mode 100644
index 0000000..9a22c16
--- /dev/null
+++ b/collection_manager/collection_manager/entities/exceptions/__init__.py
@@ -0,0 +1,6 @@
+from .Exceptions import CollectionConfigFileNotFoundError
+from .Exceptions import ConflictingPathCollectionError
+from .Exceptions import MissingValueCollectionError
+from .Exceptions import RelativePathCollectionError
+from .Exceptions import RelativePathError
+from .Exceptions import YamlParsingError
diff --git a/collection_manager/collection_manager/main.py
b/collection_manager/collection_manager/main.py
index fa4d7ce..a10446f 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -1,5 +1,6 @@
import argparse
import logging
+import os
import time
from collection_manager.services import CollectionProcessor,
CollectionWatcher, MessagePublisher
@@ -7,7 +8,13 @@ from collection_manager.services.history_manager import
SolrIngestionHistoryBuil
logging.basicConfig(level=logging.INFO)
logging.getLogger("pika").setLevel(logging.WARNING)
-logger = logging.getLogger(__name__)
+logger = logging.getLogger("collection_manager")
+
+
+def check_path(path) -> str:
+ if not os.path.isabs(path):
+ raise argparse.ArgumentError("Paths must be absolute.")
+ return path
def get_args() -> argparse.Namespace:
@@ -16,7 +23,7 @@ def get_args() -> argparse.Namespace:
help="refresh interval in seconds to check for new or
updated granules",
default=300)
parser.add_argument("--collections",
- help="path to collections configuration file",
+ help="Absolute path to collections configuration file",
required=True)
parser.add_argument('--rabbitmq_host',
default='localhost',
@@ -36,32 +43,39 @@ def get_args() -> argparse.Namespace:
help='Name of the RabbitMQ queue to consume from.
(Default: "nexus")')
history_group = parser.add_mutually_exclusive_group(required=True)
history_group.add_argument("--history-path",
- help="path to ingestion history local
directory")
+ help="Absolute path to ingestion history local
directory")
history_group.add_argument("--history-url",
- help="url to ingestion history solr database")
+ help="URL to ingestion history solr database")
+
return parser.parse_args()
def main():
- options = get_args()
- if options.history_path:
- history_manager_builder =
FileIngestionHistoryBuilder(history_path=options.history_path)
- else:
- history_manager_builder =
SolrIngestionHistoryBuilder(solr_url=options.history_url)
- publisher = MessagePublisher(host=options.rabbitmq_host,
- username=options.rabbitmq_username,
- password=options.rabbitmq_password,
- queue=options.rabbitmq_queue)
- publisher.connect()
- collection_processor = CollectionProcessor(message_publisher=publisher,
-
history_manager_builder=history_manager_builder)
- collection_watcher =
CollectionWatcher(collections_path=options.collections,
-
collection_updated_callback=collection_processor.process_collection,
-
granule_updated_callback=collection_processor.process_granule)
+ try:
+ options = get_args()
+
+ if options.history_path:
+ history_manager_builder =
FileIngestionHistoryBuilder(history_path=options.history_path)
+ else:
+ history_manager_builder =
SolrIngestionHistoryBuilder(solr_url=options.history_url)
+ publisher = MessagePublisher(host=options.rabbitmq_host,
+ username=options.rabbitmq_username,
+ password=options.rabbitmq_password,
+ queue=options.rabbitmq_queue)
+ publisher.connect()
+ collection_processor = CollectionProcessor(message_publisher=publisher,
+
history_manager_builder=history_manager_builder)
+ collection_watcher =
CollectionWatcher(collections_path=options.collections,
+
collection_updated_callback=collection_processor.process_collection,
+
granule_updated_callback=collection_processor.process_granule)
+
+ collection_watcher.start_watching()
+ while True:
+ time.sleep(1)
- collection_watcher.start_watching()
- while True:
- time.sleep(1)
+ except Exception as e:
+ logger.error(e)
+ return
if __name__ == "__main__":
diff --git
a/collection_manager/collection_manager/services/CollectionProcessor.py
b/collection_manager/collection_manager/services/CollectionProcessor.py
index 75a86e2..a81390b 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -50,7 +50,10 @@ class CollectionProcessor:
if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING:
logger.info(f"New granule '{granule}' detected for
forward-processing ingestion "
f"in collection '{collection.dataset_id}'.")
- use_priority = collection.forward_processing_priority
+ if collection.forward_processing_priority is not None:
+ use_priority = collection.forward_processing_priority
+ else:
+ use_priority = collection.historical_priority
elif granule_status is GranuleStatus.DESIRED_HISTORICAL:
logger.info(f"New granule '{granule}' detected for historical
ingestion in collection "
f"'{collection.dataset_id}'.")
@@ -61,7 +64,7 @@ class CollectionProcessor:
return
dataset_config = self._fill_template(collection,
config_template=self._config_template)
- self._publisher.publish_message(dataset_config, use_priority)
+ self._publisher.publish_message(body=dataset_config,
priority=use_priority)
history_manager.push(granule)
@staticmethod
diff --git
a/collection_manager/collection_manager/services/CollectionWatcher.py
b/collection_manager/collection_manager/services/CollectionWatcher.py
index b1fca64..6bbe7d9 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,7 +1,7 @@
import logging
import os
from collections import defaultdict
-from typing import List, Dict, Callable, Set
+from typing import Dict, Callable, Set
import yaml
from watchdog.events import FileSystemEventHandler
@@ -9,6 +9,9 @@ from watchdog.observers import Observer
from yaml.scanner import ScannerError
from collection_manager.entities import Collection
+from collection_manager.entities.exceptions import RelativePathError,
YamlParsingError, \
+ CollectionConfigFileNotFoundError, MissingValueCollectionError,
ConflictingPathCollectionError, \
+ RelativePathCollectionError
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@@ -19,71 +22,96 @@ class CollectionWatcher:
collections_path: str,
collection_updated_callback: Callable[[Collection], any],
granule_updated_callback: Callable[[str, Collection], any]):
+ if not os.path.isabs(collections_path):
+ raise RelativePathError("Collections config path must be an
absolute path.")
+
self._collections_path = collections_path
self._collection_updated_callback = collection_updated_callback
self._granule_updated_callback = granule_updated_callback
-
+
self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
self._observer = Observer()
+ self._granule_watches = set()
+
def start_watching(self):
"""
Start observing filesystem events for added/modified granules or
changes to the Collections configuration file.
When an event occurs, call the appropriate callback that was passed in
during instantiation.
:return: None
"""
-
self._observer.schedule(_CollectionEventHandler(file_path=self._collections_path,
callback=self._refresh),
- os.path.dirname(self._collections_path))
+ self._observer.schedule(
+ _CollectionEventHandler(file_path=self._collections_path,
callback=self._reload_and_reschedule),
+ os.path.dirname(self._collections_path))
self._observer.start()
- self._refresh()
+ self._reload_and_reschedule()
- def collections(self) -> List[Collection]:
+ def collections(self) -> Set[Collection]:
"""
- Return a list of all Collections being watched.
- :return: A list of Collections
+ Return a set of all Collections being watched.
+ :return: A set of Collections
"""
- return [collection for collections in
self._collections_by_dir.values() for collection in collections]
+ return {collection for collections in
self._collections_by_dir.values() for collection in collections}
+
+ def _validate_collection(self, collection: Collection):
+ directory = collection.directory()
+ if not os.path.isabs(directory):
+ raise RelativePathCollectionError(collection=collection)
+ if directory == os.path.dirname(self._collections_path):
+ raise ConflictingPathCollectionError(collection=collection)
def _load_collections(self):
try:
with open(self._collections_path, 'r') as f:
collections_yaml = yaml.load(f, Loader=yaml.FullLoader)
self._collections_by_dir.clear()
- for _, collection_dict in collections_yaml.items():
- collection = Collection.from_dict(collection_dict)
- directory = collection.directory()
- if directory == os.path.dirname(self._collections_path):
- logger.error(f"Collection {collection.dataset_id} uses
granule directory {collection.path} "
- f"which is the same directory as the
collection configuration file, "
- f"{self._collections_path}. The granules need
to be in their own directory. "
- f"Ignoring collection {collection.dataset_id}
for now.")
- else:
- self._collections_by_dir[directory].add(collection)
-
+ for collection_dict in collections_yaml['collections']:
+ try:
+ collection = Collection.from_dict(collection_dict)
+ self._validate_collection(collection)
+
self._collections_by_dir[collection.directory()].add(collection)
+ except MissingValueCollectionError as e:
+ logger.error(f"A collection is missing
'{e.missing_value}'. Ignoring this collection for now.")
+ except RelativePathCollectionError as e:
+ logger.error(f"Relative paths are not allowed for the
'path' property of a collection. "
+ f"Ignoring collection
'{e.collection.dataset_id}' until its path is fixed.")
+ except ConflictingPathCollectionError as e:
+ logger.error(f"Collection '{e.collection.dataset_id}' has
granule path '{e.collection.path}' "
+ f"which uses same directory as the collection
configuration file, "
+ f"'{self._collections_path}'. The granules
need to be in their own directory. "
+ f"Ignoring collection
'{e.collection.dataset_id}' for now.")
except FileNotFoundError:
- logger.error(f"Collection configuration file not found at
{self._collections_path}.")
+ raise CollectionConfigFileNotFoundError("The collection config
file could not be found at "
+
f"{self._collections_path}")
except yaml.scanner.ScannerError:
- logger.error(f"Bad YAML syntax in collection configuration file.
Will attempt to reload collections "
- f"after the next configuration change.")
-
- def _refresh(self):
- for collection in self._get_updated_collections():
- self._collection_updated_callback(collection)
+ raise YamlParsingError("Bad YAML syntax in collection
configuration file. Will attempt to reload "
+ "collections after the next configuration
change.")
- self._observer.unschedule_all()
- self._schedule_watches()
-
- def _get_updated_collections(self) -> List[Collection]:
+ def _get_updated_collections(self) -> Set[Collection]:
old_collections = self.collections()
self._load_collections()
- return list(set(self.collections()) - set(old_collections))
+ return self.collections() - old_collections
+
+ def _reload_and_reschedule(self):
+ try:
+ for collection in self._get_updated_collections():
+ self._collection_updated_callback(collection)
+ self._unschedule_watches()
+ self._schedule_watches()
+ except YamlParsingError as e:
+ logger.error(e)
+
+ def _unschedule_watches(self):
+ for watch in self._granule_watches:
+ self._observer.unschedule(watch)
+ self._granule_watches.clear()
def _schedule_watches(self):
for directory, collections in self._collections_by_dir.items():
granule_event_handler =
_GranuleEventHandler(self._granule_updated_callback, collections)
# Note: the Watchdog library does not schedule a new watch
# if one is already scheduled for the same directory
- self._observer.schedule(granule_event_handler, directory)
+
self._granule_watches.add(self._observer.schedule(granule_event_handler,
directory))
class _CollectionEventHandler(FileSystemEventHandler):
@@ -115,3 +143,12 @@ class _GranuleEventHandler(FileSystemEventHandler):
for collection in self._collections_for_dir:
if collection.owns_file(event.src_path):
self._callback(event.src_path, collection)
+
+ def on_modified(self, event):
+ super().on_modified(event)
+ if os.path.isdir(event.src_path):
+ return
+
+ for collection in self._collections_for_dir:
+ if collection.owns_file(event.src_path):
+ self._callback(event.src_path, collection)
diff --git a/collection_manager/tests/history_manager/__init__.py
b/collection_manager/tests/entities/__init__.py
similarity index 100%
copy from collection_manager/tests/history_manager/__init__.py
copy to collection_manager/tests/entities/__init__.py
diff --git a/collection_manager/tests/entities/test_Collection.py
b/collection_manager/tests/entities/test_Collection.py
new file mode 100644
index 0000000..46506d4
--- /dev/null
+++ b/collection_manager/tests/entities/test_Collection.py
@@ -0,0 +1,139 @@
+import os
+import unittest
+from datetime import datetime, timezone
+
+from collection_manager.entities import Collection
+from collection_manager.entities.exceptions import MissingValueCollectionError
+
+
+class TestCollection(unittest.TestCase):
+
+ def test_directory_with_directory(self):
+ directory = os.path.join(os.path.dirname(__file__),
"../resources/data")
+ collection = Collection(dataset_id="test_dataset",
+ path=directory,
+ variable="test_variable",
+ historical_priority=1,
+ forward_processing_priority=2,
+ date_from=None,
+ date_to=None)
+ self.assertEqual(directory, collection.directory())
+
+ def test_directory_with_pattern(self):
+ pattern = os.path.join(os.path.dirname(__file__),
"../resources/data/*.nc")
+ collection = Collection(dataset_id="test_dataset",
+ path=pattern,
+ variable="test_variable",
+ historical_priority=1,
+ forward_processing_priority=2,
+ date_from=None,
+ date_to=None)
+ self.assertEqual(os.path.dirname(pattern), collection.directory())
+
+ def test_owns_file_raises_exception_with_directory(self):
+ directory = os.path.join(os.path.dirname(__file__),
"../resources/data")
+ collection = Collection(dataset_id="test_dataset",
+ path=directory,
+ variable="test_variable",
+ historical_priority=1,
+ forward_processing_priority=2,
+ date_from=None,
+ date_to=None)
+ self.assertRaises(IsADirectoryError, collection.owns_file, directory)
+
+ def test_owns_file_matches(self):
+ directory = os.path.join(os.path.dirname(__file__),
"../resources/data")
+ collection = Collection(dataset_id="test_dataset",
+ path=directory,
+ variable="test_variable",
+ historical_priority=1,
+ forward_processing_priority=2,
+ date_from=None,
+ date_to=None)
+ file_path = os.path.join(directory, "test_file.nc")
+ self.assertTrue(collection.owns_file(file_path))
+
+ def test_owns_file_does_not_match(self):
+ directory = os.path.join(os.path.dirname(__file__),
"../resources/data")
+ collection = Collection(dataset_id="test_dataset",
+ path=directory,
+ variable="test_variable",
+ historical_priority=1,
+ forward_processing_priority=2,
+ date_from=None,
+ date_to=None)
+ self.assertFalse(collection.owns_file("test_file.nc"))
+
+ def test_owns_file_matches_pattern(self):
+ directory = os.path.join(os.path.dirname(__file__),
"../resources/data")
+ pattern = os.path.join(directory, "test_*.nc")
+ collection = Collection(dataset_id="test_dataset",
+ path=pattern,
+ variable="test_variable",
+ historical_priority=1,
+ forward_processing_priority=2,
+ date_from=None,
+ date_to=None)
+ file_path = os.path.join(directory, "test_file.nc")
+ self.assertTrue(collection.owns_file(file_path))
+
+ def test_owns_file_does_not_match_pattern(self):
+ directory = os.path.join(os.path.dirname(__file__),
"../resources/data")
+ pattern = os.path.join(directory, "test_*.nc")
+ collection = Collection(dataset_id="test_dataset",
+ path=pattern,
+ variable="test_variable",
+ historical_priority=1,
+ forward_processing_priority=2,
+ date_from=None,
+ date_to=None)
+ file_path = os.path.join(directory, "nonmatch.nc")
+ self.assertFalse(collection.owns_file(file_path))
+
+ def test_from_dict(self):
+ collection_dict = {
+ 'id': 'test_id',
+ 'variable': 'test_var',
+ 'path': '/some/path',
+ 'priority': 1,
+ 'forward-processing-priority': 2,
+ 'from': '2020-01-01T00:00:00+00:00',
+ 'to': '2020-02-01T00:00:00+00:00'
+ }
+
+ expected_collection = Collection(dataset_id='test_id',
+ variable='test_var',
+ path='/some/path',
+ historical_priority=1,
+ forward_processing_priority=2,
+ date_from=datetime(2020, 1, 1, 0, 0,
0, tzinfo=timezone.utc),
+ date_to=datetime(2020, 2, 1, 0, 0, 0,
tzinfo=timezone.utc))
+
+ self.assertEqual(expected_collection,
Collection.from_dict(collection_dict))
+
+ def test_from_dict_missing_optional_values(self):
+ collection_dict = {
+ 'id': 'test_id',
+ 'variable': 'test_var',
+ 'path': '/some/path',
+ 'priority': 3
+ }
+
+ expected_collection = Collection(dataset_id='test_id',
+ variable='test_var',
+ path='/some/path',
+ historical_priority=3,
+ forward_processing_priority=None,
+ date_from=None,
+ date_to=None)
+
+ self.assertEqual(expected_collection,
Collection.from_dict(collection_dict))
+
+ def test_from_dict_missing_required_values(self):
+ collection_dict = {
+ 'id': 'test_id',
+ 'variable': 'test_var',
+ 'path': '/some/path',
+ }
+
+ self.assertRaises(MissingValueCollectionError, Collection.from_dict,
collection_dict)
diff --git
a/collection_manager/tests/history_manager/test_datasetingestionhistoryfile.py
b/collection_manager/tests/history_manager/test_datasetingestionhistoryfile.py
deleted file mode 100644
index 0edeafb..0000000
---
a/collection_manager/tests/history_manager/test_datasetingestionhistoryfile.py
+++ /dev/null
@@ -1,64 +0,0 @@
-import unittest
-import os
-import sys
-import pathlib
-from collection_manager.services.history_manager import FileIngestionHistory
-from collection_manager.services.history_manager import md5sum_from_filepath
-
-HISTORY_ROOT_PATH = os.path.join(sys.prefix,
- ".collection_manager",
- "tmp/history")
-DATASET_ID = "zobi_la_mouche"
-
-
-class DatasetIngestionHistoryFileTestCase(unittest.TestCase):
- ingestion_history = None
-
- # @unittest.skip("does not work without a solr server for history_manager")
- def test_get_md5sum(self):
- self.ingestion_history = FileIngestionHistory(HISTORY_ROOT_PATH,
DATASET_ID, md5sum_from_filepath)
- self.ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
- result = self.ingestion_history._get_signature("blue")
- self.assertEqual(result, "12weeukrhbwerqu7wier")
-
- # @unittest.skip("does not work without a solr server for history_manager")
- def test_get_missing_md5sum(self):
- self.ingestion_history = FileIngestionHistory(HISTORY_ROOT_PATH,
DATASET_ID, md5sum_from_filepath)
- self.ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
- result = self.ingestion_history._get_signature("green")
- self.assertEqual(result, None)
-
- def test_has_valid_cache(self):
- self.ingestion_history = FileIngestionHistory(HISTORY_ROOT_PATH,
DATASET_ID, md5sum_from_filepath)
- # history_manager with this file
- current_file_path = pathlib.Path(__file__)
- self.ingestion_history.push(str(current_file_path))
-
self.assertEqual(self.ingestion_history.already_ingested(str(current_file_path)),
True)
-
- def test_has_valid_cache_with_latest_modifcation_signature(self):
- self.ingestion_history = FileIngestionHistory(HISTORY_ROOT_PATH,
DATASET_ID, os.path.getmtime)
- # history_manager with this file
- current_file_path = pathlib.Path(__file__)
- self.ingestion_history.push(str(current_file_path))
-
self.assertEqual(self.ingestion_history.already_ingested(str(current_file_path)),
True)
-
- def test_has_not_valid_cache(self):
- self.ingestion_history = FileIngestionHistory(HISTORY_ROOT_PATH,
DATASET_ID, md5sum_from_filepath)
- # history_manager with this file
- current_file_path = pathlib.Path(__file__)
-
self.assertEqual(self.ingestion_history.already_ingested(str(current_file_path)),
False)
-
- @unittest.skip("skip before history_manager dataset is not available")
- def test_purge(self):
- self.ingestion_history =
FileIngestionHistory("/Users/loubrieu/PycharmProjects/collection_manager/venv/.collection_manager/tmp/history/",
-
"avhrr-oi-analysed-sst-toto",
- lambda x :
str(os.path.getmtime(x)))
- self.ingestion_history.purge()
-
- def tearDown(self):
- self.ingestion_history.reset_cache()
- del self.ingestion_history
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/collection_manager/tests/resources/collections.yml
b/collection_manager/tests/resources/collections.yml
new file mode 100644
index 0000000..89524ec
--- /dev/null
+++ b/collection_manager/tests/resources/collections.yml
@@ -0,0 +1,17 @@
+collections:
+ - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
+ path: /opt/data/grace/*land*.nc
+ variable: lwe_thickness
+ priority: 1
+ forward-processing-priority: 5
+
+ - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_OCEAN
+ path: /opt/data/grace/*ocean*.nc
+ variable: lwe_thickness
+ priority: 2
+ forward-processing-priority: 6
+
+ - id: AVHRR_OI-NCEI-L4-GLOB-v2.0
+ path: /opt/data/avhrr/*.nc
+ variable: analysed_sst
+ priority: 1
diff --git a/collection_manager/tests/resources/collections_alternate.yml
b/collection_manager/tests/resources/collections_alternate.yml
new file mode 100644
index 0000000..3d7da95
--- /dev/null
+++ b/collection_manager/tests/resources/collections_alternate.yml
@@ -0,0 +1,17 @@
+collections:
+ - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
+ path: /opt/data/grace/*land*.nc
+ variable: lwe_thickness
+ priority: 1
+ forward-processing-priority: 5
+
+ - id: ID_CHANGED
+ path: /opt/data/grace/*ocean*.nc
+ variable: lwe_thickness
+ priority: 2
+ forward-processing-priority: 6
+
+ - id: AVHRR_OI-NCEI-L4-GLOB-v2.0
+ path: /opt/data/avhrr/*.nc
+ variable: analysed_sst
+ priority: 1
diff --git a/collection_manager/tests/resources/collections_bad.yml
b/collection_manager/tests/resources/collections_bad.yml
new file mode 100644
index 0000000..cac6a32
--- /dev/null
+++ b/collection_manager/tests/resources/collections_bad.yml
@@ -0,0 +1,17 @@
+collections:
+ - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
+ path: /opt/data/grace/*land*.nc
+ variable: lwe_thickness
+ priority: 1
+ forward-processing-priority: 5
+BAD SYNTAX!
+ - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_OCEAN
+ path: /opt/data/grace/*ocean*.nc
+ variable: lwe_thickness
+ priority: 2
+ forward-processing-priority: 6
+
+ - id: AVHRR_OI-NCEI-L4-GLOB-v2.0
+ path: /opt/data/avhrr/*.nc
+ variable: analysed_sst
+ priority: 1
diff --git a/collection_manager/tests/resources/data/collections.yml
b/collection_manager/tests/resources/data/collections.yml
deleted file mode 100644
index 8c30a37..0000000
--- a/collection_manager/tests/resources/data/collections.yml
+++ /dev/null
@@ -1,11 +0,0 @@
-avhrr-oi-analysed-sst:
- id: avhrr-oi-analysed-sst
- path: ../tests/resources/data/avhrr_oi/*.nc
- variable: analysed_sst
- priority: 9
-
-avhrr-oi-analysed-sst2:
- id: avhrr-oi-analysed-sst2
- path: ../tests/resources/data/avhrr_oi/*.nc
- variable: analysed_sst
- priority: 1
diff --git a/collection_manager/tests/resources/data/dataset_config_file_ok.yml
b/collection_manager/tests/resources/data/dataset_config_file_ok.yml
deleted file mode 100644
index 66bb883..0000000
--- a/collection_manager/tests/resources/data/dataset_config_file_ok.yml
+++ /dev/null
@@ -1,44 +0,0 @@
-# Tile Slicer Config
-ningester:
- tile_slicer: sliceFileByTilesDesired
- sliceFileByTilesDesired:
- tilesDesired: 1296
- timeDimension: time
- dimensions:
- - lat
- - lon
----
-# Tile processors configuration
-ningester:
- tile_processors:
- - pythonChainProcessor
- - generateTileId
- - addDatasetName
- pythonChainProcessor:
- enabled:
- processor_list:
- -
- name: GridReadingProcessor
- config:
- latitude: lat
- longitude: lon
- time: time
- variable_to_read: analysed_sst
- -
- name: EmptyTileFilter
- -
- name: TileSummarizingProcessor
- config:
- stored_var_name: analysed_sst
- generateTileId:
- enabled:
- salt: analysed_sst
- addDatasetName:
- enabled:
- datasetName: avhrr-oi-analysed-sst
----
-# Tile writer configuration
-ningester:
- tile_writer:
- data_store: cassandraStore
- metadata_store: solrStore
diff --git a/collection_manager/tests/history_manager/__init__.py
b/collection_manager/tests/services/history_manager/__init__.py
similarity index 100%
rename from collection_manager/tests/history_manager/__init__.py
rename to collection_manager/tests/services/history_manager/__init__.py
diff --git
a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
new file mode 100644
index 0000000..d2ad45c
--- /dev/null
+++
b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
@@ -0,0 +1,56 @@
+import os
+import pathlib
+import tempfile
+import unittest
+
+from collection_manager.services.history_manager import FileIngestionHistory,
md5sum_from_filepath
+
+DATASET_ID = "zobi_la_mouche"
+
+
+class TestFileIngestionHistory(unittest.TestCase):
+
+ def test_get_md5sum(self):
+ with tempfile.TemporaryDirectory() as history_dir:
+ ingestion_history = FileIngestionHistory(history_dir, DATASET_ID,
md5sum_from_filepath)
+ ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
+ result = ingestion_history._get_signature("blue")
+ self.assertEqual(result, "12weeukrhbwerqu7wier")
+
+ def test_get_missing_md5sum(self):
+ with tempfile.TemporaryDirectory() as history_dir:
+ ingestion_history = FileIngestionHistory(history_dir, DATASET_ID,
md5sum_from_filepath)
+ ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
+ result = ingestion_history._get_signature("green")
+ self.assertIsNone(result)
+
+ def test_already_ingested(self):
+ with tempfile.TemporaryDirectory() as history_dir:
+ ingestion_history = FileIngestionHistory(history_dir, DATASET_ID,
md5sum_from_filepath)
+ # history_manager with this file
+ current_file_path = pathlib.Path(__file__)
+ ingestion_history.push(str(current_file_path))
+
self.assertTrue(ingestion_history.already_ingested(str(current_file_path)))
+
+ del ingestion_history
+
+ def test_already_ingested_with_latest_modifcation_signature(self):
+ with tempfile.TemporaryDirectory() as history_dir:
+ ingestion_history = FileIngestionHistory(history_dir, DATASET_ID,
os.path.getmtime)
+ # history_manager with this file
+ current_file_path = pathlib.Path(__file__)
+ ingestion_history.push(str(current_file_path))
+
self.assertTrue(ingestion_history.already_ingested(str(current_file_path)))
+
+ del ingestion_history
+
+ def test_already_ingested_is_false(self):
+ with tempfile.TemporaryDirectory() as history_dir:
+ ingestion_history = FileIngestionHistory(history_dir, DATASET_ID,
md5sum_from_filepath)
+ # history_manager with this file
+ current_file_path = pathlib.Path(__file__)
+
self.assertFalse(ingestion_history.already_ingested(str(current_file_path)))
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git
a/collection_manager/tests/history_manager/test_datasetingestionhistorysolr.py
b/collection_manager/tests/services/history_manager/test_SolrIngestionHistory.py
similarity index 94%
rename from
collection_manager/tests/history_manager/test_datasetingestionhistorysolr.py
rename to
collection_manager/tests/services/history_manager/test_SolrIngestionHistory.py
index 57b2bd1..deab42d 100644
---
a/collection_manager/tests/history_manager/test_datasetingestionhistorysolr.py
+++
b/collection_manager/tests/services/history_manager/test_SolrIngestionHistory.py
@@ -5,7 +5,8 @@ SOLR_URL = "http://localhost:8984/solr"
DATASET_ID = "zobi_la_mouche"
-class DatasetIngestionHistorySolrTestCase(unittest.TestCase):
+# TODO: mock solr and fix these tests
+class TestSolrIngestionHistory(unittest.TestCase):
@unittest.skip("does not work without a solr server for history_manager")
def test_get(self):
ingestion_history = SolrIngestionHistory(SOLR_URL, DATASET_ID)
diff --git a/collection_manager/tests/services/test_CollectionProcessor.py
b/collection_manager/tests/services/test_CollectionProcessor.py
index 73b67ba..7899e22 100644
--- a/collection_manager/tests/services/test_CollectionProcessor.py
+++ b/collection_manager/tests/services/test_CollectionProcessor.py
@@ -1,10 +1,11 @@
import tempfile
-from unittest import mock
import unittest
+from unittest import mock
from collection_manager.entities import Collection
from collection_manager.services import CollectionProcessor
-from collection_manager.services.history_manager import
FileIngestionHistoryBuilder, FileIngestionHistory
+from collection_manager.services.history_manager import
FileIngestionHistoryBuilder
+from collection_manager.services.history_manager import GranuleStatus
class TestCollectionProcessor(unittest.TestCase):
@@ -18,15 +19,17 @@ class TestCollectionProcessor(unittest.TestCase):
def test_file_supported_with_foo(self):
self.assertFalse(CollectionProcessor._file_supported("test_dir/test_granule.foo"))
- def test_get_history_manager_returns_same_object(self):
+ @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
+ def test_get_history_manager_returns_same_object(self, mock_publisher):
with tempfile.TemporaryDirectory() as history_dir:
- collection_processor = CollectionProcessor(None,
FileIngestionHistoryBuilder(history_dir))
+ collection_processor = CollectionProcessor(mock_publisher,
FileIngestionHistoryBuilder(history_dir))
history_manager =
collection_processor._get_history_manager('dataset_id')
self.assertIs(collection_processor._get_history_manager('dataset_id'),
history_manager)
- def test_get_history_manager_returns_different_object(self):
+ @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
+ def test_get_history_manager_returns_different_object(self,
mock_publisher):
with tempfile.TemporaryDirectory() as history_dir:
- collection_processor = CollectionProcessor(None,
FileIngestionHistoryBuilder(history_dir))
+ collection_processor = CollectionProcessor(mock_publisher,
FileIngestionHistoryBuilder(history_dir))
history_manager = collection_processor._get_history_manager('foo')
self.assertIsNot(collection_processor._get_history_manager('bar'),
history_manager)
@@ -60,7 +63,106 @@ class TestCollectionProcessor(unittest.TestCase):
filled = CollectionProcessor._fill_template(collection, template)
self.assertEqual(filled, expected)
- @mock.patch.object(FileIngestionHistory, 'push')
- @mock.patch.object(FileIngestionHistory, 'get_granule_status')
- def test_process_granule(self):
- history_manager_builder = FileIngestionHistoryBuilder('/foo')
+
@mock.patch('collection_manager.services.history_manager.FileIngestionHistory',
autospec=True)
+
@mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder',
autospec=True)
+ @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
+ def test_process_granule_with_historical_granule(self, mock_publisher,
mock_history_builder, mock_history):
+ mock_history.get_granule_status.return_value =
GranuleStatus.DESIRED_HISTORICAL
+ mock_history_builder.build.return_value = mock_history
+
+ collection_processor = CollectionProcessor(mock_publisher,
mock_history_builder)
+ collection = Collection(dataset_id="test_dataset",
+ path="test_path",
+ variable="test_variable",
+ historical_priority=1,
+ forward_processing_priority=2,
+ date_from=None,
+ date_to=None)
+
+ collection_processor.process_granule("test.nc", collection)
+
+ mock_publisher.publish_message.assert_called_with(body=mock.ANY,
priority=1)
+ mock_history.push.assert_called()
+
+
@mock.patch('collection_manager.services.history_manager.FileIngestionHistory',
autospec=True)
+
@mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder',
autospec=True)
+ @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
+ def test_process_granule_with_forward_processing_granule(self,
mock_publisher, mock_history_builder, mock_history):
+ mock_history.get_granule_status.return_value =
GranuleStatus.DESIRED_FORWARD_PROCESSING
+ mock_history_builder.build.return_value = mock_history
+
+ collection_processor = CollectionProcessor(mock_publisher,
mock_history_builder)
+ collection = Collection(dataset_id="test_dataset",
+ path="test_path",
+ variable="test_variable",
+ historical_priority=1,
+ forward_processing_priority=2,
+ date_from=None,
+ date_to=None)
+
+ collection_processor.process_granule("test.h5", collection)
+
+ mock_publisher.publish_message.assert_called_with(body=mock.ANY,
priority=2)
+ mock_history.push.assert_called()
+
+
@mock.patch('collection_manager.services.history_manager.FileIngestionHistory',
autospec=True)
+
@mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder',
autospec=True)
+ @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
+ def
test_process_granule_with_forward_processing_granule_and_no_priority(self,
mock_publisher,
+
mock_history_builder, mock_history):
+ mock_history.get_granule_status.return_value =
GranuleStatus.DESIRED_FORWARD_PROCESSING
+ mock_history_builder.build.return_value = mock_history
+
+ collection_processor = CollectionProcessor(mock_publisher,
mock_history_builder)
+ collection = Collection(dataset_id="test_dataset",
+ path="test_path",
+ variable="test_variable",
+ historical_priority=1,
+ date_from=None,
+ date_to=None)
+
+ collection_processor.process_granule("test.h5", collection)
+
+ mock_publisher.publish_message.assert_called_with(body=mock.ANY,
priority=1)
+ mock_history.push.assert_called()
+
+
@mock.patch('collection_manager.services.history_manager.FileIngestionHistory',
autospec=True)
+
@mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder',
autospec=True)
+ @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
+ def test_process_granule_with_undesired_granule(self, mock_publisher,
mock_history_builder, mock_history):
+ mock_history.get_granule_status.return_value = GranuleStatus.UNDESIRED
+ mock_history_builder.build.return_value = mock_history
+
+ collection_processor = CollectionProcessor(mock_publisher,
mock_history_builder)
+ collection = Collection(dataset_id="test_dataset",
+ path="test_path",
+ variable="test_variable",
+ historical_priority=1,
+ forward_processing_priority=2,
+ date_from=None,
+ date_to=None)
+
+ collection_processor.process_granule("test.nc", collection)
+
+ mock_publisher.publish_message.assert_not_called()
+ mock_history.push.assert_not_called()
+
+
@mock.patch('collection_manager.services.history_manager.FileIngestionHistory',
autospec=True)
+
@mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder',
autospec=True)
+ @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
+ def test_process_granule_with_unsupported_file_type(self, mock_publisher,
mock_history_builder, mock_history):
+ mock_history_builder.build.return_value = mock_history
+
+ collection_processor = CollectionProcessor(mock_publisher,
mock_history_builder)
+ collection = Collection(dataset_id="test_dataset",
+ path="test_path",
+ variable="test_variable",
+ historical_priority=1,
+ forward_processing_priority=2,
+ date_from=None,
+ date_to=None)
+
+ collection_processor.process_granule("test.foo", collection)
+
+ mock_publisher.publish_message.assert_not_called()
+ mock_history.push.assert_not_called()
diff --git a/collection_manager/tests/services/test_CollectionWatcher.py
b/collection_manager/tests/services/test_CollectionWatcher.py
new file mode 100644
index 0000000..7ae25a1
--- /dev/null
+++ b/collection_manager/tests/services/test_CollectionWatcher.py
@@ -0,0 +1,203 @@
+import os
+import tempfile
+import time
+import unittest
+from datetime import datetime
+from unittest.mock import Mock
+
+from collection_manager.entities import Collection
+from collection_manager.entities.exceptions import YamlParsingError,
CollectionConfigFileNotFoundError, \
+ RelativePathCollectionError, ConflictingPathCollectionError
+from collection_manager.services import CollectionWatcher
+
+
+class TestCollectionWatcher(unittest.TestCase):
+
+ def test_collections_returns_all_collections(self):
+ collection_watcher = CollectionWatcher('/foo', Mock(), Mock())
+ collection_watcher._collections_by_dir = {
+ "/foo": {
+ Collection("id1", "var1", "path1", 1, 2, datetime.now(),
datetime.now()),
+ Collection("id2", "var2", "path2", 3, 4, datetime.now(),
datetime.now()),
+ },
+ "/bar": {
+ Collection("id3", "var3", "path3", 5, 6, datetime.now(),
datetime.now()),
+ Collection("id4", "var4", "path4", 7, 8, datetime.now(),
datetime.now()),
+ }
+ }
+ flattened_collections = collection_watcher.collections()
+ self.assertEqual(len(flattened_collections), 4)
+
+ def test_load_collections_loads_all_collections(self):
+ collections_path = os.path.join(os.path.dirname(__file__),
'../resources/collections.yml')
+ collection_watcher = CollectionWatcher(collections_path, Mock(),
Mock())
+ collection_watcher._load_collections()
+
+ self.assertEqual(len(collection_watcher._collections_by_dir), 2)
+
self.assertEqual(len(collection_watcher._collections_by_dir['/opt/data/grace']),
2)
+
self.assertEqual(len(collection_watcher._collections_by_dir['/opt/data/avhrr']),
1)
+
+ def test_load_collections_with_bad_yaml_syntax(self):
+ collections_path = os.path.join(os.path.dirname(__file__),
'../resources/collections_bad.yml')
+ collection_watcher = CollectionWatcher(collections_path, Mock(),
Mock())
+
+ self.assertRaises(YamlParsingError,
collection_watcher._load_collections)
+
+ def test_load_collections_with_file_not_found(self):
+ collections_path = os.path.join(os.path.dirname(__file__),
'../resources/does_not_exist.yml')
+ collection_watcher = CollectionWatcher(collections_path, Mock(),
Mock())
+
+ self.assertRaises(CollectionConfigFileNotFoundError,
collection_watcher._load_collections)
+
+ def test_get_updated_collections_returns_all_collections(self):
+ collections_path = os.path.join(os.path.dirname(__file__),
'../resources/collections.yml')
+ collection_watcher = CollectionWatcher(collections_path, Mock(),
Mock())
+
+ updated_collections = collection_watcher._get_updated_collections()
+ self.assertSetEqual(updated_collections,
collection_watcher.collections())
+
+ def test_get_updated_collections_returns_no_collections(self):
+ collections_path = os.path.join(os.path.dirname(__file__),
'../resources/collections.yml')
+ collection_watcher = CollectionWatcher(collections_path, Mock(),
Mock())
+ collection_watcher._load_collections()
+ updated_collections = collection_watcher._get_updated_collections()
+
+ self.assertEqual(len(updated_collections), 0)
+
+ def test_get_updated_collections_returns_some_collections(self):
+ collections_path = os.path.join(os.path.dirname(__file__),
'../resources/collections.yml')
+ collection_watcher = CollectionWatcher(collections_path, Mock(),
Mock())
+ collection_watcher._load_collections()
+
+ new_collections_path = os.path.join(os.path.dirname(__file__),
'../resources/collections_alternate.yml')
+ collection_watcher._collections_path = new_collections_path
+ updated_collections = collection_watcher._get_updated_collections()
+
+ self.assertEqual(len(updated_collections), 1)
+
+ def test_validate_collection(self):
+ collections_path = os.path.join(os.path.dirname(__file__),
'../resources/collections.yml')
+ collection_watcher = CollectionWatcher(collections_path, Mock(),
Mock())
+
+ collection = Collection(dataset_id="test_dataset",
+ path="/absolute/path",
+ variable="test_variable",
+ historical_priority=1,
+ forward_processing_priority=2,
+ date_from=None,
+ date_to=None)
+ collection_watcher._validate_collection(collection)
+
+ def test_validate_collection_with_relative_path(self):
+ collections_path = os.path.join(os.path.dirname(__file__),
'../resources/collections.yml')
+ collection_watcher = CollectionWatcher(collections_path, Mock(),
Mock())
+
+ collection = Collection(dataset_id="test_dataset",
+ path="relative/path",
+ variable="test_variable",
+ historical_priority=1,
+ forward_processing_priority=2,
+ date_from=None,
+ date_to=None)
+ self.assertRaises(RelativePathCollectionError,
collection_watcher._validate_collection, collection)
+
+ def test_validate_collection_with_conflicting_path(self):
+ collections_path = os.path.join(os.path.dirname(__file__),
'/resources/collections.yml')
+ collection_watcher = CollectionWatcher(collections_path, Mock(),
Mock())
+
+ collection = Collection(dataset_id="test_dataset",
+ path="/resources/*.nc",
+ variable="test_variable",
+ historical_priority=1,
+ forward_processing_priority=2,
+ date_from=None,
+ date_to=None)
+ self.assertRaises(ConflictingPathCollectionError,
collection_watcher._validate_collection, collection)
+
+ def test_collection_callback_is_called(self):
+ collections_config = tempfile.NamedTemporaryFile("w+b", buffering=0,
delete=False)
+ granule_dir = tempfile.TemporaryDirectory()
+ collections_str = f"""collections:
+- id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
+ path: {granule_dir.name}
+ variable: lwe_thickness
+ priority: 1
+ forward-processing-priority: 5"""
+ collections_config.write(collections_str.encode("utf-8"))
+
+ collection_callback = Mock()
+ collection_watcher = CollectionWatcher(collections_config.name,
collection_callback, Mock())
+ collection_watcher.start_watching()
+
+ collections_str = f"""
+- id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
+ path: {granule_dir.name}
+ variable: lwe_thickness
+ priority: 10
+ forward-processing-priority: 5
+ """
+ collections_config.write(collections_str.encode("utf-8"))
+ collections_config.close()
+
+ self.assert_called_within_timeout(collection_callback, timeout_sec=1,
call_count=2)
+ granule_dir.cleanup()
+ os.remove(collections_config.name)
+
+ def test_granule_callback_is_called_on_new_file(self):
+ with tempfile.NamedTemporaryFile("w+b", buffering=0) as
collections_config:
+ granule_dir = tempfile.TemporaryDirectory()
+ collections_str = f"""
+collections:
+- id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
+ path: {granule_dir.name}
+ variable: lwe_thickness
+ priority: 1
+ forward-processing-priority: 5
+ """
+ collections_config.write(collections_str.encode("utf-8"))
+
+ granule_callback = Mock()
+ collection_watcher = CollectionWatcher(collections_config.name,
Mock(), granule_callback)
+ collection_watcher.start_watching()
+
+ new_granule = open(os.path.join(granule_dir.name, 'test.nc'), "w+")
+
+ self.assert_called_within_timeout(granule_callback)
+
+ new_granule.close()
+ granule_dir.cleanup()
+
+ def test_granule_callback_is_called_on_modified_file(self):
+ with tempfile.NamedTemporaryFile("w+b", buffering=0) as
collections_config:
+ granule_dir = tempfile.TemporaryDirectory()
+ collections_str = f"""
+collections:
+- id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
+ path: {granule_dir.name}
+ variable: lwe_thickness
+ priority: 1
+ forward-processing-priority: 5
+ """
+ collections_config.write(collections_str.encode("utf-8"))
+ new_granule = open(os.path.join(granule_dir.name, 'test.nc'), "w+")
+
+ granule_callback = Mock()
+ collection_watcher = CollectionWatcher(collections_config.name,
Mock(), granule_callback)
+ collection_watcher.start_watching()
+
+ new_granule.write("hello world")
+ new_granule.close()
+
+ self.assert_called_within_timeout(granule_callback)
+
+ granule_dir.cleanup()
+
+ @staticmethod
+ def assert_called_within_timeout(mock_func, timeout_sec=1.0, call_count=1):
+ start = datetime.now()
+
+ while (datetime.now() - start).total_seconds() < timeout_sec:
+ time.sleep(0.01)
+ if mock_func.call_count >= call_count:
+ return
+ raise AssertionError(f"{mock_func} did not reach {call_count} calls
called within {timeout_sec} sec")