This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 3afb12c159ef309decf21669597b03969270db74
Author: Eamon Ford <[email protected]>
AuthorDate: Tue Jul 14 14:08:05 2020 -0700

    exc handling
---
 granule_ingester/granule_ingester/consumer/Consumer.py     | 11 +++++++----
 granule_ingester/granule_ingester/exceptions/Exceptions.py |  8 +++++---
 granule_ingester/granule_ingester/exceptions/__init__.py   |  1 -
 granule_ingester/granule_ingester/main.py                  |  2 +-
 4 files changed, 13 insertions(+), 9 deletions(-)

diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py 
b/granule_ingester/granule_ingester/consumer/Consumer.py
index 59db4e8..d40b54c 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -17,8 +17,8 @@ import logging
 
 import aio_pika
 
-from granule_ingester.exceptions import PipelineBuildingError, 
PipelineRunningError, RabbitMQConnectionError, \
-    RabbitMQFailedHealthCheckError
+from granule_ingester.exceptions import PipelineBuildingError, 
PipelineRunningError, RabbitMQLostConnectionError, \
+    RabbitMQFailedHealthCheckError, LostConnectionError
 from granule_ingester.healthcheck import HealthCheck
 from granule_ingester.pipeline import Pipeline
 
@@ -83,6 +83,9 @@ class Consumer(HealthCheck):
         except PipelineRunningError as e:
             await message.reject()
             logger.exception(f"Processing the granule failed. It will not be 
retried. The exception was:\n{e}")
+        except LostConnectionError:
+            # Let main() handle this
+            raise
         except Exception as e:
             await message.reject(requeue=True)
             logger.exception(f"Processing message failed. Message will be 
re-queued. The exception was:\n{e}")
@@ -98,7 +101,7 @@ class Consumer(HealthCheck):
             except aio_pika.exceptions.MessageProcessError:
                 # Do not try to close() the queue iterator! If we get here, 
that means the RabbitMQ
                 # connection has died, and attempting to close the queue will 
only raise another exception.
-                raise RabbitMQConnectionError("Lost connection to RabbitMQ 
while processing a granule.")
+                raise RabbitMQLostConnectionError("Lost connection to RabbitMQ 
while processing a granule.")
             except Exception as e:
-                queue_iter.close()
+                await queue_iter.close()
                 raise e
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py 
b/granule_ingester/granule_ingester/exceptions/Exceptions.py
index 7741ca6..0304b9b 100644
--- a/granule_ingester/granule_ingester/exceptions/Exceptions.py
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -25,7 +25,11 @@ class SolrLostConnectionError(LostConnectionError):
     pass
 
 
-class CassandraConnectionError(Exception):
+class RabbitMQLostConnectionError(LostConnectionError):
+    pass
+
+
+class CassandraLostConnectionError(LostConnectionError):
     pass
 
 
@@ -43,5 +47,3 @@ class SolrFailedHealthCheckError(FailedHealthCheckError):
 
 class RabbitMQFailedHealthCheckError(FailedHealthCheckError):
     pass
-
-
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py 
b/granule_ingester/granule_ingester/exceptions/__init__.py
index 838ccff..ea0969f 100644
--- a/granule_ingester/granule_ingester/exceptions/__init__.py
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -1,4 +1,3 @@
-from .Exceptions import CassandraConnectionError
 from .Exceptions import CassandraFailedHealthCheckError
 from .Exceptions import CassandraLostConnectionError
 from .Exceptions import FailedHealthCheckError
diff --git a/granule_ingester/granule_ingester/main.py 
b/granule_ingester/granule_ingester/main.py
index 86bc569..bb9ad40 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -15,7 +15,7 @@
 
 import argparse
 import asyncio
-from granule_ingester.exceptions import FailedHealthCheckError
+from granule_ingester.exceptions import FailedHealthCheckError, 
LostConnectionError
 import logging
 from functools import partial
 from typing import List

Reply via email to