This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch tests
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/tests by this push:
     new 475cbe2  tests
475cbe2 is described below

commit 475cbe24abbec551988d7fe68710d725d0f624ab
Author: Eamon Ford <[email protected]>
AuthorDate: Mon Jun 15 17:36:52 2020 -0700

    tests
---
 .../collection_manager/entities/Collection.py      |  39 +++---
 .../collection_manager/entities/__init__.py        |   3 +-
 .../entities/exceptions/Exceptions.py              |  20 +++
 .../services/CollectionProcessor.py                |   5 +-
 .../services/CollectionWatcher.py                  |  32 ++---
 .../tests/entities/test_Collection.py              | 139 +++++++++++++++++++++
 .../tests/services/test_CollectionProcessor.py     |  21 ++++
 .../tests/services/test_CollectionWatcher.py       |  15 +--
 8 files changed, 233 insertions(+), 41 deletions(-)

diff --git a/collection_manager/collection_manager/entities/Collection.py 
b/collection_manager/collection_manager/entities/Collection.py
index bea5973..8bcfd0e 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -1,33 +1,38 @@
 import os
+from collection_manager.entities.exceptions.Exceptions import 
MissingValueCollectionError
+from dataclasses import dataclass
 from datetime import datetime
 from fnmatch import fnmatch
 from glob import glob
-from typing import NamedTuple, List
+from typing import List, Optional
 
 
-class Collection(NamedTuple):
+@dataclass(frozen=True)
+class Collection:
     dataset_id: str
     variable: str
     path: str
     historical_priority: int
-    forward_processing_priority: int
-    date_from: datetime
-    date_to: datetime
+    forward_processing_priority: Optional[int] = None
+    date_from: Optional[datetime] = None
+    date_to: Optional[datetime] = None
 
     @staticmethod
     def from_dict(properties: dict):
-        date_to = datetime.fromisoformat(properties['to']) if 'to' in 
properties else None
-        date_from = datetime.fromisoformat(properties['from']) if 'from' in 
properties else None
-
-        collection = Collection(dataset_id=properties['id'],
-                                variable=properties['variable'],
-                                path=properties['path'],
-                                historical_priority=properties['priority'],
-                                
forward_processing_priority=properties.get('forward-processing-priority',
-                                                                           
properties['priority']),
-                                date_to=date_to,
-                                date_from=date_from)
-        return collection
+        try:
+            date_to = datetime.fromisoformat(properties['to']) if 'to' in 
properties else None
+            date_from = datetime.fromisoformat(properties['from']) if 'from' 
in properties else None
+
+            collection = Collection(dataset_id=properties['id'],
+                                    variable=properties['variable'],
+                                    path=properties['path'],
+                                    historical_priority=properties['priority'],
+                                    
forward_processing_priority=properties.get('forward-processing-priority', None),
+                                    date_to=date_to,
+                                    date_from=date_from)
+            return collection
+        except KeyError as e:
+            raise MissingValueCollectionError(missing_value=e.args[0])
 
     def directory(self):
         if os.path.isdir(self.path):
diff --git a/collection_manager/collection_manager/entities/__init__.py 
b/collection_manager/collection_manager/entities/__init__.py
index 9f30603..165341b 100644
--- a/collection_manager/collection_manager/entities/__init__.py
+++ b/collection_manager/collection_manager/entities/__init__.py
@@ -1,2 +1 @@
-
-from .Collection import Collection
\ No newline at end of file
+from .Collection import Collection
diff --git 
a/collection_manager/collection_manager/entities/exceptions/Exceptions.py 
b/collection_manager/collection_manager/entities/exceptions/Exceptions.py
index 9321371..8e63d24 100644
--- a/collection_manager/collection_manager/entities/exceptions/Exceptions.py
+++ b/collection_manager/collection_manager/entities/exceptions/Exceptions.py
@@ -8,3 +8,23 @@ class YamlParsingError(Exception):
 
 class CollectionConfigFileNotFoundError(Exception):
     pass
+
+
+class CollectionError(Exception):
+    def __init__(self, collection=None, message=None):
+        super().__init__(message)
+        self.collection = collection
+
+
+class MissingValueCollectionError(CollectionError):
+    def __init__(self, missing_value, collection=None, message=None):
+        super().__init__(collection, message)
+        self.missing_value = missing_value
+
+
+class ConflictingPathCollectionError(CollectionError):
+    pass
+
+
+class RelativePathCollectionError(CollectionError):
+    pass
diff --git 
a/collection_manager/collection_manager/services/CollectionProcessor.py 
b/collection_manager/collection_manager/services/CollectionProcessor.py
index 54d35a5..a81390b 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -50,7 +50,10 @@ class CollectionProcessor:
         if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING:
             logger.info(f"New granule '{granule}' detected for 
forward-processing ingestion "
                         f"in collection '{collection.dataset_id}'.")
-            use_priority = collection.forward_processing_priority
+            if collection.forward_processing_priority is not None:
+                use_priority = collection.forward_processing_priority
+            else:
+                use_priority = collection.historical_priority
         elif granule_status is GranuleStatus.DESIRED_HISTORICAL:
             logger.info(f"New granule '{granule}' detected for historical 
ingestion in collection "
                         f"'{collection.dataset_id}'.")
diff --git 
a/collection_manager/collection_manager/services/CollectionWatcher.py 
b/collection_manager/collection_manager/services/CollectionWatcher.py
index 65807c9..0fef980 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -10,7 +10,8 @@ from yaml.scanner import ScannerError
 
 from collection_manager.entities import Collection
 from collection_manager.entities.exceptions import RelativePathError, 
YamlParsingError, \
-    CollectionConfigFileNotFoundError
+    CollectionConfigFileNotFoundError, MissingValueCollectionError, 
ConflictingPathCollectionError, \
+    RelativePathCollectionError
 
 logger = logging.getLogger(__name__)
 logger.setLevel(logging.DEBUG)
@@ -52,20 +53,12 @@ class CollectionWatcher:
         """
         return {collection for collections in 
self._collections_by_dir.values() for collection in collections}
 
-    def _collection_is_valid(self, collection: Collection):
+    def _validate_collection(self, collection: Collection):
         directory = collection.directory()
-
         if not os.path.isabs(directory):
-            logger.error(f"Relative paths are not allowed for the 'path' 
property of a collection. "
-                         f"Ignoring collection '{collection.dataset_id}' until 
its path is fixed.")
-            return False
+            raise RelativePathCollectionError(collection=collection)
         if directory == os.path.dirname(self._collections_path):
-            logger.error(f"Collection {collection.dataset_id} has granule path 
{collection.path} "
-                         f"which uses same directory as the collection 
configuration file, "
-                         f"{self._collections_path}. The granules need to be 
in their own directory. "
-                         f"Ignoring collection {collection.dataset_id} for 
now.")
-            return False
-        return True
+            raise ConflictingPathCollectionError(collection=collection)
 
     def _load_collections(self):
         try:
@@ -73,9 +66,20 @@ class CollectionWatcher:
                 collections_yaml = yaml.load(f, Loader=yaml.FullLoader)
             self._collections_by_dir.clear()
             for collection_dict in collections_yaml['collections']:
-                collection = Collection.from_dict(collection_dict)
-                if self._collection_is_valid(collection):
+                try:
+                    collection = Collection.from_dict(collection_dict)
+                    self._validate_collection(collection)
                     
self._collections_by_dir[collection.directory()].add(collection)
+                except MissingValueCollectionError as e:
+                    logger.error(f"A collection is missing 
'{e.missing_value}'. Ignoring this collection for now.")
+                except RelativePathCollectionError as e:
+                    logger.error(f"Relative paths are not allowed for the 
'path' property of a collection. "
+                                 f"Ignoring collection 
'{e.collection.dataset_id}' until its path is fixed.")
+                except ConflictingPathCollectionError as e:
+                    logger.error(f"Collection '{e.collection.dataset_id}' has 
granule path '{e.collection.path}' "
+                                 f"which uses same directory as the collection 
configuration file, "
+                                 f"'{self._collections_path}'. The granules 
need to be in their own directory. "
+                                 f"Ignoring collection 
'{e.collection.dataset_id}' for now.")
         except FileNotFoundError:
             raise CollectionConfigFileNotFoundError("The collection config 
file could not be found at "
                                                     
f"{self._collections_path}")
diff --git a/collection_manager/tests/entities/test_Collection.py 
b/collection_manager/tests/entities/test_Collection.py
new file mode 100644
index 0000000..6b276a1
--- /dev/null
+++ b/collection_manager/tests/entities/test_Collection.py
@@ -0,0 +1,139 @@
+import os
+import unittest
+from datetime import datetime, timezone
+
+from collection_manager.entities import Collection
+from collection_manager.entities.exceptions import MissingValueCollectionError
+
+
+class TestCollection(unittest.TestCase):
+
+    def test_directory_with_directory(self):
+        directory = os.path.join(os.path.dirname(__file__), 
"../resources/data")
+        collection = Collection(dataset_id="test_dataset",
+                                path=directory,
+                                variable="test_variable",
+                                historical_priority=1,
+                                forward_processing_priority=2,
+                                date_from=None,
+                                date_to=None)
+        self.assertEquals(directory, collection.directory())
+
+    def test_directory_with_pattern(self):
+        pattern = os.path.join(os.path.dirname(__file__), 
"../resources/data/*.nc")
+        collection = Collection(dataset_id="test_dataset",
+                                path=pattern,
+                                variable="test_variable",
+                                historical_priority=1,
+                                forward_processing_priority=2,
+                                date_from=None,
+                                date_to=None)
+        self.assertEquals(os.path.dirname(pattern), collection.directory())
+
+    def test_owns_file_raises_exception_with_directory(self):
+        directory = os.path.join(os.path.dirname(__file__), 
"../resources/data")
+        collection = Collection(dataset_id="test_dataset",
+                                path=directory,
+                                variable="test_variable",
+                                historical_priority=1,
+                                forward_processing_priority=2,
+                                date_from=None,
+                                date_to=None)
+        self.assertRaises(IsADirectoryError, collection.owns_file, directory)
+
+    def test_owns_file_matches(self):
+        directory = os.path.join(os.path.dirname(__file__), 
"../resources/data")
+        collection = Collection(dataset_id="test_dataset",
+                                path=directory,
+                                variable="test_variable",
+                                historical_priority=1,
+                                forward_processing_priority=2,
+                                date_from=None,
+                                date_to=None)
+        file_path = os.path.join(directory, "test_file.nc")
+        self.assertTrue(collection.owns_file(file_path))
+
+    def test_owns_file_does_not_match(self):
+        directory = os.path.join(os.path.dirname(__file__), 
"../resources/data")
+        collection = Collection(dataset_id="test_dataset",
+                                path=directory,
+                                variable="test_variable",
+                                historical_priority=1,
+                                forward_processing_priority=2,
+                                date_from=None,
+                                date_to=None)
+        self.assertFalse(collection.owns_file("test_file.nc"))
+
+    def test_owns_file_matches_pattern(self):
+        directory = os.path.join(os.path.dirname(__file__), 
"../resources/data")
+        pattern = os.path.join(directory, "test_*.nc")
+        collection = Collection(dataset_id="test_dataset",
+                                path=pattern,
+                                variable="test_variable",
+                                historical_priority=1,
+                                forward_processing_priority=2,
+                                date_from=None,
+                                date_to=None)
+        file_path = os.path.join(directory, "test_file.nc")
+        self.assertTrue(collection.owns_file(file_path))
+
+    def test_owns_file_does_not_match_pattern(self):
+        directory = os.path.join(os.path.dirname(__file__), 
"../resources/data")
+        pattern = os.path.join(directory, "test_*.nc")
+        collection = Collection(dataset_id="test_dataset",
+                                path=pattern,
+                                variable="test_variable",
+                                historical_priority=1,
+                                forward_processing_priority=2,
+                                date_from=None,
+                                date_to=None)
+        file_path = os.path.join(directory, "nonmatch.nc")
+        self.assertFalse(collection.owns_file(file_path))
+
+    def test_from_dict(self):
+        collection_dict = {
+            'id': 'test_id',
+            'variable': 'test_var',
+            'path': '/some/path',
+            'priority': 1,
+            'forward-processing-priority': 2,
+            'from': '2020-01-01T00:00:00+00:00',
+            'to': '2020-02-01T00:00:00+00:00'
+        }
+
+        expected_collection = Collection(dataset_id='test_id',
+                                         variable='test_var',
+                                         path='/some/path',
+                                         historical_priority=1,
+                                         forward_processing_priority=2,
+                                         date_from=datetime(2020, 1, 1, 0, 0, 
0, tzinfo=timezone.utc),
+                                         date_to=datetime(2020, 2, 1, 0, 0, 0, 
tzinfo=timezone.utc))
+
+        self.assertEquals(expected_collection, 
Collection.from_dict(collection_dict))
+
+    def test_from_dict_missing_optional_values(self):
+        collection_dict = {
+            'id': 'test_id',
+            'variable': 'test_var',
+            'path': '/some/path',
+            'priority': 3
+        }
+
+        expected_collection = Collection(dataset_id='test_id',
+                                         variable='test_var',
+                                         path='/some/path',
+                                         historical_priority=3,
+                                         forward_processing_priority=None,
+                                         date_from=None,
+                                         date_to=None)
+
+        self.assertEquals(expected_collection, 
Collection.from_dict(collection_dict))
+
+    def test_from_dict_missing_required_values(self):
+        collection_dict = {
+            'id': 'test_id',
+            'variable': 'test_var',
+            'path': '/some/path',
+        }
+
+        self.assertRaises(MissingValueCollectionError, Collection.from_dict, 
collection_dict)
diff --git a/collection_manager/tests/services/test_CollectionProcessor.py 
b/collection_manager/tests/services/test_CollectionProcessor.py
index c95821d..7899e22 100644
--- a/collection_manager/tests/services/test_CollectionProcessor.py
+++ b/collection_manager/tests/services/test_CollectionProcessor.py
@@ -108,6 +108,27 @@ class TestCollectionProcessor(unittest.TestCase):
     
@mock.patch('collection_manager.services.history_manager.FileIngestionHistory', 
autospec=True)
     
@mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder',
 autospec=True)
     @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
+    def 
test_process_granule_with_forward_processing_granule_and_no_priority(self, 
mock_publisher,
+                                                                             
mock_history_builder, mock_history):
+        mock_history.get_granule_status.return_value = 
GranuleStatus.DESIRED_FORWARD_PROCESSING
+        mock_history_builder.build.return_value = mock_history
+
+        collection_processor = CollectionProcessor(mock_publisher, 
mock_history_builder)
+        collection = Collection(dataset_id="test_dataset",
+                                path="test_path",
+                                variable="test_variable",
+                                historical_priority=1,
+                                date_from=None,
+                                date_to=None)
+
+        collection_processor.process_granule("test.h5", collection)
+
+        mock_publisher.publish_message.assert_called_with(body=mock.ANY, 
priority=1)
+        mock_history.push.assert_called()
+
+    
@mock.patch('collection_manager.services.history_manager.FileIngestionHistory', 
autospec=True)
+    
@mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder',
 autospec=True)
+    @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
     def test_process_granule_with_undesired_granule(self, mock_publisher, 
mock_history_builder, mock_history):
         mock_history.get_granule_status.return_value = GranuleStatus.UNDESIRED
         mock_history_builder.build.return_value = mock_history
diff --git a/collection_manager/tests/services/test_CollectionWatcher.py 
b/collection_manager/tests/services/test_CollectionWatcher.py
index 09300b3..f89d619 100644
--- a/collection_manager/tests/services/test_CollectionWatcher.py
+++ b/collection_manager/tests/services/test_CollectionWatcher.py
@@ -6,7 +6,8 @@ from datetime import datetime
 from unittest.mock import Mock
 
 from collection_manager.entities import Collection
-from collection_manager.entities.exceptions import YamlParsingError, 
CollectionConfigFileNotFoundError
+from collection_manager.entities.exceptions import YamlParsingError, 
CollectionConfigFileNotFoundError, \
+    RelativePathCollectionError, ConflictingPathCollectionError
 from collection_manager.services import CollectionWatcher
 
 
@@ -74,7 +75,7 @@ class TestCollectionWatcher(unittest.TestCase):
 
         self.assertEquals(len(updated_collections), 1)
 
-    def test_collection_is_valid(self):
+    def test_validate_collection(self):
         collections_path = os.path.join(os.path.dirname(__file__), 
'../resources/collections.yml')
         collection_watcher = CollectionWatcher(collections_path, Mock(), 
Mock())
 
@@ -85,9 +86,9 @@ class TestCollectionWatcher(unittest.TestCase):
                                 forward_processing_priority=2,
                                 date_from=None,
                                 date_to=None)
-        self.assertTrue(collection_watcher._collection_is_valid(collection))
+        collection_watcher._validate_collection(collection)
 
-    def test_collection_is_valid_with_relative_path(self):
+    def test_validate_collection_with_relative_path(self):
         collections_path = os.path.join(os.path.dirname(__file__), 
'../resources/collections.yml')
         collection_watcher = CollectionWatcher(collections_path, Mock(), 
Mock())
 
@@ -98,9 +99,9 @@ class TestCollectionWatcher(unittest.TestCase):
                                 forward_processing_priority=2,
                                 date_from=None,
                                 date_to=None)
-        self.assertFalse(collection_watcher._collection_is_valid(collection))
+        self.assertRaises(RelativePathCollectionError, 
collection_watcher._validate_collection, collection)
 
-    def test_collection_is_valid_with_conflicting_path(self):
+    def test_validate_collection_with_conflicting_path(self):
         collections_path = os.path.join(os.path.dirname(__file__), 
'/resources/collections.yml')
         collection_watcher = CollectionWatcher(collections_path, Mock(), 
Mock())
 
@@ -111,7 +112,7 @@ class TestCollectionWatcher(unittest.TestCase):
                                 forward_processing_priority=2,
                                 date_from=None,
                                 date_to=None)
-        self.assertFalse(collection_watcher._collection_is_valid(collection))
+        self.assertRaises(ConflictingPathCollectionError, 
collection_watcher._validate_collection, collection)
 
     def test_collection_callback_is_called(self):
         with tempfile.NamedTemporaryFile("w+b", buffering=0) as 
collections_config:

Reply via email to