This is an automated email from the ASF dual-hosted git repository. tloubrieu pushed a commit to branch ascending_latitudes in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit e42ef8b143c7c4539ad24ee2441b557c46ca3b31 Author: Eamon Ford <[email protected]> AuthorDate: Tue Aug 25 10:47:56 2020 -0700 SDAP-282, SDAP-284: Support configuring dimension names and projection during ingestion, support hierarchical directory structures (#19) --- .../collection_manager/entities/Collection.py | 10 ++- .../resources/dataset_config_template.yml | 20 ------ .../services/CollectionProcessor.py | 46 ++++++++----- .../services/CollectionWatcher.py | 15 +++-- .../services/MessagePublisher.py | 2 +- collection_manager/setup.py | 1 - .../tests/entities/test_Collection.py | 60 ++++++++++++++--- collection_manager/tests/resources/collections.yml | 36 +++++++++- .../tests/resources/collections_alternate.yml | 34 +++++++++- .../tests/services/test_CollectionProcessor.py | 78 ++++++++++++++-------- .../tests/services/test_CollectionWatcher.py | 63 ++++++++++++++--- config_operator/tests/resources/collections.yml | 11 ++- .../consumer/{Consumer.py => MessageConsumer.py} | 4 +- .../granule_ingester/consumer/__init__.py | 2 +- .../granule_ingester/exceptions/Exceptions.py | 7 +- .../granule_ingester/exceptions/__init__.py | 19 +++--- .../granule_loaders/GranuleLoader.py | 9 ++- granule_ingester/granule_ingester/main.py | 22 +++--- .../granule_ingester/pipeline/Modules.py | 17 +++-- .../granule_ingester/pipeline/Pipeline.py | 11 ++- .../reading_processors/EccoReadingProcessor.py | 8 +-- .../reading_processors/GridReadingProcessor.py | 8 +-- .../reading_processors/SwathReadingProcessor.py | 6 +- .../reading_processors/TileReadingProcessor.py | 10 +-- .../TimeSeriesReadingProcessor.py | 8 +-- .../config_files/ingestion_config_testfile.yaml | 4 +- granule_ingester/tests/pipeline/test_Pipeline.py | 7 +- .../test_EccoReadingProcessor.py | 4 +- .../test_GridReadingProcessor.py | 2 +- .../test_SwathReadingProcessor.py | 4 +- 30 files changed, 358 insertions(+), 170 deletions(-) diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py index 3976b6d..031a3a9 100644 --- a/collection_manager/collection_manager/entities/Collection.py +++ b/collection_manager/collection_manager/entities/Collection.py @@ -11,7 +11,9 @@ from collection_manager.entities.exceptions import MissingValueCollectionError @dataclass(frozen=True) class Collection: dataset_id: str - variable: str + projection: str + dimension_names: frozenset + slices: frozenset path: str historical_priority: int forward_processing_priority: Optional[int] = None @@ -25,7 +27,9 @@ class Collection: date_from = datetime.fromisoformat(properties['from']) if 'from' in properties else None collection = Collection(dataset_id=properties['id'], - variable=properties['variable'], + projection=properties['projection'], + dimension_names=frozenset(properties['dimensionNames'].items()), + slices=frozenset(properties['slices'].items()), path=properties['path'], historical_priority=properties['priority'], forward_processing_priority=properties.get('forward-processing-priority', None), @@ -51,4 +55,4 @@ class Collection: return fnmatch(file_path, self.path) def files_owned(self) -> List[str]: - return glob(self.path) + return glob(self.path, recursive=True) diff --git a/collection_manager/collection_manager/resources/dataset_config_template.yml b/collection_manager/collection_manager/resources/dataset_config_template.yml deleted file mode 100644 index d35a527..0000000 --- a/collection_manager/collection_manager/resources/dataset_config_template.yml +++ /dev/null @@ -1,20 +0,0 @@ -granule: - resource: {{granule}} -slicer: - name: sliceFileByStepSize - dimension_step_sizes: - time: 1 - lat: 30 - lon: 30 -processors: - - name: GridReadingProcessor - latitude: lat - longitude: lon - time: time - variable_to_read: {{variable}} - - name: emptyTileFilter - - name: kelvinToCelsius - - name: tileSummary - dataset_name: {{dataset_id}} - - name: generateTileId - diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py index ac61586..f08ade9 100644 --- a/collection_manager/collection_manager/services/CollectionProcessor.py +++ b/collection_manager/collection_manager/services/CollectionProcessor.py @@ -1,8 +1,7 @@ import logging import os.path from typing import Dict - -import pystache +import yaml from collection_manager.entities import Collection from collection_manager.services import MessagePublisher @@ -11,8 +10,7 @@ from collection_manager.services.history_manager.IngestionHistory import Ingesti logger = logging.getLogger(__name__) -SUPPORTED_FILE_EXTENSIONS = ['.nc', '.h5'] -MESSAGE_TEMPLATE = os.path.join(os.path.dirname(__file__), '../resources/dataset_config_template.yml') +SUPPORTED_FILE_EXTENSIONS = ['.nc', '.nc4', '.h5'] class CollectionProcessor: @@ -22,9 +20,6 @@ class CollectionProcessor: self._history_manager_builder = history_manager_builder self._history_manager_cache: Dict[str, IngestionHistory] = {} - with open(MESSAGE_TEMPLATE, 'r') as config_template_file: - self._config_template = config_template_file.read() - async def process_collection(self, collection: Collection): """ Given a Collection, detect new granules that need to be ingested and publish RabbitMQ messages for each. @@ -63,7 +58,7 @@ class CollectionProcessor: f"collection '{collection.dataset_id}'. Skipping.") return - dataset_config = self._fill_template(granule, collection, config_template=self._config_template) + dataset_config = self._generate_ingestion_message(granule, collection) await self._publisher.publish_message(body=dataset_config, priority=use_priority) await history_manager.push(granule) @@ -78,13 +73,28 @@ class CollectionProcessor: return self._history_manager_cache[dataset_id] @staticmethod - def _fill_template(granule_path: str, collection: Collection, config_template: str) -> str: - renderer = pystache.Renderer() - config_content = renderer.render(config_template, - { - 'granule': granule_path, - 'dataset_id': collection.dataset_id, - 'variable': collection.variable - }) - logger.debug(f"Templated dataset config:\n{config_content}") - return config_content + def _generate_ingestion_message(granule_path: str, collection: Collection) -> str: + config_dict = { + 'granule': { + 'resource': granule_path + }, + 'slicer': { + 'name': 'sliceFileByStepSize', + 'dimension_step_sizes': dict(collection.slices) + }, + 'processors': [ + { + 'name': collection.projection, + **dict(collection.dimension_names), + }, + {'name': 'emptyTileFilter'}, + { + 'name': 'tileSummary', + 'dataset_name': collection.dataset_id + }, + {'name': 'generateTileId'} + ] + } + config_str = yaml.dump(config_dict) + logger.debug(f"Templated dataset config:\n{config_str}") + return config_str diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py index 54c8877..1c7c1be 100644 --- a/collection_manager/collection_manager/services/CollectionWatcher.py +++ b/collection_manager/collection_manager/services/CollectionWatcher.py @@ -50,7 +50,7 @@ class CollectionWatcher: func=self._reload_and_reschedule) self._observer.start() - def collections(self) -> Set[Collection]: + def _collections(self) -> Set[Collection]: """ Return a set of all Collections being watched. :return: A set of Collections @@ -96,9 +96,9 @@ class CollectionWatcher: "next file modification.") def _get_updated_collections(self) -> Set[Collection]: - old_collections = self.collections() + old_collections = self._collections() self._load_collections() - return self.collections() - old_collections + return self._collections() - old_collections async def _reload_and_reschedule(self): try: @@ -128,7 +128,7 @@ class CollectionWatcher: # Note: the Watchdog library does not schedule a new watch # if one is already scheduled for the same directory try: - self._granule_watches.add(self._observer.schedule(granule_event_handler, directory)) + self._granule_watches.add(self._observer.schedule(granule_event_handler, directory, recursive=True)) except (FileNotFoundError, NotADirectoryError): bad_collection_names = ' and '.join([col.dataset_id for col in collections]) logger.error(f"Granule directory {directory} does not exist. Ignoring {bad_collection_names}.") @@ -169,8 +169,11 @@ class _GranuleEventHandler(FileSystemEventHandler): def on_created(self, event): super().on_created(event) for collection in self._collections_for_dir: - if collection.owns_file(event.src_path): - self._loop.create_task(self._callback(event.src_path, collection)) + try: + if collection.owns_file(event.src_path): + self._loop.create_task(self._callback(event.src_path, collection)) + except IsADirectoryError: + pass def on_modified(self, event): super().on_modified(event) diff --git a/collection_manager/collection_manager/services/MessagePublisher.py b/collection_manager/collection_manager/services/MessagePublisher.py index 75803d1..483fff8 100644 --- a/collection_manager/collection_manager/services/MessagePublisher.py +++ b/collection_manager/collection_manager/services/MessagePublisher.py @@ -25,7 +25,7 @@ class MessagePublisher: """ self._connection = await connect_robust(self._connection_string) self._channel = await self._connection.channel() - await self._channel.declare_queue(self._queue, durable=True) + await self._channel.declare_queue(self._queue, durable=True, arguments={'x-max-priority': 10}) @retry(wait=wait_fixed(5), reraise=True, stop=stop_after_attempt(4)) async def publish_message(self, body: str, priority: int = None): diff --git a/collection_manager/setup.py b/collection_manager/setup.py index 1542486..0616d0f 100644 --- a/collection_manager/setup.py +++ b/collection_manager/setup.py @@ -31,6 +31,5 @@ setuptools.setup( ], python_requires='>=3.6', include_package_data=True, - data_files=[('.collection_manager/resources/', ['collection_manager/resources/dataset_config_template.yml'])], install_requires=pip_requirements ) diff --git a/collection_manager/tests/entities/test_Collection.py b/collection_manager/tests/entities/test_Collection.py index 46506d4..7e56c9d 100644 --- a/collection_manager/tests/entities/test_Collection.py +++ b/collection_manager/tests/entities/test_Collection.py @@ -12,7 +12,9 @@ class TestCollection(unittest.TestCase): directory = os.path.join(os.path.dirname(__file__), "../resources/data") collection = Collection(dataset_id="test_dataset", path=directory, - variable="test_variable", + projection="Grid", + slices={}, + dimension_names={}, historical_priority=1, forward_processing_priority=2, date_from=None, @@ -23,7 +25,9 @@ class TestCollection(unittest.TestCase): pattern = os.path.join(os.path.dirname(__file__), "../resources/data/*.nc") collection = Collection(dataset_id="test_dataset", path=pattern, - variable="test_variable", + projection="Grid", + slices={}, + dimension_names={}, historical_priority=1, forward_processing_priority=2, date_from=None, @@ -34,7 +38,9 @@ class TestCollection(unittest.TestCase): directory = os.path.join(os.path.dirname(__file__), "../resources/data") collection = Collection(dataset_id="test_dataset", path=directory, - variable="test_variable", + projection="Grid", + slices={}, + dimension_names={}, historical_priority=1, forward_processing_priority=2, date_from=None, @@ -45,7 +51,9 @@ class TestCollection(unittest.TestCase): directory = os.path.join(os.path.dirname(__file__), "../resources/data") collection = Collection(dataset_id="test_dataset", path=directory, - variable="test_variable", + projection="Grid", + slices={}, + dimension_names={}, historical_priority=1, forward_processing_priority=2, date_from=None, @@ -57,7 +65,9 @@ class TestCollection(unittest.TestCase): directory = os.path.join(os.path.dirname(__file__), "../resources/data") collection = Collection(dataset_id="test_dataset", path=directory, - variable="test_variable", + projection="Grid", + slices={}, + dimension_names={}, historical_priority=1, forward_processing_priority=2, date_from=None, @@ -69,7 +79,9 @@ class TestCollection(unittest.TestCase): pattern = os.path.join(directory, "test_*.nc") collection = Collection(dataset_id="test_dataset", path=pattern, - variable="test_variable", + projection="Grid", + slices={}, + dimension_names={}, historical_priority=1, forward_processing_priority=2, date_from=None, @@ -82,7 +94,9 @@ class TestCollection(unittest.TestCase): pattern = os.path.join(directory, "test_*.nc") collection = Collection(dataset_id="test_dataset", path=pattern, - variable="test_variable", + projection="Grid", + slices={}, + dimension_names={}, historical_priority=1, forward_processing_priority=2, date_from=None, @@ -93,8 +107,14 @@ class TestCollection(unittest.TestCase): def test_from_dict(self): collection_dict = { 'id': 'test_id', - 'variable': 'test_var', 'path': '/some/path', + 'projection': 'Grid', + 'dimensionNames': { + 'latitude': 'lat', + 'longitude': 'lon', + 'variable': 'test_var' + }, + 'slices': {'lat': 30, 'lon': 30, 'time': 1}, 'priority': 1, 'forward-processing-priority': 2, 'from': '2020-01-01T00:00:00+00:00', @@ -102,7 +122,13 @@ class TestCollection(unittest.TestCase): } expected_collection = Collection(dataset_id='test_id', - variable='test_var', + projection="Grid", + slices=frozenset([('lat', 30), ('lon', 30), ('time', 1)]), + dimension_names=frozenset([ + ('latitude', 'lat'), + ('longitude', 'lon'), + ('variable', 'test_var') + ]), path='/some/path', historical_priority=1, forward_processing_priority=2, @@ -114,13 +140,25 @@ class TestCollection(unittest.TestCase): def test_from_dict_missing_optional_values(self): collection_dict = { 'id': 'test_id', - 'variable': 'test_var', + 'projection': 'Grid', + 'dimensionNames': { + 'latitude': 'lat', + 'longitude': 'lon', + 'variable': 'test_var' + }, + 'slices': {'lat': 30, 'lon': 30, 'time': 1}, 'path': '/some/path', 'priority': 3 } expected_collection = Collection(dataset_id='test_id', - variable='test_var', + projection="Grid", + slices=frozenset([('lat', 30), ('lon', 30), ('time', 1)]), + dimension_names=frozenset([ + ('latitude', 'lat'), + ('longitude', 'lon'), + ('variable', 'test_var') + ]), path='/some/path', historical_priority=3, forward_processing_priority=None, diff --git a/collection_manager/tests/resources/collections.yml b/collection_manager/tests/resources/collections.yml index 89524ec..44f795b 100644 --- a/collection_manager/tests/resources/collections.yml +++ b/collection_manager/tests/resources/collections.yml @@ -1,17 +1,47 @@ collections: - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND path: /opt/data/grace/*land*.nc - variable: lwe_thickness priority: 1 forward-processing-priority: 5 + projection: Grid + dimensionNames: + latitude: lat + longitude: lon + time: time + variable: lwe_thickness + slices: + time: 1 + lat: 30 + lon: 30 + - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_OCEAN path: /opt/data/grace/*ocean*.nc - variable: lwe_thickness priority: 2 forward-processing-priority: 6 + projection: Grid + dimensionNames: + latitude: lat + longitude: lon + time: time + variable: lwe_thickness + slices: + time: 1 + lat: 30 + lon: 30 + - id: AVHRR_OI-NCEI-L4-GLOB-v2.0 path: /opt/data/avhrr/*.nc - variable: analysed_sst priority: 1 + projection: Grid + dimensionNames: + latitude: lat + longitude: lon + time: time + variable: analysed_sst + slices: + time: 1 + lat: 30 + lon: 30 + diff --git a/collection_manager/tests/resources/collections_alternate.yml b/collection_manager/tests/resources/collections_alternate.yml index 3d7da95..f9dabda 100644 --- a/collection_manager/tests/resources/collections_alternate.yml +++ b/collection_manager/tests/resources/collections_alternate.yml @@ -1,17 +1,45 @@ collections: - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND path: /opt/data/grace/*land*.nc - variable: lwe_thickness priority: 1 forward-processing-priority: 5 + projection: Grid + dimensionNames: + latitude: lat + longitude: lon + time: time + variable: lwe_thickness + slices: + time: 1 + lat: 30 + lon: 30 - id: ID_CHANGED path: /opt/data/grace/*ocean*.nc - variable: lwe_thickness priority: 2 forward-processing-priority: 6 + projection: Grid + dimensionNames: + latitude: lat + longitude: lon + time: time + variable: lwe_thickness + slices: + time: 1 + lat: 30 + lon: 30 - id: AVHRR_OI-NCEI-L4-GLOB-v2.0 path: /opt/data/avhrr/*.nc - variable: analysed_sst priority: 1 + projection: Grid + dimensionNames: + latitude: lat + longitude: lon + time: time + variable: analysed_sst + slices: + time: 1 + lat: 30 + lon: 30 + diff --git a/collection_manager/tests/services/test_CollectionProcessor.py b/collection_manager/tests/services/test_CollectionProcessor.py index a7059d6..939b5d1 100644 --- a/collection_manager/tests/services/test_CollectionProcessor.py +++ b/collection_manager/tests/services/test_CollectionProcessor.py @@ -1,4 +1,5 @@ import tempfile +import yaml import unittest from unittest import mock @@ -35,34 +36,47 @@ class TestCollectionProcessor(unittest.TestCase): self.assertIsNot(collection_processor._get_history_manager('bar'), history_manager) def test_fill_template(self): - template = """ - granule: - resource: {{granule}} - processors: - - name: GridReadingProcessor - variable_to_read: {{variable}} - - name: tileSummary - dataset_name: {{dataset_id}} - """ - - expected = """ - granule: - resource: /granules/test_granule.nc - processors: - - name: GridReadingProcessor - variable_to_read: test_variable - - name: tileSummary - dataset_name: test_dataset - """ + expected = { + 'granule': { + 'resource': '/granules/test_granule.nc' + }, + 'processors': [ + { + 'latitude': 'lat', + 'longitude': 'lon', + 'name': 'Grid', + 'variable': 'test_var' + }, + {'name': 'emptyTileFilter'}, + {'dataset_name': 'test_dataset', 'name': 'tileSummary'}, + {'name': 'generateTileId'} + ], + 'slicer': { + 'dimension_step_sizes': { + 'lat': 30, + 'lon': 30, + 'time': 1 + }, + 'name': 'sliceFileByStepSize' + } + } collection = Collection(dataset_id="test_dataset", path="/granules/test*.nc", - variable="test_variable", + projection="Grid", + slices=frozenset([('lat', 30), ('lon', 30), ('time', 1)]), + dimension_names=frozenset([ + ('latitude', 'lat'), + ('longitude', 'lon'), + ('variable', 'test_var') + ]), historical_priority=1, forward_processing_priority=2, date_from=None, date_to=None) - filled = CollectionProcessor._fill_template("/granules/test_granule.nc", collection, template) - self.assertEqual(filled, expected) + filled = CollectionProcessor._generate_ingestion_message("/granules/test_granule.nc", collection) + generated_yaml = yaml.load(filled, Loader=yaml.FullLoader) + + self.assertEqual(expected, generated_yaml) @async_test @mock.patch('collection_manager.services.history_manager.FileIngestionHistory', new_callable=AsyncMock) @@ -75,7 +89,9 @@ class TestCollectionProcessor(unittest.TestCase): collection_processor = CollectionProcessor(mock_publisher, mock_history_builder) collection = Collection(dataset_id="test_dataset", path="test_path", - variable="test_variable", + projection="Grid", + slices=frozenset(), + dimension_names=frozenset(), historical_priority=1, forward_processing_priority=2, date_from=None, @@ -100,7 +116,9 @@ class TestCollectionProcessor(unittest.TestCase): collection_processor = CollectionProcessor(mock_publisher, mock_history_builder) collection = Collection(dataset_id="test_dataset", path="test_path", - variable="test_variable", + projection="Grid", + slices=frozenset(), + dimension_names=frozenset(), historical_priority=1, forward_processing_priority=2, date_from=None, @@ -123,7 +141,9 @@ class TestCollectionProcessor(unittest.TestCase): collection_processor = CollectionProcessor(mock_publisher, mock_history_builder) collection = Collection(dataset_id="test_dataset", path="test_path", - variable="test_variable", + projection="Grid", + slices=frozenset(), + dimension_names=frozenset(), historical_priority=1, date_from=None, date_to=None) @@ -144,7 +164,9 @@ class TestCollectionProcessor(unittest.TestCase): collection_processor = CollectionProcessor(mock_publisher, mock_history_builder) collection = Collection(dataset_id="test_dataset", path="test_path", - variable="test_variable", + projection="Grid", + slices=frozenset(), + dimension_names=frozenset(), historical_priority=1, forward_processing_priority=2, date_from=None, @@ -165,7 +187,9 @@ class TestCollectionProcessor(unittest.TestCase): collection_processor = CollectionProcessor(mock_publisher, mock_history_builder) collection = Collection(dataset_id="test_dataset", path="test_path", - variable="test_variable", + projection="Grid", + slices=frozenset(), + dimension_names=frozenset(), historical_priority=1, forward_processing_priority=2, date_from=None, diff --git a/collection_manager/tests/services/test_CollectionWatcher.py b/collection_manager/tests/services/test_CollectionWatcher.py index c9a75c0..e6bf15f 100644 --- a/collection_manager/tests/services/test_CollectionWatcher.py +++ b/collection_manager/tests/services/test_CollectionWatcher.py @@ -25,7 +25,7 @@ class TestCollectionWatcher(unittest.TestCase): Collection("id4", "var4", "path4", 7, 8, datetime.now(), datetime.now()), } } - flattened_collections = collection_watcher.collections() + flattened_collections = collection_watcher._collections() self.assertEqual(len(flattened_collections), 4) def test_load_collections_loads_all_collections(self): @@ -60,7 +60,7 @@ class TestCollectionWatcher(unittest.TestCase): collection_watcher = CollectionWatcher(collections_path, Mock(), Mock()) updated_collections = collection_watcher._get_updated_collections() - self.assertSetEqual(updated_collections, collection_watcher.collections()) + self.assertSetEqual(updated_collections, collection_watcher._collections()) def test_get_updated_collections_returns_no_collections(self): collections_path = os.path.join(os.path.dirname(__file__), '../resources/collections.yml') @@ -87,7 +87,9 @@ class TestCollectionWatcher(unittest.TestCase): collection = Collection(dataset_id="test_dataset", path="/absolute/path", - variable="test_variable", + projection="Grid", + slices=frozenset(), + dimension_names=frozenset(), historical_priority=1, forward_processing_priority=2, date_from=None, @@ -100,7 +102,9 @@ class TestCollectionWatcher(unittest.TestCase): collection = Collection(dataset_id="test_dataset", path="relative/path", - variable="test_variable", + projection="Grid", + slices=frozenset(), + dimension_names=frozenset(), historical_priority=1, forward_processing_priority=2, date_from=None, @@ -113,7 +117,9 @@ class TestCollectionWatcher(unittest.TestCase): collection = Collection(dataset_id="test_dataset", path="/resources/*.nc", - variable="test_variable", + projection="Grid", + slices=frozenset(), + dimension_names=frozenset(), historical_priority=1, forward_processing_priority=2, date_from=None, @@ -127,9 +133,19 @@ class TestCollectionWatcher(unittest.TestCase): collections_str = f"""collections: - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND path: {granule_dir.name} - variable: lwe_thickness priority: 1 - forward-processing-priority: 5""" + forward-processing-priority: 5 + projection: Grid + dimensionNames: + latitude: lat + longitude: lon + time: time + variable: lwe_thickness + slices: + time: 1 + lat: 30 + lon: 30 + """ collections_config.write(collections_str.encode("utf-8")) collection_callback = AsyncMock() @@ -143,9 +159,18 @@ class TestCollectionWatcher(unittest.TestCase): collections_str = f""" - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND path: {granule_dir.name} - variable: lwe_thickness priority: 10 forward-processing-priority: 5 + projection: Grid + dimensionNames: + latitude: lat + longitude: lon + time: time + variable: lwe_thickness + slices: + time: 1 + lat: 30 + lon: 30 """ collections_config.write(collections_str.encode("utf-8")) @@ -163,9 +188,18 @@ class TestCollectionWatcher(unittest.TestCase): collections: - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND path: {granule_dir.name} - variable: lwe_thickness priority: 1 forward-processing-priority: 5 + projection: Grid + dimensionNames: + latitude: lat + longitude: lon + time: time + variable: lwe_thickness + slices: + time: 1 + lat: 30 + lon: 30 """ collections_config.write(collections_str.encode("utf-8")) @@ -187,9 +221,18 @@ collections: collections: - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND path: {granule_dir.name} - variable: lwe_thickness priority: 1 forward-processing-priority: 5 + projection: Grid + dimensionNames: + latitude: lat + longitude: lon + time: time + variable: lwe_thickness + slices: + time: 1 + lat: 30 + lon: 30 """ collections_config.write(collections_str.encode("utf-8")) new_granule = open(os.path.join(granule_dir.name, 'test.nc'), "w+") diff --git a/config_operator/tests/resources/collections.yml b/config_operator/tests/resources/collections.yml index 42d2fbc..3414c5a 100644 --- a/config_operator/tests/resources/collections.yml +++ b/config_operator/tests/resources/collections.yml @@ -5,5 +5,14 @@ avhrr-oi-analysed-sst: avhrr-oi-analysed-sst2: path: resources/history_manager/data/avhrr_oi/*.nc - variable: analysed_sst priority: 1 + projection: Grid + dimensionNames: + latitude: lat + longitude: lon + time: time + variable: analysed_sst + slices: + time: 1 + lat: 30 + lon: 30 diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/MessageConsumer.py similarity index 98% rename from granule_ingester/granule_ingester/consumer/Consumer.py rename to granule_ingester/granule_ingester/consumer/MessageConsumer.py index 6c72837..4d6c07b 100644 --- a/granule_ingester/granule_ingester/consumer/Consumer.py +++ b/granule_ingester/granule_ingester/consumer/MessageConsumer.py @@ -25,7 +25,7 @@ from granule_ingester.pipeline import Pipeline logger = logging.getLogger(__name__) -class Consumer(HealthCheck): +class MessageConsumer(HealthCheck): def __init__(self, rabbitmq_host, @@ -95,7 +95,7 @@ class Consumer(HealthCheck): async def start_consuming(self, pipeline_max_concurrency=16): channel = await self._connection.channel() await channel.set_qos(prefetch_count=1) - queue = await channel.declare_queue(self._rabbitmq_queue, durable=True) + queue = await channel.declare_queue(self._rabbitmq_queue, durable=True, arguments={'x-max-priority': 10}) queue_iter = queue.iterator() async for message in queue_iter: try: diff --git a/granule_ingester/granule_ingester/consumer/__init__.py b/granule_ingester/granule_ingester/consumer/__init__.py index 35d075b..bb782d5 100644 --- a/granule_ingester/granule_ingester/consumer/__init__.py +++ b/granule_ingester/granule_ingester/consumer/__init__.py @@ -1 +1 @@ -from granule_ingester.consumer.Consumer import Consumer +from granule_ingester.consumer.MessageConsumer import MessageConsumer diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py index c648b99..fdd03e5 100644 --- a/granule_ingester/granule_ingester/exceptions/Exceptions.py +++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py @@ -6,7 +6,11 @@ class PipelineRunningError(Exception): pass -class TileProcessingError(Exception): +class TileProcessingError(PipelineRunningError): + pass + + +class GranuleLoadingError(PipelineRunningError): pass @@ -21,6 +25,7 @@ class RabbitMQLostConnectionError(LostConnectionError): class CassandraLostConnectionError(LostConnectionError): pass + class SolrLostConnectionError(LostConnectionError): pass diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py index ea0969f..f2429b1 100644 --- a/granule_ingester/granule_ingester/exceptions/__init__.py +++ b/granule_ingester/granule_ingester/exceptions/__init__.py @@ -1,11 +1,8 @@ -from .Exceptions import CassandraFailedHealthCheckError -from .Exceptions import CassandraLostConnectionError -from .Exceptions import FailedHealthCheckError -from .Exceptions import LostConnectionError -from .Exceptions import PipelineBuildingError -from .Exceptions import PipelineRunningError -from .Exceptions import RabbitMQFailedHealthCheckError -from .Exceptions import RabbitMQLostConnectionError -from .Exceptions import SolrFailedHealthCheckError -from .Exceptions import SolrLostConnectionError -from .Exceptions import TileProcessingError +from .Exceptions import (CassandraFailedHealthCheckError, + CassandraLostConnectionError, FailedHealthCheckError, + GranuleLoadingError, LostConnectionError, + PipelineBuildingError, PipelineRunningError, + RabbitMQFailedHealthCheckError, + RabbitMQLostConnectionError, + SolrFailedHealthCheckError, SolrLostConnectionError, + TileProcessingError) diff --git a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py index c28ffbb..6377de0 100644 --- a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py +++ b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py @@ -21,6 +21,8 @@ from urllib import parse import aioboto3 import xarray as xr +from granule_ingester.exceptions import GranuleLoadingError + logger = logging.getLogger(__name__) @@ -52,7 +54,12 @@ class GranuleLoader: raise RuntimeError("Granule path scheme '{}' is not supported.".format(resource_url.scheme)) granule_name = os.path.basename(self._resource) - return xr.open_dataset(file_path, lock=False), granule_name + try: + return xr.open_dataset(file_path, lock=False), granule_name + except FileNotFoundError: + raise GranuleLoadingError(f"The granule file {self._resource} does not exist.") + except Exception: + raise GranuleLoadingError(f"The granule {self._resource} is not a valid NetCDF file.") @staticmethod async def _download_s3_file(url: str): diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py index 15390fd..b5a429c 100644 --- a/granule_ingester/granule_ingester/main.py +++ b/granule_ingester/granule_ingester/main.py @@ -20,7 +20,7 @@ import sys from functools import partial from typing import List -from granule_ingester.consumer import Consumer +from granule_ingester.consumer import MessageConsumer from granule_ingester.exceptions import FailedHealthCheckError, LostConnectionError from granule_ingester.healthcheck import HealthCheck from granule_ingester.writers import CassandraStore, SolrStore @@ -116,16 +116,16 @@ async def main(loop): solr_host_and_port = args.solr_host_and_port zk_host_and_port = args.zk_host_and_port - consumer = Consumer(rabbitmq_host=args.rabbitmq_host, - rabbitmq_username=args.rabbitmq_username, - rabbitmq_password=args.rabbitmq_password, - rabbitmq_queue=args.rabbitmq_queue, - data_store_factory=partial(cassandra_factory, - cassandra_contact_points, - cassandra_port, - cassandra_username, - cassandra_password), - metadata_store_factory=partial(solr_factory, solr_host_and_port, zk_host_and_port)) + consumer = MessageConsumer(rabbitmq_host=args.rabbitmq_host, + rabbitmq_username=args.rabbitmq_username, + rabbitmq_password=args.rabbitmq_password, + rabbitmq_queue=args.rabbitmq_queue, + data_store_factory=partial(cassandra_factory, + cassandra_contact_points, + cassandra_port, + cassandra_username, + cassandra_password), + metadata_store_factory=partial(solr_factory, solr_host_and_port, zk_host_and_port)) try: solr_store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port) await run_health_checks([CassandraStore(cassandra_contact_points, diff --git a/granule_ingester/granule_ingester/pipeline/Modules.py b/granule_ingester/granule_ingester/pipeline/Modules.py index 2cf2245..d1950dc 100644 --- a/granule_ingester/granule_ingester/pipeline/Modules.py +++ b/granule_ingester/granule_ingester/pipeline/Modules.py @@ -1,14 +1,19 @@ -from granule_ingester.processors import * -from granule_ingester.processors.reading_processors import * -from granule_ingester.slicers import * -from granule_ingester.granule_loaders import * +from granule_ingester.processors import GenerateTileId, TileSummarizingProcessor, EmptyTileFilter, KelvinToCelsius +from granule_ingester.processors.reading_processors import (EccoReadingProcessor, + GridReadingProcessor, + SwathReadingProcessor, + TimeSeriesReadingProcessor) +from granule_ingester.slicers import SliceFileByStepSize +from granule_ingester.granule_loaders import GranuleLoader modules = { "granule": GranuleLoader, "sliceFileByStepSize": SliceFileByStepSize, "generateTileId": GenerateTileId, - "EccoReadingProcessor": EccoReadingProcessor, - "GridReadingProcessor": GridReadingProcessor, + "ECCO": EccoReadingProcessor, + "Grid": GridReadingProcessor, + "TimeSeries": TimeSeriesReadingProcessor, + "Swath": SwathReadingProcessor, "tileSummary": TileSummarizingProcessor, "emptyTileFilter": EmptyTileFilter, "kelvinToCelsius": KelvinToCelsius diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py index dabca81..86bf9c8 100644 --- a/granule_ingester/granule_ingester/pipeline/Pipeline.py +++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py @@ -153,10 +153,13 @@ class Pipeline: metadata_store_factory, tile_processors, max_concurrency) + except PipelineBuildingError: + raise except KeyError as e: raise PipelineBuildingError(f"Cannot build pipeline because {e} is missing from the YAML.") - except Exception: - raise PipelineBuildingError("Cannot build pipeline.") + except Exception as e: + logger.exception(e) + raise PipelineBuildingError(f"Cannot build pipeline because of the following error: {e}") @classmethod def _parse_module(cls, module_config: dict, module_mappings: dict): @@ -166,7 +169,9 @@ class Pipeline: logger.debug("Loaded processor {}.".format(module_class)) processor_module = module_class(**module_config) except KeyError: - raise RuntimeError("'{}' is not a valid processor.".format(module_name)) + raise PipelineBuildingError(f"'{module_name}' is not a valid processor.") + except Exception as e: + raise PipelineBuildingError(f"Parsing module '{module_name}' failed because of the following error: {e}") return processor_module diff --git a/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py index 1876013..8cc24d0 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py @@ -10,14 +10,14 @@ from granule_ingester.processors.reading_processors.TileReadingProcessor import class EccoReadingProcessor(TileReadingProcessor): def __init__(self, - variable_to_read, + variable, latitude, longitude, tile, depth=None, time=None, **kwargs): - super().__init__(variable_to_read, latitude, longitude, **kwargs) + super().__init__(variable, latitude, longitude, **kwargs) self.depth = depth self.time = time @@ -31,8 +31,8 @@ class EccoReadingProcessor(TileReadingProcessor): lat_subset = np.ma.filled(np.squeeze(lat_subset), np.NaN) lon_subset = np.ma.filled(np.squeeze(lon_subset), np.NaN) - data_subset = ds[self.variable_to_read][ - type(self)._slices_for_variable(ds[self.variable_to_read], dimensions_to_slices)] + data_subset = ds[self.variable][ + type(self)._slices_for_variable(ds[self.variable], dimensions_to_slices)] data_subset = np.ma.filled(np.squeeze(data_subset), np.NaN) new_tile.tile = ds[self.tile][dimensions_to_slices[self.tile].start].item() diff --git a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py index 4354f9e..1ba76a2 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py @@ -9,8 +9,8 @@ from granule_ingester.processors.reading_processors.TileReadingProcessor import class GridReadingProcessor(TileReadingProcessor): - def __init__(self, variable_to_read, latitude, longitude, depth=None, time=None, **kwargs): - super().__init__(variable_to_read, latitude, longitude, **kwargs) + def __init__(self, variable, latitude, longitude, depth=None, time=None, **kwargs): + super().__init__(variable, latitude, longitude, **kwargs) self.depth = depth self.time = time @@ -22,8 +22,8 @@ class GridReadingProcessor(TileReadingProcessor): lat_subset = np.ma.filled(np.squeeze(lat_subset), np.NaN) lon_subset = np.ma.filled(np.squeeze(lon_subset), np.NaN) - data_subset = ds[self.variable_to_read][type(self)._slices_for_variable(ds[self.variable_to_read], - dimensions_to_slices)] + data_subset = ds[self.variable][type(self)._slices_for_variable(ds[self.variable], + dimensions_to_slices)] data_subset = np.ma.filled(np.squeeze(data_subset), np.NaN) if self.depth: diff --git a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py index fec28ca..5b8072a 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py @@ -9,8 +9,8 @@ from granule_ingester.processors.reading_processors.TileReadingProcessor import class SwathReadingProcessor(TileReadingProcessor): - def __init__(self, variable_to_read, latitude, longitude, time, depth=None, **kwargs): - super().__init__(variable_to_read, latitude, longitude, **kwargs) + def __init__(self, variable, latitude, longitude, time, depth=None, **kwargs): + super().__init__(variable, latitude, longitude, **kwargs) self.depth = depth self.time = time @@ -25,7 +25,7 @@ class SwathReadingProcessor(TileReadingProcessor): time_subset = ds[self.time][type(self)._slices_for_variable(ds[self.time], dimensions_to_slices)] time_subset = np.ma.filled(type(self)._convert_to_timestamp(time_subset), np.NaN) - data_subset = ds[self.variable_to_read][type(self)._slices_for_variable(ds[self.variable_to_read], + data_subset = ds[self.variable][type(self)._slices_for_variable(ds[self.variable], dimensions_to_slices)] data_subset = np.ma.filled(data_subset, np.NaN) diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py index 8b69ad2..aa70db3 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py @@ -27,8 +27,8 @@ from granule_ingester.processors.TileProcessor import TileProcessor class TileReadingProcessor(TileProcessor, ABC): - def __init__(self, variable_to_read: str, latitude: str, longitude: str, *args, **kwargs): - self.variable_to_read = variable_to_read + def __init__(self, variable: str, latitude: str, longitude: str, *args, **kwargs): + self.variable = variable self.latitude = latitude self.longitude = longitude @@ -38,11 +38,11 @@ class TileReadingProcessor(TileProcessor, ABC): output_tile = nexusproto.NexusTile() output_tile.CopyFrom(tile) - output_tile.summary.data_var_name = self.variable_to_read + output_tile.summary.data_var_name = self.variable return self._generate_tile(dataset, dimensions_to_slices, output_tile) - except Exception: - raise TileProcessingError("Could not generate tiles from the granule.") + except Exception as e: + raise TileProcessingError(f"Could not generate tiles from the granule because of the following error: {e}.") @abstractmethod def _generate_tile(self, dataset: xr.Dataset, dimensions_to_slices: Dict[str, slice], tile): diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py index 2831c0c..c4aae25 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py @@ -9,8 +9,8 @@ from granule_ingester.processors.reading_processors.TileReadingProcessor import class TimeSeriesReadingProcessor(TileReadingProcessor): - def __init__(self, variable_to_read, latitude, longitude, time, depth=None, **kwargs): - super().__init__(variable_to_read, latitude, longitude, **kwargs) + def __init__(self, variable, latitude, longitude, time, depth=None, **kwargs): + super().__init__(variable, latitude, longitude, **kwargs) self.depth = depth self.time = time @@ -23,8 +23,8 @@ class TimeSeriesReadingProcessor(TileReadingProcessor): lat_subset = np.ma.filled(lat_subset, np.NaN) lon_subset = np.ma.filled(lon_subset, np.NaN) - data_subset = ds[self.variable_to_read][type(self)._slices_for_variable(ds[self.variable_to_read], - dimensions_to_slices)] + data_subset = ds[self.variable][type(self)._slices_for_variable(ds[self.variable], + dimensions_to_slices)] data_subset = np.ma.filled(data_subset, np.NaN) if self.depth: diff --git a/granule_ingester/tests/config_files/ingestion_config_testfile.yaml b/granule_ingester/tests/config_files/ingestion_config_testfile.yaml index 9af889d..63df51a 100644 --- a/granule_ingester/tests/config_files/ingestion_config_testfile.yaml +++ b/granule_ingester/tests/config_files/ingestion_config_testfile.yaml @@ -7,11 +7,11 @@ slicer: lat: 33 lon: 26 processors: - - name: EccoReadingProcessor + - name: ECCO latitude: YC longitude: XC time: time depth: Z tile: tile - variable_to_read: THETA + variable: THETA - name: generateTileId diff --git a/granule_ingester/tests/pipeline/test_Pipeline.py b/granule_ingester/tests/pipeline/test_Pipeline.py index 34e66c6..27ec72b 100644 --- a/granule_ingester/tests/pipeline/test_Pipeline.py +++ b/granule_ingester/tests/pipeline/test_Pipeline.py @@ -6,8 +6,9 @@ from nexusproto import DataTile_pb2 as nexusproto from granule_ingester.pipeline.Pipeline import Pipeline from granule_ingester.processors import GenerateTileId from granule_ingester.processors.reading_processors import EccoReadingProcessor -from granule_ingester.slicers.SliceFileByStepSize import * +from granule_ingester.slicers.SliceFileByStepSize import SliceFileByStepSize from granule_ingester.writers import DataStore, MetadataStore +from granule_ingester.exceptions import PipelineBuildingError class TestPipeline(unittest.TestCase): @@ -70,7 +71,7 @@ class TestPipeline(unittest.TestCase): "name": "MockModule", "bogus_param": True } - self.assertRaises(TypeError, Pipeline._parse_module, module_config, module_mappings) + self.assertRaises(PipelineBuildingError, Pipeline._parse_module, module_config, module_mappings) def test_parse_module_with_missing_parameters(self): module_mappings = {"MockModule": TestPipeline.MockProcessorWithParams} @@ -78,7 +79,7 @@ class TestPipeline(unittest.TestCase): "name": "MockModule" } - self.assertRaises(TypeError, Pipeline._parse_module, module_config, module_mappings) + self.assertRaises(PipelineBuildingError, Pipeline._parse_module, module_config, module_mappings) def test_process_tile(self): # class MockIdProcessor: diff --git a/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py b/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py index f2e9f29..03d5054 100644 --- a/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py +++ b/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py @@ -10,7 +10,7 @@ from granule_ingester.processors.reading_processors import EccoReadingProcessor class TestEccoReadingProcessor(unittest.TestCase): def test_generate_tile(self): - reading_processor = EccoReadingProcessor(variable_to_read='OBP', + reading_processor = EccoReadingProcessor(variable='OBP', latitude='YC', longitude='XC', time='time', @@ -40,7 +40,7 @@ class TestEccoReadingProcessor(unittest.TestCase): self.assertEqual(output_tile.tile.ecco_tile.longitude.shape, [15, 7]) def test_generate_tile_with_dims_out_of_order(self): - reading_processor = EccoReadingProcessor(variable_to_read='OBP', + reading_processor = EccoReadingProcessor(variable='OBP', latitude='YC', longitude='XC', time='time', diff --git a/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py b/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py index aec3ae8..31cb547 100644 --- a/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py +++ b/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py @@ -209,7 +209,7 @@ class TestReadInterpEccoData(unittest.TestCase): time='time') def test_read_indexed_ecco(self): - reading_processor = GridReadingProcessor(variable_to_read='OBP', + reading_processor = GridReadingProcessor(variable='OBP', latitude='latitude', longitude='longitude', time='time') diff --git a/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py b/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py index 55ac4fc..db623f5 100644 --- a/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py +++ b/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py @@ -24,7 +24,7 @@ from granule_ingester.processors.reading_processors import SwathReadingProcessor class TestReadAscatbData(unittest.TestCase): def test_read_not_empty_ascatb(self): - reading_processor = SwathReadingProcessor(variable_to_read='wind_speed', + reading_processor = SwathReadingProcessor(variable='wind_speed', latitude='lat', longitude='lon', time='time') @@ -50,7 +50,7 @@ class TestReadAscatbData(unittest.TestCase): class TestReadSmapData(unittest.TestCase): def test_read_not_empty_smap(self): reading_processor = SwathReadingProcessor( - variable_to_read='smap_sss', + variable='smap_sss', latitude='lat', longitude='lon', time='row_time')
