This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch rabbitmq-fix in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 016452da1fec6c27bb5e664c7cdd6f1c694ea474 Author: Eamon Ford <[email protected]> AuthorDate: Wed Jul 8 20:16:29 2020 -0500 better error handling --- .../granule_ingester/consumer/Consumer.py | 7 +++- .../granule_ingester/exceptions/Exceptions.py | 2 ++ .../granule_ingester/exceptions/__init__.py | 1 + .../granule_ingester/pipeline/Pipeline.py | 39 +++++++++++----------- granule_ingester/tests/pipeline/test_Pipeline.py | 9 ++--- 5 files changed, 34 insertions(+), 24 deletions(-) diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py index 75d347a..31454c1 100644 --- a/granule_ingester/granule_ingester/consumer/Consumer.py +++ b/granule_ingester/granule_ingester/consumer/Consumer.py @@ -19,6 +19,7 @@ import aio_pika from granule_ingester.healthcheck import HealthCheck from granule_ingester.pipeline import Pipeline +from granule_ingester.exceptions import PipelineBuildingError logger = logging.getLogger(__name__) @@ -74,9 +75,13 @@ class Consumer(HealthCheck): metadata_store_factory=metadata_store_factory) await pipeline.run() message.ack() + except PipelineBuildingError as e: + message.reject() + logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped " + f"from RabbitMQ. The exception was:\n{e}") except Exception as e: message.reject(requeue=True) - logger.error("Processing message failed. Message will be re-queued. The exception was:\n{}".format(e)) + logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}") async def start_consuming(self): channel = await self._connection.channel() diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py new file mode 100644 index 0000000..4c03e48 --- /dev/null +++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py @@ -0,0 +1,2 @@ +class PipelineBuildingError(Exception): + pass \ No newline at end of file diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py new file mode 100644 index 0000000..a36b19a --- /dev/null +++ b/granule_ingester/granule_ingester/exceptions/__init__.py @@ -0,0 +1 @@ +from .Exceptions import PipelineBuildingError \ No newline at end of file diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py index 8f2dd6f..f872e4d 100644 --- a/granule_ingester/granule_ingester/pipeline/Pipeline.py +++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py @@ -17,10 +17,11 @@ import logging import time from typing import List - +from granule_ingester.exceptions import PipelineBuildingError import aiomultiprocess import xarray as xr import yaml +from yaml.scanner import ScannerError from nexusproto import DataTile_pb2 as nexusproto from granule_ingester.granule_loaders import GranuleLoader @@ -90,38 +91,38 @@ class Pipeline: @classmethod def from_string(cls, config_str: str, data_store_factory, metadata_store_factory): - config = yaml.load(config_str, yaml.FullLoader) - return cls._build_pipeline(config, - data_store_factory, - metadata_store_factory, - processor_module_mappings) - - @classmethod - def from_file(cls, config_path: str, data_store_factory, metadata_store_factory): - with open(config_path) as config_file: - config = yaml.load(config_file, yaml.FullLoader) + try: + config = yaml.load(config_str, yaml.FullLoader) return cls._build_pipeline(config, data_store_factory, metadata_store_factory, processor_module_mappings) + except yaml.scanner.ScannerError: + raise PipelineBuildingError("Cannot build pipeline because of a syntax error in the YAML.") + @classmethod def _build_pipeline(cls, config: dict, data_store_factory, metadata_store_factory, module_mappings: dict): - granule_loader = GranuleLoader(**config['granule']) + try: + granule_loader = GranuleLoader(**config['granule']) - slicer_config = config['slicer'] - slicer = cls._parse_module(slicer_config, module_mappings) + slicer_config = config['slicer'] + slicer = cls._parse_module(slicer_config, module_mappings) - tile_processors = [] - for processor_config in config['processors']: - module = cls._parse_module(processor_config, module_mappings) - tile_processors.append(module) + tile_processors = [] + for processor_config in config['processors']: + module = cls._parse_module(processor_config, module_mappings) + tile_processors.append(module) - return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors) + return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors) + except KeyError as e: + raise PipelineBuildingError(f"Cannot build pipeline because {e} is missing from the YAML.") + except Exception: + raise PipelineBuildingError("Cannot build pipeline.") @classmethod def _parse_module(cls, module_config: dict, module_mappings: dict): diff --git a/granule_ingester/tests/pipeline/test_Pipeline.py b/granule_ingester/tests/pipeline/test_Pipeline.py index c18bf8b..34e66c6 100644 --- a/granule_ingester/tests/pipeline/test_Pipeline.py +++ b/granule_ingester/tests/pipeline/test_Pipeline.py @@ -29,10 +29,11 @@ class TestPipeline(unittest.TestCase): pass relative_path = "../config_files/ingestion_config_testfile.yaml" - file_path = os.path.join(os.path.dirname(__file__), relative_path) - pipeline = Pipeline.from_file(config_path=str(file_path), - data_store_factory=MockDataStore, - metadata_store_factory=MockMetadataStore) + with open(os.path.join(os.path.dirname(__file__), relative_path)) as file: + yaml_str = file.read() + pipeline = Pipeline.from_string(config_str=yaml_str, + data_store_factory=MockDataStore, + metadata_store_factory=MockMetadataStore) self.assertEqual(pipeline._data_store_factory, MockDataStore) self.assertEqual(pipeline._metadata_store_factory, MockMetadataStore)
