This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch rabbitmq-fix in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 7feb58d8099d6441b0b220998c1fddf27ee04195 Author: Eamon Ford <[email protected]> AuthorDate: Thu Jul 9 11:31:28 2020 -0500 better exception handling --- .../granule_ingester/consumer/Consumer.py | 5 ++++- .../granule_ingester/exceptions/Exceptions.py | 10 +++++++++- .../granule_ingester/exceptions/__init__.py | 4 +++- .../granule_ingester/pipeline/Pipeline.py | 13 ++++++++++--- .../reading_processors/TileReadingProcessor.py | 20 +++++++++----------- granule_ingester/requirements.txt | 2 +- 6 files changed, 36 insertions(+), 18 deletions(-) diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py index 31454c1..fadfe75 100644 --- a/granule_ingester/granule_ingester/consumer/Consumer.py +++ b/granule_ingester/granule_ingester/consumer/Consumer.py @@ -17,9 +17,9 @@ import logging import aio_pika +from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError from granule_ingester.healthcheck import HealthCheck from granule_ingester.pipeline import Pipeline -from granule_ingester.exceptions import PipelineBuildingError logger = logging.getLogger(__name__) @@ -79,6 +79,9 @@ class Consumer(HealthCheck): message.reject() logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped " f"from RabbitMQ. The exception was:\n{e}") + except PipelineRunningError as e: + message.reject() + logger.exception(f"Processing the granule failed. It will not be retried. The exception was:\n{e}") except Exception as e: message.reject(requeue=True) logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}") diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py index 4c03e48..8c25532 100644 --- a/granule_ingester/granule_ingester/exceptions/Exceptions.py +++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py @@ -1,2 +1,10 @@ class PipelineBuildingError(Exception): - pass \ No newline at end of file + pass + + +class PipelineRunningError(Exception): + pass + + +class TileProcessingError(Exception): + pass diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py index a36b19a..71607c2 100644 --- a/granule_ingester/granule_ingester/exceptions/__init__.py +++ b/granule_ingester/granule_ingester/exceptions/__init__.py @@ -1 +1,3 @@ -from .Exceptions import PipelineBuildingError \ No newline at end of file +from .Exceptions import TileProcessingError +from .Exceptions import PipelineBuildingError +from .Exceptions import PipelineRunningError diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py index f872e4d..c7b5d6a 100644 --- a/granule_ingester/granule_ingester/pipeline/Pipeline.py +++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py @@ -17,13 +17,15 @@ import logging import time from typing import List -from granule_ingester.exceptions import PipelineBuildingError + import aiomultiprocess import xarray as xr import yaml -from yaml.scanner import ScannerError +from aiomultiprocess.types import ProxyException from nexusproto import DataTile_pb2 as nexusproto +from yaml.scanner import ScannerError +from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError from granule_ingester.granule_loaders import GranuleLoader from granule_ingester.pipeline.Modules import modules as processor_module_mappings from granule_ingester.processors.TileProcessor import TileProcessor @@ -62,6 +64,7 @@ async def _process_tile_in_worker(serialized_input_tile: str): input_tile = nexusproto.NexusTile.FromString(serialized_input_tile) processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile) + if processed_tile: await _worker_data_store.save_data(processed_tile) await _worker_metadata_store.save_metadata(processed_tile) @@ -149,7 +152,11 @@ class Pipeline: # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that # a queue can't have more than 2**15-1 tasks. So, we have to batch it. for chunk in type(self)._chunk_list(serialized_tiles, MAX_QUEUE_SIZE): - await pool.map(_process_tile_in_worker, chunk) + try: + await pool.map(_process_tile_in_worker, chunk) + except ProxyException: + pool.terminate() + raise PipelineRunningError("Running the pipeline failed and could not recover.") end = time.perf_counter() logger.info("Pipeline finished in {} seconds".format(end - start)) diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py index 14a44f5..8b69ad2 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py @@ -21,6 +21,7 @@ import numpy as np import xarray as xr from nexusproto import DataTile_pb2 as nexusproto +from granule_ingester.exceptions import TileProcessingError from granule_ingester.processors.TileProcessor import TileProcessor @@ -31,20 +32,17 @@ class TileReadingProcessor(TileProcessor, ABC): self.latitude = latitude self.longitude = longitude - # Common optional properties - self.temp_dir = None - self.metadata = None - # self.temp_dir = self.environ['TEMP_DIR'] - # self.metadata = self.environ['META'] - def process(self, tile, dataset: xr.Dataset, *args, **kwargs): - dimensions_to_slices = type(self)._convert_spec_to_slices(tile.summary.section_spec) + try: + dimensions_to_slices = self._convert_spec_to_slices(tile.summary.section_spec) - output_tile = nexusproto.NexusTile() - output_tile.CopyFrom(tile) - output_tile.summary.data_var_name = self.variable_to_read + output_tile = nexusproto.NexusTile() + output_tile.CopyFrom(tile) + output_tile.summary.data_var_name = self.variable_to_read - return self._generate_tile(dataset, dimensions_to_slices, output_tile) + return self._generate_tile(dataset, dimensions_to_slices, output_tile) + except Exception: + raise TileProcessingError("Could not generate tiles from the granule.") @abstractmethod def _generate_tile(self, dataset: xr.Dataset, dimensions_to_slices: Dict[str, slice], tile): diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt index 4d9d4cb..a6d64a2 100644 --- a/granule_ingester/requirements.txt +++ b/granule_ingester/requirements.txt @@ -1,3 +1,3 @@ cassandra-driver==3.23.0 -aiomultiprocess +aiomultiprocess==0.7.0 aioboto3
