This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch tests
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/tests by this push:
     new 5a1d9d5  wip tests
5a1d9d5 is described below

commit 5a1d9d533e3bb3d7bac003205a0544b35ef80e7e
Author: Eamon Ford <[email protected]>
AuthorDate: Thu Jun 11 16:27:09 2020 -0700

    wip tests
---
 .../entities/exceptions/Exceptions.py              | 10 +++
 .../entities/exceptions/__init__.py                |  1 +
 collection_manager/collection_manager/main.py      | 58 ++++++++++-------
 .../services/CollectionWatcher.py                  | 72 +++++++++++++---------
 collection_manager/tests/resources/collections.yml | 17 +++++
 .../tests/resources/collections_bad.yml            | 17 +++++
 .../tests/resources/data/collections.yml           | 11 ----
 .../resources/data/dataset_config_file_ok.yml      | 44 -------------
 .../tests/services/test_CollectionWatcher.py       | 68 ++++++++++++++++++++
 9 files changed, 193 insertions(+), 105 deletions(-)

diff --git 
a/collection_manager/collection_manager/entities/exceptions/Exceptions.py 
b/collection_manager/collection_manager/entities/exceptions/Exceptions.py
new file mode 100644
index 0000000..9321371
--- /dev/null
+++ b/collection_manager/collection_manager/entities/exceptions/Exceptions.py
@@ -0,0 +1,10 @@
+class RelativePathError(Exception):
+    pass
+
+
+class YamlParsingError(Exception):
+    pass
+
+
+class CollectionConfigFileNotFoundError(Exception):
+    pass
diff --git 
a/collection_manager/collection_manager/entities/exceptions/__init__.py 
b/collection_manager/collection_manager/entities/exceptions/__init__.py
new file mode 100644
index 0000000..1674a4e
--- /dev/null
+++ b/collection_manager/collection_manager/entities/exceptions/__init__.py
@@ -0,0 +1 @@
+from .Exceptions import *
diff --git a/collection_manager/collection_manager/main.py 
b/collection_manager/collection_manager/main.py
index fa4d7ce..a10446f 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -1,5 +1,6 @@
 import argparse
 import logging
+import os
 import time
 
 from collection_manager.services import CollectionProcessor, 
CollectionWatcher, MessagePublisher
@@ -7,7 +8,13 @@ from collection_manager.services.history_manager import 
SolrIngestionHistoryBuil
 
 logging.basicConfig(level=logging.INFO)
 logging.getLogger("pika").setLevel(logging.WARNING)
-logger = logging.getLogger(__name__)
+logger = logging.getLogger("collection_manager")
+
+
+def check_path(path) -> str:
+    if not os.path.isabs(path):
+        raise argparse.ArgumentError("Paths must be absolute.")
+    return path
 
 
 def get_args() -> argparse.Namespace:
@@ -16,7 +23,7 @@ def get_args() -> argparse.Namespace:
                         help="refresh interval in seconds to check for new or 
updated granules",
                         default=300)
     parser.add_argument("--collections",
-                        help="path to collections configuration file",
+                        help="Absolute path to collections configuration file",
                         required=True)
     parser.add_argument('--rabbitmq_host',
                         default='localhost',
@@ -36,32 +43,39 @@ def get_args() -> argparse.Namespace:
                         help='Name of the RabbitMQ queue to consume from. 
(Default: "nexus")')
     history_group = parser.add_mutually_exclusive_group(required=True)
     history_group.add_argument("--history-path",
-                               help="path to ingestion history local 
directory")
+                               help="Absolute path to ingestion history local 
directory")
     history_group.add_argument("--history-url",
-                               help="url to ingestion history solr database")
+                               help="URL to ingestion history solr database")
+
     return parser.parse_args()
 
 
 def main():
-    options = get_args()
-    if options.history_path:
-        history_manager_builder = 
FileIngestionHistoryBuilder(history_path=options.history_path)
-    else:
-        history_manager_builder = 
SolrIngestionHistoryBuilder(solr_url=options.history_url)
-    publisher = MessagePublisher(host=options.rabbitmq_host,
-                                 username=options.rabbitmq_username,
-                                 password=options.rabbitmq_password,
-                                 queue=options.rabbitmq_queue)
-    publisher.connect()
-    collection_processor = CollectionProcessor(message_publisher=publisher,
-                                               
history_manager_builder=history_manager_builder)
-    collection_watcher = 
CollectionWatcher(collections_path=options.collections,
-                                           
collection_updated_callback=collection_processor.process_collection,
-                                           
granule_updated_callback=collection_processor.process_granule)
+    try:
+        options = get_args()
+
+        if options.history_path:
+            history_manager_builder = 
FileIngestionHistoryBuilder(history_path=options.history_path)
+        else:
+            history_manager_builder = 
SolrIngestionHistoryBuilder(solr_url=options.history_url)
+        publisher = MessagePublisher(host=options.rabbitmq_host,
+                                     username=options.rabbitmq_username,
+                                     password=options.rabbitmq_password,
+                                     queue=options.rabbitmq_queue)
+        publisher.connect()
+        collection_processor = CollectionProcessor(message_publisher=publisher,
+                                                   
history_manager_builder=history_manager_builder)
+        collection_watcher = 
CollectionWatcher(collections_path=options.collections,
+                                               
collection_updated_callback=collection_processor.process_collection,
+                                               
granule_updated_callback=collection_processor.process_granule)
+
+        collection_watcher.start_watching()
+        while True:
+            time.sleep(1)
 
-    collection_watcher.start_watching()
-    while True:
-        time.sleep(1)
+    except Exception as e:
+        logger.error(e)
+        return
 
 
 if __name__ == "__main__":
diff --git 
a/collection_manager/collection_manager/services/CollectionWatcher.py 
b/collection_manager/collection_manager/services/CollectionWatcher.py
index b1fca64..5261166 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,7 +1,7 @@
 import logging
 import os
 from collections import defaultdict
-from typing import List, Dict, Callable, Set
+from typing import Dict, Callable, Set
 
 import yaml
 from watchdog.events import FileSystemEventHandler
@@ -9,6 +9,8 @@ from watchdog.observers import Observer
 from yaml.scanner import ScannerError
 
 from collection_manager.entities import Collection
+from collection_manager.entities.exceptions import RelativePathError, 
YamlParsingError, \
+    CollectionConfigFileNotFoundError
 
 logger = logging.getLogger(__name__)
 logger.setLevel(logging.DEBUG)
@@ -19,10 +21,13 @@ class CollectionWatcher:
                  collections_path: str,
                  collection_updated_callback: Callable[[Collection], any],
                  granule_updated_callback: Callable[[str, Collection], any]):
+        if not os.path.isabs(collections_path):
+            raise RelativePathError("Collections path must be an absolute 
path.")
+
         self._collections_path = collections_path
         self._collection_updated_callback = collection_updated_callback
         self._granule_updated_callback = granule_updated_callback
-        
+
         self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
         self._observer = Observer()
 
@@ -37,46 +42,57 @@ class CollectionWatcher:
         self._observer.start()
         self._refresh()
 
-    def collections(self) -> List[Collection]:
+    def collections(self) -> Set[Collection]:
         """
-        Return a list of all Collections being watched.
-        :return: A list of Collections
+        Return a set of all Collections being watched.
+        :return: A set of Collections
         """
-        return [collection for collections in 
self._collections_by_dir.values() for collection in collections]
+        return {collection for collections in 
self._collections_by_dir.values() for collection in collections}
+
+    def _collection_is_valid(self, collection: Collection):
+        directory = collection.directory()
+
+        if not os.path.isabs(directory):
+            logger.error(f"Relative paths are not allowed for the 'path' 
property of a collection. "
+                         f"Ignoring collection '{collection.dataset_id}' until 
its path is fixed.")
+            return False
+        if directory == os.path.dirname(self._collections_path):
+            logger.error(f"Collection {collection.dataset_id} uses granule 
directory {collection.path} "
+                         f"which is the same directory as the collection 
configuration file, "
+                         f"{self._collections_path}. The granules need to be 
in their own directory. "
+                         f"Ignoring collection {collection.dataset_id} for 
now.")
+            return False
+        return True
 
     def _load_collections(self):
         try:
             with open(self._collections_path, 'r') as f:
                 collections_yaml = yaml.load(f, Loader=yaml.FullLoader)
             self._collections_by_dir.clear()
-            for _, collection_dict in collections_yaml.items():
+            for collection_dict in collections_yaml['collections']:
                 collection = Collection.from_dict(collection_dict)
-                directory = collection.directory()
-                if directory == os.path.dirname(self._collections_path):
-                    logger.error(f"Collection {collection.dataset_id} uses 
granule directory {collection.path} "
-                                 f"which is the same directory as the 
collection configuration file, "
-                                 f"{self._collections_path}. The granules need 
to be in their own directory. "
-                                 f"Ignoring collection {collection.dataset_id} 
for now.")
-                else:
-                    self._collections_by_dir[directory].add(collection)
-
+                if self._collection_is_valid(collection):
+                    
self._collections_by_dir[collection.directory()].add(collection)
         except FileNotFoundError:
-            logger.error(f"Collection configuration file not found at 
{self._collections_path}.")
+            raise CollectionConfigFileNotFoundError("The collection config 
file could not be found at "
+                                                    
f"{self._collections_path}")
         except yaml.scanner.ScannerError:
-            logger.error(f"Bad YAML syntax in collection configuration file. 
Will attempt to reload collections "
-                         f"after the next configuration change.")
-
-    def _refresh(self):
-        for collection in self._get_updated_collections():
-            self._collection_updated_callback(collection)
+            raise YamlParsingError("Bad YAML syntax in collection 
configuration file. Will attempt to reload "
+                                   "collections after the next configuration 
change.")
 
-        self._observer.unschedule_all()
-        self._schedule_watches()
-
-    def _get_updated_collections(self) -> List[Collection]:
+    def _get_updated_collections(self) -> Set[Collection]:
         old_collections = self.collections()
         self._load_collections()
-        return list(set(self.collections()) - set(old_collections))
+        return self.collections() - old_collections
+
+    def _refresh(self):
+        try:
+            for collection in self._get_updated_collections():
+                self._collection_updated_callback(collection)
+            self._observer.unschedule_all()
+            self._schedule_watches()
+        except YamlParsingError as e:
+            logger.error(e)
 
     def _schedule_watches(self):
         for directory, collections in self._collections_by_dir.items():
diff --git a/collection_manager/tests/resources/collections.yml 
b/collection_manager/tests/resources/collections.yml
new file mode 100644
index 0000000..89524ec
--- /dev/null
+++ b/collection_manager/tests/resources/collections.yml
@@ -0,0 +1,17 @@
+collections:
+  - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
+    path: /opt/data/grace/*land*.nc
+    variable: lwe_thickness
+    priority: 1
+    forward-processing-priority: 5
+
+  - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_OCEAN
+    path: /opt/data/grace/*ocean*.nc
+    variable: lwe_thickness
+    priority: 2
+    forward-processing-priority: 6
+
+  - id: AVHRR_OI-NCEI-L4-GLOB-v2.0
+    path: /opt/data/avhrr/*.nc
+    variable: analysed_sst
+    priority: 1
diff --git a/collection_manager/tests/resources/collections_bad.yml 
b/collection_manager/tests/resources/collections_bad.yml
new file mode 100644
index 0000000..cac6a32
--- /dev/null
+++ b/collection_manager/tests/resources/collections_bad.yml
@@ -0,0 +1,17 @@
+collections:
+  - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
+    path: /opt/data/grace/*land*.nc
+    variable: lwe_thickness
+    priority: 1
+    forward-processing-priority: 5
+BAD SYNTAX!
+  - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_OCEAN
+    path: /opt/data/grace/*ocean*.nc
+    variable: lwe_thickness
+    priority: 2
+    forward-processing-priority: 6
+
+  - id: AVHRR_OI-NCEI-L4-GLOB-v2.0
+    path: /opt/data/avhrr/*.nc
+    variable: analysed_sst
+    priority: 1
diff --git a/collection_manager/tests/resources/data/collections.yml 
b/collection_manager/tests/resources/data/collections.yml
deleted file mode 100644
index 8c30a37..0000000
--- a/collection_manager/tests/resources/data/collections.yml
+++ /dev/null
@@ -1,11 +0,0 @@
-avhrr-oi-analysed-sst:
-  id: avhrr-oi-analysed-sst
-  path: ../tests/resources/data/avhrr_oi/*.nc
-  variable: analysed_sst
-  priority: 9
-
-avhrr-oi-analysed-sst2:
-  id: avhrr-oi-analysed-sst2
-  path: ../tests/resources/data/avhrr_oi/*.nc
-  variable: analysed_sst
-  priority: 1
diff --git a/collection_manager/tests/resources/data/dataset_config_file_ok.yml 
b/collection_manager/tests/resources/data/dataset_config_file_ok.yml
deleted file mode 100644
index 66bb883..0000000
--- a/collection_manager/tests/resources/data/dataset_config_file_ok.yml
+++ /dev/null
@@ -1,44 +0,0 @@
-# Tile Slicer Config
-ningester:
-    tile_slicer: sliceFileByTilesDesired
-    sliceFileByTilesDesired:
-      tilesDesired: 1296
-      timeDimension: time
-      dimensions:
-        - lat
-        - lon
----
-# Tile processors configuration
-ningester:
-  tile_processors:
-    - pythonChainProcessor
-    - generateTileId
-    - addDatasetName
-  pythonChainProcessor:
-    enabled:
-    processor_list:
-      -
-        name: GridReadingProcessor
-        config:
-          latitude: lat
-          longitude: lon
-          time: time
-          variable_to_read: analysed_sst
-      -
-        name: EmptyTileFilter
-      -
-        name: TileSummarizingProcessor
-        config:
-          stored_var_name: analysed_sst
-  generateTileId:
-    enabled:
-    salt: analysed_sst
-  addDatasetName:
-    enabled:
-    datasetName: avhrr-oi-analysed-sst
----
-# Tile writer configuration
-ningester:
-  tile_writer:
-    data_store: cassandraStore
-    metadata_store: solrStore
diff --git a/collection_manager/tests/services/test_CollectionWatcher.py 
b/collection_manager/tests/services/test_CollectionWatcher.py
new file mode 100644
index 0000000..1b86be0
--- /dev/null
+++ b/collection_manager/tests/services/test_CollectionWatcher.py
@@ -0,0 +1,68 @@
+import os
+import unittest
+from datetime import datetime
+from unittest.mock import Mock
+
+from collection_manager.entities import Collection
+from collection_manager.entities.exceptions import YamlParsingError, 
CollectionConfigFileNotFoundError
+from collection_manager.services import CollectionWatcher
+
+
+class TestCollectionWatcher(unittest.TestCase):
+
+    def test_collections_returns_all_collections(self):
+        collection_watcher = CollectionWatcher('/foo', Mock(), Mock())
+        collection_watcher._collections_by_dir = {
+            "/foo": {
+                Collection("id1", "var1", "path1", 1, 2, datetime.now(), 
datetime.now()),
+                Collection("id2", "var2", "path2", 3, 4, datetime.now(), 
datetime.now()),
+            },
+            "/bar": {
+                Collection("id3", "var3", "path3", 5, 6, datetime.now(), 
datetime.now()),
+                Collection("id4", "var4", "path4", 7, 8, datetime.now(), 
datetime.now()),
+            }
+        }
+        flattened_collections = collection_watcher.collections()
+        self.assertEquals(len(flattened_collections), 4)
+
+    def test_load_collections_loads_all_collections(self):
+        collections_path = os.path.join(os.path.dirname(__file__), 
'../resources/collections.yml')
+        collection_watcher = CollectionWatcher(collections_path, Mock(), 
Mock())
+        collection_watcher._load_collections()
+
+        self.assertEquals(len(collection_watcher._collections_by_dir), 2)
+        
self.assertEquals(len(collection_watcher._collections_by_dir['/opt/data/grace']),
 2)
+        
self.assertEquals(len(collection_watcher._collections_by_dir['/opt/data/avhrr']),
 1)
+
+    def test_load_collections_with_bad_yaml_syntax(self):
+        collections_path = os.path.join(os.path.dirname(__file__), 
'../resources/collections_bad.yml')
+        collection_watcher = CollectionWatcher(collections_path, Mock(), 
Mock())
+
+        self.assertRaises(YamlParsingError, 
collection_watcher._load_collections)
+
+    def test_load_collections_with_file_not_found(self):
+        collections_path = os.path.join(os.path.dirname(__file__), 
'../resources/does_not_exist.yml')
+        collection_watcher = CollectionWatcher(collections_path, Mock(), 
Mock())
+
+        self.assertRaises(CollectionConfigFileNotFoundError, 
collection_watcher._load_collections)
+
+    def test_get_updated_collections_returns_all_collections(self):
+        collections_path = os.path.join(os.path.dirname(__file__), 
'../resources/collections.yml')
+        collection_watcher = CollectionWatcher(collections_path, Mock(), 
Mock())
+
+        updated_collections = collection_watcher._get_updated_collections()
+        self.assertSetEqual(updated_collections, 
collection_watcher.collections())
+
+    def test_get_updated_collections_returns_no_collections(self):
+        collections_path = os.path.join(os.path.dirname(__file__), 
'../resources/collections.yml')
+        collection_watcher = CollectionWatcher(collections_path, Mock(), 
Mock())
+        collection_watcher._load_collections()
+        updated_collections = collection_watcher._get_updated_collections()
+
+        self.assertEquals(len(updated_collections), 0)
+
+    def test_get_updated_collections_returns_some_collections(self):
+        ...
+
+    def test_schedule_watches(self):
+        ...

Reply via email to