This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch rabbitmq-fix in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 385f5b83a169bf723608512fb98045b53f8dd5f2 Author: Eamon Ford <[email protected]> AuthorDate: Tue Jul 14 17:36:08 2020 -0700 error handling --- granule_ingester/granule_ingester/main.py | 8 ++--- .../granule_ingester/pipeline/Pipeline.py | 35 +++++++++++++--------- .../granule_ingester/writers/CassandraStore.py | 2 +- 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py index b9d475b..45877c2 100644 --- a/granule_ingester/granule_ingester/main.py +++ b/granule_ingester/granule_ingester/main.py @@ -46,7 +46,7 @@ async def run_health_checks(dependencies: List[HealthCheck]): return True -async def main(): +async def main(loop): parser = argparse.ArgumentParser(description='Listen to RabbitMQ for granule ingestion instructions, and process ' 'and ingest a granule for each message that comes through.') parser.add_argument('--rabbitmq_host', @@ -115,14 +115,14 @@ async def main(): await consumer.start_consuming() except FailedHealthCheckError as e: logger.error(f"Quitting because not all dependencies passed the health checks: {e}") - sys.exit(1) except LostConnectionError as e: logger.error(f"{e} Any messages that were being processed have been re-queued. Quitting.") - sys.exit(1) except Exception as e: logger.exception(f"Shutting down because of an unrecoverable error:\n{e}") + finally: sys.exit(1) if __name__ == '__main__': - asyncio.run(main()) + loop = asyncio.get_event_loop() + loop.run_until_complete(main(loop)) diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py index 14dc032..f1aa021 100644 --- a/granule_ingester/granule_ingester/pipeline/Pipeline.py +++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py @@ -19,15 +19,15 @@ import time from multiprocessing import Manager from typing import List -import aiomultiprocess import xarray as xr import yaml +from aiomultiprocess import Pool from aiomultiprocess.types import ProxyException from nexusproto import DataTile_pb2 as nexusproto from tblib import pickling_support from yaml.scanner import ScannerError -from granule_ingester.exceptions import PipelineBuildingError +from granule_ingester.exceptions import PipelineBuildingError, LostConnectionError from granule_ingester.granule_loaders import GranuleLoader from granule_ingester.pipeline.Modules import modules as processor_module_mappings from granule_ingester.processors.TileProcessor import TileProcessor @@ -36,7 +36,9 @@ from granule_ingester.writers import DataStore, MetadataStore logger = logging.getLogger(__name__) -MAX_QUEUE_SIZE = 2 ** 15 - 1 +# The aiomultiprocessing library has a bug where it never closes out the pool if there are more than a certain +# number of items to process. The exact number is unknown, but 2**8-1 is safe. +MAX_CHUNK_SIZE = 2 ** 8 - 1 _worker_data_store: DataStore = None _worker_metadata_store: MetadataStore = None @@ -97,9 +99,12 @@ class Pipeline: self._data_store_factory = data_store_factory self._metadata_store_factory = metadata_store_factory - # Create a SyncManager Namespace so that we can to communicate exceptions from the + # Create a SyncManager so that we can to communicate exceptions from the # worker processes back to the main process. - self._shared_memory = Manager().Namespace() + self._manager = Manager() + + def __del__(self): + self._manager.shutdown() @classmethod def from_string(cls, config_str: str, data_store_factory, metadata_store_factory): @@ -158,26 +163,28 @@ class Pipeline: async def run(self): async with self._granule_loader as (dataset, granule_name): start = time.perf_counter() - async with aiomultiprocess.Pool(initializer=_init_worker, - initargs=(self._tile_processors, - dataset, - self._data_store_factory, - self._metadata_store_factory, - self._shared_memory)) as pool: + + shared_memory = self._manager.Namespace() + async with Pool(initializer=_init_worker, + initargs=(self._tile_processors, + dataset, + self._data_store_factory, + self._metadata_store_factory, + shared_memory)) as pool: serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in self._slicer.generate_tiles(dataset, granule_name)] # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that # a queue can't have more than 2**15-1 tasks. So, we have to batch it. - for chunk in self._chunk_list(serialized_tiles, MAX_QUEUE_SIZE): + for chunk in self._chunk_list(serialized_tiles, MAX_CHUNK_SIZE): try: await pool.map(_process_tile_in_worker, chunk) except ProxyException: pool.terminate() - raise pickle.loads(self._shared_memory.error) + raise pickle.loads(shared_memory.error) end = time.perf_counter() logger.info("Pipeline finished in {} seconds".format(end - start)) @staticmethod - def _chunk_list(items, chunk_size): + def _chunk_list(items, chunk_size: int): return [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)] diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py index 791911e..6b2cf32 100644 --- a/granule_ingester/granule_ingester/writers/CassandraStore.py +++ b/granule_ingester/granule_ingester/writers/CassandraStore.py @@ -76,7 +76,7 @@ class CassandraStore(DataStore): prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)") await self._execute_query_async(self._session, prepared_query, [tile_id, bytearray(serialized_tile_data)]) - except Exception: + except NoHostAvailable: raise CassandraLostConnectionError(f"Lost connection to Cassandra, and cannot save tiles.") @staticmethod
