This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch tests
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
The following commit(s) were added to refs/heads/tests by this push:
new 5a1d9d5 wip tests
5a1d9d5 is described below
commit 5a1d9d533e3bb3d7bac003205a0544b35ef80e7e
Author: Eamon Ford <[email protected]>
AuthorDate: Thu Jun 11 16:27:09 2020 -0700
wip tests
---
.../entities/exceptions/Exceptions.py | 10 +++
.../entities/exceptions/__init__.py | 1 +
collection_manager/collection_manager/main.py | 58 ++++++++++-------
.../services/CollectionWatcher.py | 72 +++++++++++++---------
collection_manager/tests/resources/collections.yml | 17 +++++
.../tests/resources/collections_bad.yml | 17 +++++
.../tests/resources/data/collections.yml | 11 ----
.../resources/data/dataset_config_file_ok.yml | 44 -------------
.../tests/services/test_CollectionWatcher.py | 68 ++++++++++++++++++++
9 files changed, 193 insertions(+), 105 deletions(-)
diff --git
a/collection_manager/collection_manager/entities/exceptions/Exceptions.py
b/collection_manager/collection_manager/entities/exceptions/Exceptions.py
new file mode 100644
index 0000000..9321371
--- /dev/null
+++ b/collection_manager/collection_manager/entities/exceptions/Exceptions.py
@@ -0,0 +1,10 @@
+class RelativePathError(Exception):
+ pass
+
+
+class YamlParsingError(Exception):
+ pass
+
+
+class CollectionConfigFileNotFoundError(Exception):
+ pass
diff --git
a/collection_manager/collection_manager/entities/exceptions/__init__.py
b/collection_manager/collection_manager/entities/exceptions/__init__.py
new file mode 100644
index 0000000..1674a4e
--- /dev/null
+++ b/collection_manager/collection_manager/entities/exceptions/__init__.py
@@ -0,0 +1 @@
+from .Exceptions import *
diff --git a/collection_manager/collection_manager/main.py
b/collection_manager/collection_manager/main.py
index fa4d7ce..a10446f 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -1,5 +1,6 @@
import argparse
import logging
+import os
import time
from collection_manager.services import CollectionProcessor,
CollectionWatcher, MessagePublisher
@@ -7,7 +8,13 @@ from collection_manager.services.history_manager import
SolrIngestionHistoryBuil
logging.basicConfig(level=logging.INFO)
logging.getLogger("pika").setLevel(logging.WARNING)
-logger = logging.getLogger(__name__)
+logger = logging.getLogger("collection_manager")
+
+
+def check_path(path) -> str:
+ if not os.path.isabs(path):
+ raise argparse.ArgumentError("Paths must be absolute.")
+ return path
def get_args() -> argparse.Namespace:
@@ -16,7 +23,7 @@ def get_args() -> argparse.Namespace:
help="refresh interval in seconds to check for new or
updated granules",
default=300)
parser.add_argument("--collections",
- help="path to collections configuration file",
+ help="Absolute path to collections configuration file",
required=True)
parser.add_argument('--rabbitmq_host',
default='localhost',
@@ -36,32 +43,39 @@ def get_args() -> argparse.Namespace:
help='Name of the RabbitMQ queue to consume from.
(Default: "nexus")')
history_group = parser.add_mutually_exclusive_group(required=True)
history_group.add_argument("--history-path",
- help="path to ingestion history local
directory")
+ help="Absolute path to ingestion history local
directory")
history_group.add_argument("--history-url",
- help="url to ingestion history solr database")
+ help="URL to ingestion history solr database")
+
return parser.parse_args()
def main():
- options = get_args()
- if options.history_path:
- history_manager_builder =
FileIngestionHistoryBuilder(history_path=options.history_path)
- else:
- history_manager_builder =
SolrIngestionHistoryBuilder(solr_url=options.history_url)
- publisher = MessagePublisher(host=options.rabbitmq_host,
- username=options.rabbitmq_username,
- password=options.rabbitmq_password,
- queue=options.rabbitmq_queue)
- publisher.connect()
- collection_processor = CollectionProcessor(message_publisher=publisher,
-
history_manager_builder=history_manager_builder)
- collection_watcher =
CollectionWatcher(collections_path=options.collections,
-
collection_updated_callback=collection_processor.process_collection,
-
granule_updated_callback=collection_processor.process_granule)
+ try:
+ options = get_args()
+
+ if options.history_path:
+ history_manager_builder =
FileIngestionHistoryBuilder(history_path=options.history_path)
+ else:
+ history_manager_builder =
SolrIngestionHistoryBuilder(solr_url=options.history_url)
+ publisher = MessagePublisher(host=options.rabbitmq_host,
+ username=options.rabbitmq_username,
+ password=options.rabbitmq_password,
+ queue=options.rabbitmq_queue)
+ publisher.connect()
+ collection_processor = CollectionProcessor(message_publisher=publisher,
+
history_manager_builder=history_manager_builder)
+ collection_watcher =
CollectionWatcher(collections_path=options.collections,
+
collection_updated_callback=collection_processor.process_collection,
+
granule_updated_callback=collection_processor.process_granule)
+
+ collection_watcher.start_watching()
+ while True:
+ time.sleep(1)
- collection_watcher.start_watching()
- while True:
- time.sleep(1)
+ except Exception as e:
+ logger.error(e)
+ return
if __name__ == "__main__":
diff --git
a/collection_manager/collection_manager/services/CollectionWatcher.py
b/collection_manager/collection_manager/services/CollectionWatcher.py
index b1fca64..5261166 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,7 +1,7 @@
import logging
import os
from collections import defaultdict
-from typing import List, Dict, Callable, Set
+from typing import Dict, Callable, Set
import yaml
from watchdog.events import FileSystemEventHandler
@@ -9,6 +9,8 @@ from watchdog.observers import Observer
from yaml.scanner import ScannerError
from collection_manager.entities import Collection
+from collection_manager.entities.exceptions import RelativePathError,
YamlParsingError, \
+ CollectionConfigFileNotFoundError
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@@ -19,10 +21,13 @@ class CollectionWatcher:
collections_path: str,
collection_updated_callback: Callable[[Collection], any],
granule_updated_callback: Callable[[str, Collection], any]):
+ if not os.path.isabs(collections_path):
+ raise RelativePathError("Collections path must be an absolute
path.")
+
self._collections_path = collections_path
self._collection_updated_callback = collection_updated_callback
self._granule_updated_callback = granule_updated_callback
-
+
self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
self._observer = Observer()
@@ -37,46 +42,57 @@ class CollectionWatcher:
self._observer.start()
self._refresh()
- def collections(self) -> List[Collection]:
+ def collections(self) -> Set[Collection]:
"""
- Return a list of all Collections being watched.
- :return: A list of Collections
+ Return a set of all Collections being watched.
+ :return: A set of Collections
"""
- return [collection for collections in
self._collections_by_dir.values() for collection in collections]
+ return {collection for collections in
self._collections_by_dir.values() for collection in collections}
+
+ def _collection_is_valid(self, collection: Collection):
+ directory = collection.directory()
+
+ if not os.path.isabs(directory):
+ logger.error(f"Relative paths are not allowed for the 'path'
property of a collection. "
+ f"Ignoring collection '{collection.dataset_id}' until
its path is fixed.")
+ return False
+ if directory == os.path.dirname(self._collections_path):
+ logger.error(f"Collection {collection.dataset_id} uses granule
directory {collection.path} "
+ f"which is the same directory as the collection
configuration file, "
+ f"{self._collections_path}. The granules need to be
in their own directory. "
+ f"Ignoring collection {collection.dataset_id} for
now.")
+ return False
+ return True
def _load_collections(self):
try:
with open(self._collections_path, 'r') as f:
collections_yaml = yaml.load(f, Loader=yaml.FullLoader)
self._collections_by_dir.clear()
- for _, collection_dict in collections_yaml.items():
+ for collection_dict in collections_yaml['collections']:
collection = Collection.from_dict(collection_dict)
- directory = collection.directory()
- if directory == os.path.dirname(self._collections_path):
- logger.error(f"Collection {collection.dataset_id} uses
granule directory {collection.path} "
- f"which is the same directory as the
collection configuration file, "
- f"{self._collections_path}. The granules need
to be in their own directory. "
- f"Ignoring collection {collection.dataset_id}
for now.")
- else:
- self._collections_by_dir[directory].add(collection)
-
+ if self._collection_is_valid(collection):
+
self._collections_by_dir[collection.directory()].add(collection)
except FileNotFoundError:
- logger.error(f"Collection configuration file not found at
{self._collections_path}.")
+ raise CollectionConfigFileNotFoundError("The collection config
file could not be found at "
+
f"{self._collections_path}")
except yaml.scanner.ScannerError:
- logger.error(f"Bad YAML syntax in collection configuration file.
Will attempt to reload collections "
- f"after the next configuration change.")
-
- def _refresh(self):
- for collection in self._get_updated_collections():
- self._collection_updated_callback(collection)
+ raise YamlParsingError("Bad YAML syntax in collection
configuration file. Will attempt to reload "
+ "collections after the next configuration
change.")
- self._observer.unschedule_all()
- self._schedule_watches()
-
- def _get_updated_collections(self) -> List[Collection]:
+ def _get_updated_collections(self) -> Set[Collection]:
old_collections = self.collections()
self._load_collections()
- return list(set(self.collections()) - set(old_collections))
+ return self.collections() - old_collections
+
+ def _refresh(self):
+ try:
+ for collection in self._get_updated_collections():
+ self._collection_updated_callback(collection)
+ self._observer.unschedule_all()
+ self._schedule_watches()
+ except YamlParsingError as e:
+ logger.error(e)
def _schedule_watches(self):
for directory, collections in self._collections_by_dir.items():
diff --git a/collection_manager/tests/resources/collections.yml
b/collection_manager/tests/resources/collections.yml
new file mode 100644
index 0000000..89524ec
--- /dev/null
+++ b/collection_manager/tests/resources/collections.yml
@@ -0,0 +1,17 @@
+collections:
+ - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
+ path: /opt/data/grace/*land*.nc
+ variable: lwe_thickness
+ priority: 1
+ forward-processing-priority: 5
+
+ - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_OCEAN
+ path: /opt/data/grace/*ocean*.nc
+ variable: lwe_thickness
+ priority: 2
+ forward-processing-priority: 6
+
+ - id: AVHRR_OI-NCEI-L4-GLOB-v2.0
+ path: /opt/data/avhrr/*.nc
+ variable: analysed_sst
+ priority: 1
diff --git a/collection_manager/tests/resources/collections_bad.yml
b/collection_manager/tests/resources/collections_bad.yml
new file mode 100644
index 0000000..cac6a32
--- /dev/null
+++ b/collection_manager/tests/resources/collections_bad.yml
@@ -0,0 +1,17 @@
+collections:
+ - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND
+ path: /opt/data/grace/*land*.nc
+ variable: lwe_thickness
+ priority: 1
+ forward-processing-priority: 5
+BAD SYNTAX!
+ - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_OCEAN
+ path: /opt/data/grace/*ocean*.nc
+ variable: lwe_thickness
+ priority: 2
+ forward-processing-priority: 6
+
+ - id: AVHRR_OI-NCEI-L4-GLOB-v2.0
+ path: /opt/data/avhrr/*.nc
+ variable: analysed_sst
+ priority: 1
diff --git a/collection_manager/tests/resources/data/collections.yml
b/collection_manager/tests/resources/data/collections.yml
deleted file mode 100644
index 8c30a37..0000000
--- a/collection_manager/tests/resources/data/collections.yml
+++ /dev/null
@@ -1,11 +0,0 @@
-avhrr-oi-analysed-sst:
- id: avhrr-oi-analysed-sst
- path: ../tests/resources/data/avhrr_oi/*.nc
- variable: analysed_sst
- priority: 9
-
-avhrr-oi-analysed-sst2:
- id: avhrr-oi-analysed-sst2
- path: ../tests/resources/data/avhrr_oi/*.nc
- variable: analysed_sst
- priority: 1
diff --git a/collection_manager/tests/resources/data/dataset_config_file_ok.yml
b/collection_manager/tests/resources/data/dataset_config_file_ok.yml
deleted file mode 100644
index 66bb883..0000000
--- a/collection_manager/tests/resources/data/dataset_config_file_ok.yml
+++ /dev/null
@@ -1,44 +0,0 @@
-# Tile Slicer Config
-ningester:
- tile_slicer: sliceFileByTilesDesired
- sliceFileByTilesDesired:
- tilesDesired: 1296
- timeDimension: time
- dimensions:
- - lat
- - lon
----
-# Tile processors configuration
-ningester:
- tile_processors:
- - pythonChainProcessor
- - generateTileId
- - addDatasetName
- pythonChainProcessor:
- enabled:
- processor_list:
- -
- name: GridReadingProcessor
- config:
- latitude: lat
- longitude: lon
- time: time
- variable_to_read: analysed_sst
- -
- name: EmptyTileFilter
- -
- name: TileSummarizingProcessor
- config:
- stored_var_name: analysed_sst
- generateTileId:
- enabled:
- salt: analysed_sst
- addDatasetName:
- enabled:
- datasetName: avhrr-oi-analysed-sst
----
-# Tile writer configuration
-ningester:
- tile_writer:
- data_store: cassandraStore
- metadata_store: solrStore
diff --git a/collection_manager/tests/services/test_CollectionWatcher.py
b/collection_manager/tests/services/test_CollectionWatcher.py
new file mode 100644
index 0000000..1b86be0
--- /dev/null
+++ b/collection_manager/tests/services/test_CollectionWatcher.py
@@ -0,0 +1,68 @@
+import os
+import unittest
+from datetime import datetime
+from unittest.mock import Mock
+
+from collection_manager.entities import Collection
+from collection_manager.entities.exceptions import YamlParsingError,
CollectionConfigFileNotFoundError
+from collection_manager.services import CollectionWatcher
+
+
+class TestCollectionWatcher(unittest.TestCase):
+
+ def test_collections_returns_all_collections(self):
+ collection_watcher = CollectionWatcher('/foo', Mock(), Mock())
+ collection_watcher._collections_by_dir = {
+ "/foo": {
+ Collection("id1", "var1", "path1", 1, 2, datetime.now(),
datetime.now()),
+ Collection("id2", "var2", "path2", 3, 4, datetime.now(),
datetime.now()),
+ },
+ "/bar": {
+ Collection("id3", "var3", "path3", 5, 6, datetime.now(),
datetime.now()),
+ Collection("id4", "var4", "path4", 7, 8, datetime.now(),
datetime.now()),
+ }
+ }
+ flattened_collections = collection_watcher.collections()
+ self.assertEquals(len(flattened_collections), 4)
+
+ def test_load_collections_loads_all_collections(self):
+ collections_path = os.path.join(os.path.dirname(__file__),
'../resources/collections.yml')
+ collection_watcher = CollectionWatcher(collections_path, Mock(),
Mock())
+ collection_watcher._load_collections()
+
+ self.assertEquals(len(collection_watcher._collections_by_dir), 2)
+
self.assertEquals(len(collection_watcher._collections_by_dir['/opt/data/grace']),
2)
+
self.assertEquals(len(collection_watcher._collections_by_dir['/opt/data/avhrr']),
1)
+
+ def test_load_collections_with_bad_yaml_syntax(self):
+ collections_path = os.path.join(os.path.dirname(__file__),
'../resources/collections_bad.yml')
+ collection_watcher = CollectionWatcher(collections_path, Mock(),
Mock())
+
+ self.assertRaises(YamlParsingError,
collection_watcher._load_collections)
+
+ def test_load_collections_with_file_not_found(self):
+ collections_path = os.path.join(os.path.dirname(__file__),
'../resources/does_not_exist.yml')
+ collection_watcher = CollectionWatcher(collections_path, Mock(),
Mock())
+
+ self.assertRaises(CollectionConfigFileNotFoundError,
collection_watcher._load_collections)
+
+ def test_get_updated_collections_returns_all_collections(self):
+ collections_path = os.path.join(os.path.dirname(__file__),
'../resources/collections.yml')
+ collection_watcher = CollectionWatcher(collections_path, Mock(),
Mock())
+
+ updated_collections = collection_watcher._get_updated_collections()
+ self.assertSetEqual(updated_collections,
collection_watcher.collections())
+
+ def test_get_updated_collections_returns_no_collections(self):
+ collections_path = os.path.join(os.path.dirname(__file__),
'../resources/collections.yml')
+ collection_watcher = CollectionWatcher(collections_path, Mock(),
Mock())
+ collection_watcher._load_collections()
+ updated_collections = collection_watcher._get_updated_collections()
+
+ self.assertEquals(len(updated_collections), 0)
+
+ def test_get_updated_collections_returns_some_collections(self):
+ ...
+
+ def test_schedule_watches(self):
+ ...