This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch rabbitmq-fix in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 3afb12c159ef309decf21669597b03969270db74 Author: Eamon Ford <[email protected]> AuthorDate: Tue Jul 14 14:08:05 2020 -0700 exc handling --- granule_ingester/granule_ingester/consumer/Consumer.py | 11 +++++++---- granule_ingester/granule_ingester/exceptions/Exceptions.py | 8 +++++--- granule_ingester/granule_ingester/exceptions/__init__.py | 1 - granule_ingester/granule_ingester/main.py | 2 +- 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py index 59db4e8..d40b54c 100644 --- a/granule_ingester/granule_ingester/consumer/Consumer.py +++ b/granule_ingester/granule_ingester/consumer/Consumer.py @@ -17,8 +17,8 @@ import logging import aio_pika -from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQConnectionError, \ - RabbitMQFailedHealthCheckError +from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQLostConnectionError, \ + RabbitMQFailedHealthCheckError, LostConnectionError from granule_ingester.healthcheck import HealthCheck from granule_ingester.pipeline import Pipeline @@ -83,6 +83,9 @@ class Consumer(HealthCheck): except PipelineRunningError as e: await message.reject() logger.exception(f"Processing the granule failed. It will not be retried. The exception was:\n{e}") + except LostConnectionError: + # Let main() handle this + raise except Exception as e: await message.reject(requeue=True) logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}") @@ -98,7 +101,7 @@ class Consumer(HealthCheck): except aio_pika.exceptions.MessageProcessError: # Do not try to close() the queue iterator! If we get here, that means the RabbitMQ # connection has died, and attempting to close the queue will only raise another exception. - raise RabbitMQConnectionError("Lost connection to RabbitMQ while processing a granule.") + raise RabbitMQLostConnectionError("Lost connection to RabbitMQ while processing a granule.") except Exception as e: - queue_iter.close() + await queue_iter.close() raise e diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py index 7741ca6..0304b9b 100644 --- a/granule_ingester/granule_ingester/exceptions/Exceptions.py +++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py @@ -25,7 +25,11 @@ class SolrLostConnectionError(LostConnectionError): pass -class CassandraConnectionError(Exception): +class RabbitMQLostConnectionError(LostConnectionError): + pass + + +class CassandraLostConnectionError(LostConnectionError): pass @@ -43,5 +47,3 @@ class SolrFailedHealthCheckError(FailedHealthCheckError): class RabbitMQFailedHealthCheckError(FailedHealthCheckError): pass - - diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py index 838ccff..ea0969f 100644 --- a/granule_ingester/granule_ingester/exceptions/__init__.py +++ b/granule_ingester/granule_ingester/exceptions/__init__.py @@ -1,4 +1,3 @@ -from .Exceptions import CassandraConnectionError from .Exceptions import CassandraFailedHealthCheckError from .Exceptions import CassandraLostConnectionError from .Exceptions import FailedHealthCheckError diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py index 86bc569..bb9ad40 100644 --- a/granule_ingester/granule_ingester/main.py +++ b/granule_ingester/granule_ingester/main.py @@ -15,7 +15,7 @@ import argparse import asyncio -from granule_ingester.exceptions import FailedHealthCheckError +from granule_ingester.exceptions import FailedHealthCheckError, LostConnectionError import logging from functools import partial from typing import List
