This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 6b2e6a0b005e1295cbafa74d553dfdd09ca9550d
Author: Eamon Ford <[email protected]>
AuthorDate: Thu Jul 9 17:15:03 2020 -0500

    error handling
---
 .../granule_ingester/consumer/Consumer.py          | 30 ++++++++++++++--------
 granule_ingester/granule_ingester/main.py          |  1 +
 .../granule_ingester/pipeline/Pipeline.py          |  4 +++
 3 files changed, 24 insertions(+), 11 deletions(-)

diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py 
b/granule_ingester/granule_ingester/consumer/Consumer.py
index fadfe75..439415f 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -16,8 +16,8 @@
 import logging
 
 import aio_pika
-
-from granule_ingester.exceptions import PipelineBuildingError, 
PipelineRunningError
+import sys
+from granule_ingester.exceptions import PipelineBuildingError, 
PipelineRunningError, ConnectionErrorRabbitMQ
 from granule_ingester.healthcheck import HealthCheck
 from granule_ingester.pipeline import Pipeline
 
@@ -40,7 +40,7 @@ class Consumer(HealthCheck):
         self._connection_string = 
"amqp://{username}:{password}@{host}/".format(username=rabbitmq_username,
                                                                                
 password=rabbitmq_password,
                                                                                
 host=rabbitmq_host)
-        self._connection = None
+        self._connection: aio_pika.Connection = None
 
     async def health_check(self) -> bool:
         try:
@@ -51,7 +51,7 @@ class Consumer(HealthCheck):
             logger.error("Cannot connect to RabbitMQ! Connection string was 
{}".format(self._connection_string))
             return False
 
-    async def _get_connection(self):
+    async def _get_connection(self) -> aio_pika.Connection:
         return await aio_pika.connect_robust(self._connection_string)
 
     async def __aenter__(self):
@@ -74,23 +74,31 @@ class Consumer(HealthCheck):
                                             
data_store_factory=data_store_factory,
                                             
metadata_store_factory=metadata_store_factory)
             await pipeline.run()
-            message.ack()
+            await message.ack()
         except PipelineBuildingError as e:
-            message.reject()
+            await message.reject()
             logger.exception(f"Failed to build the granule-processing 
pipeline. This message will be dropped "
                              f"from RabbitMQ. The exception was:\n{e}")
         except PipelineRunningError as e:
-            message.reject()
+            await message.reject()
             logger.exception(f"Processing the granule failed. It will not be 
retried. The exception was:\n{e}")
         except Exception as e:
-            message.reject(requeue=True)
+            await message.reject(requeue=True)
             logger.exception(f"Processing message failed. Message will be 
re-queued. The exception was:\n{e}")
 
     async def start_consuming(self):
         channel = await self._connection.channel()
         await channel.set_qos(prefetch_count=1)
         queue = await channel.declare_queue(self._rabbitmq_queue, durable=True)
-
-        async with queue.iterator() as queue_iter:
-            async for message in queue_iter:
+        queue_iter = queue.iterator()
+        async for message in queue_iter:
+            try:
                 await self._received_message(message, 
self._data_store_factory, self._metadata_store_factory)
+            except aio_pika.exceptions.MessageProcessError:
+                # Do not try to close() the queue iterator! If we get here, 
that means the RabbitMQ
+                # connection has died, and attempting to close the queue will 
only raise another exception.
+                raise ConnectionErrorRabbitMQ("Lost connection to RabbitMQ 
while processing a granule.")
+            except Exception as e:
+                queue_iter.close()
+                raise e
+
diff --git a/granule_ingester/granule_ingester/main.py 
b/granule_ingester/granule_ingester/main.py
index 9010e33..a7a66c6 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -23,6 +23,7 @@ from granule_ingester.consumer import Consumer
 from granule_ingester.healthcheck import HealthCheck
 from granule_ingester.writers import CassandraStore
 from granule_ingester.writers import SolrStore
+import sys
 
 
 def cassandra_factory(contact_points, port, username, password):
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py 
b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index c7b5d6a..e52d99f 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -22,6 +22,7 @@ import aiomultiprocess
 import xarray as xr
 import yaml
 from aiomultiprocess.types import ProxyException
+from cassandra.cluster import NoHostAvailable
 from nexusproto import DataTile_pb2 as nexusproto
 from yaml.scanner import ScannerError
 
@@ -66,8 +67,11 @@ async def _process_tile_in_worker(serialized_input_tile: 
str):
     processed_tile = _recurse(_worker_processor_list, _worker_dataset, 
input_tile)
 
     if processed_tile:
+        # try:
         await _worker_data_store.save_data(processed_tile)
         await _worker_metadata_store.save_metadata(processed_tile)
+        # except NoHostAvailable as e:
+        #     logger.error(f"Could not save tile {processed_tile.tile.tile_id} 
to Cassandra")
 
 
 def _recurse(processor_list: List[TileProcessor],

Reply via email to