This is an automated email from the ASF dual-hosted git repository. tloubrieu pushed a commit to branch ascending_latitudes in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit caa296af7976c046996f22677174574247630ca3 Author: Eamon Ford <[email protected]> AuthorDate: Wed Aug 5 19:28:07 2020 -0700 SDAP-277: Improved error handling in Granule Ingester (#15) Co-authored-by: Eamon Ford <[email protected]> --- granule_ingester/conda-requirements.txt | 2 +- .../granule_ingester/consumer/Consumer.py | 40 +++++-- .../granule_ingester/pipeline/Pipeline.py | 132 +++++++++++++-------- .../reading_processors/TileReadingProcessor.py | 20 ++-- granule_ingester/tests/pipeline/test_Pipeline.py | 9 +- 5 files changed, 130 insertions(+), 73 deletions(-) diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt index b2af149..fafd6f3 100644 --- a/granule_ingester/conda-requirements.txt +++ b/granule_ingester/conda-requirements.txt @@ -6,5 +6,5 @@ xarray pyyaml==5.3.1 requests==2.23.0 aiohttp==3.6.2 -aio-pika +aio-pika==6.6.1 tenacity diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py index 5df51fe..6c72837 100644 --- a/granule_ingester/granule_ingester/consumer/Consumer.py +++ b/granule_ingester/granule_ingester/consumer/Consumer.py @@ -17,6 +17,8 @@ import logging import aio_pika +from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQLostConnectionError, \ + RabbitMQFailedHealthCheckError, LostConnectionError from granule_ingester.healthcheck import HealthCheck from granule_ingester.pipeline import Pipeline @@ -39,7 +41,7 @@ class Consumer(HealthCheck): self._connection_string = "amqp://{username}:{password}@{host}/".format(username=rabbitmq_username, password=rabbitmq_password, host=rabbitmq_host) - self._connection = None + self._connection: aio_pika.Connection = None async def health_check(self) -> bool: try: @@ -47,10 +49,10 @@ class Consumer(HealthCheck): await connection.close() return True except Exception: - logger.error("Cannot connect to RabbitMQ! Connection string was {}".format(self._connection_string)) - return False + raise RabbitMQFailedHealthCheckError(f"Cannot connect to RabbitMQ! " + f"Connection string was {self._connection_string}") - async def _get_connection(self): + async def _get_connection(self) -> aio_pika.Connection: return await aio_pika.connect_robust(self._connection_string) async def __aenter__(self): @@ -75,19 +77,37 @@ class Consumer(HealthCheck): metadata_store_factory=metadata_store_factory, max_concurrency=pipeline_max_concurrency) await pipeline.run() - message.ack() + await message.ack() + except PipelineBuildingError as e: + await message.reject() + logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped " + f"from RabbitMQ. The exception was:\n{e}") + except PipelineRunningError as e: + await message.reject() + logger.exception(f"Processing the granule failed. It will not be retried. The exception was:\n{e}") + except LostConnectionError: + # Let main() handle this + raise except Exception as e: - message.reject(requeue=True) - logger.error("Processing message failed. Message will be re-queued. The exception was:\n{}".format(e)) + await message.reject(requeue=True) + logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}") async def start_consuming(self, pipeline_max_concurrency=16): channel = await self._connection.channel() await channel.set_qos(prefetch_count=1) queue = await channel.declare_queue(self._rabbitmq_queue, durable=True) - - async with queue.iterator() as queue_iter: - async for message in queue_iter: + queue_iter = queue.iterator() + async for message in queue_iter: + try: await self._received_message(message, self._data_store_factory, self._metadata_store_factory, pipeline_max_concurrency) + except aio_pika.exceptions.MessageProcessError: + # Do not try to close() the queue iterator! If we get here, that means the RabbitMQ + # connection has died, and attempting to close the queue will only raise another exception. + raise RabbitMQLostConnectionError("Lost connection to RabbitMQ while processing a granule.") + except Exception as e: + await queue_iter.close() + await channel.close() + raise e diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py index e1e53bf..dabca81 100644 --- a/granule_ingester/granule_ingester/pipeline/Pipeline.py +++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py @@ -13,38 +13,46 @@ # See the License for the specific language governing permissions and # limitations under the License. - import logging -import os +import pickle import time +from multiprocessing import Manager from typing import List import xarray as xr import yaml -import aiomultiprocess +from aiomultiprocess import Pool +from aiomultiprocess.types import ProxyException +from granule_ingester.exceptions import PipelineBuildingError from granule_ingester.granule_loaders import GranuleLoader -from granule_ingester.pipeline.Modules import modules as processor_module_mappings +from granule_ingester.pipeline.Modules import \ + modules as processor_module_mappings from granule_ingester.processors.TileProcessor import TileProcessor from granule_ingester.slicers import TileSlicer from granule_ingester.writers import DataStore, MetadataStore from nexusproto import DataTile_pb2 as nexusproto +from tblib import pickling_support logger = logging.getLogger(__name__) -MAX_QUEUE_SIZE = 2 ** 15 - 1 +# The aiomultiprocessing library has a bug where it never closes out the pool if there are more than a certain +# number of items to process. The exact number is unknown, but 2**8-1 is safe. +MAX_CHUNK_SIZE = 2 ** 8 - 1 _worker_data_store: DataStore = None _worker_metadata_store: MetadataStore = None _worker_processor_list: List[TileProcessor] = None _worker_dataset = None +_shared_memory = None -def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory): +def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory, shared_memory): global _worker_data_store global _worker_metadata_store global _worker_processor_list global _worker_dataset + global _shared_memory # _worker_data_store and _worker_metadata_store open multiple TCP sockets from each worker process; # however, these sockets will be automatically closed by the OS once the worker processes die so no need to worry. @@ -52,19 +60,21 @@ def _init_worker(processor_list, dataset, data_store_factory, metadata_store_fac _worker_metadata_store = metadata_store_factory() _worker_processor_list = processor_list _worker_dataset = dataset + _shared_memory = shared_memory async def _process_tile_in_worker(serialized_input_tile: str): - global _worker_data_store - global _worker_metadata_store - global _worker_processor_list - global _worker_dataset + try: + input_tile = nexusproto.NexusTile.FromString(serialized_input_tile) + processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile) - input_tile = nexusproto.NexusTile.FromString(serialized_input_tile) - processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile) - if processed_tile: - await _worker_data_store.save_data(processed_tile) - await _worker_metadata_store.save_metadata(processed_tile) + if processed_tile: + await _worker_data_store.save_data(processed_tile) + await _worker_metadata_store.save_metadata(processed_tile) + except Exception as e: + pickling_support.install(e) + _shared_memory.error = pickle.dumps(e) + raise def _recurse(processor_list: List[TileProcessor], @@ -91,25 +101,34 @@ class Pipeline: self._metadata_store_factory = metadata_store_factory self._max_concurrency = max_concurrency - @classmethod - def from_string(cls, config_str: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16): - config = yaml.load(config_str, yaml.FullLoader) - return cls._build_pipeline(config, - data_store_factory, - metadata_store_factory, - processor_module_mappings, - max_concurrency) + # Create a SyncManager so that we can to communicate exceptions from the + # worker processes back to the main process. + self._manager = Manager() + + def __del__(self): + self._manager.shutdown() @classmethod - def from_file(cls, config_path: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16): - with open(config_path) as config_file: - config = yaml.load(config_file, yaml.FullLoader) + def from_string(cls, config_str: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16): + try: + config = yaml.load(config_str, yaml.FullLoader) + cls._validate_config(config) return cls._build_pipeline(config, data_store_factory, metadata_store_factory, processor_module_mappings, max_concurrency) + except yaml.scanner.ScannerError: + raise PipelineBuildingError("Cannot build pipeline because of a syntax error in the YAML.") + + # TODO: this method should validate the config against an actual schema definition + @staticmethod + def _validate_config(config: dict): + if type(config) is not dict: + raise PipelineBuildingError("Cannot build pipeline; the pipeline configuration that " + + "was received is not valid YAML.") + @classmethod def _build_pipeline(cls, config: dict, @@ -117,17 +136,27 @@ class Pipeline: metadata_store_factory, module_mappings: dict, max_concurrency: int): - granule_loader = GranuleLoader(**config['granule']) - - slicer_config = config['slicer'] - slicer = cls._parse_module(slicer_config, module_mappings) - - tile_processors = [] - for processor_config in config['processors']: - module = cls._parse_module(processor_config, module_mappings) - tile_processors.append(module) - - return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors, max_concurrency) + try: + granule_loader = GranuleLoader(**config['granule']) + + slicer_config = config['slicer'] + slicer = cls._parse_module(slicer_config, module_mappings) + + tile_processors = [] + for processor_config in config['processors']: + module = cls._parse_module(processor_config, module_mappings) + tile_processors.append(module) + + return cls(granule_loader, + slicer, + data_store_factory, + metadata_store_factory, + tile_processors, + max_concurrency) + except KeyError as e: + raise PipelineBuildingError(f"Cannot build pipeline because {e} is missing from the YAML.") + except Exception: + raise PipelineBuildingError("Cannot build pipeline.") @classmethod def _parse_module(cls, module_config: dict, module_mappings: dict): @@ -144,23 +173,32 @@ class Pipeline: async def run(self): async with self._granule_loader as (dataset, granule_name): start = time.perf_counter() - async with aiomultiprocess.Pool(initializer=_init_worker, - initargs=(self._tile_processors, - dataset, - self._data_store_factory, - self._metadata_store_factory), - maxtasksperchild=self._max_concurrency, - childconcurrency=self._max_concurrency) as pool: + + shared_memory = self._manager.Namespace() + async with Pool(initializer=_init_worker, + initargs=(self._tile_processors, + dataset, + self._data_store_factory, + self._metadata_store_factory, + shared_memory), + maxtasksperchild=self._max_concurrency, + childconcurrency=self._max_concurrency) as pool: serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in self._slicer.generate_tiles(dataset, granule_name)] # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that # a queue can't have more than 2**15-1 tasks. So, we have to batch it. - for chunk in type(self)._chunk_list(serialized_tiles, MAX_QUEUE_SIZE): - await pool.map(_process_tile_in_worker, chunk) + for chunk in self._chunk_list(serialized_tiles, MAX_CHUNK_SIZE): + try: + await pool.map(_process_tile_in_worker, chunk) + except ProxyException: + pool.terminate() + # Give the shared memory manager some time to write the exception + # await asyncio.sleep(1) + raise pickle.loads(shared_memory.error) end = time.perf_counter() logger.info("Pipeline finished in {} seconds".format(end - start)) @staticmethod - def _chunk_list(items, chunk_size): + def _chunk_list(items, chunk_size: int): return [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)] diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py index 14a44f5..8b69ad2 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py @@ -21,6 +21,7 @@ import numpy as np import xarray as xr from nexusproto import DataTile_pb2 as nexusproto +from granule_ingester.exceptions import TileProcessingError from granule_ingester.processors.TileProcessor import TileProcessor @@ -31,20 +32,17 @@ class TileReadingProcessor(TileProcessor, ABC): self.latitude = latitude self.longitude = longitude - # Common optional properties - self.temp_dir = None - self.metadata = None - # self.temp_dir = self.environ['TEMP_DIR'] - # self.metadata = self.environ['META'] - def process(self, tile, dataset: xr.Dataset, *args, **kwargs): - dimensions_to_slices = type(self)._convert_spec_to_slices(tile.summary.section_spec) + try: + dimensions_to_slices = self._convert_spec_to_slices(tile.summary.section_spec) - output_tile = nexusproto.NexusTile() - output_tile.CopyFrom(tile) - output_tile.summary.data_var_name = self.variable_to_read + output_tile = nexusproto.NexusTile() + output_tile.CopyFrom(tile) + output_tile.summary.data_var_name = self.variable_to_read - return self._generate_tile(dataset, dimensions_to_slices, output_tile) + return self._generate_tile(dataset, dimensions_to_slices, output_tile) + except Exception: + raise TileProcessingError("Could not generate tiles from the granule.") @abstractmethod def _generate_tile(self, dataset: xr.Dataset, dimensions_to_slices: Dict[str, slice], tile): diff --git a/granule_ingester/tests/pipeline/test_Pipeline.py b/granule_ingester/tests/pipeline/test_Pipeline.py index c18bf8b..34e66c6 100644 --- a/granule_ingester/tests/pipeline/test_Pipeline.py +++ b/granule_ingester/tests/pipeline/test_Pipeline.py @@ -29,10 +29,11 @@ class TestPipeline(unittest.TestCase): pass relative_path = "../config_files/ingestion_config_testfile.yaml" - file_path = os.path.join(os.path.dirname(__file__), relative_path) - pipeline = Pipeline.from_file(config_path=str(file_path), - data_store_factory=MockDataStore, - metadata_store_factory=MockMetadataStore) + with open(os.path.join(os.path.dirname(__file__), relative_path)) as file: + yaml_str = file.read() + pipeline = Pipeline.from_string(config_str=yaml_str, + data_store_factory=MockDataStore, + metadata_store_factory=MockMetadataStore) self.assertEqual(pipeline._data_store_factory, MockDataStore) self.assertEqual(pipeline._metadata_store_factory, MockMetadataStore)
