This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch tests
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
The following commit(s) were added to refs/heads/tests by this push:
new 475cbe2 tests
475cbe2 is described below
commit 475cbe24abbec551988d7fe68710d725d0f624ab
Author: Eamon Ford <[email protected]>
AuthorDate: Mon Jun 15 17:36:52 2020 -0700
tests
---
.../collection_manager/entities/Collection.py | 39 +++---
.../collection_manager/entities/__init__.py | 3 +-
.../entities/exceptions/Exceptions.py | 20 +++
.../services/CollectionProcessor.py | 5 +-
.../services/CollectionWatcher.py | 32 ++---
.../tests/entities/test_Collection.py | 139 +++++++++++++++++++++
.../tests/services/test_CollectionProcessor.py | 21 ++++
.../tests/services/test_CollectionWatcher.py | 15 +--
8 files changed, 233 insertions(+), 41 deletions(-)
diff --git a/collection_manager/collection_manager/entities/Collection.py
b/collection_manager/collection_manager/entities/Collection.py
index bea5973..8bcfd0e 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -1,33 +1,38 @@
import os
+from collection_manager.entities.exceptions.Exceptions import
MissingValueCollectionError
+from dataclasses import dataclass
from datetime import datetime
from fnmatch import fnmatch
from glob import glob
-from typing import NamedTuple, List
+from typing import List, Optional
-class Collection(NamedTuple):
+@dataclass(frozen=True)
+class Collection:
dataset_id: str
variable: str
path: str
historical_priority: int
- forward_processing_priority: int
- date_from: datetime
- date_to: datetime
+ forward_processing_priority: Optional[int] = None
+ date_from: Optional[datetime] = None
+ date_to: Optional[datetime] = None
@staticmethod
def from_dict(properties: dict):
- date_to = datetime.fromisoformat(properties['to']) if 'to' in
properties else None
- date_from = datetime.fromisoformat(properties['from']) if 'from' in
properties else None
-
- collection = Collection(dataset_id=properties['id'],
- variable=properties['variable'],
- path=properties['path'],
- historical_priority=properties['priority'],
-
forward_processing_priority=properties.get('forward-processing-priority',
-
properties['priority']),
- date_to=date_to,
- date_from=date_from)
- return collection
+ try:
+ date_to = datetime.fromisoformat(properties['to']) if 'to' in
properties else None
+ date_from = datetime.fromisoformat(properties['from']) if 'from'
in properties else None
+
+ collection = Collection(dataset_id=properties['id'],
+ variable=properties['variable'],
+ path=properties['path'],
+ historical_priority=properties['priority'],
+
forward_processing_priority=properties.get('forward-processing-priority', None),
+ date_to=date_to,
+ date_from=date_from)
+ return collection
+ except KeyError as e:
+ raise MissingValueCollectionError(missing_value=e.args[0])
def directory(self):
if os.path.isdir(self.path):
diff --git a/collection_manager/collection_manager/entities/__init__.py
b/collection_manager/collection_manager/entities/__init__.py
index 9f30603..165341b 100644
--- a/collection_manager/collection_manager/entities/__init__.py
+++ b/collection_manager/collection_manager/entities/__init__.py
@@ -1,2 +1 @@
-
-from .Collection import Collection
\ No newline at end of file
+from .Collection import Collection
diff --git
a/collection_manager/collection_manager/entities/exceptions/Exceptions.py
b/collection_manager/collection_manager/entities/exceptions/Exceptions.py
index 9321371..8e63d24 100644
--- a/collection_manager/collection_manager/entities/exceptions/Exceptions.py
+++ b/collection_manager/collection_manager/entities/exceptions/Exceptions.py
@@ -8,3 +8,23 @@ class YamlParsingError(Exception):
class CollectionConfigFileNotFoundError(Exception):
pass
+
+
+class CollectionError(Exception):
+ def __init__(self, collection=None, message=None):
+ super().__init__(message)
+ self.collection = collection
+
+
+class MissingValueCollectionError(CollectionError):
+ def __init__(self, missing_value, collection=None, message=None):
+ super().__init__(collection, message)
+ self.missing_value = missing_value
+
+
+class ConflictingPathCollectionError(CollectionError):
+ pass
+
+
+class RelativePathCollectionError(CollectionError):
+ pass
diff --git
a/collection_manager/collection_manager/services/CollectionProcessor.py
b/collection_manager/collection_manager/services/CollectionProcessor.py
index 54d35a5..a81390b 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -50,7 +50,10 @@ class CollectionProcessor:
if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING:
logger.info(f"New granule '{granule}' detected for
forward-processing ingestion "
f"in collection '{collection.dataset_id}'.")
- use_priority = collection.forward_processing_priority
+ if collection.forward_processing_priority is not None:
+ use_priority = collection.forward_processing_priority
+ else:
+ use_priority = collection.historical_priority
elif granule_status is GranuleStatus.DESIRED_HISTORICAL:
logger.info(f"New granule '{granule}' detected for historical
ingestion in collection "
f"'{collection.dataset_id}'.")
diff --git
a/collection_manager/collection_manager/services/CollectionWatcher.py
b/collection_manager/collection_manager/services/CollectionWatcher.py
index 65807c9..0fef980 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -10,7 +10,8 @@ from yaml.scanner import ScannerError
from collection_manager.entities import Collection
from collection_manager.entities.exceptions import RelativePathError,
YamlParsingError, \
- CollectionConfigFileNotFoundError
+ CollectionConfigFileNotFoundError, MissingValueCollectionError,
ConflictingPathCollectionError, \
+ RelativePathCollectionError
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@@ -52,20 +53,12 @@ class CollectionWatcher:
"""
return {collection for collections in
self._collections_by_dir.values() for collection in collections}
- def _collection_is_valid(self, collection: Collection):
+ def _validate_collection(self, collection: Collection):
directory = collection.directory()
-
if not os.path.isabs(directory):
- logger.error(f"Relative paths are not allowed for the 'path'
property of a collection. "
- f"Ignoring collection '{collection.dataset_id}' until
its path is fixed.")
- return False
+ raise RelativePathCollectionError(collection=collection)
if directory == os.path.dirname(self._collections_path):
- logger.error(f"Collection {collection.dataset_id} has granule path
{collection.path} "
- f"which uses same directory as the collection
configuration file, "
- f"{self._collections_path}. The granules need to be
in their own directory. "
- f"Ignoring collection {collection.dataset_id} for
now.")
- return False
- return True
+ raise ConflictingPathCollectionError(collection=collection)
def _load_collections(self):
try:
@@ -73,9 +66,20 @@ class CollectionWatcher:
collections_yaml = yaml.load(f, Loader=yaml.FullLoader)
self._collections_by_dir.clear()
for collection_dict in collections_yaml['collections']:
- collection = Collection.from_dict(collection_dict)
- if self._collection_is_valid(collection):
+ try:
+ collection = Collection.from_dict(collection_dict)
+ self._validate_collection(collection)
self._collections_by_dir[collection.directory()].add(collection)
+ except MissingValueCollectionError as e:
+ logger.error(f"A collection is missing
'{e.missing_value}'. Ignoring this collection for now.")
+ except RelativePathCollectionError as e:
+ logger.error(f"Relative paths are not allowed for the
'path' property of a collection. "
+ f"Ignoring collection
'{e.collection.dataset_id}' until its path is fixed.")
+ except ConflictingPathCollectionError as e:
+ logger.error(f"Collection '{e.collection.dataset_id}' has
granule path '{e.collection.path}' "
+ f"which uses same directory as the collection
configuration file, "
+ f"'{self._collections_path}'. The granules
need to be in their own directory. "
+ f"Ignoring collection
'{e.collection.dataset_id}' for now.")
except FileNotFoundError:
raise CollectionConfigFileNotFoundError("The collection config
file could not be found at "
f"{self._collections_path}")
diff --git a/collection_manager/tests/entities/test_Collection.py
b/collection_manager/tests/entities/test_Collection.py
new file mode 100644
index 0000000..6b276a1
--- /dev/null
+++ b/collection_manager/tests/entities/test_Collection.py
@@ -0,0 +1,139 @@
+import os
+import unittest
+from datetime import datetime, timezone
+
+from collection_manager.entities import Collection
+from collection_manager.entities.exceptions import MissingValueCollectionError
+
+
+class TestCollection(unittest.TestCase):
+
+ def test_directory_with_directory(self):
+ directory = os.path.join(os.path.dirname(__file__),
"../resources/data")
+ collection = Collection(dataset_id="test_dataset",
+ path=directory,
+ variable="test_variable",
+ historical_priority=1,
+ forward_processing_priority=2,
+ date_from=None,
+ date_to=None)
+ self.assertEquals(directory, collection.directory())
+
+ def test_directory_with_pattern(self):
+ pattern = os.path.join(os.path.dirname(__file__),
"../resources/data/*.nc")
+ collection = Collection(dataset_id="test_dataset",
+ path=pattern,
+ variable="test_variable",
+ historical_priority=1,
+ forward_processing_priority=2,
+ date_from=None,
+ date_to=None)
+ self.assertEquals(os.path.dirname(pattern), collection.directory())
+
+ def test_owns_file_raises_exception_with_directory(self):
+ directory = os.path.join(os.path.dirname(__file__),
"../resources/data")
+ collection = Collection(dataset_id="test_dataset",
+ path=directory,
+ variable="test_variable",
+ historical_priority=1,
+ forward_processing_priority=2,
+ date_from=None,
+ date_to=None)
+ self.assertRaises(IsADirectoryError, collection.owns_file, directory)
+
+ def test_owns_file_matches(self):
+ directory = os.path.join(os.path.dirname(__file__),
"../resources/data")
+ collection = Collection(dataset_id="test_dataset",
+ path=directory,
+ variable="test_variable",
+ historical_priority=1,
+ forward_processing_priority=2,
+ date_from=None,
+ date_to=None)
+ file_path = os.path.join(directory, "test_file.nc")
+ self.assertTrue(collection.owns_file(file_path))
+
+ def test_owns_file_does_not_match(self):
+ directory = os.path.join(os.path.dirname(__file__),
"../resources/data")
+ collection = Collection(dataset_id="test_dataset",
+ path=directory,
+ variable="test_variable",
+ historical_priority=1,
+ forward_processing_priority=2,
+ date_from=None,
+ date_to=None)
+ self.assertFalse(collection.owns_file("test_file.nc"))
+
+ def test_owns_file_matches_pattern(self):
+ directory = os.path.join(os.path.dirname(__file__),
"../resources/data")
+ pattern = os.path.join(directory, "test_*.nc")
+ collection = Collection(dataset_id="test_dataset",
+ path=pattern,
+ variable="test_variable",
+ historical_priority=1,
+ forward_processing_priority=2,
+ date_from=None,
+ date_to=None)
+ file_path = os.path.join(directory, "test_file.nc")
+ self.assertTrue(collection.owns_file(file_path))
+
+ def test_owns_file_does_not_match_pattern(self):
+ directory = os.path.join(os.path.dirname(__file__),
"../resources/data")
+ pattern = os.path.join(directory, "test_*.nc")
+ collection = Collection(dataset_id="test_dataset",
+ path=pattern,
+ variable="test_variable",
+ historical_priority=1,
+ forward_processing_priority=2,
+ date_from=None,
+ date_to=None)
+ file_path = os.path.join(directory, "nonmatch.nc")
+ self.assertFalse(collection.owns_file(file_path))
+
+ def test_from_dict(self):
+ collection_dict = {
+ 'id': 'test_id',
+ 'variable': 'test_var',
+ 'path': '/some/path',
+ 'priority': 1,
+ 'forward-processing-priority': 2,
+ 'from': '2020-01-01T00:00:00+00:00',
+ 'to': '2020-02-01T00:00:00+00:00'
+ }
+
+ expected_collection = Collection(dataset_id='test_id',
+ variable='test_var',
+ path='/some/path',
+ historical_priority=1,
+ forward_processing_priority=2,
+ date_from=datetime(2020, 1, 1, 0, 0,
0, tzinfo=timezone.utc),
+ date_to=datetime(2020, 2, 1, 0, 0, 0,
tzinfo=timezone.utc))
+
+ self.assertEquals(expected_collection,
Collection.from_dict(collection_dict))
+
+ def test_from_dict_missing_optional_values(self):
+ collection_dict = {
+ 'id': 'test_id',
+ 'variable': 'test_var',
+ 'path': '/some/path',
+ 'priority': 3
+ }
+
+ expected_collection = Collection(dataset_id='test_id',
+ variable='test_var',
+ path='/some/path',
+ historical_priority=3,
+ forward_processing_priority=None,
+ date_from=None,
+ date_to=None)
+
+ self.assertEquals(expected_collection,
Collection.from_dict(collection_dict))
+
+ def test_from_dict_missing_required_values(self):
+ collection_dict = {
+ 'id': 'test_id',
+ 'variable': 'test_var',
+ 'path': '/some/path',
+ }
+
+ self.assertRaises(MissingValueCollectionError, Collection.from_dict,
collection_dict)
diff --git a/collection_manager/tests/services/test_CollectionProcessor.py
b/collection_manager/tests/services/test_CollectionProcessor.py
index c95821d..7899e22 100644
--- a/collection_manager/tests/services/test_CollectionProcessor.py
+++ b/collection_manager/tests/services/test_CollectionProcessor.py
@@ -108,6 +108,27 @@ class TestCollectionProcessor(unittest.TestCase):
@mock.patch('collection_manager.services.history_manager.FileIngestionHistory',
autospec=True)
@mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder',
autospec=True)
@mock.patch('collection_manager.services.MessagePublisher', autospec=True)
+ def
test_process_granule_with_forward_processing_granule_and_no_priority(self,
mock_publisher,
+
mock_history_builder, mock_history):
+ mock_history.get_granule_status.return_value =
GranuleStatus.DESIRED_FORWARD_PROCESSING
+ mock_history_builder.build.return_value = mock_history
+
+ collection_processor = CollectionProcessor(mock_publisher,
mock_history_builder)
+ collection = Collection(dataset_id="test_dataset",
+ path="test_path",
+ variable="test_variable",
+ historical_priority=1,
+ date_from=None,
+ date_to=None)
+
+ collection_processor.process_granule("test.h5", collection)
+
+ mock_publisher.publish_message.assert_called_with(body=mock.ANY,
priority=1)
+ mock_history.push.assert_called()
+
+
@mock.patch('collection_manager.services.history_manager.FileIngestionHistory',
autospec=True)
+
@mock.patch('collection_manager.services.history_manager.FileIngestionHistoryBuilder',
autospec=True)
+ @mock.patch('collection_manager.services.MessagePublisher', autospec=True)
def test_process_granule_with_undesired_granule(self, mock_publisher,
mock_history_builder, mock_history):
mock_history.get_granule_status.return_value = GranuleStatus.UNDESIRED
mock_history_builder.build.return_value = mock_history
diff --git a/collection_manager/tests/services/test_CollectionWatcher.py
b/collection_manager/tests/services/test_CollectionWatcher.py
index 09300b3..f89d619 100644
--- a/collection_manager/tests/services/test_CollectionWatcher.py
+++ b/collection_manager/tests/services/test_CollectionWatcher.py
@@ -6,7 +6,8 @@ from datetime import datetime
from unittest.mock import Mock
from collection_manager.entities import Collection
-from collection_manager.entities.exceptions import YamlParsingError,
CollectionConfigFileNotFoundError
+from collection_manager.entities.exceptions import YamlParsingError,
CollectionConfigFileNotFoundError, \
+ RelativePathCollectionError, ConflictingPathCollectionError
from collection_manager.services import CollectionWatcher
@@ -74,7 +75,7 @@ class TestCollectionWatcher(unittest.TestCase):
self.assertEquals(len(updated_collections), 1)
- def test_collection_is_valid(self):
+ def test_validate_collection(self):
collections_path = os.path.join(os.path.dirname(__file__),
'../resources/collections.yml')
collection_watcher = CollectionWatcher(collections_path, Mock(),
Mock())
@@ -85,9 +86,9 @@ class TestCollectionWatcher(unittest.TestCase):
forward_processing_priority=2,
date_from=None,
date_to=None)
- self.assertTrue(collection_watcher._collection_is_valid(collection))
+ collection_watcher._validate_collection(collection)
- def test_collection_is_valid_with_relative_path(self):
+ def test_validate_collection_with_relative_path(self):
collections_path = os.path.join(os.path.dirname(__file__),
'../resources/collections.yml')
collection_watcher = CollectionWatcher(collections_path, Mock(),
Mock())
@@ -98,9 +99,9 @@ class TestCollectionWatcher(unittest.TestCase):
forward_processing_priority=2,
date_from=None,
date_to=None)
- self.assertFalse(collection_watcher._collection_is_valid(collection))
+ self.assertRaises(RelativePathCollectionError,
collection_watcher._validate_collection, collection)
- def test_collection_is_valid_with_conflicting_path(self):
+ def test_validate_collection_with_conflicting_path(self):
collections_path = os.path.join(os.path.dirname(__file__),
'/resources/collections.yml')
collection_watcher = CollectionWatcher(collections_path, Mock(),
Mock())
@@ -111,7 +112,7 @@ class TestCollectionWatcher(unittest.TestCase):
forward_processing_priority=2,
date_from=None,
date_to=None)
- self.assertFalse(collection_watcher._collection_is_valid(collection))
+ self.assertRaises(ConflictingPathCollectionError,
collection_watcher._validate_collection, collection)
def test_collection_callback_is_called(self):
with tempfile.NamedTemporaryFile("w+b", buffering=0) as
collections_config: