This is an automated email from the ASF dual-hosted git repository.

tloubrieu pushed a commit to branch ascending_latitudes
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 36b87d0be6d6184fac63ab3556445ebe3e6295c2
Author: Eamon Ford <[email protected]>
AuthorDate: Thu Jun 18 15:53:34 2020 -0700

    SDAP-234: Add more test coverage. (#1)
    
    Co-authored-by: Eamon Ford <[email protected]>
---
 .../collection_manager/entities/Collection.py      |  40 ++--
 .../collection_manager/entities/__init__.py        |   3 +-
 .../entities/exceptions/Exceptions.py              |  30 +++
 .../entities/exceptions/__init__.py                |   6 +
 collection_manager/collection_manager/main.py      |  58 +++---
 .../services/CollectionProcessor.py                |   7 +-
 .../services/CollectionWatcher.py                  | 103 +++++++----
 .../{history_manager => entities}/__init__.py      |   0
 .../tests/entities/test_Collection.py              | 139 ++++++++++++++
 .../test_datasetingestionhistoryfile.py            |  64 -------
 collection_manager/tests/resources/collections.yml |  17 ++
 .../tests/resources/collections_alternate.yml      |  17 ++
 .../tests/resources/collections_bad.yml            |  17 ++
 .../tests/resources/data/collections.yml           |  11 --
 .../resources/data/dataset_config_file_ok.yml      |  44 -----
 .../{ => services}/history_manager/__init__.py     |   0
 .../history_manager/test_FileIngestionHistory.py   |  56 ++++++
 .../history_manager/test_SolrIngestionHistory.py}  |   3 +-
 .../tests/services/test_CollectionProcessor.py     | 122 ++++++++++++-
 .../tests/services/test_CollectionWatcher.py       | 203 +++++++++++++++++++++
 20 files changed, 734 insertions(+), 206 deletions(-)

diff --git a/collection_manager/collection_manager/entities/Collection.py 
b/collection_manager/collection_manager/entities/Collection.py
index d033c69..3976b6d 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -1,33 +1,39 @@
 import os
+from dataclasses import dataclass
 from datetime import datetime
 from fnmatch import fnmatch
 from glob import glob
-from typing import NamedTuple, List
+from typing import List, Optional
 
+from collection_manager.entities.exceptions import MissingValueCollectionError
 
-class Collection(NamedTuple):
+
+@dataclass(frozen=True)
+class Collection:
     dataset_id: str
     variable: str
     path: str
     historical_priority: int
-    forward_processing_priority: int
-    date_from: datetime
-    date_to: datetime
+    forward_processing_priority: Optional[int] = None
+    date_from: Optional[datetime] = None
+    date_to: Optional[datetime] = None
 
     @staticmethod
     def from_dict(properties: dict):
-        date_to = datetime.fromisoformat(properties['to']) if 'to' in 
properties else None
-        date_from = datetime.fromisoformat(properties['from']) if 'from' in 
properties else None
-
-        collection = Collection(dataset_id=properties['id'],
-                                variable=properties['variable'],
-                                path=properties['path'],
-                                historical_priority=properties['priority'],
-                                
forward_processing_priority=properties.get('forward_processing_priority',
-                                                                           
properties['priority']),
-                                date_to=date_to,
-                                date_from=date_from)
-        return collection
+        try:
+            date_to = datetime.fromisoformat(properties['to']) if 'to' in 
properties else None
+            date_from = datetime.fromisoformat(properties['from']) if 'from' 
in properties else None
+
+            collection = Collection(dataset_id=properties['id'],
+                                    variable=properties['variable'],
+                                    path=properties['path'],
+                                    historical_priority=properties['priority'],
+                                    
forward_processing_priority=properties.get('forward-processing-priority', None),
+                                    date_to=date_to,
+                                    date_from=date_from)
+            return collection
+        except KeyError as e:
+            raise MissingValueCollectionError(missing_value=e.args[0])
 
     def directory(self):
         if os.path.isdir(self.path):
diff --git a/collection_manager/collection_manager/entities/__init__.py 
b/collection_manager/collection_manager/entities/__init__.py
index 9f30603..165341b 100644
--- a/collection_manager/collection_manager/entities/__init__.py
+++ b/collection_manager/collection_manager/entities/__init__.py
@@ -1,2 +1 @@
-
-from .Collection import Collection
\ No newline at end of file
+from .Collection import Collection
diff --git 
a/collection_manager/collection_manager/entities/exceptions/Exceptions.py 
b/collection_manager/collection_manager/entities/exceptions/Exceptions.py
new file mode 100644
index 0000000..8e63d24
--- /dev/null
+++ b/collection_manager/collection_manager/entities/exceptions/Exceptions.py
@@ -0,0 +1,30 @@
+class RelativePathError(Exception):
+    pass
+
+
+class YamlParsingError(Exception):
+    pass
+
+
+class CollectionConfigFileNotFoundError(Exception):
+    pass
+
+
+class CollectionError(Exception):
+    def __init__(self, collection=None, message=None):
+        super().__init__(message)
+        self.collection = collection
+
+
+class MissingValueCollectionError(CollectionError):
+    def __init__(self, missing_value, collection=None, message=None):
+        super().__init__(collection, message)
+        self.missing_value = missing_value
+
+
+class ConflictingPathCollectionError(CollectionError):
+    pass
+
+
+class RelativePathCollectionError(CollectionError):
+    pass
diff --git 
a/collection_manager/collection_manager/entities/exceptions/__init__.py 
b/collection_manager/collection_manager/entities/exceptions/__init__.py
new file mode 100644
index 0000000..9a22c16
--- /dev/null
+++ b/collection_manager/collection_manager/entities/exceptions/__init__.py
@@ -0,0 +1,6 @@
+from .Exceptions import CollectionConfigFileNotFoundError
+from .Exceptions import ConflictingPathCollectionError
+from .Exceptions import MissingValueCollectionError
+from .Exceptions import RelativePathCollectionError
+from .Exceptions import RelativePathError
+from .Exceptions import YamlParsingError
diff --git a/collection_manager/collection_manager/main.py 
b/collection_manager/collection_manager/main.py
index fa4d7ce..a10446f 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -1,5 +1,6 @@
 import argparse
 import logging
+import os
 import time
 
 from collection_manager.services import CollectionProcessor, 
CollectionWatcher, MessagePublisher
@@ -7,7 +8,13 @@ from collection_manager.services.history_manager import 
SolrIngestionHistoryBuil
 
 logging.basicConfig(level=logging.INFO)
 logging.getLogger("pika").setLevel(logging.WARNING)
-logger = logging.getLogger(__name__)
+logger = logging.getLogger("collection_manager")
+
+
+def check_path(path) -> str:
+    if not os.path.isabs(path):
+        raise argparse.ArgumentError("Paths must be absolute.")
+    return path
 
 
 def get_args() -> argparse.Namespace:
@@ -16,7 +23,7 @@ def get_args() -> argparse.Namespace:
                         help="refresh interval in seconds to check for new or 
updated granules",
                         default=300)
     parser.add_argument("--collections",
-                        help="path to collections configuration file",
+                        help="Absolute path to collections configuration file",
                         required=True)
     parser.add_argument('--rabbitmq_host',
                         default='localhost',
@@ -36,32 +43,39 @@ def get_args() -> argparse.Namespace:
                         help='Name of the RabbitMQ queue to consume from. 
(Default: "nexus")')
     history_group = parser.add_mutually_exclusive_group(required=True)
     history_group.add_argument("--history-path",
-                               help="path to ingestion history local 
directory")
+                               help="Absolute path to ingestion history local 
directory")
     history_group.add_argument("--history-url",
-                               help="url to ingestion history solr database")
+                               help="URL to ingestion history solr database")
+
     return parser.parse_args()
 
 
 def main():
-    options = get_args()
-    if options.history_path:
-        history_manager_builder = 
FileIngestionHistoryBuilder(history_path=options.history_path)
-    else:
-        history_manager_builder = 
SolrIngestionHistoryBuilder(solr_url=options.history_url)
-    publisher = MessagePublisher(host=options.rabbitmq_host,
-                                 username=options.rabbitmq_username,
-                                 password=options.rabbitmq_password,
-                                 queue=options.rabbitmq_queue)
-    publisher.connect()
-    collection_processor = CollectionProcessor(message_publisher=publisher,
-                                               
history_manager_builder=history_manager_builder)
-    collection_watcher = 
CollectionWatcher(collections_path=options.collections,
-                                           
collection_updated_callback=collection_processor.process_collection,
-                                           
granule_updated_callback=collection_processor.process_granule)
+    try:
+        options = get_args()
+
+        if options.history_path:
+            history_manager_builder = 
FileIngestionHistoryBuilder(history_path=options.history_path)
+        else:
+            history_manager_builder = 
SolrIngestionHistoryBuilder(solr_url=options.history_url)
+        publisher = MessagePublisher(host=options.rabbitmq_host,
+                                     username=options.rabbitmq_username,
+                                     password=options.rabbitmq_password,
+                                     queue=options.rabbitmq_queue)
+        publisher.connect()
+        collection_processor = CollectionProcessor(message_publisher=publisher,
+                                                   
history_manager_builder=history_manager_builder)
+        collection_watcher = 
CollectionWatcher(collections_path=options.collections,
+                                               
collection_updated_callback=collection_processor.process_collection,
+                                               
granule_updated_callback=collection_processor.process_granule)
+
+        collection_watcher.start_watching()
+        while True:
+            time.sleep(1)
 
-    collection_watcher.start_watching()
-    while True:
-        time.sleep(1)
+    except Exception as e:
+        logger.error(e)
+        return
 
 
 if __name__ == "__main__":
diff --git 
a/collection_manager/collection_manager/services/CollectionProcessor.py 
b/collection_manager/collection_manager/services/CollectionProcessor.py
index 75a86e2..a81390b 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -50,7 +50,10 @@ class CollectionProcessor:
         if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING:
             logger.info(f"New granule '{granule}' detected for 
forward-processing ingestion "
                         f"in collection '{collection.dataset_id}'.")
-            use_priority = collection.forward_processing_priority
+            if collection.forward_processing_priority is not None:
+                use_priority = collection.forward_processing_priority
+            else:
+                use_priority = collection.historical_priority
         elif granule_status is GranuleStatus.DESIRED_HISTORICAL:
             logger.info(f"New granule '{granule}' detected for historical 
ingestion in collection "
                         f"'{collection.dataset_id}'.")
@@ -61,7 +64,7 @@ class CollectionProcessor:
             return
 
         dataset_config = self._fill_template(collection, 
config_template=self._config_template)
-        self._publisher.publish_message(dataset_config, use_priority)
+        self._publisher.publish_message(body=dataset_config, 
priority=use_priority)
         history_manager.push(granule)
 
     @staticmethod
diff --git 
a/collection_manager/collection_manager/services/CollectionWatcher.py 
b/collection_manager/collection_manager/services/CollectionWatcher.py
index b1fca64..6bbe7d9 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,7 +1,7 @@
 import logging
 import os
 from collections import defaultdict
-from typing import List, Dict, Callable, Set
+from typing import Dict, Callable, Set
 
 import yaml
 from watchdog.events import FileSystemEventHandler
@@ -9,6 +9,9 @@ from watchdog.observers import Observer
 from yaml.scanner import ScannerError
 
 from collection_manager.entities import Collection
+from collection_manager.entities.exceptions import RelativePathError, 
YamlParsingError, \
+    CollectionConfigFileNotFoundError, MissingValueCollectionError, 
ConflictingPathCollectionError, \
+    RelativePathCollectionError
 
 logger = logging.getLogger(__name__)
 logger.setLevel(logging.DEBUG)
@@ -19,71 +22,96 @@ class CollectionWatcher:
                  collections_path: str,
                  collection_updated_callback: Callable[[Collection], any],
                  granule_updated_callback: Callable[[str, Collection], any]):
+        if not os.path.isabs(collections_path):
+            raise RelativePathError("Collections config  path must be an 
absolute path.")
+
         self._collections_path = collections_path
         self._collection_updated_callback = collection_updated_callback
         self._granule_updated_callback = granule_updated_callback
-        
+
         self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
         self._observer = Observer()
 
+        self._granule_watches = set()
+
     def start_watching(self):
         """
         Start observing filesystem events for added/modified granules or 
changes to the Collections configuration file.
         When an event occurs, call the appropriate callback that was passed in 
during instantiation.
         :return: None
         """
-        
self._observer.schedule(_CollectionEventHandler(file_path=self._collections_path,
 callback=self._refresh),
-                                os.path.dirname(self._collections_path))
+        self._observer.schedule(
+            _CollectionEventHandler(file_path=self._collections_path, 
callback=self._reload_and_reschedule),
+            os.path.dirname(self._collections_path))
         self._observer.start()
-        self._refresh()
+        self._reload_and_reschedule()
 
-    def collections(self) -> List[Collection]:
+    def collections(self) -> Set[Collection]:
         """
-        Return a list of all Collections being watched.
-        :return: A list of Collections
+        Return a set of all Collections being watched.
+        :return: A set of Collections
         """
-        return [collection for collections in 
self._collections_by_dir.values() for collection in collections]
+        return {collection for collections in 
self._collections_by_dir.values() for collection in collections}
+
+    def _validate_collection(self, collection: Collection):
+        directory = collection.directory()
+        if not os.path.isabs(directory):
+            raise RelativePathCollectionError(collection=collection)
+        if directory == os.path.dirname(self._collections_path):
+            raise ConflictingPathCollectionError(collection=collection)
 
     def _load_collections(self):
         try:
             with open(self._collections_path, 'r') as f:
                 collections_yaml = yaml.load(f, Loader=yaml.FullLoader)
             self._collections_by_dir.clear()
-            for _, collection_dict in collections_yaml.items():
-                collection = Collection.from_dict(collection_dict)
-                directory = collection.directory()
-                if directory == os.path.dirname(self._collections_path):
-                    logger.error(f"Collection {collection.dataset_id} uses 
granule directory {collection.path} "
-                                 f"which is the same directory as the 
collection configuration file, "
-                                 f"{self._collections_path}. The granules need 
to be in their own directory. "
-                                 f"Ignoring collection {collection.dataset_id} 
for now.")
-                else:
-                    self._collections_by_dir[directory].add(collection)
-
+            for collection_dict in collections_yaml['collections']:
+                try:
+                    collection = Collection.from_dict(collection_dict)
+                    self._validate_collection(collection)
+                    
self._collections_by_dir[collection.directory()].add(collection)
+                except MissingValueCollectionError as e:
+                    logger.error(f"A collection is missing 
'{e.missing_value}'. Ignoring this collection for now.")
+                except RelativePathCollectionError as e:
+                    logger.error(f"Relative paths are not allowed for the 
'path' property of a collection. "
+                                 f"Ignoring collection 
'{e.collection.dataset_id}' until its path is fixed.")
+                except ConflictingPathCollectionError as e:
+                    logger.error(f"Collection '{e.collection.dataset_id}' has 
granule path '{e.collection.path}' "
+                                 f"which uses same directory as the collection 
configuration file, "
+                                 f"'{self._collections_path}'. The granules 
need to be in their own directory. "
+                                 f"Ignoring collection 
'{e.collection.dataset_id}' for now.")
         except FileNotFoundError:
-            logger.error(f"Collection configuration file not found at 
{self._collections_path}.")
+            raise CollectionConfigFileNotFoundError("The collection config 
file could not be found at "
+                                                    
f"{self._collections_path}")
         except yaml.scanner.ScannerError:
-            logger.error(f"Bad YAML syntax in collection configuration file. 
Will attempt to reload collections "
-                         f"after the next configuration change.")
-
-    def _refresh(self):
-        for collection in self._get_updated_collections():
-            self._collection_updated_callback(collection)
+            raise YamlParsingError("Bad YAML syntax in collection 
configuration file. Will attempt to reload "
+                                   "collections after the next configuration 
change.")
 
-        self._observer.unschedule_all()
-        self._schedule_watches()
-
-    def _get_updated_collections(self) -> List[Collection]:
+    def _get_updated_collections(self) -> Set[Collection]:
         old_collections = self.collections()
         self._load_collections()
-        return list(set(self.collections()) - set(old_collections))
+        return self.collections() - old_collections
+
+    def _reload_and_reschedule(self):
+        try:
+            for collection in self._get_updated_collections():
+                self._collection_updated_callback(collection)
+            self._unschedule_watches()
+            self._schedule_watches()
+        except YamlParsingError as e:
+            logger.error(e)
+
+    def _unschedule_watches(self):
+        for watch in self._granule_watches:
+            self._observer.unschedule(watch)
+        self._granule_watches.clear()
 
     def _schedule_watches(self):
         for directory, collections in self._collections_by_dir.items():
             granule_event_handler = 
_GranuleEventHandler(self._granule_updated_callback, collections)
             # Note: the Watchdog library does not schedule a new watch
             # if one is already scheduled for the same directory
-            self._observer.schedule(granule_event_handler, directory)
+            
self._granule_watches.add(self._observer.schedule(granule_event_handler, 
directory))
 
 
 class _CollectionEventHandler(FileSystemEventHandler):
@@ -115,3 +143,12 @@ class _GranuleEventHandler(FileSystemEventHandler):
         for collection in self._collections_for_dir:
             if collection.owns_file(event.src_path):
                 self._callback(event.src_path, collection)
+
+    def on_modified(self, event):
+        super().on_modified(event)
+        if os.path.isdir(event.src_path):
+            return
+
+        for collection in self._collections_for_dir:
+            if collection.owns_file(event.src_path):
+                self._callback(event.src_path, collection)
diff --git a/collection_manager/tests/history_manager/__init__.py 
b/collection_manager/tests/entities/__init__.py
similarity index 100%
copy from collection_manager/tests/history_manager/__init__.py
copy to collection_manager/tests/entities/__init__.py
diff --git a/collection_manager/tests/entities/test_Collection.py 
b/collection_manager/tests/entities/test_Collection.py
new file mode 100644
index 0000000..46506d4
--- /dev/null
+++ b/collection_manager/tests/entities/test_Collection.py
@@ -0,0 +1,139 @@
+import os
+import unittest
+from datetime import datetime, timezone
+
+from collection_manager.entities import Collection
+from collection_manager.entities.exceptions import MissingValueCollectionError
+
+
+class TestCollection(unittest.TestCase):
+
+    def test_directory_with_directory(self):
+        directory = os.path.join(os.path.dirname(__file__), 
"../resources/data")
+        collection = Collection(dataset_id="test_dataset",
+                                path=directory,
+                                variable="test_variable",
+                                historical_priority=1,
+                                forward_processing_priority=2,
+                                date_from=None,
+                                date_to=None)
+        self.assertEqual(directory, collection.directory())
+
+    def test_directory_with_pattern(self):
+        pattern = os.path.join(os.path.dirname(__file__), 
"../resources/data/*.nc")
+        collection = Collection(dataset_id="test_dataset",
+                                path=pattern,
+                                variable="test_variable",
+                                historical_priority=1,
+                                forward_processing_priority=2,
+                                date_from=None,
+                                date_to=None)
+        self.assertEqual(os.path.dirname(pattern), collection.directory())
+
+    def test_owns_file_raises_exception_with_directory(self):
+        directory = os.path.join(os.path.dirname(__file__), 
"../resources/data")
+        collection = Collection(dataset_id="test_dataset",
+                                path=directory,
+                                variable="test_variable",
+                                historical_priority=1,
+                                forward_processing_priority=2,
+                                date_from=None,
+                                date_to=None)
+        self.assertRaises(IsADirectoryError, collection.owns_file, directory)
+
+    def test_owns_file_matches(self):
+        directory = os.path.join(os.path.dirname(__file__), 
"../resources/data")
+        collection = Collection(dataset_id="test_dataset",
+                                path=directory,
+                                variable="test_variable",
+                                historical_priority=1,
+                                forward_processing_priority=2,
+                                date_from=None,
+                                date_to=None)
+        file_path = os.path.join(directory, "test_file.nc")
+        self.assertTrue(collection.owns_file(file_path))
+
+    def test_owns_file_does_not_match(self):
+        directory = os.path.join(os.path.dirname(__file__), 
"../resources/data")
+        collection = Collection(dataset_id="test_dataset",
+                                path=directory,
+                                variable="test_variable",
+                                historical_priority=1,
+                                forward_processing_priority=2,
+                                date_from=None,
+                                date_to=None)
+        self.assertFalse(collection.owns_file("test_file.nc"))
+
+    def test_owns_file_matches_pattern(self):
+        directory = os.path.join(os.path.dirname(__file__), 
"../resources/data")
+        pattern = os.path.join(directory, "test_*.nc")
+        collection = Collection(dataset_id="test_dataset",
+                                path=pattern,
+                                variable="test_variable",
+                                historical_priority=1,
+                                forward_processing_priority=2,
+                                date_from=None,
+                                date_to=None)
+        file_path = os.path.join(directory, "test_file.nc")
+        self.assertTrue(collection.owns_file(file_path))
+
+    def test_owns_file_does_not_match_pattern(self):
+        directory = os.path.join(os.path.dirname(__file__), 
"../resources/data")
+        pattern = os.path.join(directory, "test_*.nc")
+        collection = Collection(dataset_id="test_dataset",
+                                path=pattern,
+                                variable="test_variable",
+                                historical_priority=1,
+                                forward_processing_priority=2,
+                                date_from=None,
+                                date_to=None)
+        file_path = os.path.join(directory, "nonmatch.nc")
+        self.assertFalse(collection.owns_file(file_path))
+
+    def test_from_dict(self):
+        collection_dict = {
+            'id': 'test_id',
+            'variable': 'test_var',
+            'path': '/some/path',
+            'priority': 1,
+            'forward-processing-priority': 2,
+            'from': '2020-01-01T00:00:00+00:00',
+            'to': '2020-02-01T00:00:00+00:00'
+        }
+
+        expected_collection = Collection(dataset_id='test_id',
+                                         variable='test_var',
+                                         path='/some/path',
+                                         historical_priority=1,
+                                         forward_processing_priority=2,
+                                         date_from=datetime(2020, 1, 1, 0, 0, 
0, tzinfo=timezone.utc),
+                                         date_to=datetime(2020, 2, 1, 0, 0, 0, 
tzinfo=timezone.utc))
+
+        self.assertEqual(expected_collection, 
Collection.from_dict(collection_dict))
+
+    def test_from_dict_missing_optional_values(self):
+        collection_dict = {
+            'id': 'test_id',
+            'variable': 'test_var',
+            'path': '/some/path',
+            'priority': 3
+        }
+
+        expected_collection = Collection(dataset_id='test_id',
+                                         variable='test_var',
+                                         path='/some/path',
+                                         historical_priority=3,
+                                         forward_processing_priority=None,
+                                         date_from=None,
+                                         date_to=None)
+
+        self.assertEqual(expected_collection, 
Collection.from_dict(collection_dict))
+
+    def test_from_dict_missing_required_values(self):
+        collection_dict = {
+            'id': 'test_id',
+            'variable': 'test_var',
+            'path': '/some/path',
+        }
+
+        self.assertRaises(MissingValueCollectionError, Collection.from_dict, 
collection_dict)
diff --git 
a/collection_manager/tests/history_manager/test_datasetingestionhistoryfile.py 
b/collection_manager/tests/history_manager/test_datasetingestionhistoryfile.py
deleted file mode 100644
index 0edeafb..0000000
--- 
a/collection_manager/tests/history_manager/test_datasetingestionhistoryfile.py
+++ /dev/null
@@ -1,64 +0,0 @@
-import unittest
-import os
-import sys
-import pathlib
-from collection_manager.services.history_manager import FileIngestionHistory
-from collection_manager.services.history_manager import md5sum_from_filepath
-
-HISTORY_ROOT_PATH = os.path.join(sys.prefix,
-                                 ".collection_manager",
-                                 "tmp/history")
-DATASET_ID = "zobi_la_mouche"
-
-
-class DatasetIngestionHistoryFileTestCase(unittest.TestCase):
-    ingestion_history = None
-
-    # @unittest.skip("does not work without a solr server for history_manager")
-    def test_get_md5sum(self):
-        self.ingestion_history = FileIngestionHistory(HISTORY_ROOT_PATH, 
DATASET_ID, md5sum_from_filepath)
-        self.ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
-        result = self.ingestion_history._get_signature("blue")
-        self.assertEqual(result, "12weeukrhbwerqu7wier")
-
-    # @unittest.skip("does not work without a solr server for history_manager")
-    def test_get_missing_md5sum(self):
-        self.ingestion_history = FileIngestionHistory(HISTORY_ROOT_PATH, 
DATASET_ID, md5sum_from_filepath)
-        self.ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
-        result = self.ingestion_history._get_signature("green")
-        self.assertEqual(result, None)
-
-    def test_has_valid_cache(self):
-        self.ingestion_history = FileIngestionHistory(HISTORY_ROOT_PATH, 
DATASET_ID, md5sum_from_filepath)
-        # history_manager with this file
-        current_file_path = pathlib.Path(__file__)
-        self.ingestion_history.push(str(current_file_path))
-        
self.assertEqual(self.ingestion_history.already_ingested(str(current_file_path)),
 True)
-
-    def test_has_valid_cache_with_latest_modifcation_signature(self):
-        self.ingestion_history = FileIngestionHistory(HISTORY_ROOT_PATH, 
DATASET_ID, os.path.getmtime)
-        # history_manager with this file
-        current_file_path = pathlib.Path(__file__)
-        self.ingestion_history.push(str(current_file_path))
-        
self.assertEqual(self.ingestion_history.already_ingested(str(current_file_path)),
 True)
-
-    def test_has_not_valid_cache(self):
-        self.ingestion_history = FileIngestionHistory(HISTORY_ROOT_PATH, 
DATASET_ID, md5sum_from_filepath)
-        # history_manager with this file
-        current_file_path = pathlib.Path(__file__)
-        
self.assertEqual(self.ingestion_history.already_ingested(str(current_file_path)),
 False)
-
-    @unittest.skip("skip before history_manager dataset is not available")
-    def test_purge(self):
-        self.ingestion_history = 
FileIngestionHistory("/Users/loubrieu/PycharmProjects/collection_manager/venv/.collection_manager/tmp/history/",
-                                                             
"avhrr-oi-analysed-sst-toto",
-                                                      lambda x : 
str(os.path.getmtime(x)))
-        self.ingestion_history.purge()
-
-    def tearDown(self):
-        self.ingestion_history.reset_cache()
-        del self.ingestion_history
-
-
-if __name__ == '__main__':
-    unittest.main()
diff --git a/collection_manager/tests/resources/collections.yml 
b/collection_manager/tests/resources/collections.yml
new file mode 100644
index 0000000..89524ec
--- /dev/null
+++ b/collection_manager/tests/resources/collections.yml
@@ -0,0 +1,17 @@
+collections:
+  - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
+    path: /opt/data/grace/*land*.nc
+    variable: lwe_thickness
+    priority: 1
+    forward-processing-priority: 5
+
+  - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_OCEAN
+    path: /opt/data/grace/*ocean*.nc
+    variable: lwe_thickness
+    priority: 2
+    forward-processing-priority: 6
+
+  - id: AVHRR_OI-NCEI-L4-GLOB-v2.0
+    path: /opt/data/avhrr/*.nc
+    variable: analysed_sst
+    priority: 1
diff --git a/collection_manager/tests/resources/collections_alternate.yml 
b/collection_manager/tests/resources/collections_alternate.yml
new file mode 100644
index 0000000..3d7da95
--- /dev/null
+++ b/collection_manager/tests/resources/collections_alternate.yml
@@ -0,0 +1,17 @@
+collections:
+  - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
+    path: /opt/data/grace/*land*.nc
+    variable: lwe_thickness
+    priority: 1
+    forward-processing-priority: 5
+
+  - id: ID_CHANGED
+    path: /opt/data/grace/*ocean*.nc
+    variable: lwe_thickness
+    priority: 2
+    forward-processing-priority: 6
+
+  - id: AVHRR_OI-NCEI-L4-GLOB-v2.0
+    path: /opt/data/avhrr/*.nc
+    variable: analysed_sst
+    priority: 1
diff --git a/collection_manager/tests/resources/collections_bad.yml 
b/collection_manager/tests/resources/collections_bad.yml
new file mode 100644
index 0000000..cac6a32
--- /dev/null
+++ b/collection_manager/tests/resources/collections_bad.yml
@@ -0,0 +1,17 @@
+collections:
+  - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
+    path: /opt/data/grace/*land*.nc
+    variable: lwe_thickness
+    priority: 1
+    forward-processing-priority: 5
+BAD SYNTAX!
+  - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_OCEAN
+    path: /opt/data/grace/*ocean*.nc
+    variable: lwe_thickness
+    priority: 2
+    forward-processing-priority: 6
+
+  - id: AVHRR_OI-NCEI-L4-GLOB-v2.0
+    path: /opt/data/avhrr/*.nc
+    variable: analysed_sst
+    priority: 1
diff --git a/collection_manager/tests/resources/data/collections.yml 
b/collection_manager/tests/resources/data/collections.yml
deleted file mode 100644
index 8c30a37..0000000
--- a/collection_manager/tests/resources/data/collections.yml
+++ /dev/null
@@ -1,11 +0,0 @@
-avhrr-oi-analysed-sst:
-  id: avhrr-oi-analysed-sst
-  path: ../tests/resources/data/avhrr_oi/*.nc
-  variable: analysed_sst
-  priority: 9
-
-avhrr-oi-analysed-sst2:
-  id: avhrr-oi-analysed-sst2
-  path: ../tests/resources/data/avhrr_oi/*.nc
-  variable: analysed_sst
-  priority: 1
diff --git a/collection_manager/tests/resources/data/dataset_config_file_ok.yml 
b/collection_manager/tests/resources/data/dataset_config_file_ok.yml
deleted file mode 100644
index 66bb883..0000000
--- a/collection_manager/tests/resources/data/dataset_config_file_ok.yml
+++ /dev/null
@@ -1,44 +0,0 @@
-# Tile Slicer Config
-ningester:
-    tile_slicer: sliceFileByTilesDesired
-    sliceFileByTilesDesired:
-      tilesDesired: 1296
-      timeDimension: time
-      dimensions:
-        - lat
-        - lon
----
-# Tile processors configuration
-ningester:
-  tile_processors:
-    - pythonChainProcessor
-    - generateTileId
-    - addDatasetName
-  pythonChainProcessor:
-    enabled:
-    processor_list:
-      -
-        name: GridReadingProcessor
-        config:
-          latitude: lat
-          longitude: lon
-          time: time
-          variable_to_read: analysed_sst
-      -
-        name: EmptyTileFilter
-      -
-        name: TileSummarizingProcessor
-        config:
-          stored_var_name: analysed_sst
-  generateTileId:
-    enabled:
-    salt: analysed_sst
-  addDatasetName:
-    enabled:
-    datasetName: avhrr-oi-analysed-sst
----
-# Tile writer configuration
-ningester:
-  tile_writer:
-    data_store: cassandraStore
-    metadata_store: solrStore
diff --git a/collection_manager/tests/history_manager/__init__.py 
b/collection_manager/tests/services/history_manager/__init__.py
similarity index 100%
rename from collection_manager/tests/history_manager/__init__.py
rename to collection_manager/tests/services/history_manager/__init__.py
diff --git 
a/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
 
b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
new file mode 100644
index 0000000..d2ad45c
--- /dev/null
+++ 
b/collection_manager/tests/services/history_manager/test_FileIngestionHistory.py
@@ -0,0 +1,56 @@
+import os
+import pathlib
+import tempfile
+import unittest
+
+from collection_manager.services.history_manager import FileIngestionHistory, 
md5sum_from_filepath
+
+DATASET_ID = "zobi_la_mouche"
+
+
+class TestFileIngestionHistory(unittest.TestCase):
+
+    def test_get_md5sum(self):
+        with tempfile.TemporaryDirectory() as history_dir:
+            ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, 
md5sum_from_filepath)
+            ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
+            result = ingestion_history._get_signature("blue")
+            self.assertEqual(result, "12weeukrhbwerqu7wier")
+
+    def test_get_missing_md5sum(self):
+        with tempfile.TemporaryDirectory() as history_dir:
+            ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, 
md5sum_from_filepath)
+            ingestion_history._push_record("blue", "12weeukrhbwerqu7wier")
+            result = ingestion_history._get_signature("green")
+            self.assertIsNone(result)
+
+    def test_already_ingested(self):
+        with tempfile.TemporaryDirectory() as history_dir:
+            ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, 
md5sum_from_filepath)
+            # history_manager with this file
+            current_file_path = pathlib.Path(__file__)
+            ingestion_history.push(str(current_file_path))
+            
self.assertTrue(ingestion_history.already_ingested(str(current_file_path)))
+
+            del ingestion_history
+
+    def test_already_ingested_with_latest_modifcation_signature(self):
+        with tempfile.TemporaryDirectory() as history_dir:
+            ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, 
os.path.getmtime)
+            # history_manager with this file
+            current_file_path = pathlib.Path(__file__)
+            ingestion_history.push(str(current_file_path))
+            
self.assertTrue(ingestion_history.already_ingested(str(current_file_path)))
+
+            del ingestion_history
+
+    def test_already_ingested_is_false(self):
+        with tempfile.TemporaryDirectory() as history_dir:
+            ingestion_history = FileIngestionHistory(history_dir, DATASET_ID, 
md5sum_from_filepath)
+            # history_manager with this file
+            current_file_path = pathlib.Path(__file__)
+            
self.assertFalse(ingestion_history.already_ingested(str(current_file_path)))
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git 
a/collection_manager/tests/history_manager/test_datasetingestionhistorysolr.py 
b/collection_manager/tests/services/history_manager/test_SolrIngestionHistory.py
similarity index 94%
rename from 
collection_manager/tests/history_manager/test_datasetingestionhistorysolr.py
rename to 
collection_manager/tests/services/history_manager/test_SolrIngestionHistory.py
index 57b2bd1..deab42d 100644
--- 
a/collection_manager/tests/history_manager/test_datasetingestionhistorysolr.py
+++ 
b/collection_manager/tests/services/history_manager/test_SolrIngestionHistory.py
@@ -5,7 +5,8 @@ SOLR_URL = "http://localhost:8984/solr";
 DATASET_ID = "zobi_la_mouche"
 
 
-class DatasetIngestionHistorySolrTestCase(unittest.TestCase):
+# TODO: mock solr and fix these tests
+class TestSolrIngestionHistory(unittest.TestCase):
     @unittest.skip("does not work without a solr server for history_manager")
     def test_get(self):
         ingestion_history = SolrIngestionHistory(SOLR_URL, DATASET_ID)
diff --git a/collection_manager/tests/services/test_CollectionProcessor.py 
b/collection_manager/tests/services/test_CollectionProcessor.py
index 73b67ba..7899e22 100644
--- a/collection_manager/tests/services/test_CollectionProcessor.py
+++ b/collection_manager/tests/services/test_CollectionProcessor.py
@@ -1,10 +1,11 @@
 import tempfile
-from unittest import mock
 import unittest
+from unittest import mock
 
 from collection_manager.entities import Collection
 from collection_manager.services import CollectionProcessor
-from collection_manager.services.history_manager import 
FileIngestionHistoryBuilder, FileIngestionHistory
+from collection_manager.services.history_manager import 
FileIngestionHistoryBuilder
+from collection_manager.services.history_manager import GranuleStatus
 
 
 class TestCollectionProcessor(unittest.TestCase):
@@ -18,15 +19,17 @@ class TestCollectionProcessor(unittest.TestCase):
     def test_file_supported_with_foo(self):
         
self.assertFalse(CollectionProcessor._file_supported("test_dir/test_granule.foo"))
 
-    def test_get_history_manager_returns_same_object(self):
+    @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
+    def test_get_history_manager_returns_same_object(self, mock_publisher):
         with tempfile.TemporaryDirectory() as history_dir:
-            collection_processor = CollectionProcessor(None, 
FileIngestionHistoryBuilder(history_dir))
+            collection_processor = CollectionProcessor(mock_publisher, 
FileIngestionHistoryBuilder(history_dir))
             history_manager = 
collection_processor._get_history_manager('dataset_id')
             
self.assertIs(collection_processor._get_history_manager('dataset_id'), 
history_manager)
 
-    def test_get_history_manager_returns_different_object(self):
+    @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
+    def test_get_history_manager_returns_different_object(self, 
mock_publisher):
         with tempfile.TemporaryDirectory() as history_dir:
-            collection_processor = CollectionProcessor(None, 
FileIngestionHistoryBuilder(history_dir))
+            collection_processor = CollectionProcessor(mock_publisher, 
FileIngestionHistoryBuilder(history_dir))
             history_manager = collection_processor._get_history_manager('foo')
             self.assertIsNot(collection_processor._get_history_manager('bar'), 
history_manager)
 
@@ -60,7 +63,106 @@ class TestCollectionProcessor(unittest.TestCase):
         filled = CollectionProcessor._fill_template(collection, template)
         self.assertEqual(filled, expected)
 
-    @mock.patch.object(FileIngestionHistory, 'push')
-    @mock.patch.object(FileIngestionHistory, 'get_granule_status')
-    def test_process_granule(self):
-        history_manager_builder = FileIngestionHistoryBuilder('/foo')
+    
@mock.patch('collection_manager.services.history_manager.FileIngestionHistory', 
autospec=True)
+    
@mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder',
 autospec=True)
+    @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
+    def test_process_granule_with_historical_granule(self, mock_publisher, 
mock_history_builder, mock_history):
+        mock_history.get_granule_status.return_value = 
GranuleStatus.DESIRED_HISTORICAL
+        mock_history_builder.build.return_value = mock_history
+
+        collection_processor = CollectionProcessor(mock_publisher, 
mock_history_builder)
+        collection = Collection(dataset_id="test_dataset",
+                                path="test_path",
+                                variable="test_variable",
+                                historical_priority=1,
+                                forward_processing_priority=2,
+                                date_from=None,
+                                date_to=None)
+
+        collection_processor.process_granule("test.nc", collection)
+
+        mock_publisher.publish_message.assert_called_with(body=mock.ANY, 
priority=1)
+        mock_history.push.assert_called()
+
+    
@mock.patch('collection_manager.services.history_manager.FileIngestionHistory', 
autospec=True)
+    
@mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder',
 autospec=True)
+    @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
+    def test_process_granule_with_forward_processing_granule(self, 
mock_publisher, mock_history_builder, mock_history):
+        mock_history.get_granule_status.return_value = 
GranuleStatus.DESIRED_FORWARD_PROCESSING
+        mock_history_builder.build.return_value = mock_history
+
+        collection_processor = CollectionProcessor(mock_publisher, 
mock_history_builder)
+        collection = Collection(dataset_id="test_dataset",
+                                path="test_path",
+                                variable="test_variable",
+                                historical_priority=1,
+                                forward_processing_priority=2,
+                                date_from=None,
+                                date_to=None)
+
+        collection_processor.process_granule("test.h5", collection)
+
+        mock_publisher.publish_message.assert_called_with(body=mock.ANY, 
priority=2)
+        mock_history.push.assert_called()
+
+    
@mock.patch('collection_manager.services.history_manager.FileIngestionHistory', 
autospec=True)
+    
@mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder',
 autospec=True)
+    @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
+    def 
test_process_granule_with_forward_processing_granule_and_no_priority(self, 
mock_publisher,
+                                                                             
mock_history_builder, mock_history):
+        mock_history.get_granule_status.return_value = 
GranuleStatus.DESIRED_FORWARD_PROCESSING
+        mock_history_builder.build.return_value = mock_history
+
+        collection_processor = CollectionProcessor(mock_publisher, 
mock_history_builder)
+        collection = Collection(dataset_id="test_dataset",
+                                path="test_path",
+                                variable="test_variable",
+                                historical_priority=1,
+                                date_from=None,
+                                date_to=None)
+
+        collection_processor.process_granule("test.h5", collection)
+
+        mock_publisher.publish_message.assert_called_with(body=mock.ANY, 
priority=1)
+        mock_history.push.assert_called()
+
+    
@mock.patch('collection_manager.services.history_manager.FileIngestionHistory', 
autospec=True)
+    
@mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder',
 autospec=True)
+    @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
+    def test_process_granule_with_undesired_granule(self, mock_publisher, 
mock_history_builder, mock_history):
+        mock_history.get_granule_status.return_value = GranuleStatus.UNDESIRED
+        mock_history_builder.build.return_value = mock_history
+
+        collection_processor = CollectionProcessor(mock_publisher, 
mock_history_builder)
+        collection = Collection(dataset_id="test_dataset",
+                                path="test_path",
+                                variable="test_variable",
+                                historical_priority=1,
+                                forward_processing_priority=2,
+                                date_from=None,
+                                date_to=None)
+
+        collection_processor.process_granule("test.nc", collection)
+
+        mock_publisher.publish_message.assert_not_called()
+        mock_history.push.assert_not_called()
+
+    
@mock.patch('collection_manager.services.history_manager.FileIngestionHistory', 
autospec=True)
+    
@mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder',
 autospec=True)
+    @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
+    def test_process_granule_with_unsupported_file_type(self, mock_publisher, 
mock_history_builder, mock_history):
+        mock_history_builder.build.return_value = mock_history
+
+        collection_processor = CollectionProcessor(mock_publisher, 
mock_history_builder)
+        collection = Collection(dataset_id="test_dataset",
+                                path="test_path",
+                                variable="test_variable",
+                                historical_priority=1,
+                                forward_processing_priority=2,
+                                date_from=None,
+                                date_to=None)
+
+        collection_processor.process_granule("test.foo", collection)
+
+        mock_publisher.publish_message.assert_not_called()
+        mock_history.push.assert_not_called()
diff --git a/collection_manager/tests/services/test_CollectionWatcher.py 
b/collection_manager/tests/services/test_CollectionWatcher.py
new file mode 100644
index 0000000..7ae25a1
--- /dev/null
+++ b/collection_manager/tests/services/test_CollectionWatcher.py
@@ -0,0 +1,203 @@
+import os
+import tempfile
+import time
+import unittest
+from datetime import datetime
+from unittest.mock import Mock
+
+from collection_manager.entities import Collection
+from collection_manager.entities.exceptions import YamlParsingError, 
CollectionConfigFileNotFoundError, \
+    RelativePathCollectionError, ConflictingPathCollectionError
+from collection_manager.services import CollectionWatcher
+
+
+class TestCollectionWatcher(unittest.TestCase):
+
+    def test_collections_returns_all_collections(self):
+        collection_watcher = CollectionWatcher('/foo', Mock(), Mock())
+        collection_watcher._collections_by_dir = {
+            "/foo": {
+                Collection("id1", "var1", "path1", 1, 2, datetime.now(), 
datetime.now()),
+                Collection("id2", "var2", "path2", 3, 4, datetime.now(), 
datetime.now()),
+            },
+            "/bar": {
+                Collection("id3", "var3", "path3", 5, 6, datetime.now(), 
datetime.now()),
+                Collection("id4", "var4", "path4", 7, 8, datetime.now(), 
datetime.now()),
+            }
+        }
+        flattened_collections = collection_watcher.collections()
+        self.assertEqual(len(flattened_collections), 4)
+
+    def test_load_collections_loads_all_collections(self):
+        collections_path = os.path.join(os.path.dirname(__file__), 
'../resources/collections.yml')
+        collection_watcher = CollectionWatcher(collections_path, Mock(), 
Mock())
+        collection_watcher._load_collections()
+
+        self.assertEqual(len(collection_watcher._collections_by_dir), 2)
+        
self.assertEqual(len(collection_watcher._collections_by_dir['/opt/data/grace']),
 2)
+        
self.assertEqual(len(collection_watcher._collections_by_dir['/opt/data/avhrr']),
 1)
+
+    def test_load_collections_with_bad_yaml_syntax(self):
+        collections_path = os.path.join(os.path.dirname(__file__), 
'../resources/collections_bad.yml')
+        collection_watcher = CollectionWatcher(collections_path, Mock(), 
Mock())
+
+        self.assertRaises(YamlParsingError, 
collection_watcher._load_collections)
+
+    def test_load_collections_with_file_not_found(self):
+        collections_path = os.path.join(os.path.dirname(__file__), 
'../resources/does_not_exist.yml')
+        collection_watcher = CollectionWatcher(collections_path, Mock(), 
Mock())
+
+        self.assertRaises(CollectionConfigFileNotFoundError, 
collection_watcher._load_collections)
+
+    def test_get_updated_collections_returns_all_collections(self):
+        collections_path = os.path.join(os.path.dirname(__file__), 
'../resources/collections.yml')
+        collection_watcher = CollectionWatcher(collections_path, Mock(), 
Mock())
+
+        updated_collections = collection_watcher._get_updated_collections()
+        self.assertSetEqual(updated_collections, 
collection_watcher.collections())
+
+    def test_get_updated_collections_returns_no_collections(self):
+        collections_path = os.path.join(os.path.dirname(__file__), 
'../resources/collections.yml')
+        collection_watcher = CollectionWatcher(collections_path, Mock(), 
Mock())
+        collection_watcher._load_collections()
+        updated_collections = collection_watcher._get_updated_collections()
+
+        self.assertEqual(len(updated_collections), 0)
+
+    def test_get_updated_collections_returns_some_collections(self):
+        collections_path = os.path.join(os.path.dirname(__file__), 
'../resources/collections.yml')
+        collection_watcher = CollectionWatcher(collections_path, Mock(), 
Mock())
+        collection_watcher._load_collections()
+
+        new_collections_path = os.path.join(os.path.dirname(__file__), 
'../resources/collections_alternate.yml')
+        collection_watcher._collections_path = new_collections_path
+        updated_collections = collection_watcher._get_updated_collections()
+
+        self.assertEqual(len(updated_collections), 1)
+
+    def test_validate_collection(self):
+        collections_path = os.path.join(os.path.dirname(__file__), 
'../resources/collections.yml')
+        collection_watcher = CollectionWatcher(collections_path, Mock(), 
Mock())
+
+        collection = Collection(dataset_id="test_dataset",
+                                path="/absolute/path",
+                                variable="test_variable",
+                                historical_priority=1,
+                                forward_processing_priority=2,
+                                date_from=None,
+                                date_to=None)
+        collection_watcher._validate_collection(collection)
+
+    def test_validate_collection_with_relative_path(self):
+        collections_path = os.path.join(os.path.dirname(__file__), 
'../resources/collections.yml')
+        collection_watcher = CollectionWatcher(collections_path, Mock(), 
Mock())
+
+        collection = Collection(dataset_id="test_dataset",
+                                path="relative/path",
+                                variable="test_variable",
+                                historical_priority=1,
+                                forward_processing_priority=2,
+                                date_from=None,
+                                date_to=None)
+        self.assertRaises(RelativePathCollectionError, 
collection_watcher._validate_collection, collection)
+
+    def test_validate_collection_with_conflicting_path(self):
+        collections_path = os.path.join(os.path.dirname(__file__), 
'/resources/collections.yml')
+        collection_watcher = CollectionWatcher(collections_path, Mock(), 
Mock())
+
+        collection = Collection(dataset_id="test_dataset",
+                                path="/resources/*.nc",
+                                variable="test_variable",
+                                historical_priority=1,
+                                forward_processing_priority=2,
+                                date_from=None,
+                                date_to=None)
+        self.assertRaises(ConflictingPathCollectionError, 
collection_watcher._validate_collection, collection)
+
+    def test_collection_callback_is_called(self):
+        collections_config = tempfile.NamedTemporaryFile("w+b", buffering=0, 
delete=False)
+        granule_dir = tempfile.TemporaryDirectory()
+        collections_str = f"""collections:
+- id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
+  path: {granule_dir.name}
+  variable: lwe_thickness
+  priority: 1
+  forward-processing-priority: 5"""
+        collections_config.write(collections_str.encode("utf-8"))
+
+        collection_callback = Mock()
+        collection_watcher = CollectionWatcher(collections_config.name, 
collection_callback, Mock())
+        collection_watcher.start_watching()
+
+        collections_str = f"""
+- id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
+  path: {granule_dir.name}
+  variable: lwe_thickness
+  priority: 10
+  forward-processing-priority: 5
+        """
+        collections_config.write(collections_str.encode("utf-8"))
+        collections_config.close()
+
+        self.assert_called_within_timeout(collection_callback, timeout_sec=1, 
call_count=2)
+        granule_dir.cleanup()
+        os.remove(collections_config.name)
+
+    def test_granule_callback_is_called_on_new_file(self):
+        with tempfile.NamedTemporaryFile("w+b", buffering=0) as 
collections_config:
+            granule_dir = tempfile.TemporaryDirectory()
+            collections_str = f"""
+collections:
+- id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
+  path: {granule_dir.name}
+  variable: lwe_thickness
+  priority: 1
+  forward-processing-priority: 5
+            """
+            collections_config.write(collections_str.encode("utf-8"))
+
+            granule_callback = Mock()
+            collection_watcher = CollectionWatcher(collections_config.name, 
Mock(), granule_callback)
+            collection_watcher.start_watching()
+
+            new_granule = open(os.path.join(granule_dir.name, 'test.nc'), "w+")
+
+            self.assert_called_within_timeout(granule_callback)
+
+            new_granule.close()
+            granule_dir.cleanup()
+
+    def test_granule_callback_is_called_on_modified_file(self):
+        with tempfile.NamedTemporaryFile("w+b", buffering=0) as 
collections_config:
+            granule_dir = tempfile.TemporaryDirectory()
+            collections_str = f"""
+collections:
+- id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
+  path: {granule_dir.name}
+  variable: lwe_thickness
+  priority: 1
+  forward-processing-priority: 5
+            """
+            collections_config.write(collections_str.encode("utf-8"))
+            new_granule = open(os.path.join(granule_dir.name, 'test.nc'), "w+")
+
+            granule_callback = Mock()
+            collection_watcher = CollectionWatcher(collections_config.name, 
Mock(), granule_callback)
+            collection_watcher.start_watching()
+
+            new_granule.write("hello world")
+            new_granule.close()
+
+            self.assert_called_within_timeout(granule_callback)
+
+            granule_dir.cleanup()
+
+    @staticmethod
+    def assert_called_within_timeout(mock_func, timeout_sec=1.0, call_count=1):
+        start = datetime.now()
+
+        while (datetime.now() - start).total_seconds() < timeout_sec:
+            time.sleep(0.01)
+            if mock_func.call_count >= call_count:
+                return
+        raise AssertionError(f"{mock_func} did not reach {call_count} calls 
called within {timeout_sec} sec")

Reply via email to