This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch rabbitmq-fix in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 6b2e6a0b005e1295cbafa74d553dfdd09ca9550d Author: Eamon Ford <[email protected]> AuthorDate: Thu Jul 9 17:15:03 2020 -0500 error handling --- .../granule_ingester/consumer/Consumer.py | 30 ++++++++++++++-------- granule_ingester/granule_ingester/main.py | 1 + .../granule_ingester/pipeline/Pipeline.py | 4 +++ 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py index fadfe75..439415f 100644 --- a/granule_ingester/granule_ingester/consumer/Consumer.py +++ b/granule_ingester/granule_ingester/consumer/Consumer.py @@ -16,8 +16,8 @@ import logging import aio_pika - -from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError +import sys +from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, ConnectionErrorRabbitMQ from granule_ingester.healthcheck import HealthCheck from granule_ingester.pipeline import Pipeline @@ -40,7 +40,7 @@ class Consumer(HealthCheck): self._connection_string = "amqp://{username}:{password}@{host}/".format(username=rabbitmq_username, password=rabbitmq_password, host=rabbitmq_host) - self._connection = None + self._connection: aio_pika.Connection = None async def health_check(self) -> bool: try: @@ -51,7 +51,7 @@ class Consumer(HealthCheck): logger.error("Cannot connect to RabbitMQ! Connection string was {}".format(self._connection_string)) return False - async def _get_connection(self): + async def _get_connection(self) -> aio_pika.Connection: return await aio_pika.connect_robust(self._connection_string) async def __aenter__(self): @@ -74,23 +74,31 @@ class Consumer(HealthCheck): data_store_factory=data_store_factory, metadata_store_factory=metadata_store_factory) await pipeline.run() - message.ack() + await message.ack() except PipelineBuildingError as e: - message.reject() + await message.reject() logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped " f"from RabbitMQ. The exception was:\n{e}") except PipelineRunningError as e: - message.reject() + await message.reject() logger.exception(f"Processing the granule failed. It will not be retried. The exception was:\n{e}") except Exception as e: - message.reject(requeue=True) + await message.reject(requeue=True) logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}") async def start_consuming(self): channel = await self._connection.channel() await channel.set_qos(prefetch_count=1) queue = await channel.declare_queue(self._rabbitmq_queue, durable=True) - - async with queue.iterator() as queue_iter: - async for message in queue_iter: + queue_iter = queue.iterator() + async for message in queue_iter: + try: await self._received_message(message, self._data_store_factory, self._metadata_store_factory) + except aio_pika.exceptions.MessageProcessError: + # Do not try to close() the queue iterator! If we get here, that means the RabbitMQ + # connection has died, and attempting to close the queue will only raise another exception. + raise ConnectionErrorRabbitMQ("Lost connection to RabbitMQ while processing a granule.") + except Exception as e: + queue_iter.close() + raise e + diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py index 9010e33..a7a66c6 100644 --- a/granule_ingester/granule_ingester/main.py +++ b/granule_ingester/granule_ingester/main.py @@ -23,6 +23,7 @@ from granule_ingester.consumer import Consumer from granule_ingester.healthcheck import HealthCheck from granule_ingester.writers import CassandraStore from granule_ingester.writers import SolrStore +import sys def cassandra_factory(contact_points, port, username, password): diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py index c7b5d6a..e52d99f 100644 --- a/granule_ingester/granule_ingester/pipeline/Pipeline.py +++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py @@ -22,6 +22,7 @@ import aiomultiprocess import xarray as xr import yaml from aiomultiprocess.types import ProxyException +from cassandra.cluster import NoHostAvailable from nexusproto import DataTile_pb2 as nexusproto from yaml.scanner import ScannerError @@ -66,8 +67,11 @@ async def _process_tile_in_worker(serialized_input_tile: str): processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile) if processed_tile: + # try: await _worker_data_store.save_data(processed_tile) await _worker_metadata_store.save_metadata(processed_tile) + # except NoHostAvailable as e: + # logger.error(f"Could not save tile {processed_tile.tile.tile_id} to Cassandra") def _recurse(processor_list: List[TileProcessor],
