This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch rabbitmq-fix in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 41755a5f1663085094c9d55de8c183748b65bb63 Author: Eamon Ford <[email protected]> AuthorDate: Tue Jul 14 14:08:05 2020 -0700 exc handling --- granule_ingester/granule_ingester/consumer/Consumer.py | 11 +++++++---- granule_ingester/granule_ingester/exceptions/Exceptions.py | 10 ++++++---- granule_ingester/granule_ingester/exceptions/__init__.py | 5 +++-- granule_ingester/granule_ingester/main.py | 5 ++++- granule_ingester/granule_ingester/writers/CassandraStore.py | 4 ++-- 5 files changed, 22 insertions(+), 13 deletions(-) diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py index 59db4e8..d40b54c 100644 --- a/granule_ingester/granule_ingester/consumer/Consumer.py +++ b/granule_ingester/granule_ingester/consumer/Consumer.py @@ -17,8 +17,8 @@ import logging import aio_pika -from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQConnectionError, \ - RabbitMQFailedHealthCheckError +from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQLostConnectionError, \ + RabbitMQFailedHealthCheckError, LostConnectionError from granule_ingester.healthcheck import HealthCheck from granule_ingester.pipeline import Pipeline @@ -83,6 +83,9 @@ class Consumer(HealthCheck): except PipelineRunningError as e: await message.reject() logger.exception(f"Processing the granule failed. It will not be retried. The exception was:\n{e}") + except LostConnectionError: + # Let main() handle this + raise except Exception as e: await message.reject(requeue=True) logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}") @@ -98,7 +101,7 @@ class Consumer(HealthCheck): except aio_pika.exceptions.MessageProcessError: # Do not try to close() the queue iterator! If we get here, that means the RabbitMQ # connection has died, and attempting to close the queue will only raise another exception. - raise RabbitMQConnectionError("Lost connection to RabbitMQ while processing a granule.") + raise RabbitMQLostConnectionError("Lost connection to RabbitMQ while processing a granule.") except Exception as e: - queue_iter.close() + await queue_iter.close() raise e diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py index f43bc2f..ca60608 100644 --- a/granule_ingester/granule_ingester/exceptions/Exceptions.py +++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py @@ -10,11 +10,15 @@ class TileProcessingError(Exception): pass -class RabbitMQConnectionError(Exception): +class LostConnectionError(Exception): pass -class CassandraConnectionError(Exception): +class RabbitMQLostConnectionError(LostConnectionError): + pass + + +class CassandraLostConnectionError(LostConnectionError): pass @@ -32,5 +36,3 @@ class SolrFailedHealthCheckError(FailedHealthCheckError): class RabbitMQFailedHealthCheckError(FailedHealthCheckError): pass - - diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py index 400c9bf..31cc5b8 100644 --- a/granule_ingester/granule_ingester/exceptions/__init__.py +++ b/granule_ingester/granule_ingester/exceptions/__init__.py @@ -1,9 +1,10 @@ -from .Exceptions import CassandraConnectionError from .Exceptions import CassandraFailedHealthCheckError +from .Exceptions import CassandraLostConnectionError from .Exceptions import FailedHealthCheckError +from .Exceptions import LostConnectionError from .Exceptions import PipelineBuildingError from .Exceptions import PipelineRunningError -from .Exceptions import RabbitMQConnectionError from .Exceptions import RabbitMQFailedHealthCheckError +from .Exceptions import RabbitMQLostConnectionError from .Exceptions import SolrFailedHealthCheckError from .Exceptions import TileProcessingError diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py index 2754c7f..b9d475b 100644 --- a/granule_ingester/granule_ingester/main.py +++ b/granule_ingester/granule_ingester/main.py @@ -15,7 +15,7 @@ import argparse import asyncio -from granule_ingester.exceptions import FailedHealthCheckError +from granule_ingester.exceptions import FailedHealthCheckError, LostConnectionError import logging from functools import partial from typing import List @@ -116,6 +116,9 @@ async def main(): except FailedHealthCheckError as e: logger.error(f"Quitting because not all dependencies passed the health checks: {e}") sys.exit(1) + except LostConnectionError as e: + logger.error(f"{e} Any messages that were being processed have been re-queued. Quitting.") + sys.exit(1) except Exception as e: logger.exception(f"Shutting down because of an unrecoverable error:\n{e}") sys.exit(1) diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py index fbb5a7d..791911e 100644 --- a/granule_ingester/granule_ingester/writers/CassandraStore.py +++ b/granule_ingester/granule_ingester/writers/CassandraStore.py @@ -24,7 +24,7 @@ from cassandra.cqlengine.models import Model from cassandra.policies import RetryPolicy, ConstantReconnectionPolicy from nexusproto.DataTile_pb2 import NexusTile, TileData -from granule_ingester.exceptions import CassandraFailedHealthCheckError, CassandraConnectionError +from granule_ingester.exceptions import CassandraFailedHealthCheckError, CassandraLostConnectionError from granule_ingester.writers.DataStore import DataStore logging.getLogger('cassandra').setLevel(logging.INFO) @@ -77,7 +77,7 @@ class CassandraStore(DataStore): await self._execute_query_async(self._session, prepared_query, [tile_id, bytearray(serialized_tile_data)]) except Exception: - raise CassandraConnectionError(f"Cannot connect to Cassandra to save tile.") + raise CassandraLostConnectionError(f"Lost connection to Cassandra, and cannot save tiles.") @staticmethod async def _execute_query_async(session: Session, query, parameters=None):
