This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch rabbitmq-fix in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit bddc7c541f85fa4ee69adb0be3a003c87c465aeb Author: Eamon Ford <[email protected]> AuthorDate: Fri Jul 10 10:14:05 2020 -0500 the healthchecks now raise exceptions if they rail --- .../granule_ingester/consumer/Consumer.py | 12 ++++++------ .../granule_ingester/exceptions/Exceptions.py | 18 +++++++++++++++++- .../granule_ingester/exceptions/__init__.py | 6 +++++- granule_ingester/granule_ingester/main.py | 20 ++++++++++---------- .../granule_ingester/writers/CassandraStore.py | 9 +++++---- .../granule_ingester/writers/DataStore.py | 1 + .../granule_ingester/writers/SolrStore.py | 4 ++-- 7 files changed, 46 insertions(+), 24 deletions(-) diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py index 439415f..59db4e8 100644 --- a/granule_ingester/granule_ingester/consumer/Consumer.py +++ b/granule_ingester/granule_ingester/consumer/Consumer.py @@ -16,8 +16,9 @@ import logging import aio_pika -import sys -from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, ConnectionErrorRabbitMQ + +from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQConnectionError, \ + RabbitMQFailedHealthCheckError from granule_ingester.healthcheck import HealthCheck from granule_ingester.pipeline import Pipeline @@ -48,8 +49,8 @@ class Consumer(HealthCheck): await connection.close() return True except: - logger.error("Cannot connect to RabbitMQ! Connection string was {}".format(self._connection_string)) - return False + raise RabbitMQFailedHealthCheckError(f"Cannot connect to RabbitMQ! " + f"Connection string was {self._connection_string}") async def _get_connection(self) -> aio_pika.Connection: return await aio_pika.connect_robust(self._connection_string) @@ -97,8 +98,7 @@ class Consumer(HealthCheck): except aio_pika.exceptions.MessageProcessError: # Do not try to close() the queue iterator! If we get here, that means the RabbitMQ # connection has died, and attempting to close the queue will only raise another exception. - raise ConnectionErrorRabbitMQ("Lost connection to RabbitMQ while processing a granule.") + raise RabbitMQConnectionError("Lost connection to RabbitMQ while processing a granule.") except Exception as e: queue_iter.close() raise e - diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py index 0dda59e..6e7d89a 100644 --- a/granule_ingester/granule_ingester/exceptions/Exceptions.py +++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py @@ -10,5 +10,21 @@ class TileProcessingError(Exception): pass -class ConnectionErrorRabbitMQ(Exception): +class RabbitMQConnectionError(Exception): + pass + + +class FailedHealthCheckError(Exception): + pass + + +class CassandraFailedHealthCheckError(FailedHealthCheckError): + pass + + +class SolrFailedHealthCheckError(FailedHealthCheckError): + pass + + +class RabbitMQFailedHealthCheckError(FailedHealthCheckError): pass diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py index d0c2344..2ba1b4a 100644 --- a/granule_ingester/granule_ingester/exceptions/__init__.py +++ b/granule_ingester/granule_ingester/exceptions/__init__.py @@ -1,4 +1,8 @@ -from .Exceptions import ConnectionErrorRabbitMQ +from .Exceptions import CassandraFailedHealthCheckError +from .Exceptions import FailedHealthCheckError from .Exceptions import PipelineBuildingError from .Exceptions import PipelineRunningError +from .Exceptions import RabbitMQConnectionError +from .Exceptions import RabbitMQFailedHealthCheckError +from .Exceptions import SolrFailedHealthCheckError from .Exceptions import TileProcessingError diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py index 6e1a289..2754c7f 100644 --- a/granule_ingester/granule_ingester/main.py +++ b/granule_ingester/granule_ingester/main.py @@ -15,6 +15,7 @@ import argparse import asyncio +from granule_ingester.exceptions import FailedHealthCheckError import logging from functools import partial from typing import List @@ -106,16 +107,15 @@ async def main(): data_store_factory=partial(cassandra_factory, cassandra_contact_points, cassandra_port), metadata_store_factory=partial(solr_factory, solr_host_and_port)) try: - if await run_health_checks( - [CassandraStore(cassandra_contact_points, cassandra_port), - SolrStore(solr_host_and_port), - consumer]): - async with consumer: - logger.info("All external dependencies have passed the health checks. Now listening to message queue.") - await consumer.start_consuming() - else: - logger.error("Quitting because not all dependencies passed the health checks.") - sys.exit(1) + await run_health_checks([CassandraStore(cassandra_contact_points, cassandra_port), + SolrStore(solr_host_and_port), + consumer]) + async with consumer: + logger.info("All external dependencies have passed the health checks. Now listening to message queue.") + await consumer.start_consuming() + except FailedHealthCheckError as e: + logger.error(f"Quitting because not all dependencies passed the health checks: {e}") + sys.exit(1) except Exception as e: logger.exception(f"Shutting down because of an unrecoverable error:\n{e}") sys.exit(1) diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py index ffac63d..530871d 100644 --- a/granule_ingester/granule_ingester/writers/CassandraStore.py +++ b/granule_ingester/granule_ingester/writers/CassandraStore.py @@ -23,6 +23,7 @@ from cassandra.cqlengine import columns from cassandra.cqlengine.models import Model from nexusproto.DataTile_pb2 import NexusTile, TileData +from granule_ingester.exceptions import CassandraFailedHealthCheckError from granule_ingester.writers.DataStore import DataStore logging.getLogger('cassandra').setLevel(logging.INFO) @@ -47,9 +48,8 @@ class CassandraStore(DataStore): session = self._get_session() session.shutdown() return True - except: - logger.error("Cannot connect to Cassandra!") - return False + except Exception: + raise CassandraFailedHealthCheckError("Cannot connect to Cassandra!") def _get_session(self) -> Session: cluster = Cluster(contact_points=self._contact_points, port=self._port) @@ -69,7 +69,8 @@ class CassandraStore(DataStore): tile_id = uuid.UUID(tile.summary.tile_id) serialized_tile_data = TileData.SerializeToString(tile.tile) prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)") - await type(self)._execute_query_async(self._session, prepared_query, [tile_id, bytearray(serialized_tile_data)]) + await type(self)._execute_query_async(self._session, prepared_query, + [tile_id, bytearray(serialized_tile_data)]) except NoHostAvailable as e: logger.error(f"Cannot connect to Cassandra to save tile {tile.summary.tile_id}") diff --git a/granule_ingester/granule_ingester/writers/DataStore.py b/granule_ingester/granule_ingester/writers/DataStore.py index 889d41e..a64399b 100644 --- a/granule_ingester/granule_ingester/writers/DataStore.py +++ b/granule_ingester/granule_ingester/writers/DataStore.py @@ -7,6 +7,7 @@ from granule_ingester.healthcheck import HealthCheck class DataStore(HealthCheck, ABC): + @abstractmethod def save_data(self, nexus_tile: nexusproto.NexusTile) -> None: pass diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py index 9d6a7f0..6baad28 100644 --- a/granule_ingester/granule_ingester/writers/SolrStore.py +++ b/granule_ingester/granule_ingester/writers/SolrStore.py @@ -26,7 +26,7 @@ from nexusproto.DataTile_pb2 import * from tenacity import * from granule_ingester.writers.MetadataStore import MetadataStore - +from granule_ingester.exceptions import SolrFailedHealthCheckError logger = logging.getLogger(__name__) @@ -56,7 +56,7 @@ class SolrStore(MetadataStore): else: logger.error("Solr health check returned status {}.".format(response.status)) except aiohttp.ClientConnectionError as e: - logger.error("Cannot connect to Solr!") + raise SolrFailedHealthCheckError("Cannot connect to to Solr!") return False
