This is an automated email from the ASF dual-hosted git repository. tloubrieu pushed a commit to branch ascending_latitudes in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit a9a0629405199049a822e938f46d43b575448629 Author: Eamon Ford <[email protected]> AuthorDate: Tue Aug 4 15:16:40 2020 -0700 SDAP-271 Cassandra authentication support (#11) --- granule_ingester/docker/entrypoint.sh | 2 ++ .../granule_ingester/exceptions/Exceptions.py | 41 ++++++++++++++++++++++ .../granule_ingester/exceptions/__init__.py | 11 ++++++ granule_ingester/granule_ingester/main.py | 30 ++++++++++++---- .../granule_ingester/writers/CassandraStore.py | 39 ++++++++++++++------ 5 files changed, 106 insertions(+), 17 deletions(-) diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh index e6f7262..b703ee3 100644 --- a/granule_ingester/docker/entrypoint.sh +++ b/granule_ingester/docker/entrypoint.sh @@ -7,4 +7,6 @@ python /sdap/granule_ingester/main.py \ $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq_queue=$RABBITMQ_QUEUE) \ $([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra_contact_points=$CASSANDRA_CONTACT_POINTS) \ $([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra_port=$CASSANDRA_PORT) \ + $([[ ! -z "$CASSANDRA_USERNAME" ]] && echo --cassandra_username=$CASSANDRA_USERNAME) \ + $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra_password=$CASSANDRA_PASSWORD) \ $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT) diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py new file mode 100644 index 0000000..c648b99 --- /dev/null +++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py @@ -0,0 +1,41 @@ +class PipelineBuildingError(Exception): + pass + + +class PipelineRunningError(Exception): + pass + + +class TileProcessingError(Exception): + pass + + +class LostConnectionError(Exception): + pass + + +class RabbitMQLostConnectionError(LostConnectionError): + pass + + +class CassandraLostConnectionError(LostConnectionError): + pass + +class SolrLostConnectionError(LostConnectionError): + pass + + +class FailedHealthCheckError(Exception): + pass + + +class CassandraFailedHealthCheckError(FailedHealthCheckError): + pass + + +class SolrFailedHealthCheckError(FailedHealthCheckError): + pass + + +class RabbitMQFailedHealthCheckError(FailedHealthCheckError): + pass diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py new file mode 100644 index 0000000..ea0969f --- /dev/null +++ b/granule_ingester/granule_ingester/exceptions/__init__.py @@ -0,0 +1,11 @@ +from .Exceptions import CassandraFailedHealthCheckError +from .Exceptions import CassandraLostConnectionError +from .Exceptions import FailedHealthCheckError +from .Exceptions import LostConnectionError +from .Exceptions import PipelineBuildingError +from .Exceptions import PipelineRunningError +from .Exceptions import RabbitMQFailedHealthCheckError +from .Exceptions import RabbitMQLostConnectionError +from .Exceptions import SolrFailedHealthCheckError +from .Exceptions import SolrLostConnectionError +from .Exceptions import TileProcessingError diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py index 5a8fc2d..9010e33 100644 --- a/granule_ingester/granule_ingester/main.py +++ b/granule_ingester/granule_ingester/main.py @@ -25,8 +25,8 @@ from granule_ingester.writers import CassandraStore from granule_ingester.writers import SolrStore -def cassandra_factory(contact_points, port): - store = CassandraStore(contact_points, port) +def cassandra_factory(contact_points, port, username, password): + store = CassandraStore(contact_points=contact_points, port=port, username=username, password=password) store.connect() return store @@ -72,6 +72,14 @@ async def main(): default=9042, metavar="PORT", help='Cassandra port. (Default: 9042)') + parser.add_argument('--cassandra_username', + metavar="USERNAME", + default=None, + help='Cassandra username. Optional.') + parser.add_argument('--cassandra_password', + metavar="PASSWORD", + default=None, + help='Cassandra password. Optional.') parser.add_argument('--solr_host_and_port', default='http://localhost:8983', metavar='HOST:PORT', @@ -94,6 +102,8 @@ async def main(): config_values_str = "\n".join(["{} = {}".format(arg, getattr(args, arg)) for arg in vars(args)]) logger.info("Using configuration values:\n{}".format(config_values_str)) + cassandra_username = args.cassandra_username + cassandra_password = args.cassandra_password cassandra_contact_points = args.cassandra_contact_points cassandra_port = args.cassandra_port solr_host_and_port = args.solr_host_and_port @@ -102,12 +112,18 @@ async def main(): rabbitmq_username=args.rabbitmq_username, rabbitmq_password=args.rabbitmq_password, rabbitmq_queue=args.rabbitmq_queue, - data_store_factory=partial(cassandra_factory, cassandra_contact_points, cassandra_port), + data_store_factory=partial(cassandra_factory, + cassandra_contact_points, + cassandra_port, + cassandra_username, + cassandra_password), metadata_store_factory=partial(solr_factory, solr_host_and_port)) - if await run_health_checks( - [CassandraStore(cassandra_contact_points, cassandra_port), - SolrStore(solr_host_and_port), - consumer]): + if await run_health_checks([CassandraStore(cassandra_contact_points, + cassandra_port, + cassandra_username, + cassandra_password), + SolrStore(solr_host_and_port), + consumer]): async with consumer: logger.info("All external dependencies have passed the health checks. Now listening to message queue.") await consumer.start_consuming() diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py index 7a9f146..cb5232b 100644 --- a/granule_ingester/granule_ingester/writers/CassandraStore.py +++ b/granule_ingester/granule_ingester/writers/CassandraStore.py @@ -18,11 +18,14 @@ import asyncio import logging import uuid -from cassandra.cluster import Cluster, Session +from cassandra.auth import PlainTextAuthProvider +from cassandra.cluster import Cluster, Session, NoHostAvailable from cassandra.cqlengine import columns from cassandra.cqlengine.models import Model +from cassandra.policies import RetryPolicy, ConstantReconnectionPolicy from nexusproto.DataTile_pb2 import NexusTile, TileData +from granule_ingester.exceptions import CassandraFailedHealthCheckError, CassandraLostConnectionError from granule_ingester.writers.DataStore import DataStore logging.getLogger('cassandra').setLevel(logging.INFO) @@ -37,8 +40,10 @@ class TileModel(Model): class CassandraStore(DataStore): - def __init__(self, contact_points=None, port=9042): + def __init__(self, contact_points=None, port=9042, username=None, password=None): self._contact_points = contact_points + self._username = username + self._password = password self._port = port self._session = None @@ -47,12 +52,22 @@ class CassandraStore(DataStore): session = self._get_session() session.shutdown() return True - except: - logger.error("Cannot connect to Cassandra!") - return False + except Exception: + raise CassandraFailedHealthCheckError("Cannot connect to Cassandra!") def _get_session(self) -> Session: - cluster = Cluster(contact_points=self._contact_points, port=self._port) + + if self._username and self._password: + auth_provider = PlainTextAuthProvider(username=self._username, password=self._password) + else: + auth_provider = None + + cluster = Cluster(contact_points=self._contact_points, + port=self._port, + # load_balancing_policy= + reconnection_policy=ConstantReconnectionPolicy(delay=5.0), + default_retry_policy=RetryPolicy(), + auth_provider=auth_provider) session = cluster.connect() session.set_keyspace('nexustiles') return session @@ -65,10 +80,14 @@ class CassandraStore(DataStore): self._session.shutdown() async def save_data(self, tile: NexusTile) -> None: - tile_id = uuid.UUID(tile.summary.tile_id) - serialized_tile_data = TileData.SerializeToString(tile.tile) - prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)") - await type(self)._execute_query_async(self._session, prepared_query, [tile_id, bytearray(serialized_tile_data)]) + try: + tile_id = uuid.UUID(tile.summary.tile_id) + serialized_tile_data = TileData.SerializeToString(tile.tile) + prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)") + await self._execute_query_async(self._session, prepared_query, + [tile_id, bytearray(serialized_tile_data)]) + except NoHostAvailable: + raise CassandraLostConnectionError(f"Lost connection to Cassandra, and cannot save tiles.") @staticmethod async def _execute_query_async(session: Session, query, parameters=None):
