This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch rabbitmq-fix in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 1f7aacf36015c1664d64dfbf916e385918e57d20 Author: Eamon Ford <[email protected]> AuthorDate: Thu Jul 9 17:15:03 2020 -0500 error handling --- .../granule_ingester/consumer/Consumer.py | 30 ++++++++++++++-------- .../granule_ingester/exceptions/Exceptions.py | 4 +++ .../granule_ingester/exceptions/__init__.py | 3 ++- granule_ingester/granule_ingester/main.py | 24 ++++++++++------- .../granule_ingester/pipeline/Pipeline.py | 4 +++ .../granule_ingester/writers/CassandraStore.py | 13 ++++++---- 6 files changed, 52 insertions(+), 26 deletions(-) diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py index fadfe75..439415f 100644 --- a/granule_ingester/granule_ingester/consumer/Consumer.py +++ b/granule_ingester/granule_ingester/consumer/Consumer.py @@ -16,8 +16,8 @@ import logging import aio_pika - -from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError +import sys +from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, ConnectionErrorRabbitMQ from granule_ingester.healthcheck import HealthCheck from granule_ingester.pipeline import Pipeline @@ -40,7 +40,7 @@ class Consumer(HealthCheck): self._connection_string = "amqp://{username}:{password}@{host}/".format(username=rabbitmq_username, password=rabbitmq_password, host=rabbitmq_host) - self._connection = None + self._connection: aio_pika.Connection = None async def health_check(self) -> bool: try: @@ -51,7 +51,7 @@ class Consumer(HealthCheck): logger.error("Cannot connect to RabbitMQ! Connection string was {}".format(self._connection_string)) return False - async def _get_connection(self): + async def _get_connection(self) -> aio_pika.Connection: return await aio_pika.connect_robust(self._connection_string) async def __aenter__(self): @@ -74,23 +74,31 @@ class Consumer(HealthCheck): data_store_factory=data_store_factory, metadata_store_factory=metadata_store_factory) await pipeline.run() - message.ack() + await message.ack() except PipelineBuildingError as e: - message.reject() + await message.reject() logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped " f"from RabbitMQ. The exception was:\n{e}") except PipelineRunningError as e: - message.reject() + await message.reject() logger.exception(f"Processing the granule failed. It will not be retried. The exception was:\n{e}") except Exception as e: - message.reject(requeue=True) + await message.reject(requeue=True) logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}") async def start_consuming(self): channel = await self._connection.channel() await channel.set_qos(prefetch_count=1) queue = await channel.declare_queue(self._rabbitmq_queue, durable=True) - - async with queue.iterator() as queue_iter: - async for message in queue_iter: + queue_iter = queue.iterator() + async for message in queue_iter: + try: await self._received_message(message, self._data_store_factory, self._metadata_store_factory) + except aio_pika.exceptions.MessageProcessError: + # Do not try to close() the queue iterator! If we get here, that means the RabbitMQ + # connection has died, and attempting to close the queue will only raise another exception. + raise ConnectionErrorRabbitMQ("Lost connection to RabbitMQ while processing a granule.") + except Exception as e: + queue_iter.close() + raise e + diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py index 8c25532..0dda59e 100644 --- a/granule_ingester/granule_ingester/exceptions/Exceptions.py +++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py @@ -8,3 +8,7 @@ class PipelineRunningError(Exception): class TileProcessingError(Exception): pass + + +class ConnectionErrorRabbitMQ(Exception): + pass diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py index 71607c2..d0c2344 100644 --- a/granule_ingester/granule_ingester/exceptions/__init__.py +++ b/granule_ingester/granule_ingester/exceptions/__init__.py @@ -1,3 +1,4 @@ -from .Exceptions import TileProcessingError +from .Exceptions import ConnectionErrorRabbitMQ from .Exceptions import PipelineBuildingError from .Exceptions import PipelineRunningError +from .Exceptions import TileProcessingError diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py index 5a8fc2d..6e1a289 100644 --- a/granule_ingester/granule_ingester/main.py +++ b/granule_ingester/granule_ingester/main.py @@ -23,6 +23,7 @@ from granule_ingester.consumer import Consumer from granule_ingester.healthcheck import HealthCheck from granule_ingester.writers import CassandraStore from granule_ingester.writers import SolrStore +import sys def cassandra_factory(contact_points, port): @@ -104,15 +105,20 @@ async def main(): rabbitmq_queue=args.rabbitmq_queue, data_store_factory=partial(cassandra_factory, cassandra_contact_points, cassandra_port), metadata_store_factory=partial(solr_factory, solr_host_and_port)) - if await run_health_checks( - [CassandraStore(cassandra_contact_points, cassandra_port), - SolrStore(solr_host_and_port), - consumer]): - async with consumer: - logger.info("All external dependencies have passed the health checks. Now listening to message queue.") - await consumer.start_consuming() - else: - logger.error("Quitting because not all dependencies passed the health checks.") + try: + if await run_health_checks( + [CassandraStore(cassandra_contact_points, cassandra_port), + SolrStore(solr_host_and_port), + consumer]): + async with consumer: + logger.info("All external dependencies have passed the health checks. Now listening to message queue.") + await consumer.start_consuming() + else: + logger.error("Quitting because not all dependencies passed the health checks.") + sys.exit(1) + except Exception as e: + logger.exception(f"Shutting down because of an unrecoverable error:\n{e}") + sys.exit(1) if __name__ == '__main__': diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py index c7b5d6a..e52d99f 100644 --- a/granule_ingester/granule_ingester/pipeline/Pipeline.py +++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py @@ -22,6 +22,7 @@ import aiomultiprocess import xarray as xr import yaml from aiomultiprocess.types import ProxyException +from cassandra.cluster import NoHostAvailable from nexusproto import DataTile_pb2 as nexusproto from yaml.scanner import ScannerError @@ -66,8 +67,11 @@ async def _process_tile_in_worker(serialized_input_tile: str): processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile) if processed_tile: + # try: await _worker_data_store.save_data(processed_tile) await _worker_metadata_store.save_metadata(processed_tile) + # except NoHostAvailable as e: + # logger.error(f"Could not save tile {processed_tile.tile.tile_id} to Cassandra") def _recurse(processor_list: List[TileProcessor], diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py index 7a9f146..ffac63d 100644 --- a/granule_ingester/granule_ingester/writers/CassandraStore.py +++ b/granule_ingester/granule_ingester/writers/CassandraStore.py @@ -18,7 +18,7 @@ import asyncio import logging import uuid -from cassandra.cluster import Cluster, Session +from cassandra.cluster import Cluster, Session, NoHostAvailable from cassandra.cqlengine import columns from cassandra.cqlengine.models import Model from nexusproto.DataTile_pb2 import NexusTile, TileData @@ -65,10 +65,13 @@ class CassandraStore(DataStore): self._session.shutdown() async def save_data(self, tile: NexusTile) -> None: - tile_id = uuid.UUID(tile.summary.tile_id) - serialized_tile_data = TileData.SerializeToString(tile.tile) - prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)") - await type(self)._execute_query_async(self._session, prepared_query, [tile_id, bytearray(serialized_tile_data)]) + try: + tile_id = uuid.UUID(tile.summary.tile_id) + serialized_tile_data = TileData.SerializeToString(tile.tile) + prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)") + await type(self)._execute_query_async(self._session, prepared_query, [tile_id, bytearray(serialized_tile_data)]) + except NoHostAvailable as e: + logger.error(f"Cannot connect to Cassandra to save tile {tile.summary.tile_id}") @staticmethod async def _execute_query_async(session: Session, query, parameters=None):
