This is an automated email from the ASF dual-hosted git repository. tloubrieu pushed a commit to branch ascending_latitudes in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 2c991c5edd260c57e2402664f79a15b77acf6967 Author: Eamon Ford <[email protected]> AuthorDate: Mon Jun 22 16:51:58 2020 -0700 SDAP-254, SDAP-255, SDAP-256: Fix bug where ingestion history is not saved, fix bug where messages published to RabbitMQ are incorrect, fix bug where bad collection config file crashes app (#3) --- .../entities/exceptions/Exceptions.py | 2 +- .../collection_manager/entities/exceptions/__init__.py | 2 +- collection_manager/collection_manager/main.py | 6 +++++- .../collection_manager/services/CollectionProcessor.py | 6 +++--- .../collection_manager/services/CollectionWatcher.py | 18 +++++++++++++----- .../services/history_manager/FileIngestionHistory.py | 4 ++-- .../services/history_manager/IngestionHistory.py | 6 ++++++ .../services/history_manager/SolrIngestionHistory.py | 3 +-- ...{collections_bad.yml => collections_bad_schema.yml} | 4 ++-- ...{collections_bad.yml => collections_bad_syntax.yml} | 0 .../tests/services/test_CollectionProcessor.py | 6 +++--- .../tests/services/test_CollectionWatcher.py | 12 +++++++++--- 12 files changed, 46 insertions(+), 23 deletions(-) diff --git a/collection_manager/collection_manager/entities/exceptions/Exceptions.py b/collection_manager/collection_manager/entities/exceptions/Exceptions.py index 8e63d24..c18c4c8 100644 --- a/collection_manager/collection_manager/entities/exceptions/Exceptions.py +++ b/collection_manager/collection_manager/entities/exceptions/Exceptions.py @@ -2,7 +2,7 @@ class RelativePathError(Exception): pass -class YamlParsingError(Exception): +class CollectionConfigParsingError(Exception): pass diff --git a/collection_manager/collection_manager/entities/exceptions/__init__.py b/collection_manager/collection_manager/entities/exceptions/__init__.py index 9a22c16..7fac507 100644 --- a/collection_manager/collection_manager/entities/exceptions/__init__.py +++ b/collection_manager/collection_manager/entities/exceptions/__init__.py @@ -3,4 +3,4 @@ from .Exceptions import ConflictingPathCollectionError from .Exceptions import MissingValueCollectionError from .Exceptions import RelativePathCollectionError from .Exceptions import RelativePathError -from .Exceptions import YamlParsingError +from .Exceptions import CollectionConfigParsingError diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py index a10446f..bc2d356 100644 --- a/collection_manager/collection_manager/main.py +++ b/collection_manager/collection_manager/main.py @@ -70,8 +70,12 @@ def main(): granule_updated_callback=collection_processor.process_granule) collection_watcher.start_watching() + while True: - time.sleep(1) + try: + time.sleep(1) + except KeyboardInterrupt: + return except Exception as e: logger.error(e) diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py index a81390b..232cdee 100644 --- a/collection_manager/collection_manager/services/CollectionProcessor.py +++ b/collection_manager/collection_manager/services/CollectionProcessor.py @@ -63,7 +63,7 @@ class CollectionProcessor: f"time range for collection '{collection.dataset_id}'. Skipping.") return - dataset_config = self._fill_template(collection, config_template=self._config_template) + dataset_config = self._fill_template(granule, collection, config_template=self._config_template) self._publisher.publish_message(body=dataset_config, priority=use_priority) history_manager.push(granule) @@ -78,11 +78,11 @@ class CollectionProcessor: return self._history_manager_cache[dataset_id] @staticmethod - def _fill_template(collection: Collection, config_template: str) -> str: + def _fill_template(granule_path: str, collection: Collection, config_template: str) -> str: renderer = pystache.Renderer() config_content = renderer.render(config_template, { - 'granule': collection.path, + 'granule': granule_path, 'dataset_id': collection.dataset_id, 'variable': collection.variable }) diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py index 6bbe7d9..a3c3bf7 100644 --- a/collection_manager/collection_manager/services/CollectionWatcher.py +++ b/collection_manager/collection_manager/services/CollectionWatcher.py @@ -9,7 +9,7 @@ from watchdog.observers import Observer from yaml.scanner import ScannerError from collection_manager.entities import Collection -from collection_manager.entities.exceptions import RelativePathError, YamlParsingError, \ +from collection_manager.entities.exceptions import RelativePathError, CollectionConfigParsingError, \ CollectionConfigFileNotFoundError, MissingValueCollectionError, ConflictingPathCollectionError, \ RelativePathCollectionError @@ -84,8 +84,12 @@ class CollectionWatcher: raise CollectionConfigFileNotFoundError("The collection config file could not be found at " f"{self._collections_path}") except yaml.scanner.ScannerError: - raise YamlParsingError("Bad YAML syntax in collection configuration file. Will attempt to reload " - "collections after the next configuration change.") + raise CollectionConfigParsingError("Bad YAML syntax in collection configuration file. Will attempt " + "to reload collections after the next configuration change.") + except KeyError: + raise CollectionConfigParsingError("The collections configuration YAML file does not conform to the " + "proper schema. Will attempt to reload collections config after the " + "next file modification.") def _get_updated_collections(self) -> Set[Collection]: old_collections = self.collections() @@ -98,7 +102,7 @@ class CollectionWatcher: self._collection_updated_callback(collection) self._unschedule_watches() self._schedule_watches() - except YamlParsingError as e: + except CollectionConfigParsingError as e: logger.error(e) def _unschedule_watches(self): @@ -111,7 +115,11 @@ class CollectionWatcher: granule_event_handler = _GranuleEventHandler(self._granule_updated_callback, collections) # Note: the Watchdog library does not schedule a new watch # if one is already scheduled for the same directory - self._granule_watches.add(self._observer.schedule(granule_event_handler, directory)) + try: + self._granule_watches.add(self._observer.schedule(granule_event_handler, directory)) + except (FileNotFoundError, NotADirectoryError): + bad_collection_names = ' and '.join([col.dataset_id for col in collections]) + logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.") class _CollectionEventHandler(FileSystemEventHandler): diff --git a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py index 0a92317..50f2170 100644 --- a/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py +++ b/collection_manager/collection_manager/services/history_manager/FileIngestionHistory.py @@ -20,6 +20,7 @@ class FileIngestionHistoryBuilder(IngestionHistoryBuilder): signature_fun=self._signature_fun) +# TODO: clean this up, better tests class FileIngestionHistory(IngestionHistory): def __init__(self, history_path: str, dataset_id: str, signature_fun=None): @@ -55,7 +56,6 @@ class FileIngestionHistory(IngestionHistory): def __del__(self): self._history_file.close() self._purge() - self._save_latest_timestamp() del self._history_dict def reset_cache(self): @@ -91,8 +91,8 @@ class FileIngestionHistory(IngestionHistory): def _push_record(self, file_name, signature): self._history_dict[file_name] = signature + self._history_file.write(f'{file_name},{signature}\n') - return None def _get_signature(self, file_name): return self._history_dict.get(file_name, None) diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py index b14b409..d92cb24 100644 --- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py +++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py @@ -53,6 +53,8 @@ class IngestionHistory(ABC): else: self._latest_ingested_file_update = max(self._latest_ingested_file_update, os.path.getmtime(file_path)) + self._save_latest_timestamp() + def latest_ingested_mtime(self) -> Optional[datetime]: """ Return the modified time of the most recently modified file that was ingested. @@ -100,6 +102,10 @@ class IngestionHistory(ABC): return GranuleStatus.UNDESIRED @abstractmethod + def _save_latest_timestamp(self): + pass + + @abstractmethod def _push_record(self, file_name, signature): pass diff --git a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py index 2d0438f..1ae7156 100644 --- a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py +++ b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py @@ -44,7 +44,6 @@ class SolrIngestionHistory(IngestionHistory): raise DatasetIngestionHistorySolrException(f"solr instance unreachable {solr_url}") def __del__(self): - self._push_latest_ingested_date() self._req_session.close() def _push_record(self, file_name, signature): @@ -58,7 +57,7 @@ class SolrIngestionHistory(IngestionHistory): self._solr_granules.commit() return None - def _push_latest_ingested_date(self): + def _save_latest_timestamp(self): if self._solr_datasets: self._solr_datasets.delete(q=f"id:{self._dataset_id}") self._solr_datasets.add([{ diff --git a/collection_manager/tests/resources/collections_bad.yml b/collection_manager/tests/resources/collections_bad_schema.yml similarity index 94% copy from collection_manager/tests/resources/collections_bad.yml copy to collection_manager/tests/resources/collections_bad_schema.yml index cac6a32..37c6ad3 100644 --- a/collection_manager/tests/resources/collections_bad.yml +++ b/collection_manager/tests/resources/collections_bad_schema.yml @@ -1,10 +1,10 @@ -collections: +bad_key: - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND path: /opt/data/grace/*land*.nc variable: lwe_thickness priority: 1 forward-processing-priority: 5 -BAD SYNTAX! + - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_OCEAN path: /opt/data/grace/*ocean*.nc variable: lwe_thickness diff --git a/collection_manager/tests/resources/collections_bad.yml b/collection_manager/tests/resources/collections_bad_syntax.yml similarity index 100% rename from collection_manager/tests/resources/collections_bad.yml rename to collection_manager/tests/resources/collections_bad_syntax.yml diff --git a/collection_manager/tests/services/test_CollectionProcessor.py b/collection_manager/tests/services/test_CollectionProcessor.py index 7899e22..56d5393 100644 --- a/collection_manager/tests/services/test_CollectionProcessor.py +++ b/collection_manager/tests/services/test_CollectionProcessor.py @@ -46,7 +46,7 @@ class TestCollectionProcessor(unittest.TestCase): expected = """ granule: - resource: test_path + resource: /granules/test_granule.nc processors: - name: GridReadingProcessor variable_to_read: test_variable @@ -54,13 +54,13 @@ class TestCollectionProcessor(unittest.TestCase): dataset_name: test_dataset """ collection = Collection(dataset_id="test_dataset", - path="test_path", + path="/granules/test*.nc", variable="test_variable", historical_priority=1, forward_processing_priority=2, date_from=None, date_to=None) - filled = CollectionProcessor._fill_template(collection, template) + filled = CollectionProcessor._fill_template("/granules/test_granule.nc", collection, template) self.assertEqual(filled, expected) @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', autospec=True) diff --git a/collection_manager/tests/services/test_CollectionWatcher.py b/collection_manager/tests/services/test_CollectionWatcher.py index 7ae25a1..8c6ab5f 100644 --- a/collection_manager/tests/services/test_CollectionWatcher.py +++ b/collection_manager/tests/services/test_CollectionWatcher.py @@ -6,7 +6,7 @@ from datetime import datetime from unittest.mock import Mock from collection_manager.entities import Collection -from collection_manager.entities.exceptions import YamlParsingError, CollectionConfigFileNotFoundError, \ +from collection_manager.entities.exceptions import CollectionConfigParsingError, CollectionConfigFileNotFoundError, \ RelativePathCollectionError, ConflictingPathCollectionError from collection_manager.services import CollectionWatcher @@ -38,10 +38,16 @@ class TestCollectionWatcher(unittest.TestCase): self.assertEqual(len(collection_watcher._collections_by_dir['/opt/data/avhrr']), 1) def test_load_collections_with_bad_yaml_syntax(self): - collections_path = os.path.join(os.path.dirname(__file__), '../resources/collections_bad.yml') + collections_path = os.path.join(os.path.dirname(__file__), '../resources/collections_bad_syntax.yml') collection_watcher = CollectionWatcher(collections_path, Mock(), Mock()) - self.assertRaises(YamlParsingError, collection_watcher._load_collections) + self.assertRaises(CollectionConfigParsingError, collection_watcher._load_collections) + + def test_load_collections_with_bad_schema(self): + collections_path = os.path.join(os.path.dirname(__file__), '../resources/collections_bad_schema.yml') + collection_watcher = CollectionWatcher(collections_path, Mock(), Mock()) + + self.assertRaises(CollectionConfigParsingError, collection_watcher._load_collections) def test_load_collections_with_file_not_found(self): collections_path = os.path.join(os.path.dirname(__file__), '../resources/does_not_exist.yml')
